Commit 6eb0f6a9 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Pre-merge code cleanup

parent d82241f9
Loading
Loading
Loading
Loading
+0 −108
Original line number Diff line number Diff line
{
    "contexts": [
        {
            "context_id": {"context_uuid": {"uuid": "admin"}},
            "topology_ids": [],
            "service_ids": []
        }
    ],
    "topologies": [
        {
            "topology_id": {"topology_uuid": {"uuid": "admin"}, "context_id": {"context_uuid": {"uuid": "admin"}}},
            "device_ids": [],
            "link_ids": []
        }
    ],
    "devices": [
        {
            "device_id": {"device_uuid": {"uuid": "R1-EMU"}},
            "device_type": "emu-packet-router",
            "device_config": {"config_rules": [
                {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}},
                {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}},
                {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": "{\"endpoints\": [{\"sample_types\": [], \"type\": \"optical\", \"uuid\": \"13/0/0\"}, {\"sample_types\": [101, 102, 201, 202], \"type\": \"copper\", \"uuid\": \"13/1/2\"}]}"}}
            ]},
            "device_operational_status": 1,
            "device_drivers": [0],
            "device_endpoints": []
        },
        {
            "device_id": {"device_uuid": {"uuid": "R2-EMU"}},
            "device_type": "emu-packet-router",
            "device_config": {"config_rules": [
                {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}},
                {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}},
                {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": "{\"endpoints\": [{\"sample_types\": [], \"type\": \"optical\", \"uuid\": \"13/0/0\"}, {\"sample_types\": [101, 102, 201, 202], \"type\": \"copper\", \"uuid\": \"13/1/2\"}]}"}}
            ]},
            "device_operational_status": 1,
            "device_drivers": [0],
            "device_endpoints": []
        },
        {
            "device_id": {"device_uuid": {"uuid": "R3-EMU"}},
            "device_type": "emu-packet-router",
            "device_config": {"config_rules": [
                {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}},
                {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}},
                {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": "{\"endpoints\": [{\"sample_types\": [], \"type\": \"optical\", \"uuid\": \"13/0/0\"}, {\"sample_types\": [101, 102, 201, 202], \"type\": \"copper\", \"uuid\": \"13/1/2\"}]}"}}
            ]},
            "device_operational_status": 1,
            "device_drivers": [0],
            "device_endpoints": []
        },
        {
            "device_id": {"device_uuid": {"uuid": "R4-EMU"}},
            "device_type": "emu-packet-router",
            "device_config": {"config_rules": [
                {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}},
                {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}},
                {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": "{\"endpoints\": [{\"sample_types\": [], \"type\": \"optical\", \"uuid\": \"13/0/0\"}, {\"sample_types\": [101, 102, 201, 202], \"type\": \"copper\", \"uuid\": \"13/1/2\"}]}"}}
            ]},
            "device_operational_status": 1,
            "device_drivers": [0],
            "device_endpoints": []
        },
        {
            "device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}},
            "device_type": "xr-constellation",
            "device_config": {"config_rules": [
                {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "172.19.219.44"}},
                {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "443"}},
                {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": "{\"username\": \"xr-user-1\", \"password\": \"xr-user-1\", \"hub_module_name\": \"XR HUB 1\"}"}}
            ]},
            "device_operational_status": 1,
            "device_drivers": [6],
            "device_endpoints": []
        }
    ],
    "links": [
        {
            "link_id": {"link_uuid": {"uuid": "R1-EMU/13/0/0==XR HUB 1|XR-T4"}},
            "link_endpoint_ids": [
                {"device_id": {"device_uuid": {"uuid": "R1-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}},
                {"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "XR HUB 1|XR-T4"}}
            ]
        },
        {
            "link_id": {"link_uuid": {"uuid": "R2-EMU/13/0/0==XR HUB 1|XR-T3"}},
            "link_endpoint_ids": [
                {"device_id": {"device_uuid": {"uuid": "R2-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}},
                {"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "XR HUB 1|XR-T3"}}
            ]
        },
        {
            "link_id": {"link_uuid": {"uuid": "R3-EMU/13/0/0==XR1-XR LEAF 1|XR-T1"}},
            "link_endpoint_ids": [
                {"device_id": {"device_uuid": {"uuid": "R3-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}},
                {"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "XR LEAF 1|XR-T1"}}
            ]
        },
        {
            "link_id": {"link_uuid": {"uuid": "R4-EMU/13/0/0==XR LEAF 2|XR-T1"}},
            "link_endpoint_ids": [
                {"device_id": {"device_uuid": {"uuid": "R4-EMU"}}, "endpoint_uuid": {"uuid": "13/0/0"}},
                {"device_id": {"device_uuid": {"uuid": "X1-XR-CONSTELLATION"}}, "endpoint_uuid": {"uuid": "XR LEAF 2|XR-T1"}}
            ]
        }
    ]
}
+0 −238
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List, Tuple
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Device import (
    json_device_connect_rules, json_device_emulated_connect_rules, json_device_emulated_packet_router_disabled,
    json_device_emulated_tapi_disabled, json_device_id, json_device_packetrouter_disabled, json_device_tapi_disabled)
from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id
from common.tools.object_factory.Link import json_link, json_link_id
from common.tools.object_factory.Topology import json_topology, json_topology_id
from common.proto.kpi_sample_types_pb2 import KpiSampleType

# ----- Context --------------------------------------------------------------------------------------------------------
CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_UUID)
CONTEXT    = json_context(DEFAULT_CONTEXT_UUID)

# ----- Topology -------------------------------------------------------------------------------------------------------
TOPOLOGY_ID = json_topology_id(DEFAULT_TOPOLOGY_UUID, context_id=CONTEXT_ID)
TOPOLOGY    = json_topology(DEFAULT_TOPOLOGY_UUID, context_id=CONTEXT_ID)

# ----- Monitoring Samples ---------------------------------------------------------------------------------------------
PACKET_PORT_SAMPLE_TYPES = [
    KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED,
    KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED,
    KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED,
    KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED,
]

# ----- Device Credentials and Settings --------------------------------------------------------------------------------
try:
    from .Credentials import DEVICE_R1_ADDRESS, DEVICE_R1_PORT, DEVICE_R1_USERNAME, DEVICE_R1_PASSWORD
    from .Credentials import DEVICE_R3_ADDRESS, DEVICE_R3_PORT, DEVICE_R3_USERNAME, DEVICE_R3_PASSWORD
    #from .Credentials import DEVICE_O1_ADDRESS, DEVICE_O1_PORT
    USE_REAL_DEVICES = True     # Use real devices
except ImportError:
    USE_REAL_DEVICES = False    # Use emulated devices

    DEVICE_R1_ADDRESS  = '0.0.0.0'
    DEVICE_R1_PORT     = 830
    DEVICE_R1_USERNAME = 'admin'
    DEVICE_R1_PASSWORD = 'admin'

    DEVICE_R3_ADDRESS  = '0.0.0.0'
    DEVICE_R3_PORT     = 830
    DEVICE_R3_USERNAME = 'admin'
    DEVICE_R3_PASSWORD = 'admin'

DEVICE_X1_ADDRESS  = '172.19.219.44'
DEVICE_X1_PORT     = 443

#USE_REAL_DEVICES = False     # Uncomment to force to use emulated devices

def json_endpoint_ids(device_id : Dict, endpoint_descriptors : List[Tuple[str, str, List[int]]]):
    return [
        json_endpoint_id(device_id, ep_uuid, topology_id=None)
        for ep_uuid, _, _ in endpoint_descriptors
    ]

def json_endpoints(device_id : Dict, endpoint_descriptors : List[Tuple[str, str, List[int]]]):
    return [
        json_endpoint(device_id, ep_uuid, ep_type, topology_id=None, kpi_sample_types=ep_sample_types)
        for ep_uuid, ep_type, ep_sample_types in endpoint_descriptors
    ]

def get_link_uuid(a_device_id : Dict, a_endpoint_id : Dict, z_device_id : Dict, z_endpoint_id : Dict) -> str:
    return '{:s}/{:s}=={:s}/{:s}'.format(
        a_device_id['device_uuid']['uuid'], a_endpoint_id['endpoint_uuid']['uuid'],
        z_device_id['device_uuid']['uuid'], z_endpoint_id['endpoint_uuid']['uuid'])


# ----- Devices --------------------------------------------------------------------------------------------------------
if not USE_REAL_DEVICES:
    json_device_packetrouter_disabled = json_device_emulated_packet_router_disabled
    json_device_tapi_disabled         = json_device_emulated_tapi_disabled

DEVICE_R1_UUID          = 'R1-EMU'
DEVICE_R1_TIMEOUT       = 120
DEVICE_R1_ENDPOINT_DEFS = [('13/0/0', 'optical', []), ('13/1/2', 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_R1_ID            = json_device_id(DEVICE_R1_UUID)
#DEVICE_R1_ENDPOINTS     = json_endpoints(DEVICE_R1_ID, DEVICE_R1_ENDPOINT_DEFS)
DEVICE_R1_ENDPOINT_IDS  = json_endpoint_ids(DEVICE_R1_ID, DEVICE_R1_ENDPOINT_DEFS)
DEVICE_R1               = json_device_packetrouter_disabled(DEVICE_R1_UUID)
ENDPOINT_ID_R1_13_0_0   = DEVICE_R1_ENDPOINT_IDS[0]
ENDPOINT_ID_R1_13_1_2   = DEVICE_R1_ENDPOINT_IDS[1]
DEVICE_R1_CONNECT_RULES = json_device_connect_rules(DEVICE_R1_ADDRESS, DEVICE_R1_PORT, {
    'username': DEVICE_R1_USERNAME,
    'password': DEVICE_R1_PASSWORD,
    'timeout' : DEVICE_R1_TIMEOUT,
}) if USE_REAL_DEVICES else json_device_emulated_connect_rules(DEVICE_R1_ENDPOINT_DEFS)


DEVICE_R2_UUID          = 'R2-EMU'
DEVICE_R2_ENDPOINT_DEFS = [('13/0/0', 'optical', []), ('13/1/2', 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_R2_ID            = json_device_id(DEVICE_R2_UUID)
#DEVICE_R2_ENDPOINTS     = json_endpoints(DEVICE_R2_ID, DEVICE_R2_ENDPOINT_DEFS)
DEVICE_R2_ENDPOINT_IDS  = json_endpoint_ids(DEVICE_R2_ID, DEVICE_R2_ENDPOINT_DEFS)
DEVICE_R2               = json_device_emulated_packet_router_disabled(DEVICE_R2_UUID)
ENDPOINT_ID_R2_13_0_0   = DEVICE_R2_ENDPOINT_IDS[0]
ENDPOINT_ID_R2_13_1_2   = DEVICE_R2_ENDPOINT_IDS[1]
DEVICE_R2_CONNECT_RULES = json_device_emulated_connect_rules(DEVICE_R2_ENDPOINT_DEFS)


DEVICE_R3_UUID          = 'R3-EMU'
DEVICE_R3_TIMEOUT       = 120
DEVICE_R3_ENDPOINT_DEFS = [('13/0/0', 'optical', []), ('13/1/2', 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_R3_ID            = json_device_id(DEVICE_R3_UUID)
#DEVICE_R3_ENDPOINTS     = json_endpoints(DEVICE_R3_ID, DEVICE_R3_ENDPOINT_DEFS)
DEVICE_R3_ENDPOINT_IDS  = json_endpoint_ids(DEVICE_R3_ID, DEVICE_R3_ENDPOINT_DEFS)
DEVICE_R3               = json_device_packetrouter_disabled(DEVICE_R3_UUID)
ENDPOINT_ID_R3_13_0_0   = DEVICE_R3_ENDPOINT_IDS[0]
ENDPOINT_ID_R3_13_1_2   = DEVICE_R3_ENDPOINT_IDS[1]
DEVICE_R3_CONNECT_RULES = json_device_connect_rules(DEVICE_R3_ADDRESS, DEVICE_R3_PORT, {
    'username': DEVICE_R3_USERNAME,
    'password': DEVICE_R3_PASSWORD,
    'timeout' : DEVICE_R3_TIMEOUT,
}) if USE_REAL_DEVICES else json_device_emulated_connect_rules(DEVICE_R3_ENDPOINT_DEFS)


DEVICE_R4_UUID          = 'R4-EMU'
DEVICE_R4_ENDPOINT_DEFS = [('13/0/0', 'optical', []), ('13/1/2', 'copper', PACKET_PORT_SAMPLE_TYPES)]
DEVICE_R4_ID            = json_device_id(DEVICE_R4_UUID)
#DEVICE_R4_ENDPOINTS     = json_endpoints(DEVICE_R4_ID, DEVICE_R4_ENDPOINT_DEFS)
DEVICE_R4_ENDPOINT_IDS  = json_endpoint_ids(DEVICE_R4_ID, DEVICE_R4_ENDPOINT_DEFS)
DEVICE_R4               = json_device_emulated_packet_router_disabled(DEVICE_R4_UUID)
ENDPOINT_ID_R4_13_0_0   = DEVICE_R4_ENDPOINT_IDS[0]
ENDPOINT_ID_R4_13_1_2   = DEVICE_R4_ENDPOINT_IDS[1]
DEVICE_R4_CONNECT_RULES = json_device_emulated_connect_rules(DEVICE_R4_ENDPOINT_DEFS)


DEVICE_X1_UUID          = 'X1-XR-CONSTELLATION'
DEVICE_X1_TIMEOUT       = 120
DEVICE_X1_ENDPOINT_DEFS = [
    ('XR HUB 1|XR-T1', 'optical', []),
    ('XR HUB 1|XR-T2', 'optical', []),
    ('XR HUB 1|XR-T3', 'optical', []),
    ('XR HUB 1|XR-T4', 'optical', []),
    ('XR LEAF 1|XR-T1', 'optical', []),
    ('XR LEAF 2|XR-T1', 'optical', []),
]
DEVICE_X1_ID            = json_device_id(DEVICE_X1_UUID)
DEVICE_X1               = json_device_tapi_disabled(DEVICE_X1_UUID)
DEVICE_X1_ENDPOINT_IDS  = json_endpoint_ids(DEVICE_X1_ID, DEVICE_X1_ENDPOINT_DEFS)
# These match JSON, hence indexes are what theyt are
ENDPOINT_ID_X1_EP1      = DEVICE_X1_ENDPOINT_IDS[3]
ENDPOINT_ID_X1_EP2      = DEVICE_X1_ENDPOINT_IDS[2]
ENDPOINT_ID_X1_EP3      = DEVICE_X1_ENDPOINT_IDS[4]
ENDPOINT_ID_X1_EP4      = DEVICE_X1_ENDPOINT_IDS[5]
DEVICE_X1_CONNECT_RULES = json_device_connect_rules(DEVICE_X1_ADDRESS, DEVICE_X1_PORT, {
    'timeout' : DEVICE_X1_TIMEOUT,
    "username": "xr-user-1",
    "password": "xr-user-1",
    "hub_module_name": "XR HUB 1"
})
# Always using real device (CM, whether CM has emulated backend is another story)
#if USE_REAL_DEVICES else json_device_emulated_connect_rules(DEVICE_X1_ENDPOINT_DEFS)


# ----- Links ----------------------------------------------------------------------------------------------------------
LINK_R1_X1_UUID = get_link_uuid(DEVICE_R1_ID, ENDPOINT_ID_R1_13_0_0, DEVICE_X1_ID, ENDPOINT_ID_X1_EP1)
LINK_R1_X1_ID   = json_link_id(LINK_R1_X1_UUID)
LINK_R1_X1      = json_link(LINK_R1_X1_UUID, [ENDPOINT_ID_R1_13_0_0, ENDPOINT_ID_X1_EP1])

LINK_R2_X1_UUID = get_link_uuid(DEVICE_R2_ID, ENDPOINT_ID_R2_13_0_0, DEVICE_X1_ID, ENDPOINT_ID_X1_EP2)
LINK_R2_X1_ID   = json_link_id(LINK_R2_X1_UUID)
LINK_R2_X1      = json_link(LINK_R2_X1_UUID, [ENDPOINT_ID_R2_13_0_0, ENDPOINT_ID_X1_EP2])

LINK_R3_X1_UUID = get_link_uuid(DEVICE_R3_ID, ENDPOINT_ID_R3_13_0_0, DEVICE_X1_ID, ENDPOINT_ID_X1_EP3)
LINK_R3_X1_ID   = json_link_id(LINK_R3_X1_UUID)
LINK_R3_X1      = json_link(LINK_R3_X1_UUID, [ENDPOINT_ID_R3_13_0_0, ENDPOINT_ID_X1_EP3])

LINK_R4_X1_UUID = get_link_uuid(DEVICE_R4_ID, ENDPOINT_ID_R4_13_0_0, DEVICE_X1_ID, ENDPOINT_ID_X1_EP4)
LINK_R4_X1_ID   = json_link_id(LINK_R4_X1_UUID)
LINK_R4_X1      = json_link(LINK_R4_X1_UUID, [ENDPOINT_ID_R4_13_0_0, ENDPOINT_ID_X1_EP4])


# ----- WIM Service Settings -------------------------------------------------------------------------------------------

def compose_service_endpoint_id(endpoint_id):
    device_uuid = endpoint_id['device_id']['device_uuid']['uuid']
    endpoint_uuid = endpoint_id['endpoint_uuid']['uuid']
    return ':'.join([device_uuid, endpoint_uuid])

WIM_SEP_R1_ID      = compose_service_endpoint_id(ENDPOINT_ID_R1_13_1_2)
WIM_SEP_R1_SITE_ID = '1'
WIM_SEP_R1_BEARER  = WIM_SEP_R1_ID
WIM_SRV_R1_VLAN_ID = 400

WIM_SEP_R3_ID      = compose_service_endpoint_id(ENDPOINT_ID_R3_13_1_2)
WIM_SEP_R3_SITE_ID = '2'
WIM_SEP_R3_BEARER  = WIM_SEP_R3_ID
WIM_SRV_R3_VLAN_ID = 500

WIM_USERNAME = 'admin'
WIM_PASSWORD = 'admin'

WIM_MAPPING  = [
    {'device-id': DEVICE_R1_UUID, 'service_endpoint_id': WIM_SEP_R1_ID,
     'service_mapping_info': {'bearer': {'bearer-reference': WIM_SEP_R1_BEARER}, 'site-id': WIM_SEP_R1_SITE_ID}},
    {'device-id': DEVICE_R3_UUID, 'service_endpoint_id': WIM_SEP_R3_ID,
     'service_mapping_info': {'bearer': {'bearer-reference': WIM_SEP_R3_BEARER}, 'site-id': WIM_SEP_R3_SITE_ID}},
]
WIM_SERVICE_TYPE = 'ELINE'
WIM_SERVICE_CONNECTION_POINTS = [
    {'service_endpoint_id': WIM_SEP_R1_ID,
        'service_endpoint_encapsulation_type': 'dot1q',
        'service_endpoint_encapsulation_info': {'vlan': WIM_SRV_R1_VLAN_ID}},
    {'service_endpoint_id': WIM_SEP_R3_ID,
        'service_endpoint_encapsulation_type': 'dot1q',
        'service_endpoint_encapsulation_info': {'vlan': WIM_SRV_R3_VLAN_ID}},
]

# ----- Object Collections ---------------------------------------------------------------------------------------------

CONTEXTS = [CONTEXT]
TOPOLOGIES = [TOPOLOGY]

DEVICES = [
    (DEVICE_R1, DEVICE_R1_CONNECT_RULES),
    (DEVICE_R2, DEVICE_R2_CONNECT_RULES),
    (DEVICE_R3, DEVICE_R3_CONNECT_RULES),
    (DEVICE_R4, DEVICE_R4_CONNECT_RULES),
    (DEVICE_X1, DEVICE_X1_CONNECT_RULES),
]

LINKS = [LINK_R1_X1, LINK_R2_X1, LINK_R3_X1, LINK_R4_X1]
+0 −129
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, pytest
from common.DeviceTypes import DeviceTypeEnum
from common.Settings import get_setting
from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events
from common.tools.object_factory.Connection import json_connection_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Service import json_service_id
from common.tools.grpc.Tools import grpc_message_to_json_string
from compute.tests.mock_osm.MockOSM import MockOSM
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from common.proto.context_pb2 import ContextId, Empty
from .ObjectsXr import (
    CONTEXT_ID, CONTEXTS, DEVICE_X1_UUID, DEVICE_R1_UUID, DEVICE_R3_UUID, DEVICES, LINKS, TOPOLOGIES,
    WIM_MAPPING, WIM_PASSWORD, WIM_SERVICE_CONNECTION_POINTS, WIM_SERVICE_TYPE, WIM_USERNAME)

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

DEVTYPE_EMU_PR  = DeviceTypeEnum.EMULATED_PACKET_ROUTER.value
DEVTYPE_XR_CONSTELLATION = DeviceTypeEnum.XR_CONSTELLATION.value


@pytest.fixture(scope='session')
def context_client():
    _client = ContextClient(get_setting('CONTEXTSERVICE_SERVICE_HOST'), get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC'))
    yield _client
    _client.close()


@pytest.fixture(scope='session')
def osm_wim():
    wim_url = 'http://{:s}:{:s}'.format(
        get_setting('COMPUTESERVICE_SERVICE_HOST'), str(get_setting('COMPUTESERVICE_SERVICE_PORT_HTTP')))
    return MockOSM(wim_url, WIM_MAPPING, WIM_USERNAME, WIM_PASSWORD)


def test_scenario_is_correct(context_client : ContextClient):  # pylint: disable=redefined-outer-name
    # ----- List entities - Ensure links are created -------------------------------------------------------------------
    response = context_client.ListContexts(Empty())
    assert len(response.contexts) == len(CONTEXTS)

    response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
    assert len(response.topologies) == len(TOPOLOGIES)

    response = context_client.ListDevices(Empty())
    assert len(response.devices) == len(DEVICES)

    response = context_client.ListLinks(Empty())
    assert len(response.links) == len(LINKS)

    response = context_client.ListServices(ContextId(**CONTEXT_ID))
    assert len(response.services) == 0


def test_service_creation(context_client : ContextClient, osm_wim : MockOSM): # pylint: disable=redefined-outer-name
    # ----- Start the EventsCollector ----------------------------------------------------------------------------------
    # events_collector = EventsCollector(context_client, log_events_received=True)
    # events_collector.start()

    # ----- Create Service ---------------------------------------------------------------------------------------------
    service_uuid = osm_wim.create_connectivity_service(WIM_SERVICE_TYPE, WIM_SERVICE_CONNECTION_POINTS)
    osm_wim.get_connectivity_service_status(service_uuid)

    # ----- Validate collected events ----------------------------------------------------------------------------------

    # packet_connection_uuid = '{:s}:{:s}'.format(service_uuid, DEVTYPE_EMU_PR)
    # optical_connection_uuid = '{:s}:optical:{:s}'.format(service_uuid, DEVTYPE_XR_CONSTELLATION)
    # optical_service_uuid = '{:s}:optical'.format(service_uuid)

    # expected_events = [
    #     # Create packet service and add first endpoint
    #     ('ServiceEvent',    EVENT_CREATE, json_service_id(service_uuid, context_id=CONTEXT_ID)),
    #     ('ServiceEvent',    EVENT_UPDATE, json_service_id(service_uuid, context_id=CONTEXT_ID)),

    #     # Configure OLS controller, create optical service, create optical connection
    #     ('DeviceEvent',     EVENT_UPDATE, json_device_id(DEVICE_X1_UUID)),
    #     ('ServiceEvent',    EVENT_CREATE, json_service_id(optical_service_uuid, context_id=CONTEXT_ID)),
    #     ('ConnectionEvent', EVENT_CREATE, json_connection_id(optical_connection_uuid)),

    #     # Configure endpoint packet devices, add second endpoint to service, create connection
    #     ('DeviceEvent',     EVENT_UPDATE, json_device_id(DEVICE_R1_UUID)),
    #     ('DeviceEvent',     EVENT_UPDATE, json_device_id(DEVICE_R3_UUID)),
    #     ('ServiceEvent',    EVENT_UPDATE, json_service_id(service_uuid, context_id=CONTEXT_ID)),
    #     ('ConnectionEvent', EVENT_CREATE, json_connection_id(packet_connection_uuid)),
    # ]
    # check_events(events_collector, expected_events)

    # ----- Stop the EventsCollector -----------------------------------------------------------------------------------
    # events_collector.stop()


def test_scenario_service_created(context_client : ContextClient):  # pylint: disable=redefined-outer-name
    # ----- List entities - Ensure service is created ------------------------------------------------------------------
    response = context_client.ListContexts(Empty())
    assert len(response.contexts) == len(CONTEXTS)

    response = context_client.ListTopologies(ContextId(**CONTEXT_ID))
    assert len(response.topologies) == len(TOPOLOGIES)

    response = context_client.ListDevices(Empty())
    assert len(response.devices) == len(DEVICES)

    response = context_client.ListLinks(Empty())
    assert len(response.links) == len(LINKS)

    response = context_client.ListServices(ContextId(**CONTEXT_ID))
    LOGGER.info('Services[{:d}] = {:s}'.format(len(response.services), grpc_message_to_json_string(response)))
    assert len(response.services) == 2 # L3NM + TAPI
    for service in response.services:
        service_id = service.service_id
        response = context_client.ListConnections(service_id)
        LOGGER.info('  ServiceId[{:s}] => Connections[{:d}] = {:s}'.format(
            grpc_message_to_json_string(service_id), len(response.connections), grpc_message_to_json_string(response)))
        assert len(response.connections) == 1 # one connection per service
+0 −133

File deleted.

Preview size limit exceeded, changes collapsed.