# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging, uuid from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.context_queries.Context import create_context from common.tools.context_queries.InterDomain import ( compute_interdomain_path, compute_traversed_domains, get_local_device_uuids, is_inter_domain) from common.tools.context_queries.Topology import create_topology from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from dlt.connector.client.DltConnectorClient import DltConnectorClient from interdomain.service.topology_abstractor.DltRecordSender import DltRecordSender from pathcomp.frontend.client.PathCompClient import PathCompClient from slice.client.SliceClient import SliceClient from .RemoteDomainClients import RemoteDomainClients from .Tools import compose_slice, compute_slice_owner, map_abstract_endpoints_to_real LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Interdomain', 'RPC') USE_DLT = True class InterdomainServiceServicerImpl(InterdomainServiceServicer): def __init__(self, remote_domain_clients : RemoteDomainClients): LOGGER.debug('Creating Servicer...') self.remote_domain_clients = remote_domain_clients LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: context_client = ContextClient() pathcomp_client = PathCompClient() slice_client = SliceClient() dlt_connector_client = DltConnectorClient() local_device_uuids = get_local_device_uuids(context_client) slice_owner_uuid = request.slice_owner.owner_uuid.uuid not_inter_domain = not is_inter_domain(context_client, request.slice_endpoint_ids) no_slice_owner = len(slice_owner_uuid) == 0 is_local_slice_owner = slice_owner_uuid in local_device_uuids if not_inter_domain and (no_slice_owner or is_local_slice_owner): str_slice = grpc_message_to_json_string(request) raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice)) interdomain_path = compute_interdomain_path(pathcomp_client, request) str_interdomain_path = [ [device_uuid, [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in endpoint_ids ]] for device_uuid, endpoint_ids in interdomain_path ] LOGGER.info('interdomain_path={:s}'.format(str(str_interdomain_path))) traversed_domains = compute_traversed_domains(context_client, interdomain_path) str_traversed_domains = [ (domain_uuid, is_local_domain, [ (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid) for endpoint_id in endpoint_ids ]) for domain_uuid,is_local_domain,endpoint_ids in traversed_domains ] LOGGER.info('traversed_domains={:s}'.format(str(str_traversed_domains))) slice_owner_uuid = compute_slice_owner(context_client, traversed_domains) LOGGER.info('slice_owner_uuid={:s}'.format(str(slice_owner_uuid))) if slice_owner_uuid is None: raise Exception('Unable to identify slice owner') reply = Slice() reply.CopyFrom(request) dlt_record_sender = DltRecordSender(context_client, dlt_connector_client) for domain_uuid, is_local_domain, endpoint_ids in traversed_domains: if is_local_domain: slice_uuid = str(uuid.uuid4()) LOGGER.info('[loop] [local] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format( str(domain_uuid), str(is_local_domain), str(slice_uuid))) # local slices always in DEFAULT_CONTEXT_NAME #context_uuid = request.slice_id.context_id.context_uuid.uuid context_uuid = DEFAULT_CONTEXT_NAME endpoint_ids = map_abstract_endpoints_to_real(context_client, domain_uuid, endpoint_ids) sub_slice = compose_slice( context_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints, config_rules=request.slice_config.config_rules) LOGGER.info('[loop] [local] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) sub_slice_id = slice_client.CreateSlice(sub_slice) else: slice_uuid = request.slice_id.slice_uuid.uuid LOGGER.info('[loop] [remote] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format( str(domain_uuid), str(is_local_domain), str(slice_uuid))) # create context/topology for the remote domains where we are creating slices create_context(context_client, domain_uuid) create_topology(context_client, domain_uuid, DEFAULT_TOPOLOGY_NAME) sub_slice = compose_slice( domain_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints, config_rules=request.slice_config.config_rules, owner_uuid=slice_owner_uuid) LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice))) sub_slice_id = context_client.SetSlice(sub_slice) if USE_DLT: topology_id = TopologyId(**json_topology_id(domain_uuid)) dlt_record_sender.add_slice(topology_id, sub_slice) else: interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow') sub_slice_reply = interdomain_client.LookUpSlice(sub_slice) if sub_slice_reply == sub_slice.slice_id: # pylint: disable=no-member # successful case remote_sub_slice = interdomain_client.OrderSliceFromCatalog(sub_slice) else: # not in catalog remote_sub_slice = interdomain_client.CreateSliceAndAddToCatalog(sub_slice) if remote_sub_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE: raise Exception('Remote Slice creation failed. Wrong Slice status returned') LOGGER.info('[loop] adding sub-slice') reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member LOGGER.info('Recording Remote Slice requests to DLT') dlt_record_sender.commit() LOGGER.info('Activating interdomain slice') reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member LOGGER.info('Updating interdomain slice') slice_id = context_client.SetSlice(reply) return slice_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult: auth_result = AuthenticationResult() auth_result.context_id.CopyFrom(request.context_id) # pylint: disable=no-member auth_result.authenticated = True return auth_result @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def LookUpSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: try: context_client = ContextClient() slice_ = context_client.GetSlice(request.slice_id) return slice_.slice_id except grpc.RpcError: #LOGGER.exception('Unable to get slice({:s})'.format(grpc_message_to_json_string(request.slice_id))) return SliceId() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def OrderSliceFromCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: raise NotImplementedError('OrderSliceFromCatalog') #return Slice() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: context_client = ContextClient() slice_client = SliceClient() reply = slice_client.CreateSlice(request) if reply != request.slice_id: raise Exception('Slice creation failed. Wrong Slice Id was returned') return context_client.GetSlice(request.slice_id)