Skip to content
Snippets Groups Projects
Commit d5d975d9 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Interdomain component:

- Migrated logic to Release 2.0 and used new common Interdomain methods.
- Improvements detection of remote domain settings and instantiation of interdomain clients.
- Updated Topology Abstractor to consider abstract device and link names and uuids.
- Improved logging messages of Topology Abstractor
- Corrected local topology retrieval in Topology Abstractor
- Implemented DeleteSlice RPC method
- Updated helper methods to improve detection of slice owner and store it as parameter of the slice.
- Added new unitary tests
- Added new dependency on service component's client
parent 38fd7f22
No related branches found
No related tags found
2 merge requests!142Release TeraFlowSDN 2.1,!119Migration of Interdomain component and OECC/PSC'22 test to Release 2
......@@ -68,7 +68,7 @@ COPY src/dlt/. dlt/
COPY src/interdomain/. interdomain/
#COPY src/monitoring/. monitoring/
COPY src/pathcomp/. pathcomp/
#COPY src/service/. service/
COPY src/service/. service/
COPY src/slice/. slice/
# Start the service
......
This diff is collapsed.
......@@ -27,12 +27,13 @@ from interdomain.client.InterdomainClient import InterdomainClient
LOGGER = logging.getLogger(__name__)
def get_domain_data(context_client : ContextClient, event : DeviceEvent) -> Optional[Tuple[str, str, int]]:
def get_domain_data(context_client : ContextClient, event : DeviceEvent) -> Optional[Tuple[str, str, str, int]]:
device_uuid = event.device_id.device_uuid.uuid
device = get_device(
context_client, device_uuid, include_endpoints=False,
include_components=False, include_config_rules=True)
if device.device_type != DeviceTypeEnum.NETWORK.value: return None
idc_domain_uuid = device_uuid
idc_domain_name = device.name
idc_domain_address = None
idc_domain_port = None
......@@ -45,7 +46,7 @@ def get_domain_data(context_client : ContextClient, event : DeviceEvent) -> Opti
idc_domain_port = int(config_rule.custom.resource_value)
if idc_domain_address is None: return None
if idc_domain_port is None: return None
return idc_domain_name, idc_domain_address, idc_domain_port
return idc_domain_uuid, idc_domain_name, idc_domain_address, idc_domain_port
class RemoteDomainClients(threading.Thread):
def __init__(self) -> None:
......@@ -67,21 +68,22 @@ class RemoteDomainClients(threading.Thread):
event = self.context_event_collector.get_event(timeout=0.1)
if event is None: continue
if not isinstance(event, DeviceEvent): continue
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
LOGGER.info('Processing DeviceEvent({:s})...'.format(grpc_message_to_json_string(event)))
domain_data = get_domain_data(self.context_client, event)
if domain_data is None: continue
domain_name, domain_address, domain_port = domain_data
domain_uuid, domain_name, domain_address, domain_port = domain_data
try:
self.add_peer(domain_name, domain_address, domain_port)
self.add_peer(domain_uuid, domain_name, domain_address, domain_port)
except: # pylint: disable=bare-except
MSG = 'Unable to connect to remote domain {:s} ({:s}:{:d})'
LOGGER.exception(MSG.format(domain_name, domain_address, domain_port))
MSG = 'Unable to connect to remote domain {:s} {:s} ({:s}:{:d})'
LOGGER.exception(MSG.format(domain_uuid, domain_name, domain_address, domain_port))
self.context_event_collector.stop()
self.context_client.close()
def add_peer(
self, domain_name : str, domain_address : str, domain_port : int, context_uuid : str = DEFAULT_CONTEXT_NAME
self, domain_uuid : str, domain_name : str, domain_address : str, domain_port : int,
context_uuid : str = DEFAULT_CONTEXT_NAME
) -> None:
request = TeraFlowController()
request.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member
......@@ -96,18 +98,22 @@ class RemoteDomainClients(threading.Thread):
if not reply.authenticated:
MSG = 'Authentication against {:s}:{:d} with Context({:s}) rejected'
# pylint: disable=broad-exception-raised
raise Exception(MSG.format(domain_address, domain_port, domain_name))
raise Exception(MSG.format(domain_address, domain_port, context_uuid))
with self.lock:
self.peer_domains[domain_uuid] = interdomain_client
self.peer_domains[domain_name] = interdomain_client
LOGGER.info('Added peer domain {:s} ({:s}:{:d})'.format(domain_name, domain_address, domain_port))
MSG = 'Added peer domain {:s} {:s} ({:s}:{:d})'
LOGGER.info(MSG.format(domain_uuid, domain_name, domain_address, domain_port))
def get_peer(self, domain_name : str) -> InterdomainClient:
def get_peer(self, domain_uuid_or_name : str) -> Optional[InterdomainClient]:
with self.lock:
LOGGER.warning('peers: {:s}'.format(str(self.peer_domains)))
return self.peer_domains.get(domain_name)
LOGGER.debug('domain_uuid_or_name: {:s}'.format(str(domain_uuid_or_name)))
LOGGER.debug('peers: {:s}'.format(str(self.peer_domains)))
return self.peer_domains.get(domain_uuid_or_name)
def remove_peer(self, domain_name : str) -> None:
def remove_peer(self, domain_uuid_or_name : str) -> None:
with self.lock:
self.peer_domains.pop(domain_name, None)
LOGGER.info('Removed peer domain {:s}'.format(domain_name))
LOGGER.debug('domain_uuid_or_name: {:s}'.format(str(domain_uuid_or_name)))
self.peer_domains.pop(domain_uuid_or_name, None)
LOGGER.info('Removed peer domain {:s}'.format(domain_uuid_or_name))
......@@ -13,10 +13,11 @@
# limitations under the License.
import json, logging
from typing import List, Optional, Tuple
from typing import List, Optional, Set
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import (
ConfigRule, Constraint, ContextId, Device, Empty, EndPointId, Slice, SliceStatusEnum)
ConfigRule, Constraint, ContextId, Empty, EndPointId, Slice, SliceStatusEnum)
from common.tools.context_queries.CheckType import device_type_is_network, endpoint_type_is_border
from common.tools.context_queries.InterDomain import get_local_device_uuids
from common.tools.grpc.ConfigRules import copy_config_rules
......@@ -28,27 +29,32 @@ from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
def compute_slice_owner(
context_client : ContextClient, traversed_domains : List[Tuple[str, Device, bool, List[EndPointId]]]
context_client : ContextClient, traversed_domain_uuids : Set[str]
) -> Optional[str]:
traversed_domain_uuids = {traversed_domain[0] for traversed_domain in traversed_domains}
existing_topologies = context_client.ListTopologies(ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)))
existing_topology_uuids_names = set()
domain_uuids_names = set()
DISCARD_TOPOLOGY_NAMES = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}
for topology in existing_topologies.topologies:
topology_uuid = topology.topology_id.topology_uuid.uuid
if topology_uuid in DISCARD_TOPOLOGY_NAMES: continue
topology_name = topology.name
if topology_name in DISCARD_TOPOLOGY_NAMES: continue
existing_topology_uuids_names.add(topology_uuid)
existing_topology_uuids_names.add(topology_name)
domain_uuids_names.add(topology_uuid)
domain_uuids_names.add(topology_name)
for topology in existing_topologies.topologies:
topology_details = context_client.GetTopologyDetails(topology.topology_id)
for device in topology_details.devices:
if device.device_type != DeviceTypeEnum.NETWORK.value: continue
domain_uuids_names.discard(device.device_id.device_uuid.uuid)
domain_uuids_names.discard(device.name)
candidate_owner_uuids = traversed_domain_uuids.intersection(existing_topology_uuids_names)
candidate_owner_uuids = traversed_domain_uuids.intersection(domain_uuids_names)
if len(candidate_owner_uuids) != 1:
data = {
'traversed_domain_uuids' : [td_uuid for td_uuid in traversed_domain_uuids ],
'existing_topology_uuids_names': [et_uuid for et_uuid in existing_topology_uuids_names],
'candidate_owner_uuids' : [co_uuid for co_uuid in candidate_owner_uuids ],
'traversed_domain_uuids': [td_uuid for td_uuid in traversed_domain_uuids],
'domain_uuids_names' : [et_uuid for et_uuid in domain_uuids_names ],
'candidate_owner_uuids' : [co_uuid for co_uuid in candidate_owner_uuids ],
}
LOGGER.warning('Unable to identify slice owner: {:s}'.format(json.dumps(data)))
return None
......@@ -56,17 +62,24 @@ def compute_slice_owner(
return candidate_owner_uuids.pop()
def compose_slice(
context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], constraints : List[Constraint] = [],
config_rules : List[ConfigRule] = [], owner_uuid : Optional[str] = None
context_uuid : str, slice_uuid : str, endpoint_ids : List[EndPointId], slice_name : Optional[str] = None,
constraints : List[Constraint] = [], config_rules : List[ConfigRule] = [], owner_uuid : Optional[str] = None,
owner_string : Optional[str] = None
) -> Slice:
slice_ = Slice()
slice_.slice_id.context_id.context_uuid.uuid = context_uuid # pylint: disable=no-member
slice_.slice_id.slice_uuid.uuid = slice_uuid # pylint: disable=no-member
slice_.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_PLANNED # pylint: disable=no-member
if slice_name is not None:
slice_.name = slice_name
if owner_uuid is not None:
slice_.slice_owner.owner_uuid.uuid = owner_uuid # pylint: disable=no-member
if owner_string is not None:
slice_.slice_owner.owner_string = owner_string # pylint: disable=no-member
if len(endpoint_ids) >= 2:
slice_.slice_endpoint_ids.add().CopyFrom(endpoint_ids[0]) # pylint: disable=no-member
slice_.slice_endpoint_ids.add().CopyFrom(endpoint_ids[-1]) # pylint: disable=no-member
......
......@@ -24,13 +24,15 @@ from common.tools.context_queries.Device import add_device_to_topology, get_exis
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device, json_device_id
from context.client.ContextClient import ContextClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
LOGGER = logging.getLogger(__name__)
class AbstractDevice:
def __init__(self, device_uuid : str, device_type : DeviceTypeEnum):
def __init__(self, device_uuid : str, device_name : str, device_type : DeviceTypeEnum):
self.__context_client = ContextClient()
self.__device_uuid : str = device_uuid
self.__device_name : str = device_name
self.__device_type : DeviceTypeEnum = device_type
self.__device : Optional[Device] = None
self.__device_id : Optional[DeviceId] = None
......@@ -41,9 +43,23 @@ class AbstractDevice:
# Dict[endpoint_uuid, device_uuid]
self.__abstract_endpoint_to_device : Dict[str, str] = dict()
def to_json(self) -> Dict:
return {
'device_uuid' : self.__device_uuid,
'device_name' : self.__device_name,
'device_type' : self.__device_type,
'device' : self.__device,
'device_id' : self.__device_id,
'device_endpoint_to_abstract' : self.__device_endpoint_to_abstract,
'abstract_endpoint_to_device' : self.__abstract_endpoint_to_device,
}
@property
def uuid(self) -> str: return self.__device_uuid
@property
def name(self) -> str: return self.__device_name
@property
def device_id(self) -> Optional[DeviceId]: return self.__device_id
......@@ -92,7 +108,7 @@ class AbstractDevice:
device = Device(**json_device(
device_uuid, self.__device_type.value, DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED,
endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED]
name=self.__device_name, endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED]
))
self.__context_client.SetDevice(device)
self.__device = device
......@@ -126,6 +142,14 @@ class AbstractDevice:
self.__abstract_endpoint_to_device\
.setdefault(endpoint_uuid, device_uuid)
def _update_endpoint_name(self, device_uuid : str, endpoint_uuid : str, endpoint_name : str) -> bool:
device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {})
interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_uuid)
interdomain_endpoint_name = interdomain_endpoint.name
if endpoint_name == interdomain_endpoint_name: return False
interdomain_endpoint.name = endpoint_name
return True
def _update_endpoint_type(self, device_uuid : str, endpoint_uuid : str, endpoint_type : str) -> bool:
device_endpoint_to_abstract = self.__device_endpoint_to_abstract.get(device_uuid, {})
interdomain_endpoint = device_endpoint_to_abstract.get(endpoint_uuid)
......@@ -134,16 +158,24 @@ class AbstractDevice:
interdomain_endpoint.endpoint_type = endpoint_type
return True
def _add_endpoint(self, device_uuid : str, endpoint_uuid : str, endpoint_type : str) -> EndPoint:
def _add_endpoint(
self, device_uuid : str, endpoint_uuid : str, endpoint_name : str, endpoint_type : str
) -> EndPoint:
interdomain_endpoint = self.__device.device_endpoints.add()
interdomain_endpoint.endpoint_id.topology_id.topology_uuid.uuid = INTERDOMAIN_TOPOLOGY_NAME
interdomain_endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
interdomain_endpoint.endpoint_id.device_id.CopyFrom(self.__device_id)
interdomain_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_uuid
interdomain_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_name
interdomain_endpoint.name = endpoint_name
interdomain_endpoint.endpoint_type = endpoint_type
uuids = endpoint_get_uuid(interdomain_endpoint.endpoint_id, endpoint_name=endpoint_name, allow_random=False)
_, _, interdomain_endpoint_uuid = uuids
self.__device_endpoint_to_abstract\
.setdefault(device_uuid, {}).setdefault(endpoint_uuid, interdomain_endpoint)
self.__abstract_endpoint_to_device\
.setdefault(endpoint_uuid, device_uuid)
.setdefault(interdomain_endpoint_uuid, device_uuid)
return interdomain_endpoint
......@@ -160,7 +192,7 @@ class AbstractDevice:
device_uuid = device.device_id.device_uuid.uuid
device_border_endpoint_uuids = {
endpoint.endpoint_id.endpoint_uuid.uuid : endpoint.endpoint_type
endpoint.endpoint_id.endpoint_uuid.uuid : (endpoint.name, endpoint.endpoint_type)
for endpoint in device.device_endpoints
if endpoint_type_is_border(endpoint.endpoint_type)
}
......@@ -177,14 +209,15 @@ class AbstractDevice:
updated = True
# for each border endpoint in device that is not in abstract device; add to abstract device
for endpoint_uuid,endpoint_type in device_border_endpoint_uuids.items():
# if already added; just check endpoint type is not modified
for endpoint_uuid,(endpoint_name, endpoint_type) in device_border_endpoint_uuids.items():
# if already added; just check endpoint name and type are not modified
if endpoint_uuid in self.__abstract_endpoint_to_device:
updated = updated or self._update_endpoint_name(device_uuid, endpoint_uuid, endpoint_name)
updated = updated or self._update_endpoint_type(device_uuid, endpoint_uuid, endpoint_type)
continue
# otherwise, add it to the abstract device
self._add_endpoint(device_uuid, endpoint_uuid, endpoint_type)
self._add_endpoint(device_uuid, endpoint_uuid, endpoint_name, endpoint_type)
updated = True
return updated
......@@ -33,6 +33,14 @@ class AbstractLink:
# Dict[(device_uuid, endpoint_uuid), abstract EndPointId]
self.__device_endpoint_to_abstract : Dict[Tuple[str, str], EndPointId] = dict()
def to_json(self) -> Dict:
return {
'link_uuid' : self.__link_uuid,
'link' : self.__link,
'link_id' : self.__link_id,
'device_endpoint_to_abstract' : self.__device_endpoint_to_abstract,
}
@property
def uuid(self) -> str: return self.__link_uuid
......@@ -95,6 +103,8 @@ class AbstractLink:
def _add_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None:
endpoint_id = self.__link.link_endpoint_ids.add()
endpoint_id.topology_id.topology_uuid.uuid = INTERDOMAIN_TOPOLOGY_NAME
endpoint_id.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
endpoint_id.device_id.device_uuid.uuid = device_uuid
endpoint_id.endpoint_uuid.uuid = endpoint_uuid
self.__device_endpoint_to_abstract.setdefault((device_uuid, endpoint_uuid), endpoint_id)
......
......@@ -16,15 +16,16 @@ import logging, threading
from typing import Dict, Optional, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
from common.DeviceTypes import DeviceTypeEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_missing_environment_variables, get_env_var_name
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, find_environment_variables, get_env_var_name)
from common.proto.context_pb2 import (
ContextEvent, ContextId, Device, DeviceEvent, DeviceId, EndPoint, EndPointId, Link, LinkEvent, TopologyId,
TopologyEvent)
from common.tools.context_queries.CheckType import (
device_type_is_datacenter, device_type_is_network, endpoint_type_is_border)
from common.tools.context_queries.Context import create_context
from common.tools.context_queries.Device import get_devices_in_topology, get_uuids_of_devices_in_topology
from common.tools.context_queries.Link import get_links_in_topology
from common.tools.context_queries.Device import get_uuids_of_devices_in_topology #, get_devices_in_topology
#from common.tools.context_queries.Link import get_links_in_topology
from common.tools.context_queries.Topology import create_missing_topologies
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
......@@ -99,13 +100,13 @@ class TopologyAbstractor(threading.Thread):
# return False
def _get_or_create_abstract_device(
self, device_uuid : str, device_type : DeviceTypeEnum, dlt_record_sender : DltRecordSender,
self, device_uuid : str, device_name : str, device_type : DeviceTypeEnum, dlt_record_sender : DltRecordSender,
abstract_topology_id : TopologyId
) -> AbstractDevice:
abstract_device = self.abstract_devices.get(device_uuid)
changed = False
if abstract_device is None:
abstract_device = AbstractDevice(device_uuid, device_type)
abstract_device = AbstractDevice(device_uuid, device_name, device_type)
changed = abstract_device.initialize()
if changed: dlt_record_sender.add_device(abstract_topology_id, abstract_device.device)
self.abstract_devices[device_uuid] = abstract_device
......@@ -117,16 +118,17 @@ class TopologyAbstractor(threading.Thread):
abstract_device_uuid : Optional[str] = None
) -> None:
device_uuid = device.device_id.device_uuid.uuid
device_name = device.name
if device_type_is_datacenter(device.device_type):
abstract_device_uuid = device_uuid
abstract_device = self._get_or_create_abstract_device(
device_uuid, DeviceTypeEnum.EMULATED_DATACENTER, dlt_record_sender, abstract_topology_id)
device_uuid, device_name, DeviceTypeEnum.EMULATED_DATACENTER, dlt_record_sender, abstract_topology_id)
elif device_type_is_network(device.device_type):
LOGGER.warning('device_type is network; not implemented')
return
else:
abstract_device = self._get_or_create_abstract_device(
abstract_device_uuid, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id)
abstract_device_uuid, None, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id)
self.real_to_abstract_device_uuid[device_uuid] = abstract_device_uuid
changed = abstract_device.update_endpoints(device)
if changed: dlt_record_sender.add_device(abstract_topology_id, abstract_device.device)
......@@ -224,11 +226,11 @@ class TopologyAbstractor(threading.Thread):
if changed: dlt_record_sender.add_link(INTERDOMAIN_TOPOLOGY_ID, abstract_link.link)
def update_abstraction(self, event : EventTypes) -> None:
missing_env_vars = find_missing_environment_variables([
env_vars = find_environment_variables([
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
if len(missing_env_vars) == 0:
if len(env_vars) == 2:
# DLT available
dlt_connector_client = DltConnectorClient()
dlt_connector_client.connect()
......@@ -238,41 +240,55 @@ class TopologyAbstractor(threading.Thread):
dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)
if isinstance(event, ContextEvent):
LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event)))
LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
LOGGER.warning('Ignoring ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
elif isinstance(event, TopologyEvent):
LOGGER.debug('Processing TopologyEvent({:s})'.format(grpc_message_to_json_string(event)))
topology_id = event.topology_id
topology_uuid = topology_id.topology_uuid.uuid
context_id = topology_id.context_id
context_uuid = context_id.context_uuid.uuid
topology_uuids = {DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME}
if (context_uuid == DEFAULT_CONTEXT_NAME) and (topology_uuid not in topology_uuids):
context = self.context_client.GetContext(context_id)
context_name = context.name
topology_details = self.context_client.GetTopologyDetails(topology_id)
topology_name = topology_details.name
if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \
(topology_uuid not in topology_uuids) and (topology_name not in topology_uuids):
abstract_topology_id = TopologyId(**json_topology_id(topology_uuid, context_id=ADMIN_CONTEXT_ID))
self._get_or_create_abstract_device(
topology_uuid, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id)
topology_uuid, topology_name, DeviceTypeEnum.NETWORK, dlt_record_sender, abstract_topology_id)
devices = get_devices_in_topology(self.context_client, context_id, topology_uuid)
for device in devices:
#devices = get_devices_in_topology(self.context_client, context_id, topology_uuid)
for device in topology_details.devices:
self._update_abstract_device(
device, dlt_record_sender, abstract_topology_id, abstract_device_uuid=topology_uuid)
links = get_links_in_topology(self.context_client, context_id, topology_uuid)
for link in links:
#links = get_links_in_topology(self.context_client, context_id, topology_uuid)
for link in topology_details.links:
self._update_abstract_link(link, dlt_record_sender, abstract_topology_id)
for device in devices:
for device in topology_details.devices:
self._infer_abstract_links(device, dlt_record_sender)
else:
LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event)))
MSG = 'Ignoring ({:s}/{:s})({:s}/{:s}) TopologyEvent({:s})'
args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event)
LOGGER.warning(MSG.format(*args))
elif isinstance(event, DeviceEvent):
LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event)))
device_id = event.device_id
device_uuid = device_id.device_uuid.uuid
abstract_device_uuid = self.real_to_abstract_device_uuid.get(device_uuid)
device = self.context_client.GetDevice(device_id)
if abstract_device_uuid is None:
LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event)))
LOGGER.warning('Ignoring DeviceEvent({:s})'.format(grpc_message_to_json_string(event)))
else:
abstract_topology_id = self.abstract_device_to_topology_id[abstract_device_uuid]
self._update_abstract_device(
......@@ -281,11 +297,12 @@ class TopologyAbstractor(threading.Thread):
self._infer_abstract_links(device, dlt_record_sender)
elif isinstance(event, LinkEvent):
LOGGER.debug('Processing LinkEvent({:s})'.format(grpc_message_to_json_string(event)))
link_id = event.link_id
link_uuid = link_id.link_uuid.uuid
abstract_link_uuid = self.real_to_abstract_link_uuid.get(link_uuid)
if abstract_link_uuid is None:
LOGGER.warning('Ignoring Event({:s})'.format(grpc_message_to_json_string(event)))
LOGGER.warning('Ignoring LinkEvent({:s})'.format(grpc_message_to_json_string(event)))
else:
abstract_topology_id = self.abstract_link_to_topology_id[abstract_link_uuid]
link = self.context_client.GetLink(link_id)
......
......@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#import logging, grpc
#import os
#import sqlite3
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, pytest
from typing import Dict, List, Tuple
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import EndPointId
from common.proto.pathcomp_pb2 import PathCompRequest
from common.tools.context_queries.Device import get_device
from common.tools.context_queries.InterDomain import get_device_to_domain_map, get_local_device_uuids
from common.tools.grpc.Tools import grpc_message_list_to_json, grpc_message_list_to_json_string, grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
@pytest.fixture(scope='session')
def context_client():
_client = ContextClient()
yield _client
_client.close()
@pytest.fixture(scope='session')
def pathcomp_client():
_client = PathCompClient()
yield _client
_client.close()
def test_interdomain_topology_abstractor(
context_client : ContextClient, # pylint: disable=redefined-outer-name
pathcomp_client : PathCompClient, # pylint: disable=redefined-outer-name
) -> None:
pathcomp_req = PathCompRequest(**{
"services": [
{"name": "", "service_constraints": [{"sla_capacity": {"capacity_gbps": 10.0}}, {"sla_latency": {"e2e_latency_ms": 100.0}}], "service_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "cda90d2f-e7b0-5837-8f2e-2fb29dd9b367"}}, "endpoint_uuid": {"uuid": "37ab67ef-0064-54e3-ae9b-d40100953834"}, "topology_id": {"context_id": {"context_uuid": {"uuid": "43813baf-195e-5da6-af20-b3d0922e71a7"}}, "topology_uuid": {"uuid": "c76135e3-24a8-5e92-9bed-c3c9139359c8"}}},
{"device_id": {"device_uuid": {"uuid": "800d5bd4-a7a3-5a66-82ab-d399767ca3d8"}}, "endpoint_uuid": {"uuid": "97f57787-cfec-5315-9718-7e850905f11a"}, "topology_id": {"context_id": {"context_uuid": {"uuid": "43813baf-195e-5da6-af20-b3d0922e71a7"}}, "topology_uuid": {"uuid": "c76135e3-24a8-5e92-9bed-c3c9139359c8"}}}
], "service_id": {"context_id": {"context_uuid": {"uuid": "43813baf-195e-5da6-af20-b3d0922e71a7"}}, "service_uuid": {"uuid": "77277b43-f9cd-5e01-a3e7-6c5fa4577137"}}, "service_type": "SERVICETYPE_L2NM"}
],
"shortest_path": {}
})
pathcomp_req_svc = pathcomp_req.services[0]
pathcomp_rep = pathcomp_client.Compute(pathcomp_req)
LOGGER.warning('pathcomp_rep = {:s}'.format(grpc_message_to_json_string(pathcomp_rep)))
num_services = len(pathcomp_rep.services)
if num_services == 0:
raise Exception('No services received : {:s}'.format(grpc_message_to_json_string(pathcomp_rep)))
num_connections = len(pathcomp_rep.connections)
if num_connections == 0:
raise Exception('No connections received : {:s}'.format(grpc_message_to_json_string(pathcomp_rep)))
local_device_uuids = get_local_device_uuids(context_client)
LOGGER.warning('local_device_uuids={:s}'.format(str(local_device_uuids)))
device_to_domain_map = get_device_to_domain_map(context_client)
LOGGER.warning('device_to_domain_map={:s}'.format(str(device_to_domain_map)))
local_slices : List[List[EndPointId]] = list()
remote_slices : List[List[EndPointId]] = list()
req_service_uuid = pathcomp_req_svc.service_id.service_uuid.uuid
for service in pathcomp_rep.services:
service_uuid = service.service_id.service_uuid.uuid
if service_uuid == req_service_uuid: continue # main synthetic service; we don't care
device_uuids = {
endpoint_id.device_id.device_uuid.uuid
for endpoint_id in service.service_endpoint_ids
}
local_domain_uuids = set()
remote_domain_uuids = set()
for device_uuid in device_uuids:
if device_uuid in local_device_uuids:
domain_uuid = device_to_domain_map.get(device_uuid)
if domain_uuid is None:
raise Exception('Unable to map device({:s}) to a domain'.format(str(device_uuid)))
local_domain_uuids.add(domain_uuid)
else:
device = get_device(
context_client, device_uuid, include_endpoints=True, include_config_rules=False,
include_components=False)
if device is None: raise Exception('Device({:s}) not found'.format(str(device_uuid)))
device_type = DeviceTypeEnum._value2member_map_.get(device.device_type)
is_remote = device_type == DeviceTypeEnum.NETWORK
if not is_remote:
MSG = 'Weird device({:s}) is not local and not network'
raise Exception(MSG.format(grpc_message_to_json_string(device)))
remote_domain_uuids.add(device_uuid)
is_local = len(local_domain_uuids) > 0
is_remote = len(remote_domain_uuids) > 0
if is_local == is_remote:
MSG = 'Weird service combines local and remote devices: {:s}'
raise Exception(MSG.format(grpc_message_to_json_string(service)))
elif is_local:
local_slices.append(service.service_endpoint_ids)
else:
remote_slices.append(service.service_endpoint_ids)
str_local_slices = [grpc_message_list_to_json(endpoint_ids) for endpoint_ids in local_slices]
LOGGER.warning('local_slices={:s}'.format(str(str_local_slices)))
str_remote_slices = [grpc_message_list_to_json(endpoint_ids) for endpoint_ids in remote_slices]
LOGGER.warning('remote_slices={:s}'.format(str(str_remote_slices)))
raise Exception()
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, pytest, time
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import ContextId, Empty
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceId
from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from interdomain.service.topology_abstractor.TopologyAbstractor import TopologyAbstractor
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
@pytest.fixture(scope='session')
def context_client():
_client = ContextClient()
yield _client
_client.close()
@pytest.fixture(scope='session')
def device_client():
_client = DeviceClient()
yield _client
_client.close()
@pytest.fixture(scope='session')
def topology_abstractor():
_topology_abstractor = TopologyAbstractor()
_topology_abstractor.start()
yield _topology_abstractor
_topology_abstractor.stop()
_topology_abstractor.join()
def test_pre_cleanup_scenario(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
) -> None:
for link_id in context_client.ListLinkIds(Empty()).link_ids: context_client.RemoveLink(link_id)
for device_id in context_client.ListDeviceIds(Empty()).device_ids: device_client.DeleteDevice(device_id)
contexts = context_client.ListContexts(Empty())
for context in contexts.contexts:
assert len(context.slice_ids) == 0, 'Found Slices: {:s}'.format(grpc_message_to_json_string(context))
assert len(context.service_ids) == 0, 'Found Services: {:s}'.format(grpc_message_to_json_string(context))
for topology_id in context.topology_ids: context_client.RemoveTopology(topology_id)
context_client.RemoveContext(context.context_id)
DESCRIPTOR_FILE = 'oeccpsc22/descriptors/domain1.json'
#DESCRIPTOR_FILE = 'oeccpsc22/descriptors/domain2.json'
def test_interdomain_topology_abstractor(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
topology_abstractor : TopologyAbstractor, # pylint: disable=redefined-outer-name
) -> None:
#validate_empty_scenario(context_client)
time.sleep(3)
descriptor_loader = DescriptorLoader(
descriptors_file=DESCRIPTOR_FILE, context_client=context_client, device_client=device_client)
results = descriptor_loader.process()
check_descriptor_load_results(results, descriptor_loader)
#descriptor_loader.validate()
time.sleep(3)
LOGGER.warning('real_to_abstract_device_uuid={:s}'.format(str(topology_abstractor.real_to_abstract_device_uuid)))
LOGGER.warning('real_to_abstract_link_uuid={:s}'.format(str(topology_abstractor.real_to_abstract_link_uuid)))
LOGGER.warning('abstract_device_to_topology_id={:s}'.format(str(topology_abstractor.abstract_device_to_topology_id)))
LOGGER.warning('abstract_link_to_topology_id={:s}'.format(str(topology_abstractor.abstract_link_to_topology_id)))
LOGGER.warning('abstract_devices={:s}'.format(str({
k:v.to_json()
for k,v in topology_abstractor.abstract_devices.items()
})))
LOGGER.warning('abstract_links={:s}'.format(str({
k:v.to_json()
for k,v in topology_abstractor.abstract_links.items()
})))
raise Exception()
#def test_post_cleanup_scenario(
# context_client : ContextClient, # pylint: disable=redefined-outer-name
# device_client : DeviceClient, # pylint: disable=redefined-outer-name
#) -> None:
# test_pre_cleanup_scenario(context_client, device_client)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment