Skip to content
SubscriptionServer.py 7.35 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, grpc, json, logging, networkx, requests, threading
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import (
    Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId,
    ServiceTypeEnum, ServiceStatusEnum)
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
from context.service.database.uuids.Device import device_get_uuid
from service.client.ServiceClient import ServiceClient
from websockets.sync.client import connect
from websockets.sync.server import serve


LOGGER = logging.getLogger(__name__)
logging.getLogger("websockets").propagate = True
logging.getLogger("requests.packages.urllib3").propagate = True

METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")


context_client: ContextClient = ContextClient()
service_client: ServiceClient = ServiceClient()

EXT_HOST = str(get_setting('WS_IP_HOST'))
EXT_PORT = int(get_setting('WS_IP_PORT'))
EXT_URL  = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT)

OWN_HOST = str(get_setting('WS_E2E_HOST'))
OWN_PORT = int(get_setting('WS_E2E_PORT'))

ALL_HOSTS = '0.0.0.0'

class SubscriptionServer(threading.Thread):
    def run(self):
        request = VNTSubscriptionRequest()
        request.host = OWN_HOST
        request.port = OWN_PORT
        try: 
            LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL))
            websocket = connect(EXT_URL)
        except: # pylint: disable=bare-except
            LOGGER.exception('Error connecting to {:s}'.format(EXT_URL))
        else:
            with websocket:
                LOGGER.debug('Connected to {:s}'.format(EXT_URL))
                send = grpc_message_to_json_string(request)
                websocket.send(send)
                LOGGER.debug('Sent: {:s}'.format(send))
                try:
                    message = websocket.recv()
                    LOGGER.debug('Received message from WebSocket: {:s}'.format(message))
                except Exception as ex:
                    LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex))
            self._events_server()


    def _events_server(self):
        try:
            server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT))
        except: # pylint: disable=bare-except
            LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
        else:
            with server:
                LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
                server.serve_forever()


    def _event_received(self, connection):
        LOGGER.debug('Event received')
        for message in connection:
            message_json = json.loads(message)

            # Link creation
            if 'link_id' in message_json:
                LOGGER.debug('Link creation')
                link = Link(**message_json)

                service = Service()
                service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
                service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
                service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
                service_client.CreateService(service)

                a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
                a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
                z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
                z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]

                links = context_client.ListLinks(Empty()).links
                for _link in links:
                    for _endpoint_id in _link.link_endpoint_ids:
                        if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
                        _endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
                            a_ep_id = _endpoint_id
                        elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
                        _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
                            z_ep_id = _endpoint_id

                if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()):
                    error_msg = f'Could not get VNT link endpoints\
                                    \n\ta_endpoint_uuid= {a_endpoint_uuid}\
                                    \n\tz_endpoint_uuid= {z_device_uuid}'
                    LOGGER.error(error_msg)
                    connection.send(error_msg)
                    return

                service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
                service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))

                service_client.UpdateService(service)
                re_svc = context_client.GetService(service.service_id)
                connection.send(grpc_message_to_json_string(link))
                context_client.SetLink(link)
            elif 'link_uuid' in message_json:
                LOGGER.debug('Link removal')
                link_id = LinkId(**message_json)

                service_id = ServiceId()
                service_id.service_uuid.uuid = link_id.link_uuid.uuid
                service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                service_client.DeleteService(service_id)
                connection.send(grpc_message_to_json_string(link_id))
                context_client.RemoveLink(link_id)
            else:
                LOGGER.debug('Topology received')
                topology_details = TopologyDetails(**message_json)

                context = Context()
                context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
                context_client.SetContext(context)

                topology = Topology()
                topology.topology_id.context_id.CopyFrom(context.context_id)
                topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
                context_client.SetTopology(topology)

                for device in topology_details.devices:
                    context_client.SetDevice(device)

                for link in topology_details.links:
                    context_client.SetLink(link)