diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index 991ad149a8fbd3b38321e37dbee535ebdc07d66b..2a303851c342d126c9387da6354d5d16ba338db5 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -15,11 +15,12 @@ import copy from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply -from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, TopologyId, Device, Topology, Context, Service +from common.proto.context_pb2 import Empty, Connection, ContextId, EndPointId, Link, LinkId, TopologyDetails, TopologyId, Device, Topology, Context, Service, ServiceStatus, DeviceId, ServiceTypeEnum, ServiceStatusEnum from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from context.client.ContextClient import ContextClient from service.client.ServiceClient import ServiceClient from context.service.database.uuids.EndPoint import endpoint_get_uuid +from context.service.database.uuids.Device import device_get_uuid from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest from common.tools.grpc.Tools import grpc_message_to_json_string import grpc @@ -27,9 +28,10 @@ import json import logging import networkx as nx from threading import Thread -import time from websockets.sync.client import connect from websockets.sync.server import serve +from common.Constants import DEFAULT_CONTEXT_NAME + LOGGER = logging.getLogger(__name__) logging.getLogger("websockets").propagate = False @@ -55,50 +57,61 @@ def _event_received(websocket): link = Link(**message_json) service = Service() - service.service_id = link.link_id.link_uuid - service.serivice_type = 2 # Optical - service.service_status = 1 - - # service_client.CreateService(service) + service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid + service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME + service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY + service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + service_client.CreateService(service) + + links = context_client.ListLinks(Empty()).links + a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id) + a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2] + z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id) + z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2] + + + for _link in links: + for _endpoint_id in _link.link_endpoint_ids: + if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \ + _endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid: + a_ep_id = _endpoint_id + elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \ + _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid: + z_ep_id = _endpoint_id - websocket.send(message) + service.service_endpoint_ids.append(copy.deepcopy(a_ep_id)) + service.service_endpoint_ids.append(copy.deepcopy(z_ep_id)) + + service_client.UpdateService(service) + websocket.send(grpc_message_to_json_string(link)) else: topology_details = TopologyDetails(**message_json) - context_id = topology_details.topology_id.context_id context = Context() - context.context_id.CopyFrom(context_id) + context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid context_client.SetContext(context) - topology_id = topology_details.topology_id topology = Topology() - topology.topology_id.CopyFrom(topology_id) + topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid context_client.SetTopology(topology) for device in topology_details.devices: - LOGGER.info('Setting Device: {}'.format(device)) context_client.SetDevice(device) for link in topology_details.links: - LOGGER.info('Setting Link: {}'.format(link)) context_client.SetLink(link) -def _check_policies(link): - return True - - - def requestSubscription(): url = "ws://" + EXT_HOST + ":" + EXT_PORT request = VNTSubscriptionRequest() request.host = OWN_HOST request.port = OWN_PORT - LOGGER.debug("Trying to connect to {}".format(url)) + LOGGER.debug("Connecting to {}".format(url)) try: websocket = connect(url) except Exception as ex: @@ -108,12 +121,11 @@ def requestSubscription(): LOGGER.debug("Connected to {}".format(url)) send = grpc_message_to_json_string(request) websocket.send(send) - LOGGER.debug("Sent: {}".format(send)) try: message = websocket.recv() LOGGER.debug("Received message from WebSocket: {}".format(message)) except Exception as ex: - LOGGER.info('Exception receiving from WebSocket: {}'.format(ex)) + LOGGER.error('Exception receiving from WebSocket: {}'.format(ex)) events_server() LOGGER.info('Subscription requested') @@ -127,22 +139,31 @@ def events_server(): except Exception as ex: LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT)) LOGGER.error('Exception!: {}'.format(ex)) - with server: - LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT)) - server.serve_forever() - LOGGER.info("Exiting events server...") + else: + with server: + LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT)) + server.serve_forever() + LOGGER.info("Exiting events server...") + +class SubscriptionServer(): + class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): def __init__(self): LOGGER.debug("Creating Servicer...") LOGGER.debug("Servicer Created") try: - LOGGER.info("Requesting subscription") + LOGGER.debug("Requesting subscription") subscription_thread = Thread(target=requestSubscription) subscription_thread.start() + + + # import_optical() + + except Exception as ex: LOGGER.info("Exception!: {}".format(ex)) diff --git a/src/nbi/service/context_subscription/__init__.py b/src/nbi/service/context_subscription/__init__.py index 33451e2fb218b1507ecc024e07ab5c6b75dacfc7..17c8ccf5f583c484fcaf9c1546e9be7f2192eb77 100644 --- a/src/nbi/service/context_subscription/__init__.py +++ b/src/nbi/service/context_subscription/__init__.py @@ -15,10 +15,9 @@ import logging from websockets.sync.server import serve -from common.proto.vnt_manager_pb2 import VNTSubscriptionReply, VNTSubscriptionRequest +from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest from common.proto.context_pb2 import Empty -# from vnt_manager.client.VNTManagerClient import VNTManagerClient from context.client.ContextClient import ContextClient from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.tools.object_factory.Topology import json_topology_id diff --git a/src/nbi/service/rest_server/nbi_plugins/tfs_api/Resources.py b/src/nbi/service/rest_server/nbi_plugins/tfs_api/Resources.py index 126885e70e3420969d189da28f234d0a435d041e..9f6b68962368257d7dee811d72a4943ef19ebf74 100644 --- a/src/nbi/service/rest_server/nbi_plugins/tfs_api/Resources.py +++ b/src/nbi/service/rest_server/nbi_plugins/tfs_api/Resources.py @@ -207,18 +207,9 @@ class VirtualLink(_Resource): def get(self, virtual_link_uuid : str): return format_grpc_to_json(self.vntmanager_client.GetVirtualLink(grpc_link_id(virtual_link_uuid))) def post(self, virtual_link_uuid : str): # pylint: disable=unused-argument - link = request.get_json() - LOGGER.info('---------------------------LINK received------------------------------') - LOGGER.info(link) - LOGGER.info('----------------------------------------------------------------------') - LOGGER.info(link['link_id']) - LOGGER.info('type: {}'.format(type(link['link_id']))) - LOGGER.info('---------------------------LINK received------------------------------') - link = grpc_link(link) - - - - return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(grpc_link(link))) + link_json = request.get_json() + link = grpc_link(link_json) + return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(link)) def put(self, virtual_link_uuid : str): # pylint: disable=unused-argument link = request.get_json() return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(grpc_link(link))) diff --git a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py index ce77bc9eec169cfb89bb3bfc6276af075bfb211c..74f233ad6b0d36ddb33a16213c70a1e71bbfed2d 100644 --- a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py +++ b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py @@ -41,10 +41,8 @@ from typing import Any, Dict, Set from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json -import time import json - LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool("VNTManager", "RPC") @@ -66,11 +64,10 @@ PORT = str(8765) WEBSOCKET = None def send_msg(msg): - LOGGER.info('-------------------------SENDING------------------------------') - LOGGER.info(msg) - LOGGER.info('-------------------------------------------------------') - WEBSOCKET.send(msg) - + try: + WEBSOCKET.send(msg) + except Exception as e: + LOGGER.info(e) class VNTMEventDispatcher(threading.Thread): @@ -109,6 +106,7 @@ class VNTMEventDispatcher(threading.Thread): LOGGER.debug('Connecting to {}'.format(url)) try: + LOGGER.info("Connecting to events server...: {}".format(url)) WEBSOCKET = connect(url) except Exception as ex: LOGGER.error('Error connecting to {}'.format(url)) @@ -128,9 +126,7 @@ class VNTMEventDispatcher(threading.Thread): event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) if event is None: continue - LOGGER.info('event!!!!!!!!!!!!!!!!!!!!!!: {}'.format(event)) topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id)) - LOGGER.info('topodetails..................................... ') to_send = grpc_message_to_json_string(topology_details) @@ -180,15 +176,22 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: try: - send_msg(request) + send_msg(grpc_message_to_json_string(request)) message = WEBSOCKET.recv() + + + + + + + + message_json = json.loads(message) link = Link(**message_json) context_client.SetLink(link) - except Exception as e: LOGGER.error('Exception setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) - return request.linkd_id + return request.link_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: