Commit f3642dbd authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

E2E Orchestrator component:

- Remove unneeded files
parent 8a7a1ffc
Loading
Loading
Loading
Loading
+0 −37
Original line number Diff line number Diff line
    def retrieve_external_topologies(self):
        i = 1
        while True:
            try:
                ADD  = str(get_setting(f'EXT_CONTROLLER{i}_ADD'))
                PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT'))
            except: # pylint: disable=bare-except
                break

            try:
                LOGGER.info('Retrieving external controller #{:d}'.format(i))
                url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format(
                    ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
                )
                LOGGER.info('url={:s}'.format(str(url)))
                topo = requests.get(url).json()
                LOGGER.info('Retrieved external controller #{:d}'.format(i))
            except: # pylint: disable=bare-except
                LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i))

            topology_details = TopologyDetails(**topo)
            context = Context()
            context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
            context_client.SetContext(context)

            topology = Topology()
            topology.topology_id.context_id.CopyFrom(context.context_id)
            topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
            context_client.SetTopology(topology)

            for device in topology_details.devices:
                context_client.SetDevice(device)

            for link in topology_details.links:
                context_client.SetLink(link)

            i+=1
+0 −268
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, grpc, json, logging, networkx, requests, threading
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import (
    Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId,
    ServiceTypeEnum, ServiceStatusEnum)
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
from context.service.database.uuids.Device import device_get_uuid
from service.client.ServiceClient import ServiceClient
from websockets.sync.client import connect
from websockets.sync.server import serve


LOGGER = logging.getLogger(__name__)
logging.getLogger("websockets").propagate = True
logging.getLogger("requests.packages.urllib3").propagate = True

METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")


context_client: ContextClient = ContextClient()
service_client: ServiceClient = ServiceClient()

EXT_HOST = str(get_setting('WS_IP_HOST'))
EXT_PORT = int(get_setting('WS_IP_PORT'))
EXT_URL  = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT)

OWN_HOST = str(get_setting('WS_E2E_HOST'))
OWN_PORT = int(get_setting('WS_E2E_PORT'))

ALL_HOSTS = '0.0.0.0'

class SubscriptionServer(threading.Thread):
    def run(self):
        request = VNTSubscriptionRequest()
        request.host = OWN_HOST
        request.port = OWN_PORT
        try: 
            LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL))
            websocket = connect(EXT_URL)
        except: # pylint: disable=bare-except
            LOGGER.exception('Error connecting to {:s}'.format(EXT_URL))
        else:
            with websocket:
                LOGGER.debug('Connected to {:s}'.format(EXT_URL))
                send = grpc_message_to_json_string(request)
                websocket.send(send)
                LOGGER.debug('Sent: {:s}'.format(send))
                try:
                    message = websocket.recv()
                    LOGGER.debug('Received message from WebSocket: {:s}'.format(message))
                except Exception as ex:
                    LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex))
            self._events_server()


    def _events_server(self):
        try:
            server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT))
        except: # pylint: disable=bare-except
            LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
        else:
            with server:
                LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
                server.serve_forever()


    def _event_received(self, connection):
        LOGGER.debug('Event received')
        for message in connection:
            message_json = json.loads(message)

            # Link creation
            if 'link_id' in message_json:
                LOGGER.debug('Link creation')
                link = Link(**message_json)

                service = Service()
                service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
                service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
                service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
                service_client.CreateService(service)

                a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
                a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
                z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
                z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]

                links = context_client.ListLinks(Empty()).links
                for _link in links:
                    for _endpoint_id in _link.link_endpoint_ids:
                        if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
                        _endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
                            a_ep_id = _endpoint_id
                        elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
                        _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
                            z_ep_id = _endpoint_id

                if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()):
                    error_msg = f'Could not get VNT link endpoints\
                                    \n\ta_endpoint_uuid= {a_endpoint_uuid}\
                                    \n\tz_endpoint_uuid= {z_device_uuid}'
                    LOGGER.error(error_msg)
                    connection.send(error_msg)
                    return

                service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
                service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))

                service_client.UpdateService(service)
                re_svc = context_client.GetService(service.service_id)
                connection.send(grpc_message_to_json_string(link))
                context_client.SetLink(link)
            elif 'link_uuid' in message_json:
                LOGGER.debug('Link removal')
                link_id = LinkId(**message_json)

                service_id = ServiceId()
                service_id.service_uuid.uuid = link_id.link_uuid.uuid
                service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
                service_client.DeleteService(service_id)
                connection.send(grpc_message_to_json_string(link_id))
                context_client.RemoveLink(link_id)
            else:
                LOGGER.debug('Topology received')
                topology_details = TopologyDetails(**message_json)

                context = Context()
                context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
                context_client.SetContext(context)

                topology = Topology()
                topology.topology_id.context_id.CopyFrom(context.context_id)
                topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
                context_client.SetTopology(topology)

                for device in topology_details.devices:
                    context_client.SetDevice(device)

                for link in topology_details.links:
                    context_client.SetLink(link)



