Skip to content
Snippets Groups Projects
E2EOrchestratorServiceServicerImpl.py 5.66 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import networkx as nx
import grpc
import copy
Carlos Manso's avatar
Carlos Manso committed
from websockets.sync.client import connect
import time
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
Carlos Manso's avatar
Carlos Manso committed
from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from context.client.ContextClient import ContextClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
Carlos Manso's avatar
Carlos Manso committed
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.tools.grpc.Tools import grpc_message_to_json_string
from websockets.sync.server import serve
Carlos Manso's avatar
Carlos Manso committed
import json

LOGGER = logging.getLogger(__name__)

METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")

context_client: ContextClient = ContextClient()


class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
    def __init__(self):
        LOGGER.debug("Creating Servicer...")
        LOGGER.debug("Servicer Created")

Carlos Manso's avatar
Carlos Manso committed
        time.sleep(5)
Carlos Manso's avatar
Carlos Manso committed
        try:
            LOGGER.info("Requesting subscription")
            self.RequestSubscription()
        except Exception as E:
Carlos Manso's avatar
Carlos Manso committed
            LOGGER.info("Exception0!: {}".format(E))
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply:
        endpoints_ids = []
        for endpoint_id in request.service.service_endpoint_ids:
            endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2])

        graph = nx.Graph()

        devices = context_client.ListDevices(Empty()).devices

        for device in devices:
            endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid
                               for endpoint in device.device_endpoints]
            for ep in endpoints_uuids:
                graph.add_node(ep)

            for ep in endpoints_uuids:
                for ep_i in endpoints_uuids:
                    if ep == ep_i:
                        continue
                    graph.add_edge(ep, ep_i)

        links = context_client.ListLinks(Empty()).links
        for link in links:
            eps = []
            for endpoint_id in link.link_endpoint_ids:
                eps.append(endpoint_id.endpoint_uuid.uuid)
            graph.add_edge(eps[0], eps[1])


        shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])

        path = E2EOrchestratorReply()
        path.services.append(copy.deepcopy(request.service))
        for i in range(0, int(len(shortest)/2)):
            conn = Connection()
            ep_a_uuid = str(shortest[i*2])
            ep_z_uuid = str(shortest[i*2+1])

            conn.connection_id.connection_uuid.uuid = str(ep_a_uuid) + '_->_' + str(ep_z_uuid)

            ep_a_id = EndPointId()
            ep_a_id.endpoint_uuid.uuid = ep_a_uuid
            conn.path_hops_endpoint_ids.append(ep_a_id)

            ep_z_id = EndPointId()
            ep_z_id.endpoint_uuid.uuid = ep_z_uuid
            conn.path_hops_endpoint_ids.append(ep_z_id)

            path.connections.append(conn)

Carlos Manso's avatar
Carlos Manso committed
    def RequestSubscription(self):
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.info("Trying to connect...!!!")
Carlos Manso's avatar
Carlos Manso committed
        EXT_HOST = "nbiservice.tfs-ip.svc.cluster.local"
        EXT_PORT = "8762"
Carlos Manso's avatar
Carlos Manso committed

Carlos Manso's avatar
Carlos Manso committed
        OWN_HOST = "e2e-orchestratorservice.tfs-e2e.svc.cluster.local"
        OWN_PORT = "8761"


        
Carlos Manso's avatar
Carlos Manso committed
        url = "ws://" + EXT_HOST + ":" + EXT_PORT
Carlos Manso's avatar
Carlos Manso committed
        request = VNTSubscriptionRequest()
Carlos Manso's avatar
Carlos Manso committed
        request.host = OWN_HOST
        request.port = OWN_PORT
Carlos Manso's avatar
Carlos Manso committed
        LOGGER.info("Trying to connect... to {}".format(url))
Carlos Manso's avatar
Carlos Manso committed
        with connect(url) as websocket:
            LOGGER.info("CONNECTED!!! {}")
Carlos Manso's avatar
Carlos Manso committed
            send = grpc_message_to_json_string(request)
            LOGGER.info("Sending {}".format(send))
            websocket.send(send)

            try:
                message = websocket.recv()
            except Exception as e:
                LOGGER.info('Exception1!: {}'.format(e))

            try:
                LOGGER.info("Received ws: {}".format(message))
            except Exception as e:
                LOGGER.info('Exception2!: {}'.format(e))

Carlos Manso's avatar
Carlos Manso committed
        with serve(self._event_received, "0.0.0.0", OWN_PORT, logger=LOGGER) as server:
            LOGGER.info("Running subscription server...: {}:{}".format("0.0.0.0", OWN_PORT))
Carlos Manso's avatar
Carlos Manso committed
            server.serve_forever()
            LOGGER.info("Exiting subscription server...")


            

Carlos Manso's avatar
Carlos Manso committed
    def _event_received(self, websocket):
        for message in websocket:
            LOGGER.info("Message received!!!: {}".format(message))
            message_json = json.loads(message)
            if 'event_type' in message_json:

                pass
            elif 'link_id' in message_json:
                obj = Link(**message_json)
                if self._check_policies(obj):
                    pass
            elif 'link_uuid' in message_json:
                obj = LinkId(**message_json)

            websocket.send(message)


    def _check_policies(self, link):
        return True