# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import networkx as nx import grpc import copy from websockets.sync.client import connect import time from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from context.client.ContextClient import ContextClient from context.service.database.uuids.EndPoint import endpoint_get_uuid from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply from common.tools.grpc.Tools import grpc_message_to_json_string from websockets.sync.server import serve import json LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") context_client: ContextClient = ContextClient() class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): def __init__(self): LOGGER.debug("Creating Servicer...") LOGGER.debug("Servicer Created") time.sleep(5) try: LOGGER.info("Requesting subscription") self.RequestSubscription() except Exception as E: LOGGER.info("Exception0!: {}".format(E)) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply: endpoints_ids = [] for endpoint_id in request.service.service_endpoint_ids: endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2]) graph = nx.Graph() devices = context_client.ListDevices(Empty()).devices for device in devices: endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid for endpoint in device.device_endpoints] for ep in endpoints_uuids: graph.add_node(ep) for ep in endpoints_uuids: for ep_i in endpoints_uuids: if ep == ep_i: continue graph.add_edge(ep, ep_i) links = context_client.ListLinks(Empty()).links for link in links: eps = [] for endpoint_id in link.link_endpoint_ids: eps.append(endpoint_id.endpoint_uuid.uuid) graph.add_edge(eps[0], eps[1]) shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1]) path = E2EOrchestratorReply() path.services.append(copy.deepcopy(request.service)) for i in range(0, int(len(shortest)/2)): conn = Connection() ep_a_uuid = str(shortest[i*2]) ep_z_uuid = str(shortest[i*2+1]) conn.connection_id.connection_uuid.uuid = str(ep_a_uuid) + '_->_' + str(ep_z_uuid) ep_a_id = EndPointId() ep_a_id.endpoint_uuid.uuid = ep_a_uuid conn.path_hops_endpoint_ids.append(ep_a_id) ep_z_id = EndPointId() ep_z_id.endpoint_uuid.uuid = ep_z_uuid conn.path_hops_endpoint_ids.append(ep_z_id) path.connections.append(conn) def RequestSubscription(self): LOGGER.info("Trying to connect...!!!") EXT_HOST = "nbiservice.tfs-ip.svc.cluster.local" EXT_PORT = "8762" OWN_HOST = "e2e-orchestratorservice.tfs-e2e.svc.cluster.local" OWN_PORT = "8761" url = "ws://" + EXT_HOST + ":" + EXT_PORT request = VNTSubscriptionRequest() request.host = OWN_HOST request.port = OWN_PORT LOGGER.info("Trying to connect... to {}".format(url)) with connect(url) as websocket: LOGGER.info("CONNECTED!!! {}") send = grpc_message_to_json_string(request) LOGGER.info("Sending {}".format(send)) websocket.send(send) try: message = websocket.recv() except Exception as e: LOGGER.info('Exception1!: {}'.format(e)) try: LOGGER.info("Received ws: {}".format(message)) except Exception as e: LOGGER.info('Exception2!: {}'.format(e)) with serve(self._event_received, "0.0.0.0", OWN_PORT, logger=LOGGER) as server: LOGGER.info("Running subscription server...: {}:{}".format("0.0.0.0", OWN_PORT)) server.serve_forever() LOGGER.info("Exiting subscription server...") def _event_received(self, websocket): for message in websocket: LOGGER.info("Message received!!!: {}".format(message)) message_json = json.loads(message) if 'event_type' in message_json: pass elif 'link_id' in message_json: obj = Link(**message_json) if self._check_policies(obj): pass elif 'link_uuid' in message_json: obj = LinkId(**message_json) websocket.send(message) def _check_policies(self, link): return True