Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
# Configure logging to ensure logs appear on the console
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def create_test_configuration():
return {
"config_rules": [
{"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}},
{"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": 8080}},
{"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {
"endpoints": [
{"uuid": "eth0", "type": "ethernet", "sample_types": [101, 102]},
{"uuid": "eth1", "type": "ethernet", "sample_types": []},
{"uuid": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}
]
}}},
{"action": 1, "custom": {"resource_key": "/interface[eth0]/settings", "resource_value": {
"name": "eth0", "enabled": True
}}},
{"action": 1, "custom": {"resource_key": "/interface[eth1]/settings", "resource_value": {
"name": "eth1", "enabled": False
}}},
{"action": 1, "custom": {"resource_key": "/interface[13/1/2]/settings", "resource_value": {
"name": "13/1/2", "enabled": True
}}}
]
}
def create_specific_config_keys():
keys_to_return = ["_connect/settings/endpoints/eth1", "/interface/[13/1/2]/settings", "_connect/address"]
return keys_to_return
def create_config_for_delete():
keys_to_delete = ["_connect/settings/endpoints/eth0", "/interface/[eth1]", "_connect/port"]
return keys_to_delete
def create_test_subscriptions():
return [("_connect/settings/endpoints/eth1", 10, 2),
("_connect/settings/endpoints/13/1/2", 15, 3),
("_connect/settings/endpoints/eth0", 8, 2)]
def create_unscubscribe_subscriptions():
return [("_connect/settings/endpoints/eth1", 10, 2),
("_connect/settings/endpoints/13/1/2", 15, 3),
("_connect/settings/endpoints/eth0", 8, 2)]