Skip to content
VNTManagerServiceServicerImpl.py 6.06 KiB
Newer Older
Carlos Manso's avatar
Carlos Manso committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import networkx as nx
import grpc
Carlos Manso's avatar
Carlos Manso committed
import time
from websockets.sync.client import connect
Carlos Manso's avatar
Carlos Manso committed
from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, safe_and_metered_rpc_method)
Carlos Manso's avatar
Carlos Manso committed
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer
from context.client.ContextClient import ContextClient
from common.proto.context_pb2 import (
    Empty,
    Event, EventTypeEnum,  
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
)
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from common.proto.context_pb2 import ContextId, TopologyId
import threading
from common.proto.context_pb2 import (
    ConnectionEvent, ContextEvent, DeviceEvent, EventTypeEnum, ServiceEvent, TopologyEvent)
Carlos Manso's avatar
Carlos Manso committed
from context.client.ContextClient import ContextClient
Carlos Manso's avatar
Carlos Manso committed
from context.client.EventsCollector import EventsCollector
from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
from typing import Any, Dict, Set
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
Carlos Manso's avatar
Carlos Manso committed


LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool("VNTManager", "RPC")

context_client: ContextClient = ContextClient()

Carlos Manso's avatar
Carlos Manso committed
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
Carlos Manso's avatar
Carlos Manso committed
GET_EVENT_TIMEOUT = 0.5
Carlos Manso's avatar
Carlos Manso committed
HOST = "10.1.1.83"
PORT = str(8765)



class VNTMEventDispatcher(threading.Thread):
    def __init__(self, host, port) -> None:
        LOGGER.debug('Creating VTNM connector...')
        self.host = host
        self.port = port
        super().__init__(name='VNTMEventDispatcher', daemon=True)
        self._terminate = threading.Event()
        LOGGER.debug('VNTM connector created')

    def start(self) -> None:
        self._terminate.clear()
        return super().start()

    def stop(self):
        self._terminate.set()

    def run(self) -> None:
        LOGGER.info('Thread running!')
        events_collector = EventsCollector(
            context_client, log_events_received=True,
            activate_context_collector     = True,
            activate_topology_collector    = True,
            activate_device_collector      = False,
            activate_link_collector        = False,
            activate_service_collector     = False,
            activate_slice_collector       = False,
            activate_connection_collector  = False,)
        events_collector.start()

        while not self._terminate.is_set():
            event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
            if event is None: continue

            url = "ws://" + str(self.host) + ":" + str(self.port)
            request = VNTSubscriptionRequest()
            request.host = self.host
            request.port = self.port
            LOGGER.info("Sending event to {}".format(url))
            with connect(url, logger=LOGGER) as websocket:
                send = grpc_message_to_json_string(request)
                LOGGER.info("Sending {}".format(send))
                websocket.send(send)
                message = websocket.recv()
                LOGGER.info("Received ws: {}".format(message))
Carlos Manso's avatar
Carlos Manso committed
            
Carlos Manso's avatar
Carlos Manso committed
        events_collector.stop()
Carlos Manso's avatar
Carlos Manso committed
class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
    def __init__(self):
        LOGGER.debug("Creating Servicer...")
        LOGGER.debug("Servicer Created")
        self.links = []


    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def VNTSubscript(self, request: VNTSubscriptionRequest, context: grpc.ServicerContext) -> VNTSubscriptionReply:
        LOGGER.info('----------------')
        LOGGER.info(request)
        LOGGER.info('----------------')
        reply = VNTSubscriptionReply()
        reply.subscription = "OK"

        event_dispatcher = VNTMEventDispatcher(request.host, int(request.port))
        event_dispatcher.start()
Carlos Manso's avatar
Carlos Manso committed
        return reply
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListVirtualLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
        return LinkIdList(link_ids=[link.link_id for link in self.links])

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
        return LinkList(link=self.links)
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
        for link in  self.links:
            if link.link_uuid.uuid == request.uuid:
                return link
        return Empty()
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
        self.links.append(request)
        return request.linkd_id

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
        for link in  self.links:
            if link.link_uuid.uuid == request.uuid:
                self.links.remove(link)
                return Empty()
        return Empty()