diff --git a/manifests/e2e_orchestratorservice.yaml b/manifests/e2e_orchestratorservice.yaml index be2277408e443051e8aabc065bd5344580059739..2c2fbdd085e1d0e6b698458f8be21585d3dfc21b 100644 --- a/manifests/e2e_orchestratorservice.yaml +++ b/manifests/e2e_orchestratorservice.yaml @@ -42,7 +42,7 @@ spec: - containerPort: 8761 env: - name: LOG_LEVEL - value: "DEBUG" + value: "INFO" readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:10050"] diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index fe8d27fe549898b0dd02195911248054a092e262..fb285e50feeac012f13fdd560eb5680c96a17cdb 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -15,7 +15,7 @@ import copy from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply -from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId +from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, TopologyId, Device, Topology, Context from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from context.client.ContextClient import ContextClient from context.service.database.uuids.EndPoint import endpoint_get_uuid @@ -31,9 +31,11 @@ from websockets.sync.client import connect from websockets.sync.server import serve LOGGER = logging.getLogger(__name__) +logging.getLogger("websockets").propagate = False METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") + context_client: ContextClient = ContextClient() @@ -44,16 +46,11 @@ OWN_HOST = "e2e-orchestratorservice.tfs-e2e.svc.cluster.local" OWN_PORT = "8761" - def _event_received(websocket): for message in websocket: - LOGGER.info("Message received!!!: {}".format(message)) message_json = json.loads(message) - if 'event_type' in message_json: - pass - - elif 'link_id' in message_json: + if 'link_id' in message_json: obj = Link(**message_json) if _check_policies(obj): pass @@ -61,7 +58,25 @@ def _event_received(websocket): elif 'link_uuid' in message_json: obj = LinkId(**message_json) - websocket.send(message) + else: + topology_details = TopologyDetails(**message_json) + + context_id = topology_details.topology_id.context_id + context = Context() + context.context_id.CopyFrom(context_id) + context_client.SetContext(context) + + topology_id = topology_details.topology_id + topology = Topology() + topology.topology_id.CopyFrom(topology_id) + context_client.SetTopology(topology) + + for device in topology_details.devices: + context_client.SetDevice(device) + + for link in topology_details.links: + context_client.SetLink(link) + def _check_policies(link): @@ -69,6 +84,7 @@ def _check_policies(link): + def requestSubscription(): url = "ws://" + EXT_HOST + ":" + EXT_PORT request = VNTSubscriptionRequest() @@ -77,15 +93,14 @@ def requestSubscription(): LOGGER.debug("Trying to connect to {}".format(url)) try: websocket = connect(url) - LOGGER.debug("Connected to {}".format(url)) except Exception as ex: LOGGER.error('Error connecting to {}'.format(url)) else: with websocket: LOGGER.debug("Connected to {}".format(url)) send = grpc_message_to_json_string(request) - LOGGER.debug("Sending {}".format(send)) websocket.send(send) + LOGGER.debug("Sent: {}".format(send)) try: message = websocket.recv() LOGGER.debug("Received message from WebSocket: {}".format(message)) @@ -103,7 +118,7 @@ def events_server(): server = serve(_event_received, all_hosts, int(OWN_PORT)) except Exception as ex: LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT)) - LOGGER.error('Exception: {}'.format(ex)) + LOGGER.error('Exception!: {}'.format(ex)) with server: LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT)) server.serve_forever() @@ -116,7 +131,6 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): LOGGER.debug("Creating Servicer...") LOGGER.debug("Servicer Created") - time.sleep(5) try: LOGGER.info("Requesting subscription") subscription_thread = Thread(target=requestSubscription) diff --git a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py index c4d3388df9d9a7225cc39449847c054e4a560836..a4e84be6c4df66be65159d65507ca06baa7c0db6 100644 --- a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py +++ b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py @@ -61,10 +61,14 @@ HOST = "10.1.1.83" PORT = str(8765) -def send_msg(websocket, msg): - send = grpc_message_to_json_string(msg) - websocket.send(send) - message = websocket.recv() +WEBSOCKET = None + +def send_msg(msg): + LOGGER.info('-------------------------SENDING------------------------------') + LOGGER.info(msg) + LOGGER.info('-------------------------------------------------------') + WEBSOCKET.send(msg) + message = WEBSOCKET.recv() @@ -85,10 +89,12 @@ class VNTMEventDispatcher(threading.Thread): self._terminate.set() def run(self) -> None: - time.sleep(10) + global WEBSOCKET + + time.sleep(5) events_collector = EventsCollector( context_client, log_events_received=True, - activate_context_collector = True, + activate_context_collector = False, activate_topology_collector = True, activate_device_collector = False, activate_link_collector = False, @@ -102,21 +108,33 @@ class VNTMEventDispatcher(threading.Thread): LOGGER.debug('Connecting to {}'.format(url)) try: - websocket = connect(url) + WEBSOCKET = connect(url) except Exception as ex: LOGGER.error('Error connecting to {}'.format(url)) else: - LOGGER.debug('Connected to {}'.format(url)) - with websocket: - send_msg(websocket, "HOLA") + LOGGER.info('Connected to {}'.format(url)) + context_id = json_context_id(DEFAULT_CONTEXT_NAME) + topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id) + + try: + topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id)) + except Exception as ex: + LOGGER.warning('No topology found') + else: + send_msg(grpc_message_to_json_string(topology_details)) - while not self._terminate.is_set(): - event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) - if event is None: continue + LOGGER.info('aaaaaaaaaaaaaaaaa') + while not self._terminate.is_set(): + event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + if event is None: continue + LOGGER.info('event!: {}'.format(event)) + topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id)) - send_msg(websocket, event) - - events_collector.stop() + to_send = grpc_message_to_json_string(topology_details) + + send_msg(to_send) + + events_collector.stop() class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @@ -138,7 +156,6 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): self.port = request.port event_dispatcher.start() - return reply @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -152,8 +169,7 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: try: - url = "ws://" + str(self.host) + ":" + str(self.port) - send_msg(url, request) + send_msg(request) except Exception as e: LOGGER.error('Exception getting virtual link={}\n\t{}'.format(request.link_uuid.uuid, e)) else: @@ -165,8 +181,7 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: try: - url = "ws://" + str(self.host) + ":" + str(self.port) - send_msg(url, request) + send_msg(request) except Exception as e: LOGGER.error('Exception setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) else: @@ -176,8 +191,7 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: try: - url = "ws://" + str(self.host) + ":" + str(self.port) - send_msg(url, request) + send_msg(request) except Exception as e: LOGGER.error('Exception removing virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) else: