diff --git a/manifests/e2e_orchestratorservice.yaml b/manifests/e2e_orchestratorservice.yaml index c02823c9f6e5217d722574ea07c5fc9fa3041a13..be2277408e443051e8aabc065bd5344580059739 100644 --- a/manifests/e2e_orchestratorservice.yaml +++ b/manifests/e2e_orchestratorservice.yaml @@ -25,6 +25,9 @@ spec: metadata: annotations: config.linkerd.io/skip-outbound-ports: "8761" + config.linkerd.io/skip-inbound-ports: "8761" + + labels: app: e2e-orchestratorservice spec: diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index b00bbb73284d1f72b835e3fe6698494b4430157f..fe8d27fe549898b0dd02195911248054a092e262 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -12,23 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - -import networkx as nx -import grpc import copy -from websockets.sync.client import connect -import time from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from context.client.ContextClient import ContextClient from context.service.database.uuids.EndPoint import endpoint_get_uuid -from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply +from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest from common.tools.grpc.Tools import grpc_message_to_json_string -from websockets.sync.server import serve +import grpc import json +import logging +import networkx as nx +from threading import Thread +import time +from websockets.sync.client import connect +from websockets.sync.server import serve LOGGER = logging.getLogger(__name__) @@ -37,6 +37,78 @@ METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") context_client: ContextClient = ContextClient() +EXT_HOST = "nbiservice.tfs-ip.svc.cluster.local" +EXT_PORT = "8762" + +OWN_HOST = "e2e-orchestratorservice.tfs-e2e.svc.cluster.local" +OWN_PORT = "8761" + + + +def _event_received(websocket): + for message in websocket: + LOGGER.info("Message received!!!: {}".format(message)) + message_json = json.loads(message) + + if 'event_type' in message_json: + pass + + elif 'link_id' in message_json: + obj = Link(**message_json) + if _check_policies(obj): + pass + + elif 'link_uuid' in message_json: + obj = LinkId(**message_json) + + websocket.send(message) + + +def _check_policies(link): + return True + + + +def requestSubscription(): + url = "ws://" + EXT_HOST + ":" + EXT_PORT + request = VNTSubscriptionRequest() + request.host = OWN_HOST + request.port = OWN_PORT + LOGGER.debug("Trying to connect to {}".format(url)) + try: + websocket = connect(url) + LOGGER.debug("Connected to {}".format(url)) + except Exception as ex: + LOGGER.error('Error connecting to {}'.format(url)) + else: + with websocket: + LOGGER.debug("Connected to {}".format(url)) + send = grpc_message_to_json_string(request) + LOGGER.debug("Sending {}".format(send)) + websocket.send(send) + try: + message = websocket.recv() + LOGGER.debug("Received message from WebSocket: {}".format(message)) + except Exception as ex: + LOGGER.info('Exception receiving from WebSocket: {}'.format(ex)) + + events_server() + LOGGER.info('Subscription requested') + + +def events_server(): + all_hosts = "0.0.0.0" + + try: + server = serve(_event_received, all_hosts, int(OWN_PORT)) + except Exception as ex: + LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT)) + LOGGER.error('Exception: {}'.format(ex)) + with server: + LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT)) + server.serve_forever() + LOGGER.info("Exiting events server...") + class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): @@ -47,9 +119,10 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): time.sleep(5) try: LOGGER.info("Requesting subscription") - self.RequestSubscription() - except Exception as E: - LOGGER.info("Exception0!: {}".format(E)) + subscription_thread = Thread(target=requestSubscription) + subscription_thread.start() + except Exception as ex: + LOGGER.info("Exception!: {}".format(ex)) @@ -104,66 +177,4 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): path.connections.append(conn) - def RequestSubscription(self): - LOGGER.info("Trying to connect...!!!") - - EXT_HOST = "nbiservice.tfs-ip.svc.cluster.local" - EXT_PORT = "8762" - - OWN_HOST = "e2e-orchestratorservice.tfs-e2e.svc.cluster.local" - OWN_PORT = "8761" - - - - url = "ws://" + EXT_HOST + ":" + EXT_PORT - request = VNTSubscriptionRequest() - request.host = OWN_HOST - request.port = OWN_PORT - LOGGER.info("Trying to connect... to {}".format(url)) - with connect(url) as websocket: - LOGGER.info("CONNECTED!!! {}") - send = grpc_message_to_json_string(request) - LOGGER.info("Sending {}".format(send)) - websocket.send(send) - - try: - message = websocket.recv() - except Exception as e: - LOGGER.info('Exception1!: {}'.format(e)) - - try: - LOGGER.info("Received ws: {}".format(message)) - except Exception as e: - LOGGER.info('Exception2!: {}'.format(e)) - - - - with serve(self._event_received, "0.0.0.0", OWN_PORT, logger=LOGGER) as server: - LOGGER.info("Running subscription server...: {}:{}".format("0.0.0.0", OWN_PORT)) - server.serve_forever() - LOGGER.info("Exiting subscription server...") - - - - - def _event_received(self, websocket): - for message in websocket: - LOGGER.info("Message received!!!: {}".format(message)) - message_json = json.loads(message) - if 'event_type' in message_json: - - pass - elif 'link_id' in message_json: - obj = Link(**message_json) - if self._check_policies(obj): - pass - elif 'link_uuid' in message_json: - obj = LinkId(**message_json) - - websocket.send(message) - - - def _check_policies(self, link): - return True - diff --git a/src/nbi/Dockerfile b/src/nbi/Dockerfile index 33f22953464d56725344df96fff1d41c4c7688b4..06e8c21a6f9c0ce01814398f3379832f3054ca5d 100644 --- a/src/nbi/Dockerfile +++ b/src/nbi/Dockerfile @@ -89,9 +89,8 @@ COPY src/service/__init__.py service/__init__.py COPY src/service/client/. service/client/ COPY src/slice/__init__.py slice/__init__.py COPY src/slice/client/. slice/client/ -# COPY src/vnt_manager/__init__.py vnt_manager/__init__.py -# COPY src/vnt_manager/client/. vnt_manager/client/ -COPY --chown=teraflow:teraflow ./src/vnt_manager/. vnt_manager +COPY src/vnt_manager/__init__.py vnt_manager/__init__.py +COPY src/vnt_manager/client/. vnt_manager/client/ RUN mkdir -p /var/teraflow/tests/tools COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/ diff --git a/src/nbi/service/__main__.py b/src/nbi/service/__main__.py index 67bf06267c1570c777671848467711147807494e..9fb439e16213045109466708185b7ae2545ba673 100644 --- a/src/nbi/service/__main__.py +++ b/src/nbi/service/__main__.py @@ -26,7 +26,7 @@ from .rest_server.nbi_plugins.ietf_l3vpn import register_ietf_l3vpn from .rest_server.nbi_plugins.ietf_network import register_ietf_network from .rest_server.nbi_plugins.ietf_network_slice import register_ietf_nss from .rest_server.nbi_plugins.tfs_api import register_tfs_api -from .rest_server.nbi_plugins.context_subscription import register_context_subscription +from .context_subscription import register_context_subscription terminate = threading.Event() LOGGER = None diff --git a/src/nbi/service/rest_server/nbi_plugins/context_subscription/__init__.py b/src/nbi/service/context_subscription/__init__.py similarity index 86% rename from src/nbi/service/rest_server/nbi_plugins/context_subscription/__init__.py rename to src/nbi/service/context_subscription/__init__.py index eb841cb7fda0fe52c123719cead5776843deb447..33451e2fb218b1507ecc024e07ab5c6b75dacfc7 100644 --- a/src/nbi/service/rest_server/nbi_plugins/context_subscription/__init__.py +++ b/src/nbi/service/context_subscription/__init__.py @@ -50,25 +50,17 @@ def register_context_subscription(): def subcript_to_vnt_manager(websocket): for message in websocket: - LOGGER.info("Message received: {}".format(message)) + LOGGER.debug("Message received: {}".format(message)) message_json = json.loads(message) request = VNTSubscriptionRequest() request.host = message_json['host'] request.port = message_json['port'] - LOGGER.info("Received gRPC from ws: {}".format(request)) + LOGGER.debug("Received gRPC from ws: {}".format(request)) - reply = VNTSubscriptionReply() try: vntm_reply = vnt_manager_client.VNTSubscript(request) - LOGGER.info("Received gRPC from vntm: {}".format(vntm_reply)) + LOGGER.debug("Received gRPC from vntm: {}".format(vntm_reply)) except Exception as e: LOGGER.error('Could not subscript to VTNManager: {}'.format(e)) - reply.subscription = "NOT OK" - else: - reply.subscription = "OK" - - - websocket.send(reply.subscription) - - + websocket.send(vntm_reply.subscription) diff --git a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py index 9d2e8364a795edf22dae80f39e7a0e8cf111817f..c4d3388df9d9a7225cc39449847c054e4a560836 100644 --- a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py +++ b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py @@ -40,7 +40,8 @@ from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERD from typing import Any, Dict, Set from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum from common.tools.grpc.Tools import grpc_message_to_json_string - +from common.tools.grpc.Tools import grpc_message_to_json +import time LOGGER = logging.getLogger(__name__) @@ -60,14 +61,10 @@ HOST = "10.1.1.83" PORT = str(8765) -def send_msg(url, msg): - LOGGER.info("Sending event to {}".format(url)) - with connect(url, logger=LOGGER) as websocket: - send = grpc_message_to_json_string(msg) - LOGGER.info("Sending {}".format(send)) - websocket.send(send) - message = websocket.recv() - LOGGER.info("Received ws: {}".format(message)) +def send_msg(websocket, msg): + send = grpc_message_to_json_string(msg) + websocket.send(send) + message = websocket.recv() @@ -88,7 +85,7 @@ class VNTMEventDispatcher(threading.Thread): self._terminate.set() def run(self) -> None: - LOGGER.info('Thread running!') + time.sleep(10) events_collector = EventsCollector( context_client, log_events_received=True, activate_context_collector = True, @@ -102,22 +99,24 @@ class VNTMEventDispatcher(threading.Thread): url = "ws://" + str(self.host) + ":" + str(self.port) + LOGGER.debug('Connecting to {}'.format(url)) + try: + websocket = connect(url) + except Exception as ex: + LOGGER.error('Error connecting to {}'.format(url)) + else: + LOGGER.debug('Connected to {}'.format(url)) + with websocket: + send_msg(websocket, "HOLA") - request = VNTSubscriptionRequest() - request.host = str(self.host) - request.port = str(self.port) - - - send_msg(url, request) - - while not self._terminate.is_set(): - event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) - if event is None: continue + while not self._terminate.is_set(): + event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + if event is None: continue - send_msg(url, event) - - events_collector.stop() + send_msg(websocket, event) + + events_collector.stop() class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @@ -129,13 +128,12 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def VNTSubscript(self, request: VNTSubscriptionRequest, context: grpc.ServicerContext) -> VNTSubscriptionReply: - LOGGER.info('----------------') - LOGGER.info(request) - LOGGER.info('----------------') + LOGGER.info("Subscript request: {:s}".format(str(grpc_message_to_json(request)))) reply = VNTSubscriptionReply() reply.subscription = "OK" event_dispatcher = VNTMEventDispatcher(request.host, int(request.port)) + self.host = request.host self.port = request.port event_dispatcher.start() @@ -157,7 +155,7 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): url = "ws://" + str(self.host) + ":" + str(self.port) send_msg(url, request) except Exception as e: - LOGGER.error('Exection getting virtual link={}\n\t{}'.format(request.link_uuid.uuid, e)) + LOGGER.error('Exception getting virtual link={}\n\t{}'.format(request.link_uuid.uuid, e)) else: for link in self.links: if link.link_uuid.uuid == request.uuid: @@ -170,7 +168,7 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): url = "ws://" + str(self.host) + ":" + str(self.port) send_msg(url, request) except Exception as e: - LOGGER.error('Exection setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) + LOGGER.error('Exception setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) else: self.links.append(request) return request.linkd_id @@ -181,7 +179,7 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): url = "ws://" + str(self.host) + ":" + str(self.port) send_msg(url, request) except Exception as e: - LOGGER.error('Exection removing virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) + LOGGER.error('Exception removing virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) else: for link in self.links: if link.link_uuid.uuid == request.uuid: