# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging, threading from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.context_pb2 import ContextId, Empty from common.proto.pathcomp_pb2 import PathCompReply, PathCompRequest from common.proto.pathcomp_pb2_grpc import PathCompServiceServicer from common.tools.context_queries.Device import get_devices_in_topology from common.tools.context_queries.Link import get_links_in_topology from common.tools.context_queries.InterDomain import is_inter_domain from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id from context.client.ContextClient import ContextClient from pathcomp.frontend.service.algorithms.Factory import get_algorithm LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('PathComp', 'RPC') ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)) class PathCompServiceServicerImpl(PathCompServiceServicer): def __init__(self) -> None: LOGGER.debug('Creating Servicer...') self._lock = threading.Lock() LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def Compute(self, request : PathCompRequest, context : grpc.ServicerContext) -> PathCompReply: LOGGER.debug('[Compute] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) context_client = ContextClient() if (len(request.services) == 1) and is_inter_domain(context_client, request.services[0].service_endpoint_ids): devices = get_devices_in_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID) links = get_links_in_topology(context_client, ADMIN_CONTEXT_ID, INTERDOMAIN_TOPOLOGY_UUID) else: # TODO: improve filtering of devices and links # TODO: add contexts, topologies, and membership of devices/links in topologies devices = context_client.ListDevices(Empty()) links = context_client.ListLinks(Empty()) algorithm = get_algorithm(request) algorithm.add_devices(devices) algorithm.add_links(links) algorithm.add_service_requests(request) #LOGGER.debug('device_list = {:s}' .format(str(algorithm.device_list ))) #LOGGER.debug('endpoint_dict = {:s}'.format(str(algorithm.endpoint_dict))) #LOGGER.debug('link_list = {:s}' .format(str(algorithm.link_list ))) #LOGGER.debug('service_list = {:s}' .format(str(algorithm.service_list ))) #LOGGER.debug('service_dict = {:s}' .format(str(algorithm.service_dict ))) #import time #ts = time.time() #algorithm.execute('request-{:f}.json'.format(ts), 'reply-{:f}.json'.format(ts)) with self._lock: # ensure backend receives requests one at a time algorithm.execute() reply = algorithm.get_reply() LOGGER.debug('[Compute] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply