diff --git a/manifests/interdomainservice.yaml b/manifests/interdomainservice.yaml index 9be6032cfbb59cb580219ca71451be24dac93205..8926dcdafdea90ad7dea41eca854cbcb30853553 100644 --- a/manifests/interdomainservice.yaml +++ b/manifests/interdomainservice.yaml @@ -38,6 +38,8 @@ spec: value: "INFO" - name: TOPOLOGY_ABSTRACTOR value: "DISABLE" + - name: DLT_INTEGRATION + value: "DISABLE" readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:10010"] diff --git a/my_deploy.sh b/my_deploy.sh index 3d4572e3ea2bc4f161338f6300c12246743db73e..67ec8615e16f41fde24254316ae7a16171bd2e86 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -62,12 +62,13 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene # Uncomment to activate E2E Orchestrator #export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" -# Uncomment to activate DLT -export TFS_COMPONENTS="${TFS_COMPONENTS} interdomain dlt" -export KEY_DIRECTORY_PATH="src/dlt/gateway/keys/priv_sk" -export CERT_DIRECTORY_PATH="src/dlt/gateway/keys/cert.pem" -export TLS_CERT_PATH="src/dlt/gateway/keys/ca.crt" - +# Uncomment to activate DLT and Interdomain +#export TFS_COMPONENTS="${TFS_COMPONENTS} interdomain dlt" +#if [[ "$TFS_COMPONENTS" == *"dlt"* ]]; then +# export KEY_DIRECTORY_PATH="src/dlt/gateway/keys/priv_sk" +# export CERT_DIRECTORY_PATH="src/dlt/gateway/keys/cert.pem" +# export TLS_CERT_PATH="src/dlt/gateway/keys/ca.crt" +#fi # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" @@ -116,7 +117,7 @@ export CRDB_DATABASE="tfs" export CRDB_DEPLOY_MODE="single" # Disable flag for dropping database, if it exists. -export CRDB_DROP_DATABASE_IF_EXISTS="YES" +export CRDB_DROP_DATABASE_IF_EXISTS="" # Disable flag for re-deploying CockroachDB from scratch. export CRDB_REDEPLOY="" @@ -168,7 +169,7 @@ export QDB_TABLE_MONITORING_KPIS="tfs_monitoring_kpis" export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups" # Disable flag for dropping tables if they exist. -export QDB_DROP_TABLES_IF_EXIST="YES" +export QDB_DROP_TABLES_IF_EXIST="" # Disable flag for re-deploying QuestDB from scratch. export QDB_REDEPLOY="" diff --git a/src/interdomain/service/__main__.py b/src/interdomain/service/__main__.py index 4181fa73ab1db12bdc5c32b2241a19a2f61bdb5d..dc58603b2bff902f77d169956dd7d8526dcc4a85 100644 --- a/src/interdomain/service/__main__.py +++ b/src/interdomain/service/__main__.py @@ -19,7 +19,7 @@ from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables) from interdomain.Config import is_dlt_enabled -#from .topology_abstractor.TopologyAbstractor import TopologyAbstractor +from .topology_abstractor.TopologyAbstractor import TopologyAbstractor from .topology_abstractor.DltRecorder import DLTRecorder from .InterdomainService import InterdomainService from .RemoteDomainClients import RemoteDomainClients @@ -65,14 +65,13 @@ def main(): grpc_service.start() # Subscribe to Context Events - # topology_abstractor_enabled = is_topology_abstractor_enabled() - # if topology_abstractor_enabled: - # topology_abstractor = TopologyAbstractor() - # topology_abstractor.start() - - # Subscribe to Context Events - #dlt_enabled = is_dlt_enabled() #How to change the config? - dlt_enabled = True + topology_abstractor_enabled = is_topology_abstractor_enabled() + if topology_abstractor_enabled: + topology_abstractor = TopologyAbstractor() + topology_abstractor.start() + + # Subscribe to Context Events + dlt_enabled = is_dlt_enabled() if dlt_enabled: LOGGER.info('Starting DLT functionality...') dlt_recorder = DLTRecorder() @@ -82,8 +81,8 @@ def main(): while not terminate.wait(timeout=1.0): pass LOGGER.info('Terminating...') - # if topology_abstractor_enabled: - # topology_abstractor.stop() + if topology_abstractor_enabled: + topology_abstractor.stop() if dlt_enabled: dlt_recorder.stop() grpc_service.stop() diff --git a/src/interdomain/service/topology_abstractor/DltRecordSender.py b/src/interdomain/service/topology_abstractor/DltRecordSender.py index ae9fd440b6979c6423712ec48f35d73b98dcb1e8..363cda72ada524c229c606f5435e14fbb42a3cc7 100644 --- a/src/interdomain/service/topology_abstractor/DltRecordSender.py +++ b/src/interdomain/service/topology_abstractor/DltRecordSender.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import asyncio - - +import asyncio, logging from typing import Dict, List, Tuple from common.proto.context_pb2 import Device, Link, Service, Slice, TopologyId from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId @@ -25,42 +22,42 @@ from dlt.connector.client.DltConnectorClientAsync import DltConnectorClientAsync LOGGER = logging.getLogger(__name__) class DltRecordSender: - def __init__(self, context_client: ContextClient) -> None: + def __init__(self, context_client : ContextClient) -> None: self.context_client = context_client LOGGER.debug('Creating Servicer...') self.dlt_connector_client = DltConnectorClientAsync() LOGGER.debug('Servicer Created') - self.dlt_record_uuids: List[str] = list() - self.dlt_record_uuid_to_data: Dict[str, Tuple[TopologyId, object]] = dict() - + self.dlt_record_uuids : List[str] = list() + self.dlt_record_uuid_to_data : Dict[str, Tuple[TopologyId, object]] = dict() + async def initialize(self): await self.dlt_connector_client.connect() - def _add_record(self, record_uuid: str, data: Tuple[TopologyId, object]) -> None: + def _add_record(self, record_uuid : str, data : Tuple[TopologyId, object]) -> None: if record_uuid in self.dlt_record_uuid_to_data: return self.dlt_record_uuid_to_data[record_uuid] = data self.dlt_record_uuids.append(record_uuid) - def add_device(self, topology_id: TopologyId, device: Device) -> None: + def add_device(self, topology_id : TopologyId, device : Device) -> None: topology_uuid = topology_id.topology_uuid.uuid device_uuid = device.device_id.device_uuid.uuid record_uuid = '{:s}:device:{:s}'.format(topology_uuid, device_uuid) self._add_record(record_uuid, (topology_id, device)) - def add_link(self, topology_id: TopologyId, link: Link) -> None: + def add_link(self, topology_id : TopologyId, link : Link) -> None: topology_uuid = topology_id.topology_uuid.uuid link_uuid = link.link_id.link_uuid.uuid record_uuid = '{:s}:link:{:s}'.format(topology_uuid, link_uuid) self._add_record(record_uuid, (topology_id, link)) - def add_service(self, topology_id: TopologyId, service: Service) -> None: + def add_service(self, topology_id : TopologyId, service : Service) -> None: topology_uuid = topology_id.topology_uuid.uuid context_uuid = service.service_id.context_id.context_uuid.uuid service_uuid = service.service_id.service_uuid.uuid record_uuid = '{:s}:service:{:s}/{:s}'.format(topology_uuid, context_uuid, service_uuid) self._add_record(record_uuid, (topology_id, service)) - def add_slice(self, topology_id: TopologyId, slice_: Slice) -> None: + def add_slice(self, topology_id : TopologyId, slice_ : Slice) -> None: topology_uuid = topology_id.topology_uuid.uuid context_uuid = slice_.slice_id.context_id.context_uuid.uuid slice_uuid = slice_.slice_id.slice_uuid.uuid @@ -71,6 +68,7 @@ class DltRecordSender: if not self.dlt_connector_client: LOGGER.error('DLT Connector Client is None, cannot commit records.') return + tasks = [] # List to hold all the async tasks for dlt_record_uuid in self.dlt_record_uuids: @@ -108,4 +106,3 @@ class DltRecordSender: if tasks: await asyncio.gather(*tasks) # Run all the tasks concurrently - diff --git a/src/interdomain/service/topology_abstractor/DltRecorder.py b/src/interdomain/service/topology_abstractor/DltRecorder.py index ae869b7c0b4290b691f1b976e668b6d0fc9eab60..22c436363b009810815b1cf3fa011fd5cbbc6a13 100644 --- a/src/interdomain/service/topology_abstractor/DltRecorder.py +++ b/src/interdomain/service/topology_abstractor/DltRecorder.py @@ -14,8 +14,10 @@ import logging, threading, asyncio, time from typing import Dict, Optional -from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum -from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, LinkId, LinkEvent, TopologyId, TopologyEvent +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME +from common.proto.context_pb2 import ( + ContextEvent, ContextId, DeviceEvent, DeviceId, LinkId, LinkEvent, TopologyId, TopologyEvent +) from common.tools.context_queries.Context import create_context from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id @@ -44,7 +46,6 @@ class DLTRecorder(threading.Thread): self.update_event_queue = asyncio.Queue() self.remove_event_queue = asyncio.Queue() - def stop(self): self.terminate.set() @@ -57,10 +58,9 @@ class DLTRecorder(threading.Thread): #self.create_topologies() self.context_event_collector.start() - batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available last_task_time = time.time() - + while not self.terminate.is_set(): event = self.context_event_collector.get_event(timeout=0.1) if event: @@ -91,7 +91,7 @@ class DLTRecorder(threading.Thread): # Finally, process REMOVE events await self.process_queue(self.remove_event_queue) - async def process_queue(self, queue: asyncio.Queue): + async def process_queue(self, queue : asyncio.Queue): tasks = [] while not queue.empty(): event = await queue.get() @@ -106,7 +106,7 @@ class DLTRecorder(threading.Thread): except Exception as e: LOGGER.error(f"Error while processing tasks: {e}") - async def update_record(self, event: EventTypes) -> None: + async def update_record(self, event : EventTypes) -> None: dlt_record_sender = DltRecordSender(self.context_client) await dlt_record_sender.initialize() # Ensure DltRecordSender is initialized asynchronously LOGGER.debug('STARTING processing event: {:s}'.format(grpc_message_to_json_string(event))) @@ -135,7 +135,7 @@ class DLTRecorder(threading.Thread): LOGGER.debug('Finished processing event: {:s}'.format(grpc_message_to_json_string(event))) - def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: + def process_topology_event(self, event : TopologyEvent, dlt_record_sender : DltRecordSender) -> None: topology_id = event.topology_id topology_uuid = topology_id.topology_uuid.uuid context_id = topology_id.context_id @@ -167,7 +167,7 @@ class DLTRecorder(threading.Thread): args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event) LOGGER.warning(MSG.format(*args)) - def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]: + def find_topology_for_device(self, device_id : DeviceId) -> Optional[TopologyId]: for topology_uuid, topology_id in self.topology_cache.items(): details = self.context_client.GetTopologyDetails(topology_id) for device in details.devices: @@ -175,7 +175,7 @@ class DLTRecorder(threading.Thread): return topology_id return None - def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]: + def find_topology_for_link(self, link_id : LinkId) -> Optional[TopologyId]: for topology_uuid, topology_id in self.topology_cache.items(): details = self.context_client.GetTopologyDetails(topology_id) for link in details.links: @@ -183,16 +183,18 @@ class DLTRecorder(threading.Thread): return topology_id return None - def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None: + def process_device_event(self, event : DeviceEvent, dlt_record_sender : DltRecordSender) -> None: device_id = event.device_id device = self.context_client.GetDevice(device_id) topology_id = self.find_topology_for_device(device_id) if topology_id: - LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id))) - + LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format( + str(device.device_id.device_uuid.uuid), + grpc_message_to_json_string(device_id) + )) dlt_record_sender.add_device(topology_id, device) else: - LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}") + LOGGER.warning("Topology not found for device {:s}".format(str(device_id.device_uuid.uuid))) def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None: link_id = event.link_id @@ -201,5 +203,4 @@ class DLTRecorder(threading.Thread): if topology_id: dlt_record_sender.add_link(topology_id, link) else: - LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}") - + LOGGER.warning("Topology not found for link {:s}".format(str(link_id.link_uuid.uuid)))