Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatusEnum, TeraFlowController, TopologyId
from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.InterDomain import (

Lluis Gifre Renom
committed
compute_interdomain_path, compute_traversed_domains, get_local_device_uuids, is_inter_domain)
from common.tools.context_queries.Topology import create_topology
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from interdomain.service.topology_abstractor.DltRecordSender import DltRecordSender
from pathcomp.frontend.client.PathCompClient import PathCompClient
from slice.client.SliceClient import SliceClient
from .RemoteDomainClients import RemoteDomainClients
from .Tools import compose_slice, compute_slice_owner, map_abstract_endpoints_to_real
LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom
committed
USE_DLT = True
class InterdomainServiceServicerImpl(InterdomainServiceServicer):
def __init__(self, remote_domain_clients : RemoteDomainClients):
LOGGER.debug('Creating Servicer...')
self.remote_domain_clients = remote_domain_clients
LOGGER.debug('Servicer Created')
def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
local_device_uuids = get_local_device_uuids(context_client)
slice_owner_uuid = request.slice_owner.owner_uuid.uuid
not_inter_domain = not is_inter_domain(context_client, request.slice_endpoint_ids)
no_slice_owner = len(slice_owner_uuid) == 0
is_local_slice_owner = slice_owner_uuid in local_device_uuids
if not_inter_domain and (no_slice_owner or is_local_slice_owner):
str_slice = grpc_message_to_json_string(request)
raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice))
interdomain_path = compute_interdomain_path(pathcomp_client, request)
str_interdomain_path = [
[device_uuid, [
(endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid)
for endpoint_id in endpoint_ids
]]
for device_uuid, endpoint_ids in interdomain_path
]
LOGGER.info('interdomain_path={:s}'.format(str(str_interdomain_path)))
traversed_domains = compute_traversed_domains(context_client, interdomain_path)
str_traversed_domains = [
(domain_uuid, is_local_domain, [
(endpoint_id.device_id.device_uuid.uuid, endpoint_id.endpoint_uuid.uuid)
for endpoint_id in endpoint_ids
])
for domain_uuid,is_local_domain,endpoint_ids in traversed_domains
]
LOGGER.info('traversed_domains={:s}'.format(str(str_traversed_domains)))
slice_owner_uuid = compute_slice_owner(context_client, traversed_domains)
LOGGER.info('slice_owner_uuid={:s}'.format(str(slice_owner_uuid)))
if slice_owner_uuid is None:
raise Exception('Unable to identify slice owner')
reply = Slice()
reply.CopyFrom(request)
dlt_record_sender = DltRecordSender(context_client, dlt_connector_client)
for domain_uuid, is_local_domain, endpoint_ids in traversed_domains:
slice_uuid = str(uuid.uuid4())
LOGGER.info('[loop] [local] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format(
str(domain_uuid), str(is_local_domain), str(slice_uuid)))
# local slices always in DEFAULT_CONTEXT_UUID
#context_uuid = request.slice_id.context_id.context_uuid.uuid
context_uuid = DEFAULT_CONTEXT_UUID
endpoint_ids = map_abstract_endpoints_to_real(context_client, domain_uuid, endpoint_ids)
sub_slice = compose_slice(
context_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints,
config_rules=request.slice_config.config_rules)
LOGGER.info('[loop] [local] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice)))
sub_slice_id = slice_client.CreateSlice(sub_slice)

Lluis Gifre Renom
committed
if sub_slice_id != sub_slice.slice_id: # pylint: disable=no-member
raise Exception('Local Slice creation failed. Wrong Slice Id was returned')
slice_uuid = request.slice_id.slice_uuid.uuid
LOGGER.info('[loop] [remote] domain_uuid={:s} is_local_domain={:s} slice_uuid={:s}'.format(
str(domain_uuid), str(is_local_domain), str(slice_uuid)))
# create context/topology for the remote domains where we are creating slices
create_topology(context_client, domain_uuid, DEFAULT_TOPOLOGY_UUID)

Lluis Gifre Renom
committed
sub_slice = compose_slice(
domain_uuid, slice_uuid, endpoint_ids, constraints=request.slice_constraints,
config_rules=request.slice_config.config_rules, owner_uuid=slice_owner_uuid)
LOGGER.info('[loop] [remote] sub_slice={:s}'.format(grpc_message_to_json_string(sub_slice)))
sub_slice_id = context_client.SetSlice(sub_slice)

Lluis Gifre Renom
committed
if USE_DLT:
topology_id = TopologyId(**json_topology_id(domain_uuid))
dlt_record_sender.add_slice(topology_id, sub_slice)
else:
interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow')
sub_slice_reply = interdomain_client.LookUpSlice(sub_slice)
if sub_slice_reply == sub_slice.slice_id: # pylint: disable=no-member
# successful case
remote_sub_slice = interdomain_client.OrderSliceFromCatalog(sub_slice)
else:
# not in catalog
remote_sub_slice = interdomain_client.CreateSliceAndAddToCatalog(sub_slice)
if remote_sub_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE:
raise Exception('Remote Slice creation failed. Wrong Slice status returned')
reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member
LOGGER.info('Recording Remote Slice requests to DLT')
dlt_record_sender.commit()
LOGGER.info('Activating interdomain slice')
reply.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE # pylint: disable=no-member
LOGGER.info('Updating interdomain slice')
slice_id = context_client.SetSlice(reply)
return slice_id
def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult:
auth_result = AuthenticationResult()
auth_result.context_id.CopyFrom(request.context_id) # pylint: disable=no-member
auth_result.authenticated = True
return auth_result
def LookUpSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
try:
context_client = ContextClient()
slice_ = context_client.GetSlice(request.slice_id)
return slice_.slice_id
except grpc.RpcError:
#LOGGER.exception('Unable to get slice({:s})'.format(grpc_message_to_json_string(request.slice_id)))
return SliceId()
def OrderSliceFromCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice:
raise NotImplementedError('OrderSliceFromCatalog')
#return Slice()
def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice:
context_client = ContextClient()
slice_client = SliceClient()
reply = slice_client.CreateSlice(request)

Lluis Gifre Renom
committed
if reply != request.slice_id:
raise Exception('Slice creation failed. Wrong Slice Id was returned')
return context_client.GetSlice(request.slice_id)