Skip to content
Snippets Groups Projects
VNTManagerServiceServicerImpl.py 9.87 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, Optional
import grpc, json, logging, uuid
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import Empty, Link, LinkId, LinkList, LinkTypeEnum
Carlos Manso's avatar
Carlos Manso committed
from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer
#from common.tools.context_queries.EndPoint import get_endpoint_names
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
#from common.tools.object_factory.Device import json_device_id
#from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Link import json_link, json_link_id
from context.client.ContextClient import ContextClient
#from .vntm_config_device import configure, deconfigure
Carlos Manso's avatar
Carlos Manso committed

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool('VNTManager', 'RPC')
Carlos Manso's avatar
Carlos Manso committed
class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
    def __init__(self):
        LOGGER.debug('Creating Servicer...')
        self.context_client = ContextClient()
        self.kafka_producer = KafkaProducer({
            'bootstrap.servers' : KafkaConfig.get_kafka_address()
        })
        self.kafka_consumer = KafkaConsumer({
            'bootstrap.servers' : KafkaConfig.get_kafka_address(),
            'group.id'          : str(uuid.uuid4()),
            'auto.offset.reset' : 'latest'
        })
        self.kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value])
Carlos Manso's avatar
Carlos Manso committed
        self.links = []
        LOGGER.debug('Servicer Created')
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
        links = self.context_client.ListLinks(Empty()).links
        return [link for link in links if link.virtual]
Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
        link = self.context_client.GetLink(request)
Carlos Manso's avatar
Carlos Manso committed
        return link if link.virtual else Empty()

    def send_recommendation(self, vntm_request : Dict) -> str:
        request_key = str(uuid.uuid4())
        vntm_request = json.dumps(vntm_request)
        MSG = '[send_recommendation] request_key={:s} vntm_request={:s}'
        LOGGER.info(MSG.format(str(request_key), str(vntm_request)))
        self.kafka_producer.produce(
            KafkaTopic.VNTMANAGER_REQUEST.value,
            key=request_key.encode('utf-8'),
            value=vntm_request.encode('utf-8'),
        )
        self.kafka_producer.flush()
        return request_key

    def send_vlink_create(self, request : Link) -> str:
        return self.send_recommendation({
            'event': 'vlink_create', 'data': grpc_message_to_json_string(request)
        })

    def send_vlink_remove(self, request : LinkId) -> str:
        return self.send_recommendation({
            'event': 'vlink_remove', 'data': grpc_message_to_json_string(request)
        })

    def wait_for_reply(self, request_key : str) -> Optional[Dict]:
        LOGGER.info('[wait_for_reply] request_key={:s}'.format(str(request_key)))

        while True:
            receive_msg = self.kafka_consumer.poll(2.0)
            if receive_msg is None: continue
            LOGGER.info('[wait_for_reply] receive_msg={:s}'.format(str(receive_msg)))
            if receive_msg.error():
                if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue
                LOGGER.error('[wait_for_reply] Consumer error: {:s}'.format(str(receive_msg.error())))
                return None
            
            reply_key = receive_msg.key().decode('utf-8')
            LOGGER.info('[wait_for_reply] reply_key={:s}'.format(str(reply_key)))
            if reply_key == request_key:
                LOGGER.info('[wait_for_reply] match!')
                break
            LOGGER.info('[wait_for_reply] no match... waiting...')

        json_receive_msg = json.loads(receive_msg.value().decode('utf-8'))
        LOGGER.info('[wait_for_reply] json_receive_msg={:s}'.format(str(json_receive_msg)))

        if 'data' not in json_receive_msg:
            MSG = 'Malformed reply: {:s}'
            raise Exception(MSG.format(str(json_receive_msg)))
        data = json_receive_msg['data']

        if 'error' in data:
            MSG = 'Something went wrong: {:s}'
            raise Exception(MSG.format(str(data['error'])))

        if 'result' not in data:
            MSG = 'Malformed reply: {:s}'
            raise Exception(MSG.format(str(data)))
        return data['result']

