Skip to content
Snippets Groups Projects
Commit fc04ad0b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/scenario2-workflow3-tests' into 'develop'

Changes in scenario2 workflow3 tests

See merge request !66
parents c8315657 604c15c6
No related branches found
No related tags found
2 merge requests!142Release TeraFlowSDN 2.1,!66Changes in scenario2 workflow3 tests
Showing
with 1114 additions and 0 deletions
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# make sure to source the following scripts:
# - my_deploy.sh
source tfs_runtime_env_vars.sh
pytest --verbose src/tests/scenario2/tests/test_functional_bootstrap.py
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
source tfs_runtime_env_vars.sh
pytest --verbose src/tests/scenario2/tests/test_functional_create_service.py
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
source tfs_runtime_env_vars.sh
pytest --verbose src/tests/scenario2/tests/test_functional_delete_service.py
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
source tfs_runtime_env_vars.sh
pytest --verbose src/tests/scenario2/tests/test_functional_cleanup.py
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
COVERAGEFILE=$PROJECTDIR/coverage/.coverage
# Configure the correct folder on the .coveragerc file
cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/teraflow/controller+$PROJECTDIR+g > $RCFILE
# Destroy old coverage file
rm -f $COVERAGEFILE
# Force a flush of Context database
kubectl --namespace $TFS_K8S_NAMESPACE exec -it deployment/contextservice --container redis -- redis-cli FLUSHALL
source tfs_runtime_env_vars.sh
# Run functional tests and analyze code coverage at the same time
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
tests/ofc22/tests/test_functional_bootstrap.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
tests/ofc22/tests/test_functional_create_service.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
tests/ofc22/tests/test_functional_delete_service.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
tests/ofc22/tests/test_functional_cleanup.py
# Add here your files containing confidential testbed details such as IP addresses, ports, usernames, passwords, etc.
Credentials.py
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, json, sys
from .Objects import CONTEXTS, DEVICES, LINKS, TOPOLOGIES
def main():
with open('tests/ofc22/descriptors_emulated.json', 'w', encoding='UTF-8') as f:
devices = []
for device,connect_rules in DEVICES:
device = copy.deepcopy(device)
device['device_config']['config_rules'].extend(connect_rules)
devices.append(device)
f.write(json.dumps({
'contexts': CONTEXTS,
'topologies': TOPOLOGIES,
'devices': devices,
'links': LINKS
}))
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from common.Settings import get_setting
from common.proto.monitoring_pb2 import AlarmDescriptor, AlarmSubscription
from compute.tests.mock_osm.MockOSM import MockOSM
from .Objects import WIM_MAPPING, WIM_PASSWORD, WIM_USERNAME
@pytest.fixture(scope='session')
def osm_wim():
wim_url = 'http://{:s}:{:s}'.format(
get_setting('COMPUTESERVICE_SERVICE_HOST'), str(get_setting('COMPUTESERVICE_SERVICE_PORT_HTTP')))
return MockOSM(wim_url, WIM_MAPPING, WIM_USERNAME, WIM_PASSWORD)
@pytest.fixture(scope='session')
def alarm_descriptor():
alarm_descriptor = AlarmDescriptor()
alarm_descriptor.alarm_description = "Default Alarm Description"
alarm_descriptor.name = "Default Alarm Name"
alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = 0.0
alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = 250.0
alarm_descriptor.kpi_value_range.inRange = True
alarm_descriptor.kpi_value_range.includeMinValue = False
alarm_descriptor.kpi_value_range.includeMaxValue = True
return alarm_descriptor
@pytest.fixture(scope='session')
def alarm_subscription():
alarm_subscription = AlarmSubscription()
alarm_subscription.subscription_timeout_s = 10
alarm_subscription.subscription_frequency_ms = 2000
return alarm_subscription
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, sys
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from common.proto.context_pb2 import Context, Device, Link, Topology
from device.client.DeviceClient import DeviceClient
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def main():
context_client = ContextClient(
get_setting('CONTEXTSERVICE_SERVICE_HOST'), get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC'))
device_client = DeviceClient(
get_setting('DEVICESERVICE_SERVICE_HOST'), get_setting('DEVICESERVICE_SERVICE_PORT_GRPC'))
with open('tests/ofc22/descriptors.json', 'r', encoding='UTF-8') as f:
descriptors = json.loads(f.read())
for context in descriptors['contexts' ]: context_client.SetContext (Context (**context ))
for topology in descriptors['topologies']: context_client.SetTopology(Topology(**topology))
for device in descriptors['devices' ]: device_client .AddDevice (Device (**device ))
for link in descriptors['links' ]: context_client.SetLink (Link (**link ))
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Tuple
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Device import (
json_device_connect_rules, json_device_emulated_connect_rules, json_device_emulated_packet_router_disabled,
json_device_emulated_tapi_disabled, json_device_id, json_device_packetrouter_disabled, json_device_tapi_disabled)
from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id
from common.tools.object_factory.Link import json_link, json_link_id
from common.tools.object_factory.Topology import json_topology, json_topology_id
from common.proto.kpi_sample_types_pb2 import KpiSampleType
# ----- Context --------------------------------------------------------------------------------------------------------
CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_UUID)
CONTEXT = json_context(DEFAULT_CONTEXT_UUID)
# ----- Topology -------------------------------------------------------------------------------------------------------
TOPOLOGY_ID = json_topology_id(DEFAULT_TOPOLOGY_UUID, context_id=CONTEXT_ID)
TOPOLOGY = json_topology(DEFAULT_TOPOLOGY_UUID, context_id=CONTEXT_ID)
# ----- Monitoring Samples ---------------------------------------------------------------------------------------------
PACKET_PORT_SAMPLE_TYPES = [
KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED,
KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED,
KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED,
KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED,
]
# ----- Device Credentials and Settings --------------------------------------------------------------------------------
try:
from .Credentials import DEVICE_R1_ADDRESS, DEVICE_R1_PORT, DEVICE_R1_USERNAME, DEVICE_R1_PASSWORD
from .Credentials import DEVICE_R3_ADDRESS, DEVICE_R3_PORT, DEVICE_R3_USERNAME, DEVICE_R3_PASSWORD
from .Credentials import DEVICE_O1_ADDRESS, DEVICE_O1_PORT
USE_REAL_DEVICES = True # Use real devices
except ImportError:
USE_REAL_DEVICES = False # Use emulated devices
DEVICE_R1_ADDRESS = '0.0.0.0'
DEVICE_R1_PORT = 830
DEVICE_R1_USERNAME = 'admin'
DEVICE_R1_PASSWORD = 'admin'
DEVICE_R3_ADDRESS = '0.0.0.0'
DEVICE_R3_PORT = 830
DEVICE_R3_USERNAME = 'admin'
DEVICE_R3_PASSWORD = 'admin'
DEVICE_O1_ADDRESS = '0.0.0.0'
DEVICE_O1_PORT = 4900
#USE_REAL_DEVICES = False # Uncomment to force to use emulated devices
def json_endpoint_ids(device_id : Dict, endpoint_descriptors : List[Tuple[str, str, List[int]]]):
return [
json_endpoint_id(device_id, ep_uuid, topology_id=None)
for ep_uuid, _, _ in endpoint_descriptors
]
def json_endpoints(device_id : Dict, endpoint_descriptors : List[Tuple[str, str, List[int]]]):
return [
json_endpoint(device_id, ep_uuid, ep_type, topology_id=None, kpi_sample_types=ep_sample_types)
for ep_uuid, ep_type, ep_sample_types in endpoint_descriptors
]
def get_link_uuid(a_device_id : Dict, a_endpoint_id : Dict, z_device_id : Dict, z_endpoint_id : Dict) -> str:
return '{:s}/{:s}=={:s}/{:s}'.format(
a_device_id['device_uuid']['uuid'], a_endpoint_id['endpoint_uuid']['uuid'],
z_device_id['device_uuid']['uuid'], z_endpoint_id['endpoint_uuid']['uuid'])
# ----- Devices --------------------------------------------------------------------------------------------------------
if not USE_REAL_DEVICES:
json_device_packetrouter_disabled = json_device_emulated_packet_router_disabled
json_device_tapi_disabled = json_device_emulated_tapi_disabled
DEVICE_R1_UUID = 'R1-EMU'
DEVICE_R1_TIMEOUT = 120
DEVICE_R1_ENDPOINT_DEFS = [('13/0/0', 'optical', []), ('13/1/2', 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_R1_ID = json_device_id(DEVICE_R1_UUID)
#DEVICE_R1_ENDPOINTS = json_endpoints(DEVICE_R1_ID, DEVICE_R1_ENDPOINT_DEFS)
DEVICE_R1_ENDPOINT_IDS = json_endpoint_ids(DEVICE_R1_ID, DEVICE_R1_ENDPOINT_DEFS)
DEVICE_R1 = json_device_packetrouter_disabled(DEVICE_R1_UUID)
ENDPOINT_ID_R1_13_0_0 = DEVICE_R1_ENDPOINT_IDS[0]
ENDPOINT_ID_R1_13_1_2 = DEVICE_R1_ENDPOINT_IDS[1]
DEVICE_R1_CONNECT_RULES = json_device_connect_rules(DEVICE_R1_ADDRESS, DEVICE_R1_PORT, {
'username': DEVICE_R1_USERNAME,
'password': DEVICE_R1_PASSWORD,
'timeout' : DEVICE_R1_TIMEOUT,
}) if USE_REAL_DEVICES else json_device_emulated_connect_rules(DEVICE_R1_ENDPOINT_DEFS)
DEVICE_R2_UUID = 'R2-EMU'
DEVICE_R2_ENDPOINT_DEFS = [('13/0/0', 'optical', []), ('13/1/2', 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_R2_ID = json_device_id(DEVICE_R2_UUID)
#DEVICE_R2_ENDPOINTS = json_endpoints(DEVICE_R2_ID, DEVICE_R2_ENDPOINT_DEFS)
DEVICE_R2_ENDPOINT_IDS = json_endpoint_ids(DEVICE_R2_ID, DEVICE_R2_ENDPOINT_DEFS)
DEVICE_R2 = json_device_emulated_packet_router_disabled(DEVICE_R2_UUID)
ENDPOINT_ID_R2_13_0_0 = DEVICE_R2_ENDPOINT_IDS[0]
ENDPOINT_ID_R2_13_1_2 = DEVICE_R2_ENDPOINT_IDS[1]
DEVICE_R2_CONNECT_RULES = json_device_emulated_connect_rules(DEVICE_R2_ENDPOINT_DEFS)
DEVICE_R3_UUID = 'R3-EMU'
DEVICE_R3_TIMEOUT = 120
DEVICE_R3_ENDPOINT_DEFS = [('13/0/0', 'optical', []), ('13/1/2', 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_R3_ID = json_device_id(DEVICE_R3_UUID)
#DEVICE_R3_ENDPOINTS = json_endpoints(DEVICE_R3_ID, DEVICE_R3_ENDPOINT_DEFS)
DEVICE_R3_ENDPOINT_IDS = json_endpoint_ids(DEVICE_R3_ID, DEVICE_R3_ENDPOINT_DEFS)
DEVICE_R3 = json_device_packetrouter_disabled(DEVICE_R3_UUID)
ENDPOINT_ID_R3_13_0_0 = DEVICE_R3_ENDPOINT_IDS[0]
ENDPOINT_ID_R3_13_1_2 = DEVICE_R3_ENDPOINT_IDS[1]
DEVICE_R3_CONNECT_RULES = json_device_connect_rules(DEVICE_R3_ADDRESS, DEVICE_R3_PORT, {
'username': DEVICE_R3_USERNAME,
'password': DEVICE_R3_PASSWORD,
'timeout' : DEVICE_R3_TIMEOUT,
}) if USE_REAL_DEVICES else json_device_emulated_connect_rules(DEVICE_R3_ENDPOINT_DEFS)
DEVICE_R4_UUID = 'R4-EMU'
DEVICE_R4_ENDPOINT_DEFS = [('13/0/0', 'optical', []), ('13/1/2', 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_R4_ID = json_device_id(DEVICE_R4_UUID)
#DEVICE_R4_ENDPOINTS = json_endpoints(DEVICE_R4_ID, DEVICE_R4_ENDPOINT_DEFS)
DEVICE_R4_ENDPOINT_IDS = json_endpoint_ids(DEVICE_R4_ID, DEVICE_R4_ENDPOINT_DEFS)
DEVICE_R4 = json_device_emulated_packet_router_disabled(DEVICE_R4_UUID)
ENDPOINT_ID_R4_13_0_0 = DEVICE_R4_ENDPOINT_IDS[0]
ENDPOINT_ID_R4_13_1_2 = DEVICE_R4_ENDPOINT_IDS[1]
DEVICE_R4_CONNECT_RULES = json_device_emulated_connect_rules(DEVICE_R4_ENDPOINT_DEFS)
DEVICE_O1_UUID = 'O1-OLS'
DEVICE_O1_TIMEOUT = 120
DEVICE_O1_ENDPOINT_DEFS = [
('aade6001-f00b-5e2f-a357-6a0a9d3de870', 'optical', []), # node_1_port_13
('eb287d83-f05e-53ec-ab5a-adf6bd2b5418', 'optical', []), # node_2_port_13
('0ef74f99-1acc-57bd-ab9d-4b958b06c513', 'optical', []), # node_3_port_13
('50296d99-58cc-5ce7-82f5-fc8ee4eec2ec', 'optical', []), # node_4_port_13
]
DEVICE_O1_ID = json_device_id(DEVICE_O1_UUID)
DEVICE_O1 = json_device_tapi_disabled(DEVICE_O1_UUID)
#DEVICE_O1_ENDPOINTS = json_endpoints(DEVICE_O1_ID, DEVICE_O1_ENDPOINT_DEFS)
DEVICE_O1_ENDPOINT_IDS = json_endpoint_ids(DEVICE_O1_ID, DEVICE_O1_ENDPOINT_DEFS)
ENDPOINT_ID_O1_EP1 = DEVICE_O1_ENDPOINT_IDS[0]
ENDPOINT_ID_O1_EP2 = DEVICE_O1_ENDPOINT_IDS[1]
ENDPOINT_ID_O1_EP3 = DEVICE_O1_ENDPOINT_IDS[2]
ENDPOINT_ID_O1_EP4 = DEVICE_O1_ENDPOINT_IDS[3]
DEVICE_O1_CONNECT_RULES = json_device_connect_rules(DEVICE_O1_ADDRESS, DEVICE_O1_PORT, {
'timeout' : DEVICE_O1_TIMEOUT,
}) if USE_REAL_DEVICES else json_device_emulated_connect_rules(DEVICE_O1_ENDPOINT_DEFS)
# ----- Links ----------------------------------------------------------------------------------------------------------
LINK_R1_O1_UUID = get_link_uuid(DEVICE_R1_ID, ENDPOINT_ID_R1_13_0_0, DEVICE_O1_ID, ENDPOINT_ID_O1_EP1)
LINK_R1_O1_ID = json_link_id(LINK_R1_O1_UUID)
LINK_R1_O1 = json_link(LINK_R1_O1_UUID, [ENDPOINT_ID_R1_13_0_0, ENDPOINT_ID_O1_EP1])
LINK_R2_O1_UUID = get_link_uuid(DEVICE_R2_ID, ENDPOINT_ID_R2_13_0_0, DEVICE_O1_ID, ENDPOINT_ID_O1_EP2)
LINK_R2_O1_ID = json_link_id(LINK_R2_O1_UUID)
LINK_R2_O1 = json_link(LINK_R2_O1_UUID, [ENDPOINT_ID_R2_13_0_0, ENDPOINT_ID_O1_EP2])
LINK_R3_O1_UUID = get_link_uuid(DEVICE_R3_ID, ENDPOINT_ID_R3_13_0_0, DEVICE_O1_ID, ENDPOINT_ID_O1_EP3)
LINK_R3_O1_ID = json_link_id(LINK_R3_O1_UUID)
LINK_R3_O1 = json_link(LINK_R3_O1_UUID, [ENDPOINT_ID_R3_13_0_0, ENDPOINT_ID_O1_EP3])
LINK_R4_O1_UUID = get_link_uuid(DEVICE_R4_ID, ENDPOINT_ID_R4_13_0_0, DEVICE_O1_ID, ENDPOINT_ID_O1_EP4)
LINK_R4_O1_ID = json_link_id(LINK_R4_O1_UUID)
LINK_R4_O1 = json_link(LINK_R4_O1_UUID, [ENDPOINT_ID_R4_13_0_0, ENDPOINT_ID_O1_EP4])
# ----- WIM Service Settings -------------------------------------------------------------------------------------------
def compose_service_endpoint_id(endpoint_id):
device_uuid = endpoint_id['device_id']['device_uuid']['uuid']
endpoint_uuid = endpoint_id['endpoint_uuid']['uuid']
return ':'.join([device_uuid, endpoint_uuid])
WIM_SEP_R1_ID = compose_service_endpoint_id(ENDPOINT_ID_R1_13_1_2)
WIM_SEP_R1_SITE_ID = '1'
WIM_SEP_R1_BEARER = WIM_SEP_R1_ID
WIM_SRV_R1_VLAN_ID = 400
WIM_SEP_R3_ID = compose_service_endpoint_id(ENDPOINT_ID_R3_13_1_2)
WIM_SEP_R3_SITE_ID = '2'
WIM_SEP_R3_BEARER = WIM_SEP_R3_ID
WIM_SRV_R3_VLAN_ID = 500
WIM_USERNAME = 'admin'
WIM_PASSWORD = 'admin'
WIM_MAPPING = [
{'device-id': DEVICE_R1_UUID, 'service_endpoint_id': WIM_SEP_R1_ID,
'service_mapping_info': {'bearer': {'bearer-reference': WIM_SEP_R1_BEARER}, 'site-id': WIM_SEP_R1_SITE_ID}},
{'device-id': DEVICE_R3_UUID, 'service_endpoint_id': WIM_SEP_R3_ID,
'service_mapping_info': {'bearer': {'bearer-reference': WIM_SEP_R3_BEARER}, 'site-id': WIM_SEP_R3_SITE_ID}},
]
WIM_SERVICE_TYPE = 'ELINE'
WIM_SERVICE_CONNECTION_POINTS = [
{'service_endpoint_id': WIM_SEP_R1_ID,
'service_endpoint_encapsulation_type': 'dot1q',
'service_endpoint_encapsulation_info': {'vlan': WIM_SRV_R1_VLAN_ID}},
{'service_endpoint_id': WIM_SEP_R3_ID,
'service_endpoint_encapsulation_type': 'dot1q',
'service_endpoint_encapsulation_info': {'vlan': WIM_SRV_R3_VLAN_ID}},
]
# ----- Object Collections ---------------------------------------------------------------------------------------------
CONTEXTS = [CONTEXT]
TOPOLOGIES = [TOPOLOGY]
DEVICES = [
(DEVICE_R1, DEVICE_R1_CONNECT_RULES),
(DEVICE_R2, DEVICE_R2_CONNECT_RULES),
(DEVICE_R3, DEVICE_R3_CONNECT_RULES),
(DEVICE_R4, DEVICE_R4_CONNECT_RULES),
(DEVICE_O1, DEVICE_O1_CONNECT_RULES),
]
LINKS = [LINK_R1_O1, LINK_R2_O1, LINK_R3_O1, LINK_R4_O1]
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, logging, pytest
from common.Settings import get_setting
from common.proto.monitoring_pb2 import KpiDescriptorList
from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Link import json_link_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from monitoring.client.MonitoringClient import MonitoringClient
from context.client.EventsCollector import EventsCollector
from common.proto.context_pb2 import Context, ContextId, Device, Empty, Link, Topology
from common.proto.monitoring_pb2 import AlarmDescriptor, AlarmList
from device.client.DeviceClient import DeviceClient
from .Objects import CONTEXT_ID, CONTEXTS, DEVICES, LINKS, TOPOLOGIES
from tests.Fixtures import context_client, device_client, monitoring_client
from .Fixtures import alarm_descriptor
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def test_scenario_empty(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure database is empty -------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == 0
response = context_client.ListDevices(Empty())
assert len(response.devices) == 0
response = context_client.ListLinks(Empty())
assert len(response.links) == 0
def test_prepare_scenario(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- Start the EventsCollector ----------------------------------------------------------------------------------
#events_collector = EventsCollector(context_client)
#events_collector.start()
#expected_events = []
# ----- Create Contexts and Topologies -----------------------------------------------------------------------------
for context in CONTEXTS:
context_uuid = context['context_id']['context_uuid']['uuid']
LOGGER.info('Adding Context {:s}'.format(context_uuid))
response = context_client.SetContext(Context(**context))
assert response.context_uuid.uuid == context_uuid
#expected_events.append(('ContextEvent', EVENT_CREATE, json_context_id(context_uuid)))
for topology in TOPOLOGIES:
context_uuid = topology['topology_id']['context_id']['context_uuid']['uuid']
topology_uuid = topology['topology_id']['topology_uuid']['uuid']
LOGGER.info('Adding Topology {:s}/{:s}'.format(context_uuid, topology_uuid))
response = context_client.SetTopology(Topology(**topology))
assert response.context_id.context_uuid.uuid == context_uuid
assert response.topology_uuid.uuid == topology_uuid
context_id = json_context_id(context_uuid)
#expected_events.append(('TopologyEvent', EVENT_CREATE, json_topology_id(topology_uuid, context_id=context_id)))
# ----- Validate Collected Events ----------------------------------------------------------------------------------
#check_events(events_collector, expected_events)
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
#events_collector.stop()
def test_scenario_ready(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure scenario is ready -------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == 0
response = context_client.ListLinks(Empty())
assert len(response.links) == 0
response = context_client.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 0
def test_devices_bootstraping(
context_client : ContextClient, device_client : DeviceClient): # pylint: disable=redefined-outer-name
# ----- Start the EventsCollector ----------------------------------------------------------------------------------
#events_collector = EventsCollector(context_client, log_events_received=True)
#events_collector.start()
#expected_events = []
# ----- Create Devices and Validate Collected Events ---------------------------------------------------------------
for device, connect_rules in DEVICES:
device_uuid = device['device_id']['device_uuid']['uuid']
LOGGER.info('Adding Device {:s}'.format(device_uuid))
device_with_connect_rules = copy.deepcopy(device)
device_with_connect_rules['device_config']['config_rules'].extend(connect_rules)
response = device_client.AddDevice(Device(**device_with_connect_rules))
assert response.device_uuid.uuid == device_uuid
#expected_events.extend([
# # Device creation, update for automation to start the device
# ('DeviceEvent', EVENT_CREATE, json_device_id(device_uuid)),
# #('DeviceEvent', EVENT_UPDATE, json_device_id(device_uuid)),
#])
#response = context_client.GetDevice(response)
#for endpoint in response.device_endpoints:
# for _ in endpoint.kpi_sample_types:
# # Monitoring configures monitoring for endpoint
# expected_events.append(('DeviceEvent', EVENT_UPDATE, json_device_id(device_uuid)))
# ----- Validate Collected Events ----------------------------------------------------------------------------------
#check_events(events_collector, expected_events)
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
#events_collector.stop()
def test_devices_bootstrapped(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure bevices are created -----------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == 0
response = context_client.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 0
def test_links_creation(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- Start the EventsCollector ----------------------------------------------------------------------------------
#events_collector = EventsCollector(context_client)
#events_collector.start()
#expected_events = []
# ----- Create Links and Validate Collected Events -----------------------------------------------------------------
for link in LINKS:
link_uuid = link['link_id']['link_uuid']['uuid']
LOGGER.info('Adding Link {:s}'.format(link_uuid))
response = context_client.SetLink(Link(**link))
assert response.link_uuid.uuid == link_uuid
#expected_events.append(('LinkEvent', EVENT_CREATE, json_link_id(link_uuid)))
# ----- Validate Collected Events ----------------------------------------------------------------------------------
#check_events(events_collector, expected_events)
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
#events_collector.stop()
def test_links_created(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure links are created -------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == len(LINKS)
response = context_client.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 0
def test_scenario_kpis_created(monitoring_client: MonitoringClient):
"""
This test validates that KPIs related to the service/device/endpoint were created
during the service creation process.
"""
response: KpiDescriptorList = monitoring_client.GetKpiDescriptorList(Empty())
LOGGER.info("Number of KPIs created: {}".format(len(response.kpi_descriptor_list)))
# TODO: replace the magic number `16` below for a formula that adapts to the number
# of links and devices
assert len(response.kpi_descriptor_list) == 16
def test_scenario_alarms_created(monitoring_client: MonitoringClient, alarm_descriptor : AlarmDescriptor):
# ----- Create one alarm per Kpi created ----------------------------------------------------------------------------
response: KpiDescriptorList = monitoring_client.GetKpiDescriptorList(Empty())
for kpi_descriptor in response.kpi_descriptor_list:
new_alarm_descriptor = alarm_descriptor
new_alarm_descriptor.kpi_id.kpi_id.uuid = kpi_descriptor.kpi_id.kpi_id.uuid
monitoring_client.SetKpiAlarm(new_alarm_descriptor)
response: AlarmList = monitoring_client.GetAlarms(Empty())
assert len(response.alarm_descriptor) == 16
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, pytest
from common.Settings import get_setting
from common.tests.EventTools import EVENT_REMOVE, check_events
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Link import json_link_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from common.proto.context_pb2 import ContextId, DeviceId, Empty, LinkId, TopologyId
from common.proto.monitoring_pb2 import KpiId, KpiDescriptorList, AlarmList, AlarmID
from device.client.DeviceClient import DeviceClient
from monitoring.client.MonitoringClient import MonitoringClient
from tests.Fixtures import context_client, device_client, monitoring_client
from .Objects import CONTEXT_ID, CONTEXTS, DEVICES, LINKS, TOPOLOGIES
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def test_services_removed(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure service is removed ------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == len(LINKS)
response = context_client.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 0
def test_scenario_cleanup(
context_client : ContextClient, device_client : DeviceClient, monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name
# ----- Start the EventsCollector ----------------------------------------------------------------------------------
#events_collector = EventsCollector(context_client)
#events_collector.start()
#expected_events = []
# ----- Delete Links and Validate Collected Events -----------------------------------------------------------------
for link in LINKS:
link_id = link['link_id']
link_uuid = link_id['link_uuid']['uuid']
LOGGER.info('Deleting Link {:s}'.format(link_uuid))
context_client.RemoveLink(LinkId(**link_id))
#expected_events.append(('LinkEvent', EVENT_REMOVE, json_link_id(link_uuid)))
# ----- Delete Devices and Validate Collected Events ---------------------------------------------------------------
for device, _ in DEVICES:
device_id = device['device_id']
device_uuid = device_id['device_uuid']['uuid']
LOGGER.info('Deleting Device {:s}'.format(device_uuid))
device_client.DeleteDevice(DeviceId(**device_id))
#expected_events.append(('DeviceEvent', EVENT_REMOVE, json_device_id(device_uuid)))
# ----- Delete Topologies and Validate Collected Events ------------------------------------------------------------
for topology in TOPOLOGIES:
topology_id = topology['topology_id']
context_uuid = topology_id['context_id']['context_uuid']['uuid']
topology_uuid = topology_id['topology_uuid']['uuid']
LOGGER.info('Deleting Topology {:s}/{:s}'.format(context_uuid, topology_uuid))
context_client.RemoveTopology(TopologyId(**topology_id))
context_id = json_context_id(context_uuid)
#expected_events.append(('TopologyEvent', EVENT_REMOVE, json_topology_id(topology_uuid, context_id=context_id)))
# ----- Delete Contexts and Validate Collected Events --------------------------------------------------------------
for context in CONTEXTS:
context_id = context['context_id']
context_uuid = context_id['context_uuid']['uuid']
LOGGER.info('Deleting Context {:s}'.format(context_uuid))
context_client.RemoveContext(ContextId(**context_id))
#expected_events.append(('ContextEvent', EVENT_REMOVE, json_context_id(context_uuid)))
# ----- Delete Alarms ----------------------------------------------------------------------------------------------
response: AlarmList = monitoring_client.GetAlarms(Empty())
for alarm in response.alarm_descriptor:
alarm_id = AlarmID()
alarm_id.alarm_id.uuid = alarm.alarm_id.alarm_id.uuid
monitoring_client.DeleteAlarm(alarm_id)
# ----- Delete Kpis ------------------------------------------------------------------------------------------------
response: KpiDescriptorList = monitoring_client.GetKpiDescriptorList(Empty())
for kpi_descriptor in response.kpi_descriptor_list:
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_descriptor.kpi_id.kpi_id.uuid
monitoring_client.DeleteKpi(kpi_id)
# ----- Validate Collected Events ----------------------------------------------------------------------------------
#check_events(events_collector, expected_events)
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
#events_collector.stop()
def test_scenario_empty_again(context_client : ContextClient, monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure database is empty again -------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == 0
response = context_client.ListDevices(Empty())
assert len(response.devices) == 0
response = context_client.ListLinks(Empty())
assert len(response.links) == 0
response = monitoring_client.GetAlarms(Empty())
assert len(response.alarm_descriptor) == 0
response = monitoring_client.GetKpiDescriptorList(Empty())
assert len(response.kpi_descriptor_list) == 0
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, pytest, random, time
from grpc._channel import _MultiThreadedRendezvous
from common.DeviceTypes import DeviceTypeEnum
from common.Settings import get_setting
from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events
from common.tools.object_factory.Connection import json_connection_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Service import json_service_id
from common.tools.grpc.Tools import grpc_message_to_json_string
from compute.tests.mock_osm.MockOSM import MockOSM
from context.client.ContextClient import ContextClient
from monitoring.client.MonitoringClient import MonitoringClient
from context.client.EventsCollector import EventsCollector
from common.proto.context_pb2 import ContextId, Empty
from common.proto.monitoring_pb2 import AlarmList, AlarmSubscription
from tests.Fixtures import context_client, monitoring_client
from .Fixtures import osm_wim, alarm_subscription
from .Objects import (
CONTEXT_ID, CONTEXTS, DEVICE_O1_UUID, DEVICE_R1_UUID, DEVICE_R3_UUID, DEVICES, LINKS, TOPOLOGIES,
WIM_SERVICE_CONNECTION_POINTS, WIM_SERVICE_TYPE)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
DEVTYPE_EMU_PR = DeviceTypeEnum.EMULATED_PACKET_ROUTER.value
DEVTYPE_EMU_OLS = DeviceTypeEnum.EMULATED_OPEN_LINE_SYSTEM.value
def test_scenario_is_correct(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure links are created -------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == len(LINKS)
response = context_client.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 0
def test_service_creation(context_client : ContextClient, osm_wim : MockOSM): # pylint: disable=redefined-outer-name
# ----- Start the EventsCollector ----------------------------------------------------------------------------------
# TODO: restablish the tests of the events
# events_collector = EventsCollector(context_client, log_events_received=True)
# events_collector.start()
# ----- Create Service ---------------------------------------------------------------------------------------------
service_uuid = osm_wim.create_connectivity_service(WIM_SERVICE_TYPE, WIM_SERVICE_CONNECTION_POINTS)
osm_wim.get_connectivity_service_status(service_uuid)
# ----- Validate collected events ----------------------------------------------------------------------------------
# packet_connection_uuid = '{:s}:{:s}'.format(service_uuid, DEVTYPE_EMU_PR)
# optical_connection_uuid = '{:s}:optical:{:s}'.format(service_uuid, DEVTYPE_EMU_OLS)
# optical_service_uuid = '{:s}:optical'.format(service_uuid)
# expected_events = [
# # Create packet service and add first endpoint
# ('ServiceEvent', EVENT_CREATE, json_service_id(service_uuid, context_id=CONTEXT_ID)),
# ('ServiceEvent', EVENT_UPDATE, json_service_id(service_uuid, context_id=CONTEXT_ID)),
# # Configure OLS controller, create optical service, create optical connection
# ('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_O1_UUID)),
# ('ServiceEvent', EVENT_CREATE, json_service_id(optical_service_uuid, context_id=CONTEXT_ID)),
# ('ConnectionEvent', EVENT_CREATE, json_connection_id(optical_connection_uuid)),
# # Configure endpoint packet devices, add second endpoint to service, create connection
# ('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_R1_UUID)),
# ('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_R3_UUID)),
# ('ServiceEvent', EVENT_UPDATE, json_service_id(service_uuid, context_id=CONTEXT_ID)),
# ('ConnectionEvent', EVENT_CREATE, json_connection_id(packet_connection_uuid)),
# ]
# check_events(events_collector, expected_events)
# # ----- Stop the EventsCollector -----------------------------------------------------------------------------------
# events_collector.stop()
def test_scenario_service_created(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure service is created ------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == len(LINKS)
response = context_client.ListServices(ContextId(**CONTEXT_ID))
LOGGER.info('Services[{:d}] = {:s}'.format(len(response.services), grpc_message_to_json_string(response)))
assert len(response.services) == 2 # L3NM + TAPI
for service in response.services:
service_id = service.service_id
response = context_client.ListConnections(service_id)
LOGGER.info(' ServiceId[{:s}] => Connections[{:d}] = {:s}'.format(
grpc_message_to_json_string(service_id), len(response.connections), grpc_message_to_json_string(response)))
assert len(response.connections) == 1 # one connection per service
def test_scenario_kpi_values_created(monitoring_client: MonitoringClient):
"""
This test validates that KPI values have been inserted into the monitoring database.
We short k KPI descriptors to test.
"""
response = monitoring_client.GetKpiDescriptorList(Empty())
kpi_descriptors = random.choices(response.kpi_descriptor_list, k=2)
time.sleep(5)
for kpi_descriptor in kpi_descriptors:
response = monitoring_client.GetInstantKpi(kpi_descriptor.kpi_id)
assert response.kpi_id.kpi_id.uuid == kpi_descriptor.kpi_id.kpi_id.uuid
assert response.timestamp.timestamp > 0
def test_scenario_alarm_subscriptions_created(monitoring_client: MonitoringClient, alarm_subscription: AlarmSubscription):
response: AlarmList = monitoring_client.GetAlarms(Empty())
for alarm in response.alarm_descriptor:
new_alarm_subscription = alarm_subscription
new_alarm_subscription.alarm_id.alarm_id.uuid = alarm.alarm_id.alarm_id.uuid
response = monitoring_client.GetAlarmResponseStream(new_alarm_subscription)
assert isinstance(response, _MultiThreadedRendezvous)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, pytest
from common.DeviceTypes import DeviceTypeEnum
from common.Settings import get_setting
from common.tests.EventTools import EVENT_REMOVE, EVENT_UPDATE, check_events
from common.tools.object_factory.Connection import json_connection_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Service import json_service_id
from common.tools.grpc.Tools import grpc_message_to_json_string
from compute.tests.mock_osm.MockOSM import MockOSM
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from common.proto.context_pb2 import ContextId, Empty, ServiceTypeEnum
from tests.Fixtures import context_client
from .Fixtures import osm_wim
from .Objects import (
CONTEXT_ID, CONTEXTS, DEVICE_O1_UUID, DEVICE_R1_UUID, DEVICE_R3_UUID, DEVICES, LINKS, TOPOLOGIES, WIM_MAPPING,
WIM_PASSWORD, WIM_USERNAME)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
DEVTYPE_EMU_PR = DeviceTypeEnum.EMULATED_PACKET_ROUTER.value
DEVTYPE_EMU_OLS = DeviceTypeEnum.EMULATED_OPEN_LINE_SYSTEM.value
def test_scenario_is_correct(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure service is created ------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == len(LINKS)
response = context_client.ListServices(ContextId(**CONTEXT_ID))
LOGGER.info('Services[{:d}] = {:s}'.format(len(response.services), grpc_message_to_json_string(response)))
assert len(response.services) == 2 # L3NM + TAPI
for service in response.services:
service_id = service.service_id
response = context_client.ListConnections(service_id)
LOGGER.info(' ServiceId[{:s}] => Connections[{:d}] = {:s}'.format(
grpc_message_to_json_string(service_id), len(response.connections), grpc_message_to_json_string(response)))
assert len(response.connections) == 1 # one connection per service
def test_service_removal(context_client : ContextClient, osm_wim : MockOSM): # pylint: disable=redefined-outer-name
# ----- Start the EventsCollector ----------------------------------------------------------------------------------
#events_collector = EventsCollector(context_client, log_events_received=True)
#events_collector.start()
# ----- Delete Service ---------------------------------------------------------------------------------------------
response = context_client.ListServices(ContextId(**CONTEXT_ID))
LOGGER.info('Services[{:d}] = {:s}'.format(len(response.services), grpc_message_to_json_string(response)))
assert len(response.services) == 2 # L3NM + TAPI
service_uuids = set()
for service in response.services:
if service.service_type != ServiceTypeEnum.SERVICETYPE_L3NM: continue
service_uuid = service.service_id.service_uuid.uuid
service_uuids.add(service_uuid)
osm_wim.conn_info[service_uuid] = {}
assert len(service_uuids) == 1 # assume a single L3NM service has been created
service_uuid = set(service_uuids).pop()
osm_wim.delete_connectivity_service(service_uuid)
# ----- Validate collected events ----------------------------------------------------------------------------------
#packet_connection_uuid = '{:s}:{:s}'.format(service_uuid, DEVTYPE_EMU_PR)
#optical_connection_uuid = '{:s}:optical:{:s}'.format(service_uuid, DEVTYPE_EMU_OLS)
#optical_service_uuid = '{:s}:optical'.format(service_uuid)
#expected_events = [
# ('ConnectionEvent', EVENT_REMOVE, json_connection_id(packet_connection_uuid)),
# ('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_R1_UUID)),
# ('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_R3_UUID)),
# ('ServiceEvent', EVENT_REMOVE, json_service_id(service_uuid, context_id=CONTEXT_ID)),
# ('ConnectionEvent', EVENT_REMOVE, json_connection_id(optical_connection_uuid)),
# ('DeviceEvent', EVENT_UPDATE, json_device_id(DEVICE_O1_UUID)),
# ('ServiceEvent', EVENT_REMOVE, json_service_id(optical_service_uuid, context_id=CONTEXT_ID)),
#]
#check_events(events_collector, expected_events)
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
#events_collector.stop()
def test_services_removed(context_client : ContextClient): # pylint: disable=redefined-outer-name
# ----- List entities - Ensure service is removed ------------------------------------------------------------------
response = context_client.ListContexts(Empty())
assert len(response.contexts) == len(CONTEXTS)
response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == len(TOPOLOGIES)
response = context_client.ListDevices(Empty())
assert len(response.devices) == len(DEVICES)
response = context_client.ListLinks(Empty())
assert len(response.links) == len(LINKS)
response = context_client.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment