Skip to content
VNTManagerServiceServicerImpl.py 7.15 KiB
Newer Older
Carlos Manso's avatar
Carlos Manso committed
# Copyright 2022-2024 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
Carlos Manso's avatar
Carlos Manso committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import networkx as nx
import grpc
Carlos Manso's avatar
Carlos Manso committed
import time
from websockets.sync.client import connect
Carlos Manso's avatar
Carlos Manso committed
from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, safe_and_metered_rpc_method)
Carlos Manso's avatar
Carlos Manso committed
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer
from context.client.ContextClient import ContextClient
from common.proto.context_pb2 import (
    Empty,
    Event, EventTypeEnum,  
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
)
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from common.proto.context_pb2 import ContextId, TopologyId
import threading
from common.proto.context_pb2 import (
    ConnectionEvent, ContextEvent, DeviceEvent, EventTypeEnum, ServiceEvent, TopologyEvent)
Carlos Manso's avatar
Carlos Manso committed
from context.client.ContextClient import ContextClient
Carlos Manso's avatar
Carlos Manso committed
from context.client.EventsCollector import EventsCollector
from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
from typing import Any, Dict, Set
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
Carlos Manso's avatar
Carlos Manso committed
from common.tools.grpc.Tools import grpc_message_to_json
Carlos Manso's avatar
Carlos Manso committed
import json

Carlos Manso's avatar
Carlos Manso committed
LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool("VNTManager", "RPC")

context_client: ContextClient = ContextClient()

Carlos Manso's avatar
Carlos Manso committed
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
Carlos Manso's avatar
Carlos Manso committed
GET_EVENT_TIMEOUT = 0.5
Carlos Manso's avatar
Carlos Manso committed
HOST = "10.1.1.83"
PORT = str(8765)


WEBSOCKET = None

def send_msg(msg):
Carlos Manso's avatar
Carlos Manso committed
    try:
        WEBSOCKET.send(msg)
    except Exception as e:
        LOGGER.info(e)
Carlos Manso's avatar
Carlos Manso committed

Carlos Manso's avatar
Carlos Manso committed

class VNTMEventDispatcher(threading.Thread):
    def __init__(self, host, port) -> None:
        LOGGER.debug('Creating VTNM connector...')
        self.host = host
        self.port = port
        super().__init__(name='VNTMEventDispatcher', daemon=True)
        self._terminate = threading.Event()
        LOGGER.debug('VNTM connector created')

    def start(self) -> None:
        self._terminate.clear()
        return super().start()

    def stop(self):
        self._terminate.set()

    def run(self) -> None:
        global WEBSOCKET

        time.sleep(5)
Carlos Manso's avatar
Carlos Manso committed
        events_collector = EventsCollector(
            context_client, log_events_received=True,
            activate_context_collector     = False,
Carlos Manso's avatar
Carlos Manso committed
            activate_topology_collector    = True,
            activate_device_collector      = False,
            activate_link_collector        = False,
            activate_service_collector     = False,
            activate_slice_collector       = False,
            activate_connection_collector  = False,)
        events_collector.start()


Carlos Manso's avatar
Carlos Manso committed
        url = "ws://" + str(self.host) + ":" + str(self.port)
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.debug('Connecting to {}'.format(url))
Carlos Manso's avatar
Carlos Manso committed
        try:
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.info("Connecting to events server...: {}".format(url))
            WEBSOCKET = connect(url)
Carlos Manso's avatar
Carlos Manso committed
        except Exception as ex:
            LOGGER.error('Error connecting to {}'.format(url))
        else:
            LOGGER.info('Connected to {}'.format(url))
            context_id = json_context_id(DEFAULT_CONTEXT_NAME)
            topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id)
            
            try:
                topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
            except Exception as ex:
                LOGGER.warning('No topology found')
            else:
                send_msg(grpc_message_to_json_string(topology_details))
            while not self._terminate.is_set():
Carlos Manso's avatar
Carlos Manso committed

                event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
                if event is None: continue
                topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
Carlos Manso's avatar
Carlos Manso committed

                to_send = grpc_message_to_json_string(topology_details)

                send_msg(to_send)
        
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.info('Exiting')
            events_collector.stop()
Carlos Manso's avatar
Carlos Manso committed
class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
    def __init__(self):
        LOGGER.debug("Creating Servicer...")
        LOGGER.debug("Servicer Created")
        self.links = []


    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def VNTSubscript(self, request: VNTSubscriptionRequest, context: grpc.ServicerContext) -> VNTSubscriptionReply:
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.info("Subscript request: {:s}".format(str(grpc_message_to_json(request))))
Carlos Manso's avatar
Carlos Manso committed
        reply = VNTSubscriptionReply()
        reply.subscription = "OK"

        event_dispatcher = VNTMEventDispatcher(request.host, int(request.port))
Carlos Manso's avatar
Carlos Manso committed

Carlos Manso's avatar
Carlos Manso committed
        self.host = request.host
        self.port = request.port
Carlos Manso's avatar
Carlos Manso committed
        event_dispatcher.start()
Carlos Manso's avatar
Carlos Manso committed
        return reply
Carlos Manso's avatar
Carlos Manso committed
    """    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
        def ListVirtualLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
            return [link for link in context_client.ListLinks(Empty()) if link.virtual]
Carlos Manso's avatar
Carlos Manso committed

Carlos Manso's avatar
Carlos Manso committed
            return LinkIdList(link_ids=[link.link_id for link in self.links])
    """
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
Carlos Manso's avatar
Carlos Manso committed
        return [link for link in context_client.ListLinks(Empty()).links if link.virtual]
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
Carlos Manso's avatar
Carlos Manso committed
        link = context_client.GetLink(request)
        return link if link.virtual else Empty()

Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
Carlos Manso's avatar
Carlos Manso committed
        try:
Carlos Manso's avatar
Carlos Manso committed
            send_msg(grpc_message_to_json_string(request))
Carlos Manso's avatar
Carlos Manso committed
            message = WEBSOCKET.recv()
Carlos Manso's avatar
Carlos Manso committed
            message_json = json.loads(message)
            link = Link(**message_json)
            context_client.SetLink(link)
Carlos Manso's avatar
Carlos Manso committed
        except Exception as e:
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.error('Exception setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e))
Carlos Manso's avatar
Carlos Manso committed
        return request.link_id
Carlos Manso's avatar
Carlos Manso committed

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
Carlos Manso's avatar
Carlos Manso committed
        # TODO
Carlos Manso's avatar
Carlos Manso committed
        return Empty()