Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
PathCompServiceServicerImpl.py 4.29 KiB
Newer Older
  • Learn to ignore specific revisions
  • Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    import grpc, logging, threading
    
    from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
    
    from common.proto.context_pb2 import ContextId, Empty, TopologyId
    
    from common.proto.pathcomp_pb2 import PathCompReply, PathCompRequest
    from common.proto.pathcomp_pb2_grpc import PathCompServiceServicer
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.tools.context_queries.Device import get_devices_in_topology
    from common.tools.context_queries.Link import get_links_in_topology
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.tools.context_queries.InterDomain import is_inter_domain
    
    from common.tools.grpc.Tools import grpc_message_to_json_string
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from common.tools.object_factory.Context import json_context_id
    
    from common.tools.object_factory.Topology import json_topology_id
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from context.client.ContextClient import ContextClient
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    from pathcomp.frontend.service.algorithms.Factory import get_algorithm
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
    LOGGER = logging.getLogger(__name__)
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    METRICS_POOL = MetricsPool('PathComp', 'RPC')
    
    #ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    class PathCompServiceServicerImpl(PathCompServiceServicer):
        def __init__(self) -> None:
            LOGGER.debug('Creating Servicer...')
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            self._lock = threading.Lock()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            LOGGER.debug('Servicer Created')
    
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
        def Compute(self, request : PathCompRequest, context : grpc.ServicerContext) -> PathCompReply:
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            LOGGER.debug('[Compute] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            context_client = ContextClient()
    
    
            context_id = json_context_id(DEFAULT_CONTEXT_NAME)
    
            # TODO: improve definition of topologies; for interdomain the current topology design might be not convenient
            #if (len(request.services) == 1) and is_inter_domain(context_client, request.services[0].service_endpoint_ids):
            #    #devices = get_devices_in_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME)
            #    #links = get_links_in_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_NAME)
            #    topology_id = json_topology_id(INTERDOMAIN_TOPOLOGY_NAME, context_id)
            #else:
            #    # TODO: improve filtering of devices and links
            #    # TODO: add contexts, topologies, and membership of devices/links in topologies
            #    #devices = context_client.ListDevices(Empty())
            #    #links = context_client.ListLinks(Empty())
            #    topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id)
            topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id)
    
    
            topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            algorithm = get_algorithm(request)
    
            algorithm.add_devices(topology_details.devices)
            algorithm.add_links(topology_details.links)
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            algorithm.add_service_requests(request)
    
            #LOGGER.debug('device_list = {:s}'  .format(str(algorithm.device_list  )))
            #LOGGER.debug('endpoint_dict = {:s}'.format(str(algorithm.endpoint_dict)))
            #LOGGER.debug('link_list = {:s}'    .format(str(algorithm.link_list    )))
            #LOGGER.debug('service_list = {:s}' .format(str(algorithm.service_list )))
            #LOGGER.debug('service_dict = {:s}' .format(str(algorithm.service_dict )))
    
            #import time
            #ts = time.time()
            #algorithm.execute('request-{:f}.json'.format(ts), 'reply-{:f}.json'.format(ts))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            with self._lock:
                # ensure backend receives requests one at a time
                algorithm.execute()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
    
            reply = algorithm.get_reply()
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            LOGGER.debug('[Compute] end ; reply = {:s}'.format(grpc_message_to_json_string(reply)))
    
    Lluis Gifre Renom's avatar
    Lluis Gifre Renom committed
            return reply