Skip to content
Snippets Groups Projects
Commit 37977f22 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Pre-merge code cleanup

parent 51765439
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!259Resolve "(CTTC) Replace DLT Gateway functionality with an opensource and Hyper Ledger v2.4+ compliant version"
......@@ -38,6 +38,8 @@ spec:
value: "INFO"
- name: TOPOLOGY_ABSTRACTOR
value: "DISABLE"
- name: DLT_INTEGRATION
value: "DISABLE"
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10010"]
......
......@@ -62,12 +62,13 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene
# Uncomment to activate E2E Orchestrator
#export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator"
# Uncomment to activate DLT
export TFS_COMPONENTS="${TFS_COMPONENTS} interdomain dlt"
export KEY_DIRECTORY_PATH="src/dlt/gateway/keys/priv_sk"
export CERT_DIRECTORY_PATH="src/dlt/gateway/keys/cert.pem"
export TLS_CERT_PATH="src/dlt/gateway/keys/ca.crt"
# Uncomment to activate DLT and Interdomain
#export TFS_COMPONENTS="${TFS_COMPONENTS} interdomain dlt"
#if [[ "$TFS_COMPONENTS" == *"dlt"* ]]; then
# export KEY_DIRECTORY_PATH="src/dlt/gateway/keys/priv_sk"
# export CERT_DIRECTORY_PATH="src/dlt/gateway/keys/cert.pem"
# export TLS_CERT_PATH="src/dlt/gateway/keys/ca.crt"
#fi
# Set the tag you want to use for your images.
export TFS_IMAGE_TAG="dev"
......@@ -116,7 +117,7 @@ export CRDB_DATABASE="tfs"
export CRDB_DEPLOY_MODE="single"
# Disable flag for dropping database, if it exists.
export CRDB_DROP_DATABASE_IF_EXISTS="YES"
export CRDB_DROP_DATABASE_IF_EXISTS=""
# Disable flag for re-deploying CockroachDB from scratch.
export CRDB_REDEPLOY=""
......@@ -168,7 +169,7 @@ export QDB_TABLE_MONITORING_KPIS="tfs_monitoring_kpis"
export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups"
# Disable flag for dropping tables if they exist.
export QDB_DROP_TABLES_IF_EXIST="YES"
export QDB_DROP_TABLES_IF_EXIST=""
# Disable flag for re-deploying QuestDB from scratch.
export QDB_REDEPLOY=""
......
......@@ -19,7 +19,7 @@ from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables)
from interdomain.Config import is_dlt_enabled
#from .topology_abstractor.TopologyAbstractor import TopologyAbstractor
from .topology_abstractor.TopologyAbstractor import TopologyAbstractor
from .topology_abstractor.DltRecorder import DLTRecorder
from .InterdomainService import InterdomainService
from .RemoteDomainClients import RemoteDomainClients
......@@ -65,14 +65,13 @@ def main():
grpc_service.start()
# Subscribe to Context Events
# topology_abstractor_enabled = is_topology_abstractor_enabled()
# if topology_abstractor_enabled:
# topology_abstractor = TopologyAbstractor()
# topology_abstractor.start()
# Subscribe to Context Events
#dlt_enabled = is_dlt_enabled() #How to change the config?
dlt_enabled = True
topology_abstractor_enabled = is_topology_abstractor_enabled()
if topology_abstractor_enabled:
topology_abstractor = TopologyAbstractor()
topology_abstractor.start()
# Subscribe to Context Events
dlt_enabled = is_dlt_enabled()
if dlt_enabled:
LOGGER.info('Starting DLT functionality...')
dlt_recorder = DLTRecorder()
......@@ -82,8 +81,8 @@ def main():
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
# if topology_abstractor_enabled:
# topology_abstractor.stop()
if topology_abstractor_enabled:
topology_abstractor.stop()
if dlt_enabled:
dlt_recorder.stop()
grpc_service.stop()
......
......@@ -12,10 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import asyncio
import asyncio, logging
from typing import Dict, List, Tuple
from common.proto.context_pb2 import Device, Link, Service, Slice, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
......@@ -25,42 +22,42 @@ from dlt.connector.client.DltConnectorClientAsync import DltConnectorClientAsync
LOGGER = logging.getLogger(__name__)
class DltRecordSender:
def __init__(self, context_client: ContextClient) -> None:
def __init__(self, context_client : ContextClient) -> None:
self.context_client = context_client
LOGGER.debug('Creating Servicer...')
self.dlt_connector_client = DltConnectorClientAsync()
LOGGER.debug('Servicer Created')
self.dlt_record_uuids: List[str] = list()
self.dlt_record_uuid_to_data: Dict[str, Tuple[TopologyId, object]] = dict()
self.dlt_record_uuids : List[str] = list()
self.dlt_record_uuid_to_data : Dict[str, Tuple[TopologyId, object]] = dict()
async def initialize(self):
await self.dlt_connector_client.connect()
def _add_record(self, record_uuid: str, data: Tuple[TopologyId, object]) -> None:
def _add_record(self, record_uuid : str, data : Tuple[TopologyId, object]) -> None:
if record_uuid in self.dlt_record_uuid_to_data: return
self.dlt_record_uuid_to_data[record_uuid] = data
self.dlt_record_uuids.append(record_uuid)
def add_device(self, topology_id: TopologyId, device: Device) -> None:
def add_device(self, topology_id : TopologyId, device : Device) -> None:
topology_uuid = topology_id.topology_uuid.uuid
device_uuid = device.device_id.device_uuid.uuid
record_uuid = '{:s}:device:{:s}'.format(topology_uuid, device_uuid)
self._add_record(record_uuid, (topology_id, device))
def add_link(self, topology_id: TopologyId, link: Link) -> None:
def add_link(self, topology_id : TopologyId, link : Link) -> None:
topology_uuid = topology_id.topology_uuid.uuid
link_uuid = link.link_id.link_uuid.uuid
record_uuid = '{:s}:link:{:s}'.format(topology_uuid, link_uuid)
self._add_record(record_uuid, (topology_id, link))
def add_service(self, topology_id: TopologyId, service: Service) -> None:
def add_service(self, topology_id : TopologyId, service : Service) -> None:
topology_uuid = topology_id.topology_uuid.uuid
context_uuid = service.service_id.context_id.context_uuid.uuid
service_uuid = service.service_id.service_uuid.uuid
record_uuid = '{:s}:service:{:s}/{:s}'.format(topology_uuid, context_uuid, service_uuid)
self._add_record(record_uuid, (topology_id, service))
def add_slice(self, topology_id: TopologyId, slice_: Slice) -> None:
def add_slice(self, topology_id : TopologyId, slice_ : Slice) -> None:
topology_uuid = topology_id.topology_uuid.uuid
context_uuid = slice_.slice_id.context_id.context_uuid.uuid
slice_uuid = slice_.slice_id.slice_uuid.uuid
......@@ -71,6 +68,7 @@ class DltRecordSender:
if not self.dlt_connector_client:
LOGGER.error('DLT Connector Client is None, cannot commit records.')
return
tasks = [] # List to hold all the async tasks
for dlt_record_uuid in self.dlt_record_uuids:
......@@ -108,4 +106,3 @@ class DltRecordSender:
if tasks:
await asyncio.gather(*tasks) # Run all the tasks concurrently
......@@ -14,8 +14,10 @@
import logging, threading, asyncio, time
from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
from common.proto.context_pb2 import ContextEvent, ContextId, Device, DeviceEvent, DeviceId, LinkId, LinkEvent, TopologyId, TopologyEvent
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME
from common.proto.context_pb2 import (
ContextEvent, ContextId, DeviceEvent, DeviceId, LinkId, LinkEvent, TopologyId, TopologyEvent
)
from common.tools.context_queries.Context import create_context
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
......@@ -44,7 +46,6 @@ class DLTRecorder(threading.Thread):
self.update_event_queue = asyncio.Queue()
self.remove_event_queue = asyncio.Queue()
def stop(self):
self.terminate.set()
......@@ -57,10 +58,9 @@ class DLTRecorder(threading.Thread):
#self.create_topologies()
self.context_event_collector.start()
batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available
last_task_time = time.time()
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event:
......@@ -91,7 +91,7 @@ class DLTRecorder(threading.Thread):
# Finally, process REMOVE events
await self.process_queue(self.remove_event_queue)
async def process_queue(self, queue: asyncio.Queue):
async def process_queue(self, queue : asyncio.Queue):
tasks = []
while not queue.empty():
event = await queue.get()
......@@ -106,7 +106,7 @@ class DLTRecorder(threading.Thread):
except Exception as e:
LOGGER.error(f"Error while processing tasks: {e}")
async def update_record(self, event: EventTypes) -> None:
async def update_record(self, event : EventTypes) -> None:
dlt_record_sender = DltRecordSender(self.context_client)
await dlt_record_sender.initialize() # Ensure DltRecordSender is initialized asynchronously
LOGGER.debug('STARTING processing event: {:s}'.format(grpc_message_to_json_string(event)))
......@@ -135,7 +135,7 @@ class DLTRecorder(threading.Thread):
LOGGER.debug('Finished processing event: {:s}'.format(grpc_message_to_json_string(event)))
def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None:
def process_topology_event(self, event : TopologyEvent, dlt_record_sender : DltRecordSender) -> None:
topology_id = event.topology_id
topology_uuid = topology_id.topology_uuid.uuid
context_id = topology_id.context_id
......@@ -167,7 +167,7 @@ class DLTRecorder(threading.Thread):
args = context_uuid, context_name, topology_uuid, topology_name, grpc_message_to_json_string(event)
LOGGER.warning(MSG.format(*args))
def find_topology_for_device(self, device_id: DeviceId) -> Optional[TopologyId]:
def find_topology_for_device(self, device_id : DeviceId) -> Optional[TopologyId]:
for topology_uuid, topology_id in self.topology_cache.items():
details = self.context_client.GetTopologyDetails(topology_id)
for device in details.devices:
......@@ -175,7 +175,7 @@ class DLTRecorder(threading.Thread):
return topology_id
return None
def find_topology_for_link(self, link_id: LinkId) -> Optional[TopologyId]:
def find_topology_for_link(self, link_id : LinkId) -> Optional[TopologyId]:
for topology_uuid, topology_id in self.topology_cache.items():
details = self.context_client.GetTopologyDetails(topology_id)
for link in details.links:
......@@ -183,16 +183,18 @@ class DLTRecorder(threading.Thread):
return topology_id
return None
def process_device_event(self, event: DeviceEvent, dlt_record_sender: DltRecordSender) -> None:
def process_device_event(self, event : DeviceEvent, dlt_record_sender : DltRecordSender) -> None:
device_id = event.device_id
device = self.context_client.GetDevice(device_id)
topology_id = self.find_topology_for_device(device_id)
if topology_id:
LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(str(device.device_id.device_uuid.uuid), grpc_message_to_json_string(device_id)))
LOGGER.debug('DEVICE_INFO({:s}), DEVICE_ID ({:s})'.format(
str(device.device_id.device_uuid.uuid),
grpc_message_to_json_string(device_id)
))
dlt_record_sender.add_device(topology_id, device)
else:
LOGGER.warning(f"Topology not found for device {device_id.device_uuid.uuid}")
LOGGER.warning("Topology not found for device {:s}".format(str(device_id.device_uuid.uuid)))
def process_link_event(self, event: LinkEvent, dlt_record_sender: DltRecordSender) -> None:
link_id = event.link_id
......@@ -201,5 +203,4 @@ class DLTRecorder(threading.Thread):
if topology_id:
dlt_record_sender.add_link(topology_id, link)
else:
LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}")
LOGGER.warning("Topology not found for link {:s}".format(str(link_id.link_uuid.uuid)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment