Skip to content
Snippets Groups Projects
InterdomainServiceServicerImpl.py 9.44 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging, uuid
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId
from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.InterDomain import (
    compute_interdomain_path, compute_traversed_domains, get_local_device_uuids, is_inter_domain)
from common.tools.context_queries.Topology import create_topology
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from interdomain.service.topology_abstractor.DltRecordSender import DltRecordSender
from pathcomp.frontend.client.PathCompClient import PathCompClient
from slice.client.SliceClient import SliceClient
from .RemoteDomainClients import RemoteDomainClients
from .Tools import compose_slice, compute_slice_owner, map_abstract_endpoints_to_real

LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Interdomain', 'RPC')

class InterdomainServiceServicerImpl(InterdomainServiceServicer):
    def __init__(self, remote_domain_clients : RemoteDomainClients):
        LOGGER.debug('Creating Servicer...')
        self.remote_domain_clients = remote_domain_clients
        LOGGER.debug('Servicer Created')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
        context_client = ContextClient()
        pathcomp_client = PathCompClient()
        slice_client = SliceClient()
        dlt_connector_client = DltConnectorClient()
        local_device_uuids = get_local_device_uuids(context_client)
        slice_owner_uuid = request.slice_owner.owner_uuid.uuid
        not_inter_domain = not is_inter_domain(context_client, request.slice_endpoint_ids)
        no_slice_owner = len(slice_owner_uuid) == 0
        is_local_slice_owner = slice_owner_uuid in local_device_uuids
        if not_inter_domain and (no_slice_owner or is_local_slice_owner):
            str_slice = grpc_message_to_json_string(request)
            raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice))

        interdomain_path = compute_interdomain_path(pathcomp_client, request)
        str_interdomain_path = [
            [device_uuid, [
                (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid)
                for endpoint_id in endpoint_ids
            ]]
            for device_uuid, endpoint_ids in interdomain_path
        ]
        LOGGER.info('interdomain_path={:s}'.format(str(str_interdomain_path)))

        traversed_domains = compute_traversed_domains(context_client, interdomain_path)
        str_traversed_domains = [
            (domain_uuid, is_local_domain, [
                (endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid)
                for endpoint_id in endpoint_ids
            ])
            for domain_uuid,is_local_domain,endpoint_ids in traversed_domains
        ]
        LOGGER.info('traversed_domains={:s}'.format(str(str_traversed_domains)))

        slice_owner_uuid = compute_slice_owner(context_client, traversed_domains)
        LOGGER.info('slice_owner_uuid={:s}'.format(str(slice_owner_uuid)))
        if slice_owner_uuid is None:
            raise Exception('Unable to identify slice owner')

        reply = Slice()
        reply.CopyFrom(request)

        dlt_record_sender = DltRecordSender(context_client, dlt_connector_client)

        for domain_uuid, is_local_domain, endpoint_ids in traversed_domains:
            if is_local_domain:
                slice_uuid = str(uuid.uuid4())
                LOGGER.info('[loop] [local] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format(
                    str(domain_uuid), str(is_local_domain), str(slice_uuid)))

                # local slices always in DEFAULT_CONTEXT_UUID
                #context_uuid = request.slice_id.context_id.context_uuid.uuid
                context_uuid = DEFAULT_CONTEXT_UUID
                endpoint_ids = map_abstract_endpoints_to_real(context_client, domain_uuid, endpoint_ids)
                sub_slice = compose_slice(
                    context_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints,
                    config_rules=request.slice_config.config_rules)
                LOGGER.info('[loop] [local] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice)))
                sub_slice_id = slice_client.CreateSlice(sub_slice)
                if sub_slice_id != sub_slice.slice_id: # pylint: disable=no-member
                    raise Exception('Local Slice creation failed. Wrong Slice Id was returned')
                slice_uuid = request.slice_id.slice_uuid.uuid
                LOGGER.info('[loop] [remote] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format(
                    str(domain_uuid), str(is_local_domain), str(slice_uuid)))

                # create context/topology for the remote domains where we are creating slices
                create_context(context_client, domain_uuid)
                create_topology(context_client, domain_uuid, DEFAULT_TOPOLOGY_UUID)
                sub_slice = compose_slice(
                    domain_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints,
                    config_rules=request.slice_config.config_rules, owner_uuid=slice_owner_uuid)
                LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice)))
                sub_slice_id = context_client.SetSlice(sub_slice)

                if USE_DLT:
                    topology_id = TopologyId(**json_topology_id(domain_uuid))
                    dlt_record_sender.add_slice(topology_id, sub_slice)
                else:
                    interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow')
                    sub_slice_reply = interdomain_client.LookUpSlice(sub_slice)
                    if sub_slice_reply == sub_slice.slice_id: # pylint: disable=no-member
                        # successful case
                        remote_sub_slice = interdomain_client.OrderSliceFromCatalog(sub_slice)
                    else:
                        # not in catalog
                        remote_sub_slice = interdomain_client.CreateSliceAndAddToCatalog(sub_slice)
                    if remote_sub_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE:
                        raise Exception('Remote Slice creation failed. Wrong Slice status returned')
            LOGGER.info('[loop] adding sub-slice')
            reply.slice_subslice_ids.add().CopyFrom(sub_slice_id)   # pylint: disable=no-member
        LOGGER.info('Recording Remote Slice requests to DLT')
        dlt_record_sender.commit()

        LOGGER.info('Activating interdomain slice')
        reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member

        LOGGER.info('Updating interdomain slice')
        slice_id = context_client.SetSlice(reply)
        return slice_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult:
        auth_result = AuthenticationResult()
        auth_result.context_id.CopyFrom(request.context_id) # pylint: disable=no-member
Ricard Vilalta's avatar
Ricard Vilalta committed
        auth_result.authenticated = True
        return auth_result
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def LookUpSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
        try:
            context_client = ContextClient()
            slice_ = context_client.GetSlice(request.slice_id)
            return slice_.slice_id
        except grpc.RpcError:
            #LOGGER.exception('Unable to get slice({:s})'.format(grpc_message_to_json_string(request.slice_id)))
            return SliceId()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def OrderSliceFromCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice:
        raise NotImplementedError('OrderSliceFromCatalog')
        #return Slice()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice:
        context_client = ContextClient()
        slice_client = SliceClient()
        reply = slice_client.CreateSlice(request)
            raise Exception('Slice creation failed. Wrong Slice Id was returned')
        return context_client.GetSlice(request.slice_id)