Carlos Manso's avatar
Carlos Manso committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
Carlos Manso's avatar
Carlos Manso committed
        try:
            LOGGER.info('[SetVirtualLink] request={:s}'.format(grpc_message_to_json_string(request)))
            request_key = self.send_vlink_create(request)
            reply = self.wait_for_reply(request_key)
            LOGGER.info('[SetVirtualLink] reply={:s}'.format(str(reply)))

            # At this point, we know the request is processed and an optical connection was created

            vlink_uuid = reply['vlink_uuid']
            LOGGER.info('[SetVirtualLink] vlink_uuid={:s}'.format(str(vlink_uuid)))

            vlink_name = request.name
            if len(vlink_name) == 0: vlink_name = request.link_id.link_uuid.uuid
            LOGGER.info('[SetVirtualLink] vlink_name={:s}'.format(str(vlink_name)))

            vlink_endpoint_ids = [
                grpc_message_to_json(endpoint_id)
                for endpoint_id in request.link_endpoint_ids
            ]
            LOGGER.info('[SetVirtualLink] vlink_endpoint_ids={:s}'.format(str(vlink_endpoint_ids)))

            total_capacity_gbps = request.attributes.total_capacity_gbps
            LOGGER.info('[SetVirtualLink] total_capacity_gbps={:s}'.format(str(total_capacity_gbps)))

            vlink = Link(**json_link(
                vlink_uuid, vlink_endpoint_ids, name=vlink_name,
                link_type=LinkTypeEnum.LINKTYPE_VIRTUAL,
                total_capacity_gbps=total_capacity_gbps,
            ))
            LOGGER.info('[SetVirtualLink] vlink={:s}'.format(grpc_message_to_json_string(vlink)))

            #device_names, endpoints_data = get_endpoint_names(self.context_client, request.link_endpoint_ids)

            #device_uuid_or_name_a = vlink_endpoint_ids[ 0]['device_id']['device_uuid']['uuid']
            #device_name_a = device_names.get(device_uuid_or_name_a, device_uuid_or_name_a)

            #device_uuid_or_name_b = vlink_endpoint_ids[-1]['device_id']['device_uuid']['uuid']
            #device_name_b = device_names.get(device_uuid_or_name_b, device_uuid_or_name_b)

            #endpoint_uuid_or_name_a = vlink_endpoint_ids[ 0]['endpoint_uuid']['uuid']
            #endpoint_name_a = endpoints_data.get(endpoint_uuid_or_name_a, (endpoint_uuid_or_name_a, None))
            #endpoint_name_a = endpoint_name_a.replace('PORT-', '')

            #endpoint_uuid_or_name_b = vlink_endpoint_ids[-1]['endpoint_uuid']['uuid']
            #endpoint_name_b = endpoints_data.get(endpoint_uuid_or_name_b, (endpoint_uuid_or_name_b, None))
            #endpoint_name_b = endpoint_name_b.replace('PORT-', '')

            #network_instance_name = '-'.join([
            #    device_name_a, endpoint_name_a, device_name_b, endpoint_name_b
            #])
            #configure(
            #    device_name_a, endpoint_name_a, device_name_b, endpoint_name_b, network_instance_name
            #)

            vlink_id = self.context_client.SetLink(vlink)

            MSG = 'Virtual link created, vlink_id={:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(vlink_id)))
            return vlink_id
        except: # pylint: disable=bare-except
            MSG = 'Exception setting virtual link={:s}'
            LOGGER.exception(MSG.format(str(request.link_id.link_uuid.uuid)))
            raise
Carlos Manso's avatar
Carlos Manso committed

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
Carlos Manso's avatar
Carlos Manso committed
        try:
            LOGGER.info('[RemoveVirtualLink] request={:s}'.format(grpc_message_to_json_string(request)))
            request_key = self.send_vlink_remove(request)
            reply = self.wait_for_reply(request_key)
            LOGGER.info('[RemoveVirtualLink] reply={:s}'.format(str(reply)))

            # At this point, we know the request is processed and an optical connection was removed

            vlink_uuid = request.link_uuid.uuid
            LOGGER.info('[RemoveVirtualLink] vlink_uuid={:s}'.format(str(vlink_uuid)))

            vlink_id = LinkId(**json_link_id(vlink_uuid))
            LOGGER.info('[RemoveVirtualLink] vlink_id={:s}'.format(grpc_message_to_json_string(vlink_id)))
Carlos Manso's avatar
Carlos Manso committed

            # deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
            self.context_client.RemoveLink(vlink_id)

            MSG = 'Virtual link removed, vlink_id={:s}'
            LOGGER.info(MSG.format(grpc_message_to_json_string(vlink_id)))
            return Empty()
        except: # pylint: disable=bare-except
            MSG = 'Exception removing virtual link={:s}'
            LOGGER.exception(MSG.format(str(request.link_uuid.uuid)))
            raise