Skip to content
VNTManagerServiceServicerImpl.py 7.02 KiB
Newer Older
Carlos Manso's avatar
Carlos Manso committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import networkx as nx
import grpc
Carlos Manso's avatar
Carlos Manso committed
import time
from websockets.sync.client import connect
Carlos Manso's avatar
Carlos Manso committed
from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, safe_and_metered_rpc_method)
Carlos Manso's avatar
Carlos Manso committed
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer
from context.client.ContextClient import ContextClient
from common.proto.context_pb2 import (
    Empty,
    Event, EventTypeEnum,  
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
)
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from common.proto.context_pb2 import ContextId, TopologyId
import threading
from common.proto.context_pb2 import (
    ConnectionEvent, ContextEvent, DeviceEvent, EventTypeEnum, ServiceEvent, TopologyEvent)
Carlos Manso's avatar
Carlos Manso committed
from context.client.ContextClient import ContextClient
Carlos Manso's avatar
Carlos Manso committed
from context.client.EventsCollector import EventsCollector
from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
from typing import Any, Dict, Set
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
Carlos Manso's avatar
Carlos Manso committed
from common.tools.grpc.Tools import grpc_message_to_json
import time
Carlos Manso's avatar
Carlos Manso committed

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool("VNTManager", "RPC")

context_client: ContextClient = ContextClient()

Carlos Manso's avatar
Carlos Manso committed
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
Carlos Manso's avatar
Carlos Manso committed
GET_EVENT_TIMEOUT = 0.5
Carlos Manso's avatar
Carlos Manso committed
HOST = "10.1.1.83"
PORT = str(8765)


Carlos Manso's avatar
Carlos Manso committed
def send_msg(websocket, msg):
    send = grpc_message_to_json_string(msg)
    websocket.send(send)
    message = websocket.recv()
Carlos Manso's avatar
Carlos Manso committed


Carlos Manso's avatar
Carlos Manso committed

class VNTMEventDispatcher(threading.Thread):
    def __init__(self, host, port) -> None:
        LOGGER.debug('Creating VTNM connector...')
        self.host = host
        self.port = port
        super().__init__(name='VNTMEventDispatcher', daemon=True)
        self._terminate = threading.Event()
        LOGGER.debug('VNTM connector created')

    def start(self) -> None:
        self._terminate.clear()
        return super().start()

    def stop(self):
        self._terminate.set()

    def run(self) -> None:
Carlos Manso's avatar
Carlos Manso committed
        time.sleep(10)
Carlos Manso's avatar
Carlos Manso committed
        events_collector = EventsCollector(
            context_client, log_events_received=True,
            activate_context_collector     = True,
            activate_topology_collector    = True,
            activate_device_collector      = False,
            activate_link_collector        = False,
            activate_service_collector     = False,
            activate_slice_collector       = False,
            activate_connection_collector  = False,)
        events_collector.start()


Carlos Manso's avatar
Carlos Manso committed
        url = "ws://" + str(self.host) + ":" + str(self.port)
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.debug('Connecting to {}'.format(url))
Carlos Manso's avatar
Carlos Manso committed
        try:
            websocket = connect(url)
        except Exception as ex:
            LOGGER.error('Error connecting to {}'.format(url))
        else:
            LOGGER.debug('Connected to {}'.format(url))
            with websocket:
                send_msg(websocket, "HOLA")
Carlos Manso's avatar
Carlos Manso committed
                while not self._terminate.is_set():
                    event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
                    if event is None: continue
Carlos Manso's avatar
Carlos Manso committed

Carlos Manso's avatar
Carlos Manso committed
                    send_msg(websocket, event)
            
                events_collector.stop()
Carlos Manso's avatar
Carlos Manso committed
class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
    def __init__(self):
        LOGGER.debug("Creating Servicer...")
        LOGGER.debug("Servicer Created")
        self.links = []


    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def VNTSubscript(self, request: VNTSubscriptionRequest, context: grpc.ServicerContext) -> VNTSubscriptionReply:
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.info("Subscript request: {:s}".format(str(grpc_message_to_json(request))))
Carlos Manso's avatar
Carlos Manso committed
        reply = VNTSubscriptionReply()
        reply.subscription = "OK"

        event_dispatcher = VNTMEventDispatcher(request.host, int(request.port))
Carlos Manso's avatar
Carlos Manso committed

Carlos Manso's avatar
Carlos Manso committed
        self.host = request.host
        self.port = request.port
Carlos Manso's avatar
Carlos Manso committed
        event_dispatcher.start()
Carlos Manso's avatar
Carlos Manso committed
        return reply
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListVirtualLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList:
        return LinkIdList(link_ids=[link.link_id for link in self.links])

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
        return LinkList(link=self.links)
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
Carlos Manso's avatar
Carlos Manso committed
        try:
            url = "ws://" + str(self.host) + ":" + str(self.port)
            send_msg(url, request)
        except Exception as e:
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.error('Exception getting virtual link={}\n\t{}'.format(request.link_uuid.uuid, e))
Carlos Manso's avatar
Carlos Manso committed
        else:
            for link in  self.links:
                if link.link_uuid.uuid == request.uuid:
                    return link
Carlos Manso's avatar
Carlos Manso committed
        return Empty()
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
Carlos Manso's avatar
Carlos Manso committed
        try:
            url = "ws://" + str(self.host) + ":" + str(self.port)
            send_msg(url, request)
        except Exception as e:
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.error('Exception setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e))
Carlos Manso's avatar
Carlos Manso committed
        else:
            self.links.append(request)
            return request.linkd_id
Carlos Manso's avatar
Carlos Manso committed

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
Carlos Manso's avatar
Carlos Manso committed
        try:
            url = "ws://" + str(self.host) + ":" + str(self.port)
            send_msg(url, request)
        except Exception as e:
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.error('Exception removing virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e))
Carlos Manso's avatar
Carlos Manso committed
        else:
            for link in  self.links:
                if link.link_uuid.uuid == request.uuid:
                    self.links.remove(link)
                    return Empty()
Carlos Manso's avatar
Carlos Manso committed
        return Empty()