diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index 2a303851c342d126c9387da6354d5d16ba338db5..afc62a6865e6c6897f964705ab4a755129833622 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -15,7 +15,9 @@ import copy from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply -from common.proto.context_pb2 import Empty, Connection, ContextId, EndPointId, Link, LinkId, TopologyDetails, TopologyId, Device, Topology, Context, Service, ServiceStatus, DeviceId, ServiceTypeEnum, ServiceStatusEnum +from common.proto.context_pb2 import ( + Empty, Connection, EndPointId, Link, TopologyDetails, Topology, Context, Service, ServiceTypeEnum, + ServiceStatusEnum) from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from context.client.ContextClient import ContextClient from service.client.ServiceClient import ServiceClient @@ -34,7 +36,7 @@ from common.Constants import DEFAULT_CONTEXT_NAME LOGGER = logging.getLogger(__name__) -logging.getLogger("websockets").propagate = False +logging.getLogger("websockets").propagate = True METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") @@ -48,121 +50,115 @@ EXT_PORT = "8762" OWN_HOST = "e2e-orchestratorservice.tfs-e2e.svc.cluster.local" OWN_PORT = "8761" +ALL_HOSTS = "0.0.0.0" -def _event_received(websocket): - for message in websocket: - message_json = json.loads(message) - - if 'link_id' in message_json: - link = Link(**message_json) - - service = Service() - service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid - service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME - service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY - service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED - service_client.CreateService(service) - - links = context_client.ListLinks(Empty()).links - a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id) - a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2] - z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id) - z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2] - - - for _link in links: - for _endpoint_id in _link.link_endpoint_ids: - if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \ - _endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid: - a_ep_id = _endpoint_id - elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \ - _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid: - z_ep_id = _endpoint_id +class SubscriptionServer(Thread): + def __init__(self): + Thread.__init__(self) + + def run(self): + url = "ws://" + EXT_HOST + ":" + EXT_PORT + request = VNTSubscriptionRequest() + request.host = OWN_HOST + request.port = OWN_PORT + try: + LOGGER.debug("Trying to connect to {}".format(url)) + websocket = connect(url) + except Exception as ex: + LOGGER.error('Error connecting to {}'.format(url)) + else: + with websocket: + LOGGER.debug("Connected to {}".format(url)) + send = grpc_message_to_json_string(request) + websocket.send(send) + LOGGER.debug("Sent: {}".format(send)) + try: + message = websocket.recv() + LOGGER.debug("Received message from WebSocket: {}".format(message)) + except Exception as ex: + LOGGER.info('Exception receiving from WebSocket: {}'.format(ex)) + self._events_server() - service.service_endpoint_ids.append(copy.deepcopy(a_ep_id)) - service.service_endpoint_ids.append(copy.deepcopy(z_ep_id)) - service_client.UpdateService(service) - websocket.send(grpc_message_to_json_string(link)) + def _events_server(self): + all_hosts = "0.0.0.0" + try: + server = serve(self._event_received, all_hosts, int(OWN_PORT)) + except Exception as ex: + LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT)) + LOGGER.error('Exception!: {}'.format(ex)) else: - topology_details = TopologyDetails(**message_json) + with server: + LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT)) + server.serve_forever() - context = Context() - context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid - context_client.SetContext(context) - topology = Topology() - topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid - context_client.SetTopology(topology) + def _event_received(self, connection): + for message in connection: + message_json = json.loads(message) - for device in topology_details.devices: - context_client.SetDevice(device) + if 'link_id' in message_json: + link = Link(**message_json) - for link in topology_details.links: - context_client.SetLink(link) + service = Service() + service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid + service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY + service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + service_client.CreateService(service) + links = context_client.ListLinks(Empty()).links + a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id) + a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2] + z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id) + z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2] + for _link in links: + for _endpoint_id in _link.link_endpoint_ids: + if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \ + _endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid: + a_ep_id = _endpoint_id + elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \ + _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid: + z_ep_id = _endpoint_id -def requestSubscription(): - url = "ws://" + EXT_HOST + ":" + EXT_PORT - request = VNTSubscriptionRequest() - request.host = OWN_HOST - request.port = OWN_PORT - LOGGER.debug("Connecting to {}".format(url)) - try: - websocket = connect(url) - except Exception as ex: - LOGGER.error('Error connecting to {}'.format(url)) - else: - with websocket: - LOGGER.debug("Connected to {}".format(url)) - send = grpc_message_to_json_string(request) - websocket.send(send) - try: - message = websocket.recv() - LOGGER.debug("Received message from WebSocket: {}".format(message)) - except Exception as ex: - LOGGER.error('Exception receiving from WebSocket: {}'.format(ex)) - events_server() - LOGGER.info('Subscription requested') + service.service_endpoint_ids.append(copy.deepcopy(a_ep_id)) + service.service_endpoint_ids.append(copy.deepcopy(z_ep_id)) + service_client.UpdateService(service) + connection.send(grpc_message_to_json_string(link)) -def events_server(): - all_hosts = "0.0.0.0" + else: + topology_details = TopologyDetails(**message_json) - try: - server = serve(_event_received, all_hosts, int(OWN_PORT)) - except Exception as ex: - LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT)) - LOGGER.error('Exception!: {}'.format(ex)) - else: - with server: - LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT)) - server.serve_forever() - LOGGER.info("Exiting events server...") + context = Context() + context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid + context_client.SetContext(context) + topology = Topology() + topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid + context_client.SetTopology(topology) + for device in topology_details.devices: + context_client.SetDevice(device) + + for link in topology_details.links: + context_client.SetLink(link) -class SubscriptionServer(): class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): def __init__(self): LOGGER.debug("Creating Servicer...") - LOGGER.debug("Servicer Created") - try: LOGGER.debug("Requesting subscription") - subscription_thread = Thread(target=requestSubscription) - subscription_thread.start() - - - # import_optical() - + sub_server = SubscriptionServer() + sub_server.start() + LOGGER.debug("Servicer Created") except Exception as ex: LOGGER.info("Exception!: {}".format(ex))