diff --git a/src/e2e_orchestrator/service/subscriptions/Subscriptions.py b/src/e2e_orchestrator/service/subscriptions/Subscriptions.py index 683aead3b6d25aea2990025168d659c43709a9a4..0ef8c010968997f3440b32fd99f3b4951e35598e 100644 --- a/src/e2e_orchestrator/service/subscriptions/Subscriptions.py +++ b/src/e2e_orchestrator/service/subscriptions/Subscriptions.py @@ -31,7 +31,7 @@ class Subscriptions: device_uuid = tfs_ctrl_settings.device_uuid with self._lock: subscription = self._subscriptions.get(device_uuid) - if (subscription is not None) and subscription.is_running: return + if subscription is not None: return subscription = Subscription(tfs_ctrl_settings, self._dispatchers, self._terminate) self._subscriptions[device_uuid] = subscription subscription.start() diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/ClientNamespace.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/ClientNamespace.py index ab702acf63b546b1940ec374b6353ae6677dc6b5..168a136de75d542a8b9741b7401b0c65d225d4aa 100644 --- a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/ClientNamespace.py +++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/ClientNamespace.py @@ -31,25 +31,60 @@ class ClientNamespace(socketio.ClientNamespace): MSG = '[on_disconnect] Disconnected!, reason: {:s}' LOGGER.info(MSG.format(str(reason))) - def on_recommendation(self, data): - MSG = '[on_recommendation] begin data={:s}' + def on_vlink_create(self, data): + MSG = '[on_vlink_create] begin data={:s}' LOGGER.info(MSG.format(str(data))) json_data = json.loads(data) + request_key = json_data.pop('_request_key') + + recommendation = Recommendation( + action = RecommendationAction.VLINK_CREATE, + data = json_data, + ) + result = Future() + + MSG = '[on_vlink_create] Recommendation ({:s}): {:s}' + LOGGER.info(MSG.format(str(request_key), str(recommendation))) + + LOGGER.debug('[on_vlink_create] Queuing recommendation...') + self._dispatcher_queue.put_nowait((recommendation, result)) + + reply = dict() + reply['_request_key'] = request_key + try: + reply['result'] = result.result() + event = reply['result']['event'] + except Exception as e: + reply['error'] = str(e) + #reply['stacktrace'] = str(e) + event = 'error' + + LOGGER.debug('[on_vlink_create] Replying...') + self.emit(event, json.dumps(reply)) + LOGGER.debug('[on_vlink_create] end') + + def on_vlink_remove(self, data): + MSG = '[on_vlink_remove] begin data={:s}' + LOGGER.info(MSG.format(str(data))) + + json_data = json.loads(data) + request_key = json_data.pop('_request_key') + recommendation = Recommendation( - action = RecommendationAction._value2member_map_[json_data['action']], - data = json.loads(json_data['data']), + action = RecommendationAction.VLINK_REMOVE, + data = json_data, ) result = Future() - MSG = '[on_recommendation] Recommendation: {:s}' - LOGGER.info(MSG.format(str(recommendation))) + MSG = '[on_vlink_remove] Recommendation ({:s}): {:s}' + LOGGER.info(MSG.format(str(request_key), str(recommendation))) - LOGGER.debug('[on_recommendation] Queuing recommendation...') + LOGGER.debug('[on_vlink_remove] Queuing recommendation...') self._dispatcher_queue.put_nowait((recommendation, result)) - LOGGER.debug('[on_recommendation] Recommendation processed...') reply = dict() + reply['_request_key'] = request_key try: reply['result'] = result.result() event = reply['result']['event'] @@ -58,6 +93,6 @@ class ClientNamespace(socketio.ClientNamespace): #reply['stacktrace'] = str(e) event = 'error' - LOGGER.debug('[on_recommendation] Replying...') + LOGGER.debug('[on_vlink_remove] Replying...') self.emit(event, json.dumps(reply)) - LOGGER.debug('[on_recommendation] end') + LOGGER.debug('[on_vlink_remove] end') diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Dispatcher.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Dispatcher.py index 72e79e6b5900c29d6ed0fbe19c600426b07ff17d..4fd45285707eec0357bf7805137a6b1179731d71 100644 --- a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Dispatcher.py +++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Dispatcher.py @@ -46,7 +46,7 @@ class RecommendationDispatcher(_Dispatcher): vlink_optical_service['service_id']['service_uuid']['uuid'] = service_id.service_uuid.uuid service_id = service_client.UpdateService(Service(**vlink_optical_service)) - result = {'event': 'vlink-created'} + result = {'event': 'vlink_created'} elif request.action == RecommendationAction.VLINK_REMOVE: vlink_service_uuid = request.data['link_id']['link_uuid']['uuid'] context_id = json_context_id(DEFAULT_CONTEXT_NAME) @@ -58,7 +58,7 @@ class RecommendationDispatcher(_Dispatcher): if vlink_service_uuid == 'IP1/PORT-xe1==IP2/PORT-xe1': service_id = service_client.DeleteService(ServiceId(**vlink_optical_service_id)) - result = {'event': 'vlink-removed'} + result = {'event': 'vlink_removed'} else: MSG = 'RecommendationAction not supported in Recommendation({:s})' raise NotImplementedError(MSG.format(str(request))) diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Tools.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Tools.py index 48720767c87e48e15b1dadd540af56a83fe1249c..20665fcb5d2141b7efc3aca458b46fdbece599f4 100644 --- a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Tools.py +++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Tools.py @@ -14,11 +14,11 @@ import logging, networkx from dataclasses import dataclass, field -from typing import Dict, List +from typing import Dict, List, Set from common.proto.context_pb2 import ServiceTypeEnum from common.tools.context_queries.Topology import get_topology_details from common.tools.object_factory.Constraint import json_constraint_custom -from common.tools.object_factory.Context import json_context +from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Device import json_device_id from common.tools.object_factory.EndPoint import json_endpoint_id from common.tools.object_factory.Service import json_service @@ -39,6 +39,21 @@ class GraphAndMapping: endpoint_to_device_uuid : Dict[str, str] = field(default_factory=dict) +EXCLUDED_DEVICE_TYPES : Set[str] = { + DeviceTypeEnum.EMULATED_IP_SDN_CONTROLLER.value, + DeviceTypeEnum.EMULATED_MICROWAVE_RADIO_SYSTEM.value, + DeviceTypeEnum.EMULATED_OPEN_LINE_SYSTEM.value, + DeviceTypeEnum.EMULATED_XR_CONSTELLATION.value, + DeviceTypeEnum.IETF_SLICE.value, + DeviceTypeEnum.IP_SDN_CONTROLLER.value, + DeviceTypeEnum.MICROWAVE_RADIO_SYSTEM.value, + DeviceTypeEnum.NCE.value, + DeviceTypeEnum.OPEN_LINE_SYSTEM.value, + DeviceTypeEnum.TERAFLOWSDN_CONTROLLER.value, + DeviceTypeEnum.XR_CONSTELLATION.value, +} + + def compose_graph_from_topology() -> GraphAndMapping: context_client = ContextClient() topology_details = get_topology_details( @@ -48,12 +63,18 @@ def compose_graph_from_topology() -> GraphAndMapping: graph_and_mapping = GraphAndMapping() + excluded_device_uuids : Set[str] = set() + for device in topology_details.devices: device_uuid = device.device_id.device_uuid.uuid graph_and_mapping.device_name_to_uuid.setdefault(device.name, device_uuid) graph_and_mapping.device_name_to_uuid.setdefault(device_uuid, device_uuid) graph_and_mapping.device_to_type.setdefault(device_uuid, device.device_type) + if device.device_type in EXCLUDED_DEVICE_TYPES: + excluded_device_uuids.add(device_uuid) + continue + endpoint_uuids = list() for endpoint in device.device_endpoints: endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid @@ -70,9 +91,18 @@ def compose_graph_from_topology() -> GraphAndMapping: graph_and_mapping.graph.add_edge(endpoint_uuid_i, endpoint_uuid_j) for link in topology_details.links: + endpoint_id_a = link.link_endpoint_ids[ 0] + endpoint_id_z = link.link_endpoint_ids[-1] + + device_uuid_a = endpoint_id_a.device_id.device_uuid.uuid + if device_uuid_a in excluded_device_uuids: continue + + device_uuid_z = endpoint_id_z.device_id.device_uuid.uuid + if device_uuid_z in excluded_device_uuids: continue + graph_and_mapping.graph.add_edge( - link.link_endpoint_ids[ 0].endpoint_uuid.uuid, - link.link_endpoint_ids[-1].endpoint_uuid.uuid, + endpoint_id_a.endpoint_uuid.uuid, + endpoint_id_z.endpoint_uuid.uuid, ) return graph_and_mapping @@ -98,15 +128,23 @@ def compose_optical_service(vlink_request : Dict) -> Dict: graph_and_mapping.graph, endpoint_uuid_a, endpoint_uuid_b ) + LOGGER.info('[compose_optical_service] path_hops={:s}'.format(str(path_hops))) + optical_border_endpoint_ids : List[str] = list() for endpoint_uuid in path_hops: + LOGGER.info('[compose_optical_service] endpoint_uuid={:s}'.format(str(endpoint_uuid))) device_uuid = graph_and_mapping.endpoint_to_device_uuid[endpoint_uuid] + LOGGER.info('[compose_optical_service] device_uuid={:s}'.format(str(device_uuid))) device_type = graph_and_mapping.device_to_type[device_uuid] + LOGGER.info('[compose_optical_service] device_type={:s}'.format(str(device_type))) if device_type != DeviceTypeEnum.EMULATED_OPTICAL_TRANSPONDER.value: continue device_id = json_device_id(device_uuid) endpoint_id = json_endpoint_id(device_id, endpoint_uuid) + LOGGER.info('[compose_optical_service] endpoint_id={:s}'.format(str(endpoint_id))) optical_border_endpoint_ids.append(endpoint_id) + LOGGER.info('[compose_optical_service] optical_border_endpoint_ids={:s}'.format(str(optical_border_endpoint_ids))) + constraints = [ json_constraint_custom('bandwidth[gbps]', str(vlink_request['attributes']['total_capacity_gbps'])), json_constraint_custom('bidirectionality', '1'), @@ -120,8 +158,10 @@ def compose_optical_service(vlink_request : Dict) -> Dict: vlink_optical_service = json_service( vlink_service_uuid, ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY, - context_id=json_context(DEFAULT_CONTEXT_NAME), - endpoint_ids=optical_border_endpoint_ids, + context_id=json_context_id(DEFAULT_CONTEXT_NAME), + endpoint_ids=[ + optical_border_endpoint_ids[1], optical_border_endpoint_ids[2] + ], constraints=constraints, ) return vlink_optical_service