diff --git a/src/e2e_orchestrator/requirements.in b/src/e2e_orchestrator/requirements.in index 53f9028a7ed13cfc9d8949ef739e2c96af544797..2b34cad189fd9e9950589c0dfd4b4f40cf50c8b3 100644 --- a/src/e2e_orchestrator/requirements.in +++ b/src/e2e_orchestrator/requirements.in @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -networkx -websockets==12.0 +networkx==3.2.1 +python-socketio==5.12.1 requests==2.27.* +#websockets==12.0 diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index 4878d4788276857600478b20c060c75e29d39015..2284abe4cf578d58310cf8666b9b67896e81e199 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -12,229 +12,49 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy, grpc, json, logging, networkx, requests, threading +import copy, grpc, logging, networkx from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply -from common.proto.context_pb2 import ( - Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId, - ServiceTypeEnum, ServiceStatusEnum) +from common.proto.context_pb2 import Empty, Connection, EndPointId from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer -from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest -from common.tools.grpc.Tools import grpc_message_to_json_string -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME -from common.Settings import get_setting from context.client.ContextClient import ContextClient from context.service.database.uuids.EndPoint import endpoint_get_uuid -from context.service.database.uuids.Device import device_get_uuid -from service.client.ServiceClient import ServiceClient -from websockets.sync.client import connect -from websockets.sync.server import serve - LOGGER = logging.getLogger(__name__) -logging.getLogger("websockets").propagate = True -logging.getLogger("requests.packages.urllib3").propagate = True METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") - -context_client: ContextClient = ContextClient() -service_client: ServiceClient = ServiceClient() - -EXT_HOST = str(get_setting('WS_IP_HOST')) -EXT_PORT = int(get_setting('WS_IP_PORT')) -EXT_URL = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT) - -OWN_HOST = str(get_setting('WS_E2E_HOST')) -OWN_PORT = int(get_setting('WS_E2E_PORT')) - -ALL_HOSTS = '0.0.0.0' - -class SubscriptionServer(threading.Thread): - def run(self): - request = VNTSubscriptionRequest() - request.host = OWN_HOST - request.port = OWN_PORT - try: - LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL)) - websocket = connect(EXT_URL) - except: # pylint: disable=bare-except - LOGGER.exception('Error connecting to {:s}'.format(EXT_URL)) - else: - with websocket: - LOGGER.debug('Connected to {:s}'.format(EXT_URL)) - send = grpc_message_to_json_string(request) - websocket.send(send) - LOGGER.debug('Sent: {:s}'.format(send)) - try: - message = websocket.recv() - LOGGER.debug('Received message from WebSocket: {:s}'.format(message)) - except Exception as ex: - LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex)) - self._events_server() - - - def _events_server(self): - try: - server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT)) - except: # pylint: disable=bare-except - LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT)) - else: - with server: - LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT)) - server.serve_forever() - - - def _event_received(self, connection): - LOGGER.debug('Event received') - for message in connection: - message_json = json.loads(message) - - # Link creation - if 'link_id' in message_json: - LOGGER.debug('Link creation') - link = Link(**message_json) - - service = Service() - service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid - service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME - service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY - service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED - service_client.CreateService(service) - - a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id) - a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2] - z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id) - z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2] - - links = context_client.ListLinks(Empty()).links - for _link in links: - for _endpoint_id in _link.link_endpoint_ids: - if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \ - _endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid: - a_ep_id = _endpoint_id - elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \ - _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid: - z_ep_id = _endpoint_id - - if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()): - error_msg = f'Could not get VNT link endpoints\ - \n\ta_endpoint_uuid= {a_endpoint_uuid}\ - \n\tz_endpoint_uuid= {z_device_uuid}' - LOGGER.error(error_msg) - connection.send(error_msg) - return - - service.service_endpoint_ids.append(copy.deepcopy(a_ep_id)) - service.service_endpoint_ids.append(copy.deepcopy(z_ep_id)) - - service_client.UpdateService(service) - re_svc = context_client.GetService(service.service_id) - connection.send(grpc_message_to_json_string(link)) - context_client.SetLink(link) - elif 'link_uuid' in message_json: - LOGGER.debug('Link removal') - link_id = LinkId(**message_json) - - service_id = ServiceId() - service_id.service_uuid.uuid = link_id.link_uuid.uuid - service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME - service_client.DeleteService(service_id) - connection.send(grpc_message_to_json_string(link_id)) - context_client.RemoveLink(link_id) - else: - LOGGER.debug('Topology received') - topology_details = TopologyDetails(**message_json) - - context = Context() - context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid - context_client.SetContext(context) - - topology = Topology() - topology.topology_id.context_id.CopyFrom(context.context_id) - topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid - context_client.SetTopology(topology) - - for device in topology_details.devices: - context_client.SetDevice(device) - - for link in topology_details.links: - context_client.SetLink(link) - - - class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): def __init__(self): LOGGER.debug('Creating Servicer...') - try: - LOGGER.debug('Requesting subscription') - sub_server = SubscriptionServer() - sub_server.start() - LOGGER.debug('Servicer Created') - self.retrieve_external_topologies() - except: - LOGGER.exception('Unhandled Exception') - - def retrieve_external_topologies(self): - i = 1 - while True: - try: - ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD')) - PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT')) - except: # pylint: disable=bare-except - break + LOGGER.debug('Servicer Created') - try: - LOGGER.info('Retrieving external controller #{:d}'.format(i)) - url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format( - ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME - ) - LOGGER.info('url={:s}'.format(str(url))) - topo = requests.get(url).json() - LOGGER.info('Retrieved external controller #{:d}'.format(i)) - except: # pylint: disable=bare-except - LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i)) - - topology_details = TopologyDetails(**topo) - context = Context() - context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid - context_client.SetContext(context) - - topology = Topology() - topology.topology_id.context_id.CopyFrom(context.context_id) - topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid - context_client.SetTopology(topology) - - for device in topology_details.devices: - context_client.SetDevice(device) - - for link in topology_details.links: - context_client.SetLink(link) - - i+=1 - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply: - endpoints_ids = [] - for endpoint_id in request.service.service_endpoint_ids: - endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2]) + def Compute( + self, request: E2EOrchestratorRequest, context: grpc.ServicerContext + ) -> E2EOrchestratorReply: + endpoints_ids = [ + endpoint_get_uuid(endpoint_id)[2] + for endpoint_id in request.service.service_endpoint_ids + ] graph = networkx.Graph() + context_client = ContextClient() devices = context_client.ListDevices(Empty()).devices - for device in devices: - endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid - for endpoint in device.device_endpoints] + endpoints_uuids = [ + endpoint.endpoint_id.endpoint_uuid.uuid + for endpoint in device.device_endpoints + ] for ep in endpoints_uuids: graph.add_node(ep) - for ep in endpoints_uuids: - for ep_i in endpoints_uuids: - if ep == ep_i: + for ep_i in endpoints_uuids: + for ep_j in endpoints_uuids: + if ep_i == ep_j: continue - graph.add_edge(ep, ep_i) + graph.add_edge(ep_i, ep_j) links = context_client.ListLinks(Empty()).links for link in links: @@ -244,7 +64,9 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): graph.add_edge(eps[0], eps[1]) - shortest = networkx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1]) + shortest = networkx.shortest_path( + graph, endpoints_ids[0], endpoints_ids[1] + ) path = E2EOrchestratorReply() path.services.append(copy.deepcopy(request.service)) diff --git a/src/e2e_orchestrator/service/SubscriptionServer.py b/src/e2e_orchestrator/service/SubscriptionServer.py new file mode 100644 index 0000000000000000000000000000000000000000..ab1c37dbdaef656c0babf9eee896f15ff76053a8 --- /dev/null +++ b/src/e2e_orchestrator/service/SubscriptionServer.py @@ -0,0 +1,162 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy, grpc, json, logging, networkx, requests, threading +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply +from common.proto.context_pb2 import ( + Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId, + ServiceTypeEnum, ServiceStatusEnum) +from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer +from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME +from common.Settings import get_setting +from context.client.ContextClient import ContextClient +from context.service.database.uuids.EndPoint import endpoint_get_uuid +from context.service.database.uuids.Device import device_get_uuid +from service.client.ServiceClient import ServiceClient +from websockets.sync.client import connect +from websockets.sync.server import serve + + +LOGGER = logging.getLogger(__name__) +logging.getLogger("websockets").propagate = True +logging.getLogger("requests.packages.urllib3").propagate = True + +METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") + + +context_client: ContextClient = ContextClient() +service_client: ServiceClient = ServiceClient() + +EXT_HOST = str(get_setting('WS_IP_HOST')) +EXT_PORT = int(get_setting('WS_IP_PORT')) +EXT_URL = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT) + +OWN_HOST = str(get_setting('WS_E2E_HOST')) +OWN_PORT = int(get_setting('WS_E2E_PORT')) + +ALL_HOSTS = '0.0.0.0' + +class SubscriptionServer(threading.Thread): + def run(self): + request = VNTSubscriptionRequest() + request.host = OWN_HOST + request.port = OWN_PORT + try: + LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL)) + websocket = connect(EXT_URL) + except: # pylint: disable=bare-except + LOGGER.exception('Error connecting to {:s}'.format(EXT_URL)) + else: + with websocket: + LOGGER.debug('Connected to {:s}'.format(EXT_URL)) + send = grpc_message_to_json_string(request) + websocket.send(send) + LOGGER.debug('Sent: {:s}'.format(send)) + try: + message = websocket.recv() + LOGGER.debug('Received message from WebSocket: {:s}'.format(message)) + except Exception as ex: + LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex)) + self._events_server() + + + def _events_server(self): + try: + server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT)) + except: # pylint: disable=bare-except + LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT)) + else: + with server: + LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT)) + server.serve_forever() + + + def _event_received(self, connection): + LOGGER.debug('Event received') + for message in connection: + message_json = json.loads(message) + + # Link creation + if 'link_id' in message_json: + LOGGER.debug('Link creation') + link = Link(**message_json) + + service = Service() + service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid + service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY + service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + service_client.CreateService(service) + + a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id) + a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2] + z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id) + z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2] + + links = context_client.ListLinks(Empty()).links + for _link in links: + for _endpoint_id in _link.link_endpoint_ids: + if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \ + _endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid: + a_ep_id = _endpoint_id + elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \ + _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid: + z_ep_id = _endpoint_id + + if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()): + error_msg = f'Could not get VNT link endpoints\ + \n\ta_endpoint_uuid= {a_endpoint_uuid}\ + \n\tz_endpoint_uuid= {z_device_uuid}' + LOGGER.error(error_msg) + connection.send(error_msg) + return + + service.service_endpoint_ids.append(copy.deepcopy(a_ep_id)) + service.service_endpoint_ids.append(copy.deepcopy(z_ep_id)) + + service_client.UpdateService(service) + re_svc = context_client.GetService(service.service_id) + connection.send(grpc_message_to_json_string(link)) + context_client.SetLink(link) + elif 'link_uuid' in message_json: + LOGGER.debug('Link removal') + link_id = LinkId(**message_json) + + service_id = ServiceId() + service_id.service_uuid.uuid = link_id.link_uuid.uuid + service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + service_client.DeleteService(service_id) + connection.send(grpc_message_to_json_string(link_id)) + context_client.RemoveLink(link_id) + else: + LOGGER.debug('Topology received') + topology_details = TopologyDetails(**message_json) + + context = Context() + context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid + context_client.SetContext(context) + + topology = Topology() + topology.topology_id.context_id.CopyFrom(context.context_id) + topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid + context_client.SetTopology(topology) + + for device in topology_details.devices: + context_client.SetDevice(device) + + for link in topology_details.links: + context_client.SetLink(link) diff --git a/src/e2e_orchestrator/service/TopologyTools.py b/src/e2e_orchestrator/service/TopologyTools.py new file mode 100644 index 0000000000000000000000000000000000000000..eaa0e2dbb7c839dde67040fa7ee89ab568a57a78 --- /dev/null +++ b/src/e2e_orchestrator/service/TopologyTools.py @@ -0,0 +1,37 @@ + def retrieve_external_topologies(self): + i = 1 + while True: + try: + ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD')) + PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT')) + except: # pylint: disable=bare-except + break + + try: + LOGGER.info('Retrieving external controller #{:d}'.format(i)) + url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format( + ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME + ) + LOGGER.info('url={:s}'.format(str(url))) + topo = requests.get(url).json() + LOGGER.info('Retrieved external controller #{:d}'.format(i)) + except: # pylint: disable=bare-except + LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i)) + + topology_details = TopologyDetails(**topo) + context = Context() + context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid + context_client.SetContext(context) + + topology = Topology() + topology.topology_id.context_id.CopyFrom(context.context_id) + topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid + context_client.SetTopology(topology) + + for device in topology_details.devices: + context_client.SetDevice(device) + + for link in topology_details.links: + context_client.SetLink(link) + + i+=1 diff --git a/src/e2e_orchestrator/service/__main__.py b/src/e2e_orchestrator/service/__main__.py index 4c0a6d471e2b6d7ae87aee666695ebdca6938491..d984add76f453cfcec7839afe27af37a3417f0b0 100644 --- a/src/e2e_orchestrator/service/__main__.py +++ b/src/e2e_orchestrator/service/__main__.py @@ -24,6 +24,7 @@ from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables) +from e2e_orchestrator.service.subscriptions.ControllerDiscovererThread import ControllerDiscoverer from .E2EOrchestratorService import E2EOrchestratorService @@ -49,17 +50,21 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) - # Starting CentralizedCybersecurity service grpc_service = E2EOrchestratorService() grpc_service.start() - LOGGER.info("Started...") - # Wait for Ctrl+C or termination signal + controller_discoverer = ControllerDiscoverer( + terminate=terminate + ) + controller_discoverer.start() + + LOGGER.info("Running...") + # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1): pass - LOGGER.info("Terminating...") + controller_discoverer.stop() grpc_service.stop() LOGGER.info("Bye") diff --git a/src/e2e_orchestrator/service/old_E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/old_E2EOrchestratorServiceServicerImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..4878d4788276857600478b20c060c75e29d39015 --- /dev/null +++ b/src/e2e_orchestrator/service/old_E2EOrchestratorServiceServicerImpl.py @@ -0,0 +1,268 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy, grpc, json, logging, networkx, requests, threading +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply +from common.proto.context_pb2 import ( + Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId, + ServiceTypeEnum, ServiceStatusEnum) +from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer +from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME +from common.Settings import get_setting +from context.client.ContextClient import ContextClient +from context.service.database.uuids.EndPoint import endpoint_get_uuid +from context.service.database.uuids.Device import device_get_uuid +from service.client.ServiceClient import ServiceClient +from websockets.sync.client import connect +from websockets.sync.server import serve + + +LOGGER = logging.getLogger(__name__) +logging.getLogger("websockets").propagate = True +logging.getLogger("requests.packages.urllib3").propagate = True + +METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") + + +context_client: ContextClient = ContextClient() +service_client: ServiceClient = ServiceClient() + +EXT_HOST = str(get_setting('WS_IP_HOST')) +EXT_PORT = int(get_setting('WS_IP_PORT')) +EXT_URL = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT) + +OWN_HOST = str(get_setting('WS_E2E_HOST')) +OWN_PORT = int(get_setting('WS_E2E_PORT')) + +ALL_HOSTS = '0.0.0.0' + +class SubscriptionServer(threading.Thread): + def run(self): + request = VNTSubscriptionRequest() + request.host = OWN_HOST + request.port = OWN_PORT + try: + LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL)) + websocket = connect(EXT_URL) + except: # pylint: disable=bare-except + LOGGER.exception('Error connecting to {:s}'.format(EXT_URL)) + else: + with websocket: + LOGGER.debug('Connected to {:s}'.format(EXT_URL)) + send = grpc_message_to_json_string(request) + websocket.send(send) + LOGGER.debug('Sent: {:s}'.format(send)) + try: + message = websocket.recv() + LOGGER.debug('Received message from WebSocket: {:s}'.format(message)) + except Exception as ex: + LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex)) + self._events_server() + + + def _events_server(self): + try: + server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT)) + except: # pylint: disable=bare-except + LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT)) + else: + with server: + LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT)) + server.serve_forever() + + + def _event_received(self, connection): + LOGGER.debug('Event received') + for message in connection: + message_json = json.loads(message) + + # Link creation + if 'link_id' in message_json: + LOGGER.debug('Link creation') + link = Link(**message_json) + + service = Service() + service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid + service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY + service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + service_client.CreateService(service) + + a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id) + a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2] + z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id) + z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2] + + links = context_client.ListLinks(Empty()).links + for _link in links: + for _endpoint_id in _link.link_endpoint_ids: + if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \ + _endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid: + a_ep_id = _endpoint_id + elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \ + _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid: + z_ep_id = _endpoint_id + + if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()): + error_msg = f'Could not get VNT link endpoints\ + \n\ta_endpoint_uuid= {a_endpoint_uuid}\ + \n\tz_endpoint_uuid= {z_device_uuid}' + LOGGER.error(error_msg) + connection.send(error_msg) + return + + service.service_endpoint_ids.append(copy.deepcopy(a_ep_id)) + service.service_endpoint_ids.append(copy.deepcopy(z_ep_id)) + + service_client.UpdateService(service) + re_svc = context_client.GetService(service.service_id) + connection.send(grpc_message_to_json_string(link)) + context_client.SetLink(link) + elif 'link_uuid' in message_json: + LOGGER.debug('Link removal') + link_id = LinkId(**message_json) + + service_id = ServiceId() + service_id.service_uuid.uuid = link_id.link_uuid.uuid + service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + service_client.DeleteService(service_id) + connection.send(grpc_message_to_json_string(link_id)) + context_client.RemoveLink(link_id) + else: + LOGGER.debug('Topology received') + topology_details = TopologyDetails(**message_json) + + context = Context() + context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid + context_client.SetContext(context) + + topology = Topology() + topology.topology_id.context_id.CopyFrom(context.context_id) + topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid + context_client.SetTopology(topology) + + for device in topology_details.devices: + context_client.SetDevice(device) + + for link in topology_details.links: + context_client.SetLink(link) + + + +class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): + def __init__(self): + LOGGER.debug('Creating Servicer...') + try: + LOGGER.debug('Requesting subscription') + sub_server = SubscriptionServer() + sub_server.start() + LOGGER.debug('Servicer Created') + self.retrieve_external_topologies() + except: + LOGGER.exception('Unhandled Exception') + + def retrieve_external_topologies(self): + i = 1 + while True: + try: + ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD')) + PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT')) + except: # pylint: disable=bare-except + break + + try: + LOGGER.info('Retrieving external controller #{:d}'.format(i)) + url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format( + ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME + ) + LOGGER.info('url={:s}'.format(str(url))) + topo = requests.get(url).json() + LOGGER.info('Retrieved external controller #{:d}'.format(i)) + except: # pylint: disable=bare-except + LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i)) + + topology_details = TopologyDetails(**topo) + context = Context() + context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid + context_client.SetContext(context) + + topology = Topology() + topology.topology_id.context_id.CopyFrom(context.context_id) + topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid + context_client.SetTopology(topology) + + for device in topology_details.devices: + context_client.SetDevice(device) + + for link in topology_details.links: + context_client.SetLink(link) + + i+=1 + + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply: + endpoints_ids = [] + for endpoint_id in request.service.service_endpoint_ids: + endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2]) + + graph = networkx.Graph() + + devices = context_client.ListDevices(Empty()).devices + + for device in devices: + endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid + for endpoint in device.device_endpoints] + for ep in endpoints_uuids: + graph.add_node(ep) + + for ep in endpoints_uuids: + for ep_i in endpoints_uuids: + if ep == ep_i: + continue + graph.add_edge(ep, ep_i) + + links = context_client.ListLinks(Empty()).links + for link in links: + eps = [] + for endpoint_id in link.link_endpoint_ids: + eps.append(endpoint_id.endpoint_uuid.uuid) + graph.add_edge(eps[0], eps[1]) + + + shortest = networkx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1]) + + path = E2EOrchestratorReply() + path.services.append(copy.deepcopy(request.service)) + for i in range(0, int(len(shortest)/2)): + conn = Connection() + ep_a_uuid = str(shortest[i*2]) + ep_z_uuid = str(shortest[i*2+1]) + + conn.connection_id.connection_uuid.uuid = str(ep_a_uuid) + '_->_' + str(ep_z_uuid) + + ep_a_id = EndPointId() + ep_a_id.endpoint_uuid.uuid = ep_a_uuid + conn.path_hops_endpoint_ids.append(ep_a_id) + + ep_z_id = EndPointId() + ep_z_id.endpoint_uuid.uuid = ep_z_uuid + conn.path_hops_endpoint_ids.append(ep_z_id) + + path.connections.append(conn) + + return path diff --git a/src/e2e_orchestrator/service/subscriptions/ControllerDiscovererThread.py b/src/e2e_orchestrator/service/subscriptions/ControllerDiscovererThread.py new file mode 100644 index 0000000000000000000000000000000000000000..e12917f7d4e25314c292dd662340ccc6a5df4c02 --- /dev/null +++ b/src/e2e_orchestrator/service/subscriptions/ControllerDiscovererThread.py @@ -0,0 +1,100 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging, queue, threading +from typing import Any, Optional +from common.proto.context_pb2 import DeviceEvent, Empty +from common.tools.grpc.BaseEventCollector import BaseEventCollector +from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from .Subscriptions import Subscriptions +from .TFSControllerSettings import get_tfs_controller_settings + + +LOGGER = logging.getLogger(__name__) + + +class EventDispatcher(BaseEventDispatcher): + def __init__( + self, events_queue : queue.PriorityQueue, + context_client : ContextClient, + subscriptions : Subscriptions, + terminate : Optional[threading.Event] = None + ) -> None: + super().__init__(events_queue, terminate) + self._context_client = context_client + self._subscriptions = subscriptions + + def dispatch_device_create(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Create: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + tfs_ctrl_settings = get_tfs_controller_settings( + self._context_client, device_event + ) + if tfs_ctrl_settings is None: return + self._subscriptions.add_subscription(tfs_ctrl_settings) + + def dispatch_device_update(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Update: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + tfs_ctrl_settings = get_tfs_controller_settings( + self._context_client, device_event + ) + if tfs_ctrl_settings is None: return + self._subscriptions.add_subscription(tfs_ctrl_settings) + + def dispatch_device_remove(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Remove: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + device_uuid = device_event.device_id.device_uuid.uuid + self._subscriptions.remove_subscription(device_uuid) + + def dispatch(self, event : Any) -> None: + MSG = 'Unexpected Event: {:s}' + LOGGER.warning(MSG.format(grpc_message_to_json_string(event))) + +class ControllerDiscoverer: + def __init__( + self, terminate : Optional[threading.Event] = None + ) -> None: + self._context_client = ContextClient() + + self._event_collector = BaseEventCollector( + terminate=terminate + ) + self._event_collector.install_collector( + self._context_client.GetDeviceEvents, + Empty(), log_events_received=True + ) + + self._subscriptions = Subscriptions() + + self._event_dispatcher = EventDispatcher( + self._event_collector.get_events_queue(), + self._context_client, + self._subscriptions, + terminate=terminate + ) + + def start(self) -> None: + self._context_client.connect() + self._event_dispatcher.start() + self._event_collector.start() + + def stop(self): + self._event_collector.stop() + self._event_dispatcher.stop() + self._context_client.close() diff --git a/src/e2e_orchestrator/service/subscriptions/RecommendationsClientNamespace.py b/src/e2e_orchestrator/service/subscriptions/RecommendationsClientNamespace.py new file mode 100644 index 0000000000000000000000000000000000000000..590524eeab8f2338c5e21cd9f2c5de3f6daf475a --- /dev/null +++ b/src/e2e_orchestrator/service/subscriptions/RecommendationsClientNamespace.py @@ -0,0 +1,40 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, queue, socketio + +LOGGER = logging.getLogger(__name__) + +class RecommendationsClientNamespace(socketio.ClientNamespace): + def __init__(self, request_queue : queue.Queue, reply_queue : queue.Queue): + self._request_queue = request_queue + self._reply_queue = reply_queue + super().__init__(namespace='/recommendations') + + def on_connect(self): + LOGGER.info('[on_connect] Connected') + + def on_disconnect(self, reason): + MSG = '[on_disconnect] Disconnected!, reason: {:s}' + LOGGER.info(MSG.format(str(reason))) + + def on_recommendation(self, data): + MSG = '[on_recommendation] data={:s}' + LOGGER.info(MSG.format(str(data))) + + #MSG = '[on_recommendation] Recommendation: {:s}' + #LOGGER.info(MSG.format(str(recommendation))) + + #request = (self._device_uuid, *sample) + #self._request_queue.put_nowait(request) diff --git a/src/e2e_orchestrator/service/subscriptions/Subscription.py b/src/e2e_orchestrator/service/subscriptions/Subscription.py new file mode 100644 index 0000000000000000000000000000000000000000..8e1866bac2b89d69422d581a3cebae12778b7d9d --- /dev/null +++ b/src/e2e_orchestrator/service/subscriptions/Subscription.py @@ -0,0 +1,71 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import queue, socketio, threading +from common.Constants import ServiceNameEnum +from common.Settings import get_service_baseurl_http +from .RecommendationsClientNamespace import RecommendationsClientNamespace +from .TFSControllerSettings import TFSControllerSettings + + +NBI_SERVICE_PREFIX_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or '' +CHILD_SOCKETIO_URL = 'http://{:s}:{:s}@{:s}:{:d}{:s}' + + +class Subscription(threading.Thread): + def __init__( + self, tfs_ctrl_settings : TFSControllerSettings, + terminate : threading.Event + ) -> None: + super().__init__(daemon=True) + self._settings = tfs_ctrl_settings + self._terminate = terminate + self._request_queue = queue.Queue() + self._reply_queue = queue.Queue() + self._is_running = threading.Event() + + @property + def is_running(self): return self._is_running.is_set() + + @property + def request_queue(self): return self._request_queue + + @property + def reply_queue(self): return self._reply_queue + + def run(self) -> None: + child_socketio_url = CHILD_SOCKETIO_URL.format( + self._settings.nbi_username, + self._settings.nbi_password, + self._settings.nbi_address, + self._settings.nbi_port, + NBI_SERVICE_PREFIX_URL + ) + + namespace = RecommendationsClientNamespace( + self._request_queue, self._reply_queue + ) + + sio = socketio.Client(logger=True, engineio_logger=True) + sio.register_namespace(namespace) + sio.connect(child_socketio_url) + + while not self._terminate.is_set(): + sio.sleep(seconds=0.5) + + sio.shutdown() + + def stop(self): + self._terminate.set() diff --git a/src/e2e_orchestrator/service/subscriptions/Subscriptions.py b/src/e2e_orchestrator/service/subscriptions/Subscriptions.py new file mode 100644 index 0000000000000000000000000000000000000000..f4676ff8dbd1d1f9bee4f6693454749100dfa93d --- /dev/null +++ b/src/e2e_orchestrator/service/subscriptions/Subscriptions.py @@ -0,0 +1,47 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, queue, threading +from typing import Dict +from .Subscription import Subscription +from .TFSControllerSettings import TFSControllerSettings + +LOGGER = logging.getLogger(__name__) + +class Subscriptions: + def __init__(self) -> None: + self._terminate = threading.Event() + self._lock = threading.Lock() + self._subscriptions : Dict[str, Subscription] = dict() + + def add_subscription(self, tfs_ctrl_settings : TFSControllerSettings) -> None: + device_uuid = tfs_ctrl_settings.device_uuid + with self._lock: + subscription = self._subscriptions.get(device_uuid) + if (subscription is not None) and subscription.is_running: return + subscription = Subscription(tfs_ctrl_settings, self._terminate) + self._subscriptions[device_uuid] = subscription + subscription.start() + + def remove_subscription(self, device_uuid : str) -> None: + with self._lock: + subscription = self._subscriptions.get(device_uuid) + if subscription is None: return + if subscription.is_running: subscription.stop() + self._subscriptions.pop(device_uuid, None) + + def stop(self): + self._terminate.set() + for device_uuid in self._subscriptions: + self.remove_subscription(device_uuid) diff --git a/src/e2e_orchestrator/service/subscriptions/TFSControllerSettings.py b/src/e2e_orchestrator/service/subscriptions/TFSControllerSettings.py new file mode 100644 index 0000000000000000000000000000000000000000..00613e3d44fa5d25f0640b1e12521329956c0e7f --- /dev/null +++ b/src/e2e_orchestrator/service/subscriptions/TFSControllerSettings.py @@ -0,0 +1,72 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +from dataclasses import dataclass +from typing import Optional +from common.DeviceTypes import DeviceTypeEnum +from common.proto.context_pb2 import ConfigActionEnum, DeviceEvent +from common.tools.context_queries.Device import get_device +from context.client.ContextClient import ContextClient + + +@dataclass +class TFSControllerSettings: + device_uuid : str + device_type : DeviceTypeEnum + nbi_address : str + nbi_port : int + nbi_username : str + nbi_password : str + + +SELECTED_DEVICE_TYPES = { + DeviceTypeEnum.TERAFLOWSDN_CONTROLLER.value +} + + +def get_tfs_controller_settings( + context_client : ContextClient, device_event : DeviceEvent +) -> Optional[TFSControllerSettings]: + device_uuid = device_event.device_id.device_uuid.uuid + device = get_device( + context_client, device_uuid, rw_copy=False, + include_endpoints=False, include_config_rules=True, + include_components=False + ) + device_type = device.device_type + if device_type not in SELECTED_DEVICE_TYPES: return None + + connect_rules = dict() + for config_rule in device.device_config.config_rules: + if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue + if config_rule.WhichOneof('config_rule') != 'custom': continue + if not config_rule.custom.resource_key.startswith('_connect/'): continue + connect_attribute = config_rule.custom.resource_key.replace('_connect/', '') + if connect_attribute == 'settings': + settings = json.loads(config_rule.custom.resource_value) + for field in ['username', 'password']: + connect_rules[field] = settings[field] + else: + connect_rules[connect_attribute] = config_rule.custom.resource_value + + return TFSControllerSettings( + device_uuid = device_uuid, + device_type = device_type, + nbi_address = str(connect_rules['address' ]), + nbi_port = int(connect_rules['port' ]), + nbi_username = str(connect_rules['username']), + nbi_password = str(connect_rules['password']), + ) diff --git a/src/e2e_orchestrator/service/subscriptions/__init__.py b/src/e2e_orchestrator/service/subscriptions/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..023830645e0fcb60e3f8583674a954810af222f2 --- /dev/null +++ b/src/e2e_orchestrator/service/subscriptions/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.