# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import networkx as nx import grpc import time from websockets.sync.client import connect from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, safe_and_metered_rpc_method) from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer from context.client.ContextClient import ContextClient from common.proto.context_pb2 import ( Empty, Event, EventTypeEnum, Link, LinkEvent, LinkId, LinkIdList, LinkList, ) from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Topology import json_topology_id from common.proto.context_pb2 import ContextId, TopologyId import threading from common.proto.context_pb2 import ( ConnectionEvent, ContextEvent, DeviceEvent, EventTypeEnum, ServiceEvent, TopologyEvent) from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME from typing import Any, Dict, Set from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json import time LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool("VNTManager", "RPC") context_client: ContextClient = ContextClient() JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME) ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID) ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID)) GET_EVENT_TIMEOUT = 0.5 HOST = "10.1.1.83" PORT = str(8765) WEBSOCKET = None def send_msg(msg): LOGGER.info('-------------------------SENDING------------------------------') LOGGER.info(msg) LOGGER.info('-------------------------------------------------------') WEBSOCKET.send(msg) message = WEBSOCKET.recv() class VNTMEventDispatcher(threading.Thread): def __init__(self, host, port) -> None: LOGGER.debug('Creating VTNM connector...') self.host = host self.port = port super().__init__(name='VNTMEventDispatcher', daemon=True) self._terminate = threading.Event() LOGGER.debug('VNTM connector created') def start(self) -> None: self._terminate.clear() return super().start() def stop(self): self._terminate.set() def run(self) -> None: global WEBSOCKET time.sleep(5) events_collector = EventsCollector( context_client, log_events_received=True, activate_context_collector = False, activate_topology_collector = True, activate_device_collector = False, activate_link_collector = False, activate_service_collector = False, activate_slice_collector = False, activate_connection_collector = False,) events_collector.start() url = "ws://" + str(self.host) + ":" + str(self.port) LOGGER.debug('Connecting to {}'.format(url)) try: WEBSOCKET = connect(url) except Exception as ex: LOGGER.error('Error connecting to {}'.format(url)) else: LOGGER.info('Connected to {}'.format(url)) context_id = json_context_id(DEFAULT_CONTEXT_NAME) topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id) try: topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id)) except Exception as ex: LOGGER.warning('No topology found') else: send_msg(grpc_message_to_json_string(topology_details)) LOGGER.info('aaaaaaaaaaaaaaaaa') while not self._terminate.is_set(): event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) if event is None: continue LOGGER.info('event!: {}'.format(event)) topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id)) to_send = grpc_message_to_json_string(topology_details) send_msg(to_send) events_collector.stop() class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): def __init__(self): LOGGER.debug("Creating Servicer...") LOGGER.debug("Servicer Created") self.links = [] @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def VNTSubscript(self, request: VNTSubscriptionRequest, context: grpc.ServicerContext) -> VNTSubscriptionReply: LOGGER.info("Subscript request: {:s}".format(str(grpc_message_to_json(request)))) reply = VNTSubscriptionReply() reply.subscription = "OK" event_dispatcher = VNTMEventDispatcher(request.host, int(request.port)) self.host = request.host self.port = request.port event_dispatcher.start() return reply @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListVirtualLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList: return LinkIdList(link_ids=[link.link_id for link in self.links]) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList: return LinkList(link=self.links) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: try: send_msg(request) except Exception as e: LOGGER.error('Exception getting virtual link={}\n\t{}'.format(request.link_uuid.uuid, e)) else: for link in self.links: if link.link_uuid.uuid == request.uuid: return link return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: try: send_msg(request) except Exception as e: LOGGER.error('Exception setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) else: self.links.append(request) return request.linkd_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: try: send_msg(request) except Exception as e: LOGGER.error('Exception removing virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) else: for link in self.links: if link.link_uuid.uuid == request.uuid: self.links.remove(link) return Empty() return Empty()