Skip to content
Snippets Groups Projects
Commit 430382ca authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Interdomain component:

- removed unneeded code in Dockerfile
- created first partial version of TopologyAbstractor
parent 416038a7
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!24Integrate NFV-SDN'22 demo
......@@ -63,10 +63,11 @@ RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/context/. context/
COPY src/device/. device/
#COPY src/device/. device/
COPY src/dlt/. dlt/
COPY src/interdomain/. interdomain/
COPY src/monitoring/. monitoring/
COPY src/service/. service/
#COPY src/monitoring/. monitoring/
#COPY src/service/. service/
COPY src/slice/. slice/
# Start the service
......
......@@ -18,6 +18,7 @@ from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
get_service_port_grpc, wait_for_environment_variables)
from .topology_abstractor.TopologyAbstractor import TopologyAbstractor
from .InterdomainService import InterdomainService
from .RemoteDomainClients import RemoteDomainClients
......@@ -40,6 +41,8 @@ def main():
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
......@@ -58,14 +61,19 @@ def main():
grpc_service = InterdomainService(remote_domain_clients)
grpc_service.start()
# Subscribe to Context Events
topology_abstractor = TopologyAbstractor()
topology_abstractor.start()
# TODO: improve with configuration the definition of the remote peers
interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc)
#interdomain_service_port_grpc = get_service_port_grpc(ServiceNameEnum.INTERDOMAIN)
#remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', interdomain_service_port_grpc)
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=0.1): pass
LOGGER.info('Terminating...')
topology_abstractor.stop()
grpc_service.stop()
LOGGER.info('Bye')
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import logging, threading
from typing import List, Optional, Union
from common.Constants import AGGREGATED_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID
from common.proto.context_pb2 import (
ConnectionEvent, ContextEvent, ContextId, DeviceEvent, DeviceId, LinkEvent, ServiceEvent, ServiceId,
SliceEvent, SliceId, TopologyEvent)
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .tools.ContextMethods import create_abstracted_device_if_not_exists, create_interdomain_entities
LOGGER = logging.getLogger(__name__)
DltRecordIdTypes = Union[DeviceId, SliceId, ServiceId]
EventTypes = Union[
ContextEvent, TopologyEvent, DeviceEvent, LinkEvent, ServiceEvent, SliceEvent, ConnectionEvent
]
class TopologyAbstractor(threading.Thread):
def __init__(self) -> None:
super().__init__(daemon=True)
self.terminate = threading.Event()
self.context_client = ContextClient()
self.dlt_connector_client = DltConnectorClient()
self.context_event_collector = EventsCollector(self.context_client)
self.own_context_id : Optional[ContextId] = None
self.own_abstract_device : Optional[ContextId] = None
def stop(self):
self.terminate.set()
def run(self) -> None:
self.context_client.connect()
self.dlt_connector_client.connect()
self.context_event_collector.start()
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event is None: continue
if self.ignore_event(event): continue
# TODO: filter events resulting from abstraction computation
# TODO: filter events resulting from updating remote abstractions
LOGGER.info('Processing Event({:s})...'.format(str(event)))
dlt_records = self.update_abstraction(event)
self.send_dlt_records(dlt_records)
self.context_event_collector.stop()
self.context_client.close()
self.dlt_connector_client.close()
def ignore_event(self, event : EventTypes) -> List[DltRecordIdTypes]:
if isinstance(event, ContextEvent):
context_uuid = event.context_id.context_uuid.uuid
if self.own_context_id is None: return False
own_context_uuid = self.own_context_id.context_uuid.uuid
return context_uuid == own_context_uuid
elif isinstance(event, TopologyEvent):
context_uuid = event.topology_id.context_id.context_uuid.uuid
if self.own_context_id is None: return False
own_context_uuid = self.own_context_id.context_uuid.uuid
if context_uuid != own_context_uuid: return True
topology_uuid = event.topology_id.topology_uuid.uuid
if topology_uuid in {DOMAINS_TOPOLOGY_UUID, AGGREGATED_TOPOLOGY_UUID}: return True
return False
def send_dlt_records(self, dlt_records : Union[DltRecordIdTypes, List[DltRecordIdTypes]]) -> None:
for dlt_record_id in dlt_records:
if isinstance(dlt_record_id, DeviceId):
self.dlt_connector_client.RecordDevice(dlt_record_id)
elif isinstance(dlt_record_id, ServiceId):
self.dlt_connector_client.RecordService(dlt_record_id)
elif isinstance(dlt_record_id, SliceId):
self.dlt_connector_client.RecordSlice(dlt_record_id)
else:
LOGGER.error('Unsupported Record({:s})'.format(str(dlt_record_id)))
def update_abstraction(self, event : Optional[EventTypes] = None) -> List[DltRecordIdTypes]:
dlt_record_ids_with_changes = []
if self.own_context_id is None:
self.own_context_id = create_interdomain_entities(self.context_client)
if self.own_abstract_device is None:
self.own_abstract_device = create_abstracted_device_if_not_exists(self.context_client, self.own_context_id)
dlt_record_ids_with_changes.append(self.own_abstract_device.device_id)
if event is None:
# TODO: identify initial status from topology and update endpoints accordingly
pass
else:
# TODO: identify changes from event and update endpoints accordingly
pass
return dlt_record_ids_with_changes
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from common.Constants import (
AGGREGATED_TOPOLOGY_UUID, DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID)
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import (
Context, ContextId, Device, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, Empty, Topology, TopologyId)
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Device import json_device, json_device_id
from common.tools.object_factory.Topology import json_topology, json_topology_id
from context.client.ContextClient import ContextClient
def create_interdomain_entities(context_client : ContextClient) -> ContextId:
existing_context_ids = context_client.ListContextIds(Empty())
existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids}
# Detect local context name (will be used as abstracted device name); exclude DEFAULT_CONTEXT_UUID
existing_non_admin_context_uuids = copy.deepcopy(existing_context_uuids)
existing_non_admin_context_uuids.discard(DEFAULT_CONTEXT_UUID)
if len(existing_non_admin_context_uuids) != 1:
MSG = 'Unable to identify own domain name. Existing Contexts({:s})'
raise Exception(MSG.format(str(existing_context_uuids)))
own_domain_uuid = existing_non_admin_context_uuids.pop()
own_context_id = ContextId(**json_context_id(own_domain_uuid))
#if DEFAULT_CONTEXT_UUID not in existing_context_uuids:
# context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_UUID)))
#admin_context_id = ContextId(**json_context_id(DEFAULT_CONTEXT_UUID))
# Create topologies "admin", "domains", and "aggregated"
existing_topology_ids = context_client.ListTopologyIds(own_context_id)
existing_topology_uuids = {topology_id.topology_uuid.uuid for topology_id in existing_topology_ids.topology_ids}
topology_uuids = [DEFAULT_TOPOLOGY_UUID, DOMAINS_TOPOLOGY_UUID, AGGREGATED_TOPOLOGY_UUID]
for topology_uuid in topology_uuids:
if topology_uuid in existing_topology_uuids: continue
context_client.SetTopology(Topology(**json_topology(topology_uuid, context_id=own_context_id)))
return own_context_id
def create_abstracted_device_if_not_exists(context_client : ContextClient, own_context_id : ContextId) -> Device:
own_domain_uuid = own_context_id.context_uuid.uuid
# Create device representing abstracted local domain
existing_device_ids = context_client.ListDeviceIds(Empty())
existing_device_uuids = {device_id.device_uuid.uuid for device_id in existing_device_ids.device_ids}
own_abstract_device_id = DeviceId(**json_device_id(own_domain_uuid))
if own_domain_uuid in existing_device_uuids:
own_abstract_device = context_client.GetDevice(own_abstract_device_id)
else:
own_abstracted_device_uuid = own_domain_uuid
own_abstract_device = Device(**json_device(
own_abstracted_device_uuid, DeviceTypeEnum.NETWORK.value,
DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED,
endpoints=[], config_rules=[], drivers=[DeviceDriverEnum.DEVICEDRIVER_UNDEFINED]
))
context_client.SetDevice(own_abstract_device)
# Add own abstracted device to topologies ["domains"]
topology_uuids = [DOMAINS_TOPOLOGY_UUID]
for topology_uuid in topology_uuids:
topology_id = TopologyId(**json_topology_id(topology_uuid, own_context_id))
topology_ro = context_client.GetTopology(topology_id)
device_uuids = {device_id.device_uuid.uuid for device_id in topology_ro.device_ids}
if own_abstracted_device_uuid in device_uuids: continue
topology_rw = Topology()
topology_rw.CopyFrom(topology_ro)
topology_rw.device_ids.add().device_uuid.uuid = own_abstracted_device_uuid
context_client.SetTopology(topology_rw)
return own_abstract_device
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment