Skip to content
Snippets Groups Projects
Commit 608093ed authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Interdomain Component:

- backed up old servicer
- updated to use new generic Context queries
- added pathcomp as dependency
- updated logic to validate inter-domain requests
- implemented inter-domain pathcomp request
- updated logic to create per-domain slices
parent b152de16
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!24Integrate NFV-SDN'22 demo
...@@ -67,6 +67,7 @@ COPY src/context/. context/ ...@@ -67,6 +67,7 @@ COPY src/context/. context/
COPY src/dlt/. dlt/ COPY src/dlt/. dlt/
COPY src/interdomain/. interdomain/ COPY src/interdomain/. interdomain/
#COPY src/monitoring/. monitoring/ #COPY src/monitoring/. monitoring/
COPY src/pathcomp/. pathcomp/
#COPY src/service/. service/ #COPY src/service/. service/
COPY src/slice/. slice/ COPY src/slice/. slice/
......
...@@ -12,15 +12,18 @@ ...@@ -12,15 +12,18 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import grpc, logging import grpc, logging, uuid
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.proto.context_pb2 import AuthenticationResult, Slice, SliceId, TeraFlowController
from common.proto.context_pb2 import (
AuthenticationResult, Slice, SliceId, SliceStatus, SliceStatusEnum, TeraFlowController)
from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer
#from common.tools.grpc.Tools import grpc_message_to_json_string from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.tools.context_queries.InterDomain import (
compute_interdomain_path, compute_traversed_domains, is_multi_domain)
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from interdomain.service.RemoteDomainClients import RemoteDomainClients from pathcomp.frontend.client.PathCompClient import PathCompClient
from slice.client.SliceClient import SliceClient from slice.client.SliceClient import SliceClient
from .RemoteDomainClients import RemoteDomainClients
from .Tools import compose_slice, compute_slice_owner
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -37,89 +40,37 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer): ...@@ -37,89 +40,37 @@ class InterdomainServiceServicerImpl(InterdomainServiceServicer):
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
context_client = ContextClient() context_client = ContextClient()
pathcomp_client = PathCompClient()
slice_client = SliceClient() slice_client = SliceClient()
domains_to_endpoints = {} if not is_multi_domain(context_client, request.slice_endpoint_ids):
local_domain_uuid = None str_slice = grpc_message_to_json_string(request)
for slice_endpoint_id in request.slice_endpoint_ids: raise Exception('InterDomain can only handle inter-domain slice requests: {:s}'.format(str_slice))
device_uuid = slice_endpoint_id.device_id.device_uuid.uuid
domain_uuid = device_uuid.split('@')[1] interdomain_path = compute_interdomain_path(pathcomp_client, request)
endpoints = domains_to_endpoints.setdefault(domain_uuid, []) traversed_domains = compute_traversed_domains(context_client, interdomain_path)
endpoints.append(slice_endpoint_id) slice_owner_uuid = compute_slice_owner(context_client, traversed_domains)
if local_domain_uuid is None: local_domain_uuid = domain_uuid if slice_owner_uuid is None:
raise Exception('Unable to identify slice owner')
reply = Slice() reply = Slice()
reply.CopyFrom(request) reply.CopyFrom(request)
# decompose remote slices for domain_uuid, _, is_local_domain, endpoint_ids in traversed_domains:
for domain_uuid, slice_endpoint_ids in domains_to_endpoints.items(): slice_uuid = str(uuid.uuid4())
if domain_uuid == local_domain_uuid: continue
if is_local_domain:
remote_slice_request = Slice() context_uuid = request.slice_id.context_id.context_uuid.uuid
remote_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid sub_slice = compose_slice(context_uuid, slice_uuid, endpoint_ids)
remote_slice_request.slice_id.slice_uuid.uuid = \ sub_slice_id = slice_client.CreateSlice(sub_slice)
request.slice_id.slice_uuid.uuid + ':subslice@' + local_domain_uuid
remote_slice_request.slice_status.slice_status = request.slice_status.slice_status
for endpoint_id in slice_endpoint_ids:
slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = endpoint_id.device_id.device_uuid.uuid
slice_endpoint_id.endpoint_uuid.uuid = endpoint_id.endpoint_uuid.uuid
# add endpoint connecting to remote domain
if domain_uuid == 'D1':
slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
elif domain_uuid == 'D2':
slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow')
remote_slice_reply = interdomain_client.LookUpSlice(remote_slice_request)
if remote_slice_reply == remote_slice_request.slice_id: # pylint: disable=no-member
# successful case
remote_slice = interdomain_client.OrderSliceFromCatalog(remote_slice_request)
if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE:
raise Exception('Remote Slice creation failed. Wrong Slice status returned')
else: else:
# not in catalog sub_slice = compose_slice(domain_uuid, slice_uuid, endpoint_ids, slice_owner_uuid)
remote_slice = interdomain_client.CreateSliceAndAddToCatalog(remote_slice_request) sub_slice_id = context_client.SetSlice(sub_slice)
if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE:
raise Exception('Remote Slice creation failed. Wrong Slice status returned') reply.slice_subslice_ids.add().CopyFrom(sub_slice_id) # pylint: disable=no-member
#context_client.SetSlice(remote_slice) slice_id = context_client.SetSlice(reply)
#subslice_id = reply.slice_subslice_ids.add() return slice_id
#subslice_id.CopyFrom(remote_slice.slice_id)
local_slice_request = Slice()
local_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid
local_slice_request.slice_id.slice_uuid.uuid = request.slice_id.slice_uuid.uuid + ':subslice'
local_slice_request.slice_status.slice_status = request.slice_status.slice_status
for endpoint_id in domains_to_endpoints[local_domain_uuid]:
slice_endpoint_id = local_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.CopyFrom(endpoint_id)
# add endpoint connecting to remote domain
if local_domain_uuid == 'D1':
slice_endpoint_id = local_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
elif local_domain_uuid == 'D2':
slice_endpoint_id = local_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
local_slice_reply = slice_client.CreateSlice(local_slice_request)
if local_slice_reply != local_slice_request.slice_id: # pylint: disable=no-member
raise Exception('Local Slice creation failed. Wrong Slice Id was returned')
subslice_id = reply.slice_subslice_ids.add()
subslice_id.context_id.context_uuid.uuid = local_slice_request.slice_id.context_id.context_uuid.uuid
subslice_id.slice_uuid.uuid = local_slice_request.slice_id.slice_uuid.uuid
context_client.SetSlice(reply)
return reply.slice_id
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult: def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult:
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging
from typing import List, Optional, Tuple
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, INTERDOMAIN_TOPOLOGY_UUID
from common.proto.context_pb2 import ContextId, Device, EndPointId, Slice, SliceStatusEnum
from common.tools.context_queries.InterDomain import get_local_domain_devices
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
def compute_slice_owner(
context_client : ContextClient, traversed_domains : List[Tuple[str, Device, bool, List[EndPointId]]]
) -> Optional[str]:
traversed_domain_uuids = {traversed_domain[0] for traversed_domain in traversed_domains}
existing_topology_ids = context_client.ListTopologyIds(ContextId(**json_context_id(DEFAULT_CONTEXT_UUID)))
existing_topology_uuids = {
topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids
}
existing_topology_uuids.discard(DEFAULT_TOPOLOGY_UUID)
existing_topology_uuids.discard(INTERDOMAIN_TOPOLOGY_UUID)
candidate_owner_uuids = traversed_domain_uuids.intersection(existing_topology_uuids)
if len(candidate_owner_uuids) != 1:
data = {
'traversed_domain_uuids' : [td_uuid for td_uuid in traversed_domain_uuids ],
'existing_topology_uuids': [et_uuid for et_uuid in existing_topology_uuids],
'candidate_owner_uuids' : [co_uuid for co_uuid in candidate_owner_uuids ],
}
LOGGER.warning('Unable to identify slice owner: {:s}'.format(json.dumps(data)))
return None
return candidate_owner_uuids.pop()
def compose_slice(
context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], owner_uuid : Optional[str] = None
) -> Slice:
slice_ = Slice()
slice_.slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member
slice_.slice_id.slice_uuid.uuid = slice_uuid # pylint: disable=no-member
slice_.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_PLANNED # pylint: disable=no-member
if owner_uuid is not None:
slice_.slice_owner.owner_uuid.uuid = owner_uuid # pylint: disable=no-member
for endpoint_id in endpoint_ids:
slice_.slice_endpoint_ids.append(endpoint_id) # pylint: disable=no-member
return slice_
...@@ -37,12 +37,14 @@ def main(): ...@@ -37,12 +37,14 @@ def main():
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([ wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
]) ])
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.proto.context_pb2 import (
AuthenticationResult, Slice, SliceId, SliceStatus, SliceStatusEnum, TeraFlowController)
from common.proto.interdomain_pb2_grpc import InterdomainServiceServicer
#from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from interdomain.service.RemoteDomainClients import RemoteDomainClients
from slice.client.SliceClient import SliceClient
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'Interdomain'
METHOD_NAMES = ['RequestSlice', 'Authenticate', 'LookUpSlice', 'OrderSliceFromCatalog', 'CreateSliceAndAddToCatalog']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class InterdomainServiceServicerImpl(InterdomainServiceServicer):
def __init__(self, remote_domain_clients : RemoteDomainClients):
LOGGER.debug('Creating Servicer...')
self.remote_domain_clients = remote_domain_clients
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
context_client = ContextClient()
slice_client = SliceClient()
domains_to_endpoints = {}
local_domain_uuid = None
for slice_endpoint_id in request.slice_endpoint_ids:
device_uuid = slice_endpoint_id.device_id.device_uuid.uuid
domain_uuid = device_uuid.split('@')[1]
endpoints = domains_to_endpoints.setdefault(domain_uuid, [])
endpoints.append(slice_endpoint_id)
if local_domain_uuid is None: local_domain_uuid = domain_uuid
reply = Slice()
reply.CopyFrom(request)
# decompose remote slices
for domain_uuid, slice_endpoint_ids in domains_to_endpoints.items():
if domain_uuid == local_domain_uuid: continue
remote_slice_request = Slice()
remote_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid
remote_slice_request.slice_id.slice_uuid.uuid = \
request.slice_id.slice_uuid.uuid + ':subslice@' + local_domain_uuid
remote_slice_request.slice_status.slice_status = request.slice_status.slice_status
for endpoint_id in slice_endpoint_ids:
slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = endpoint_id.device_id.device_uuid.uuid
slice_endpoint_id.endpoint_uuid.uuid = endpoint_id.endpoint_uuid.uuid
# add endpoint connecting to remote domain
if domain_uuid == 'D1':
slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
elif domain_uuid == 'D2':
slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow')
remote_slice_reply = interdomain_client.LookUpSlice(remote_slice_request)
if remote_slice_reply == remote_slice_request.slice_id: # pylint: disable=no-member
# successful case
remote_slice = interdomain_client.OrderSliceFromCatalog(remote_slice_request)
if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE:
raise Exception('Remote Slice creation failed. Wrong Slice status returned')
else:
# not in catalog
remote_slice = interdomain_client.CreateSliceAndAddToCatalog(remote_slice_request)
if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE:
raise Exception('Remote Slice creation failed. Wrong Slice status returned')
#context_client.SetSlice(remote_slice)
#subslice_id = reply.slice_subslice_ids.add()
#subslice_id.CopyFrom(remote_slice.slice_id)
local_slice_request = Slice()
local_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid
local_slice_request.slice_id.slice_uuid.uuid = request.slice_id.slice_uuid.uuid + ':subslice'
local_slice_request.slice_status.slice_status = request.slice_status.slice_status
for endpoint_id in domains_to_endpoints[local_domain_uuid]:
slice_endpoint_id = local_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.CopyFrom(endpoint_id)
# add endpoint connecting to remote domain
if local_domain_uuid == 'D1':
slice_endpoint_id = local_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
elif local_domain_uuid == 'D2':
slice_endpoint_id = local_slice_request.slice_endpoint_ids.add()
slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2'
slice_endpoint_id.endpoint_uuid.uuid = '2/1'
local_slice_reply = slice_client.CreateSlice(local_slice_request)
if local_slice_reply != local_slice_request.slice_id: # pylint: disable=no-member
raise Exception('Local Slice creation failed. Wrong Slice Id was returned')
subslice_id = reply.slice_subslice_ids.add()
subslice_id.context_id.context_uuid.uuid = local_slice_request.slice_id.context_id.context_uuid.uuid
subslice_id.slice_uuid.uuid = local_slice_request.slice_id.slice_uuid.uuid
context_client.SetSlice(reply)
return reply.slice_id
@safe_and_metered_rpc_method(METRICS, LOGGER)
def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult:
auth_result = AuthenticationResult()
auth_result.context_id.CopyFrom(request.context_id) # pylint: disable=no-member
auth_result.authenticated = True
return auth_result
@safe_and_metered_rpc_method(METRICS, LOGGER)
def LookUpSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
try:
context_client = ContextClient()
slice_ = context_client.GetSlice(request.slice_id)
return slice_.slice_id
except grpc.RpcError:
#LOGGER.exception('Unable to get slice({:s})'.format(grpc_message_to_json_string(request.slice_id)))
return SliceId()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def OrderSliceFromCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice:
raise NotImplementedError('OrderSliceFromCatalog')
#return Slice()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice:
context_client = ContextClient()
slice_client = SliceClient()
reply = slice_client.CreateSlice(request)
if reply != request.slice_id: # pylint: disable=no-member
raise Exception('Slice creation failed. Wrong Slice Id was returned')
return context_client.GetSlice(request.slice_id)
...@@ -18,12 +18,12 @@ from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID ...@@ -18,12 +18,12 @@ from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID
from common.DeviceTypes import DeviceTypeEnum from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import ( from common.proto.context_pb2 import (
ContextId, Device, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, EndPoint) ContextId, Device, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, EndPoint)
from common.tools.context_queries.CheckType import (
device_type_is_datacenter, device_type_is_network, endpoint_type_is_border)
from common.tools.context_queries.Device import add_device_to_topology, get_existing_device_uuids
from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device, json_device_id from common.tools.object_factory.Device import json_device, json_device_id
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from .Tools import (
add_device_to_topology, device_type_is_datacenter, device_type_is_network, endpoint_type_is_border,
get_existing_device_uuids)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
......
...@@ -16,10 +16,10 @@ import copy, logging ...@@ -16,10 +16,10 @@ import copy, logging
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID from common.Constants import DEFAULT_CONTEXT_UUID, INTERDOMAIN_TOPOLOGY_UUID
from common.proto.context_pb2 import ContextId, EndPointId, Link, LinkId from common.proto.context_pb2 import ContextId, EndPointId, Link, LinkId
from common.tools.context_queries.Link import add_link_to_topology, get_existing_link_uuids
from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Link import json_link, json_link_id from common.tools.object_factory.Link import json_link, json_link_id
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from .Tools import add_link_to_topology, get_existing_link_uuids
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List, Set, Union
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import Context, ContextId, Device, Empty, Link, Topology, TopologyId
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Topology import json_topology, json_topology_id
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
def create_context(
context_client : ContextClient, context_uuid : str
) -> None:
existing_context_ids = context_client.ListContextIds(Empty())
existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids}
if context_uuid in existing_context_uuids: return
context_client.SetContext(Context(**json_context(context_uuid)))
def create_topology(
context_client : ContextClient, context_uuid : str, topology_uuid : str
) -> None:
context_id = ContextId(**json_context_id(context_uuid))
existing_topology_ids = context_client.ListTopologyIds(context_id)
existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids}
if topology_uuid in existing_topology_uuids: return
context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=context_id)))
def create_missing_topologies(
context_client : ContextClient, context_id : ContextId, topology_uuids : List[str]
) -> None:
# Find existing topologies within own context
existing_topology_ids = context_client.ListTopologyIds(context_id)
existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids}
# Create topologies within provided context
for topology_uuid in topology_uuids:
if topology_uuid in existing_topology_uuids: continue
grpc_topology = Topology(**json_topology(topology_uuid, context_id=context_id))
context_client.SetTopology(grpc_topology)
def get_existing_device_uuids(context_client : ContextClient) -> Set[str]:
existing_device_ids = context_client.ListDeviceIds(Empty())
existing_device_uuids = {device_id.device_uuid.uuid for device_id in existing_device_ids.device_ids}
return existing_device_uuids
def get_existing_link_uuids(context_client : ContextClient) -> Set[str]:
existing_link_ids = context_client.ListLinkIds(Empty())
existing_link_uuids = {link_id.link_uuid.uuid for link_id in existing_link_ids.link_ids}
return existing_link_uuids
def add_device_to_topology(
context_client : ContextClient, context_id : ContextId, topology_uuid : str, device_uuid : str
) -> bool:
topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id))
topology_ro = context_client.GetTopology(topology_id)
device_uuids = {device_id.device_uuid.uuid for device_id in topology_ro.device_ids}
if device_uuid in device_uuids: return False # already existed
topology_rw = Topology()
topology_rw.CopyFrom(topology_ro)
topology_rw.device_ids.add().device_uuid.uuid = device_uuid
context_client.SetTopology(topology_rw)
return True
def add_link_to_topology(
context_client : ContextClient, context_id : ContextId, topology_uuid : str, link_uuid : str
) -> bool:
topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id))
topology_ro = context_client.GetTopology(topology_id)
link_uuids = {link_id.link_uuid.uuid for link_id in topology_ro.link_ids}
if link_uuid in link_uuids: return False # already existed
topology_rw = Topology()
topology_rw.CopyFrom(topology_ro)
topology_rw.link_ids.add().link_uuid.uuid = link_uuid
context_client.SetTopology(topology_rw)
return True
def get_uuids_of_devices_in_topology(
context_client : ContextClient, context_id : ContextId, topology_uuid : str
) -> List[str]:
topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id))
topology = context_client.GetTopology(topology_id)
device_uuids = [device_id.device_uuid.uuid for device_id in topology.device_ids]
return device_uuids
def get_uuids_of_links_in_topology(
context_client : ContextClient, context_id : ContextId, topology_uuid : str
) -> List[str]:
topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=context_id))
topology = context_client.GetTopology(topology_id)
link_uuids = [link_id.link_uuid.uuid for link_id in topology.link_ids]
return link_uuids
def get_devices_in_topology(
context_client : ContextClient, context_id : ContextId, topology_uuid : str
) -> List[Device]:
device_uuids = get_uuids_of_devices_in_topology(context_client, context_id, topology_uuid)
all_devices = context_client.ListDevices(Empty())
devices_in_topology = list()
for device in all_devices.devices:
device_uuid = device.device_id.device_uuid.uuid
if device_uuid not in device_uuids: continue
devices_in_topology.append(device)
return devices_in_topology
def get_links_in_topology(
context_client : ContextClient, context_id : ContextId, topology_uuid : str
) -> List[Link]:
link_uuids = get_uuids_of_links_in_topology(context_client, context_id, topology_uuid)
all_links = context_client.ListLinks(Empty())
links_in_topology = list()
for link in all_links.links:
link_uuid = link.link_id.link_uuid.uuid
if link_uuid not in link_uuids: continue
links_in_topology.append(link)
return links_in_topology
def device_type_is_datacenter(device_type : Union[str, DeviceTypeEnum]) -> bool:
return device_type in {
DeviceTypeEnum.DATACENTER, DeviceTypeEnum.DATACENTER.value,
DeviceTypeEnum.EMULATED_DATACENTER, DeviceTypeEnum.EMULATED_DATACENTER.value
}
def device_type_is_network(device_type : Union[str, DeviceTypeEnum]) -> bool:
return device_type in {DeviceTypeEnum.NETWORK, DeviceTypeEnum.NETWORK.value}
def endpoint_type_is_border(endpoint_type : str) -> bool:
return str(endpoint_type).endswith('/border')
...@@ -19,6 +19,12 @@ from common.DeviceTypes import DeviceTypeEnum ...@@ -19,6 +19,12 @@ from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import ( from common.proto.context_pb2 import (
ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId,
TopologyEvent) TopologyEvent)
from common.tools.context_queries.CheckType import (
device_type_is_datacenter, device_type_is_network, endpoint_type_is_border)
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import get_devices_in_topology, get_uuids_of_devices_in_topology
from common.tools.context_queries.Link import get_links_in_topology
from common.tools.context_queries.Topology import create_missing_topologies
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id from common.tools.object_factory.Device import json_device_id
...@@ -29,9 +35,6 @@ from dlt.connector.client.DltConnectorClient import DltConnectorClient ...@@ -29,9 +35,6 @@ from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .AbstractDevice import AbstractDevice from .AbstractDevice import AbstractDevice
from .AbstractLink import AbstractLink from .AbstractLink import AbstractLink
from .DltRecordSender import DltRecordSender from .DltRecordSender import DltRecordSender
from .Tools import (
create_context, create_missing_topologies, device_type_is_datacenter, device_type_is_network,
endpoint_type_is_border, get_devices_in_topology, get_links_in_topology, get_uuids_of_devices_in_topology)
from .Types import EventTypes from .Types import EventTypes
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment