Skip to content
Snippets Groups Projects
VNTManagerServiceServicerImpl.py 7.34 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import json
import logging
import threading
Carlos Manso's avatar
Carlos Manso committed
import time
from websockets.sync.client import connect
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import ContextId, Empty, Link, LinkId, LinkList, TopologyId
Carlos Manso's avatar
Carlos Manso committed
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
Carlos Manso's avatar
Carlos Manso committed
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
Carlos Manso's avatar
Carlos Manso committed
from context.client.EventsCollector import EventsCollector
Carlos Manso's avatar
Carlos Manso committed
from .vntm_config_device import configure, deconfigure
Carlos Manso's avatar
Carlos Manso committed

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool("VNTManager", "RPC")

context_client: ContextClient = ContextClient()

Carlos Manso's avatar
Carlos Manso committed
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
Carlos Manso's avatar
Carlos Manso committed
GET_EVENT_TIMEOUT = 0.5
Carlos Manso's avatar
Carlos Manso committed
class VNTMEventDispatcher(threading.Thread):
    def __init__(self, host, port) -> None:
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.debug('Creating VNTM connector...')
Carlos Manso's avatar
Carlos Manso committed
        self.host = host
        self.port = port
        super().__init__(name='VNTMEventDispatcher', daemon=True)
        self._terminate = threading.Event()
        LOGGER.debug('VNTM connector created')

    def start(self) -> None:
        self._terminate.clear()
        return super().start()

    def stop(self):
        self._terminate.set()

Carlos Manso's avatar
Carlos Manso committed
    def send_msg(self, msg):
        try:
            self.websocket.send(msg)
        except Exception as e:
            LOGGER.exception('Unable to send message')
Carlos Manso's avatar
Carlos Manso committed

    def recv_msg(self):
        message = self.websocket.recv()
        return message

Carlos Manso's avatar
Carlos Manso committed
    def run(self) -> None:
        events_collector = EventsCollector(
            context_client,
            log_events_received            = True,
Carlos Manso's avatar
Carlos Manso committed
            activate_context_collector     = True,
Carlos Manso's avatar
Carlos Manso committed
            activate_topology_collector    = True,
Carlos Manso's avatar
Carlos Manso committed
            activate_device_collector      = True,
            activate_link_collector        = True,
Carlos Manso's avatar
Carlos Manso committed
            activate_service_collector     = False,
            activate_slice_collector       = False,
            activate_connection_collector  = False,
        )
Carlos Manso's avatar
Carlos Manso committed
        events_collector.start()

Carlos Manso's avatar
Carlos Manso committed
        try:
            url = "ws://" + str(self.host) + ":" + str(self.port)
            LOGGER.info("Connecting to events server...: {:s}".format(url))
Carlos Manso's avatar
Carlos Manso committed
            self.websocket = connect(url)
Carlos Manso's avatar
Carlos Manso committed
        except Exception as ex:
            MSG = 'Error connecting to {:s}'
            LOGGER.exception(MSG.format(str(url)))
Carlos Manso's avatar
Carlos Manso committed
        else:
            LOGGER.info('Connected to {:s}'.format(url))
            context_id = json_context_id(DEFAULT_CONTEXT_NAME)
            topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id)
            
            try:
                topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
            except Exception as ex:
                LOGGER.warning('No topology found')
            else:
Carlos Manso's avatar
Carlos Manso committed
                self.send_msg(grpc_message_to_json_string(topology_details))
            while not self._terminate.is_set():
                event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
                if event is None: continue
Carlos Manso's avatar
Carlos Manso committed
                LOGGER.debug('Event type: {}'.format(event))
                topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
                to_send = grpc_message_to_json_string(topology_details)
Carlos Manso's avatar
Carlos Manso committed
                self.send_msg(to_send)
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.info('Exiting')
            events_collector.stop()
Carlos Manso's avatar
Carlos Manso committed
class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
    def __init__(self):
        LOGGER.debug("Creating Servicer...")
        LOGGER.debug("Servicer Created")
        self.links = []

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def VNTSubscript(self, request: VNTSubscriptionRequest, context: grpc.ServicerContext) -> VNTSubscriptionReply:
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.info("Subscript request: {:s}".format(str(grpc_message_to_json(request))))
Carlos Manso's avatar
Carlos Manso committed
        reply = VNTSubscriptionReply()
        reply.subscription = "OK"

Carlos Manso's avatar
Carlos Manso committed
        self.event_dispatcher = VNTMEventDispatcher(request.host, int(request.port))
Carlos Manso's avatar
Carlos Manso committed
        self.host = request.host
        self.port = request.port
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.info('sleeping 5...')
        time.sleep(5)
Carlos Manso's avatar
Carlos Manso committed
        self.event_dispatcher.start()
Carlos Manso's avatar
Carlos Manso committed
        return reply
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
Carlos Manso's avatar
Carlos Manso committed
        return [link for link in context_client.ListLinks(Empty()).links if link.virtual]
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
Carlos Manso's avatar
Carlos Manso committed
        link = context_client.GetLink(request)
        return link if link.virtual else Empty()

Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
Carlos Manso's avatar
Carlos Manso committed
        try:
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.info('SETTING virtual link')
            self.event_dispatcher.send_msg(grpc_message_to_json_string(request))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # configure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
Carlos Manso's avatar
Carlos Manso committed
            response = self.event_dispatcher.recv_msg()
            message_json = json.loads(response)
Carlos Manso's avatar
Carlos Manso committed
            link = Link(**message_json)
            context_client.SetLink(link)
Carlos Manso's avatar
Carlos Manso committed
        except Exception as e:
            MSG = 'Exception setting virtual link={:s}')
            LOGGER.exception(MSG.format(str(request.link_id.link_uuid.uuid)))
Carlos Manso's avatar
Carlos Manso committed
        return request.link_id
Carlos Manso's avatar
Carlos Manso committed

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
Carlos Manso's avatar
Carlos Manso committed
        try:
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.debug('Removing virtual link')
Carlos Manso's avatar
Carlos Manso committed
            self.event_dispatcher.send_msg(grpc_message_to_json_string(request))
            # deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
            response = self.event_dispatcher.recv_msg()
            message_json = json.loads(response)
            link_id = LinkId(**message_json)
            context_client.RemoveLink(link_id)

            LOGGER.info('Removed')
        except Exception as e:
            MSG = 'Exception removing virtual link={:s}'
            LOGGER.exception(MSG.format(str(request.link_uuid.uuid)))
Carlos Manso's avatar
Carlos Manso committed
            return msg_error
        else:
            context_client.RemoveLink(request)
            LOGGER.info('Removed')

Carlos Manso's avatar
Carlos Manso committed
        return Empty()