class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
    def __init__(self):
        LOGGER.debug('Creating Servicer...')
        try:
            LOGGER.debug('Requesting subscription')
            sub_server = SubscriptionServer()
            sub_server.start()
            LOGGER.debug('Servicer Created')
            self.retrieve_external_topologies()
        except:
            LOGGER.exception('Unhandled Exception')

    def retrieve_external_topologies(self):
        i = 1
        while True:
            try:
                ADD  = str(get_setting(f'EXT_CONTROLLER{i}_ADD'))
                PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT'))
            except: # pylint: disable=bare-except
                break

            try:
                LOGGER.info('Retrieving external controller #{:d}'.format(i))
                url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format(
                    ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
                )
                LOGGER.info('url={:s}'.format(str(url)))
                topo = requests.get(url).json()
                LOGGER.info('Retrieved external controller #{:d}'.format(i))
            except: # pylint: disable=bare-except
                LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i))

            topology_details = TopologyDetails(**topo)
            context = Context()
            context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
            context_client.SetContext(context)

            topology = Topology()
            topology.topology_id.context_id.CopyFrom(context.context_id)
            topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
            context_client.SetTopology(topology)

            for device in topology_details.devices:
                context_client.SetDevice(device)

            for link in topology_details.links:
                context_client.SetLink(link)

            i+=1

        
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply:
        endpoints_ids = []
        for endpoint_id in request.service.service_endpoint_ids:
            endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2])

        graph = networkx.Graph()

        devices = context_client.ListDevices(Empty()).devices

        for device in devices:
            endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid
                               for endpoint in device.device_endpoints]
            for ep in endpoints_uuids:
                graph.add_node(ep)

            for ep in endpoints_uuids:
                for ep_i in endpoints_uuids:
                    if ep == ep_i:
                        continue
                    graph.add_edge(ep, ep_i)

        links = context_client.ListLinks(Empty()).links
        for link in links:
            eps = []
            for endpoint_id in link.link_endpoint_ids:
                eps.append(endpoint_id.endpoint_uuid.uuid)
            graph.add_edge(eps[0], eps[1])


        shortest = networkx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])

        path = E2EOrchestratorReply()
        path.services.append(copy.deepcopy(request.service))
        for i in range(0, int(len(shortest)/2)):
            conn = Connection()
            ep_a_uuid = str(shortest[i*2])
            ep_z_uuid = str(shortest[i*2+1])

            conn.connection_id.connection_uuid.uuid = str(ep_a_uuid) + '_->_' + str(ep_z_uuid)

            ep_a_id = EndPointId()
            ep_a_id.endpoint_uuid.uuid = ep_a_uuid
            conn.path_hops_endpoint_ids.append(ep_a_id)

            ep_z_id = EndPointId()
            ep_z_id.endpoint_uuid.uuid = ep_z_uuid
            conn.path_hops_endpoint_ids.append(ep_z_id)

            path.connections.append(conn)

        return path