diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index 47307b12c635514bff61caaafe2188fc2a69fe7a..4878d4788276857600478b20c060c75e29d39015 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -12,29 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import requests +import copy, grpc, json, logging, networkx, requests, threading from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply from common.proto.context_pb2 import ( Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId, ServiceTypeEnum, ServiceStatusEnum) from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer +from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.Settings import get_setting from context.client.ContextClient import ContextClient -from service.client.ServiceClient import ServiceClient from context.service.database.uuids.EndPoint import endpoint_get_uuid from context.service.database.uuids.Device import device_get_uuid -from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest -from common.tools.grpc.Tools import grpc_message_to_json_string -import grpc -import json -import logging -import networkx as nx -from threading import Thread +from service.client.ServiceClient import ServiceClient from websockets.sync.client import connect from websockets.sync.server import serve -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME LOGGER = logging.getLogger(__name__) @@ -48,53 +42,46 @@ context_client: ContextClient = ContextClient() service_client: ServiceClient = ServiceClient() EXT_HOST = str(get_setting('WS_IP_HOST')) -EXT_PORT = str(get_setting('WS_IP_PORT')) +EXT_PORT = int(get_setting('WS_IP_PORT')) +EXT_URL = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT) OWN_HOST = str(get_setting('WS_E2E_HOST')) -OWN_PORT = str(get_setting('WS_E2E_PORT')) - +OWN_PORT = int(get_setting('WS_E2E_PORT')) -ALL_HOSTS = "0.0.0.0" +ALL_HOSTS = '0.0.0.0' -class SubscriptionServer(Thread): - def __init__(self): - Thread.__init__(self) - +class SubscriptionServer(threading.Thread): def run(self): - url = "ws://" + EXT_HOST + ":" + EXT_PORT request = VNTSubscriptionRequest() request.host = OWN_HOST request.port = OWN_PORT try: - LOGGER.debug("Trying to connect to {}".format(url)) - websocket = connect(url) - except Exception as ex: - LOGGER.error('Error connecting to {}'.format(url)) + LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL)) + websocket = connect(EXT_URL) + except: # pylint: disable=bare-except + LOGGER.exception('Error connecting to {:s}'.format(EXT_URL)) else: with websocket: - LOGGER.debug("Connected to {}".format(url)) + LOGGER.debug('Connected to {:s}'.format(EXT_URL)) send = grpc_message_to_json_string(request) websocket.send(send) - LOGGER.debug("Sent: {}".format(send)) + LOGGER.debug('Sent: {:s}'.format(send)) try: message = websocket.recv() - LOGGER.debug("Received message from WebSocket: {}".format(message)) + LOGGER.debug('Received message from WebSocket: {:s}'.format(message)) except Exception as ex: - LOGGER.error('Exception receiving from WebSocket: {}'.format(ex)) + LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex)) self._events_server() def _events_server(self): - all_hosts = "0.0.0.0" - try: - server = serve(self._event_received, all_hosts, int(OWN_PORT)) - except Exception as ex: - LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT)) - LOGGER.error('Exception!: {}'.format(ex)) + server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT)) + except: # pylint: disable=bare-except + LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT)) else: with server: - LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT)) + LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT)) server.serve_forever() @@ -178,32 +165,36 @@ class SubscriptionServer(Thread): class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): def __init__(self): - LOGGER.debug("Creating Servicer...") + LOGGER.debug('Creating Servicer...') try: - LOGGER.debug("Requesting subscription") + LOGGER.debug('Requesting subscription') sub_server = SubscriptionServer() sub_server.start() - LOGGER.debug("Servicer Created") + LOGGER.debug('Servicer Created') self.retrieve_external_topologies() - except Exception as ex: - LOGGER.info("Exception!: {}".format(ex)) + except: + LOGGER.exception('Unhandled Exception') def retrieve_external_topologies(self): i = 1 while True: try: - ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD')) - PORT = str(get_setting(f'EXT_CONTROLLER{i}_PORT')) - except Exception as e: + ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD')) + PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT')) + except: # pylint: disable=bare-except break + try: - LOGGER.info(f'Retrieving external controller #{i}') - url = f'http://{ADD}:{PORT}/tfs-api/context/{DEFAULT_CONTEXT_NAME}/topology_details/{DEFAULT_TOPOLOGY_NAME}' - LOGGER.info(f'url= {url}') + LOGGER.info('Retrieving external controller #{:d}'.format(i)) + url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format( + ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME + ) + LOGGER.info('url={:s}'.format(str(url))) topo = requests.get(url).json() - LOGGER.info(f'Retrieved external controller #{i}') - except Exception as e: - LOGGER.info(f'Exception retrieven topology from external controler #{i}: {e}') + LOGGER.info('Retrieved external controller #{:d}'.format(i)) + except: # pylint: disable=bare-except + LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i)) + topology_details = TopologyDetails(**topo) context = Context() context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid @@ -229,7 +220,7 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): for endpoint_id in request.service.service_endpoint_ids: endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2]) - graph = nx.Graph() + graph = networkx.Graph() devices = context_client.ListDevices(Empty()).devices @@ -253,7 +244,7 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): graph.add_edge(eps[0], eps[1]) - shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1]) + shortest = networkx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1]) path = E2EOrchestratorReply() path.services.append(copy.deepcopy(request.service))