Skip to content
Snippets Groups Projects
Commit 113a54dd authored by Javier Diaz's avatar Javier Diaz
Browse files

Async implementation

parent 34f8bb97
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!259Resolve "(CTTC) Replace DLT Gateway functionality with an opensource and Hyper Ledger v2.4+ compliant version"
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
import logging import logging
import threading import threading
import asyncio import asyncio
import time
from typing import Dict, Optional from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
...@@ -47,6 +48,12 @@ class DLTRecorder(threading.Thread): ...@@ -47,6 +48,12 @@ class DLTRecorder(threading.Thread):
self.context_event_collector = EventsCollector(self.context_client) self.context_event_collector = EventsCollector(self.context_client)
self.topology_cache: Dict[str, TopologyId] = {} self.topology_cache: Dict[str, TopologyId] = {}
# Queues for each event type
self.create_event_queue = asyncio.Queue()
self.update_event_queue = asyncio.Queue()
self.remove_event_queue = asyncio.Queue()
def stop(self): def stop(self):
self.terminate.set() self.terminate.set()
...@@ -61,27 +68,29 @@ class DLTRecorder(threading.Thread): ...@@ -61,27 +68,29 @@ class DLTRecorder(threading.Thread):
tasks = [] tasks = []
batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available
last_task_time = time.time()
while not self.terminate.is_set(): while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1) event = self.context_event_collector.get_event(timeout=0.1)
if event is None: if event:
continue LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) task = asyncio.create_task(self.update_record(event))
task = asyncio.create_task(self.update_record(event)) tasks.append(task)
tasks.append(task) LOGGER.debug('Task for event scheduled.')
LOGGER.debug('Task for event scheduled.')
# Limit the number of concurrent tasks # Update the last task time since we've added a new task
# If we have enough tasks or it's time to process them last_task_time = time.time()
if len(tasks) >= 10 or (tasks and len(tasks) > 0 and await asyncio.sleep(batch_timeout)):
# Check if it's time to process the tasks or if we have enough tasks
if tasks and (len(tasks) >= 10 or (time.time() - last_task_time >= batch_timeout)):
try: try:
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
except Exception as e: except Exception as e:
LOGGER.error(f"Error while processing tasks: {e}") LOGGER.error(f"Error while processing tasks: {e}")
finally: finally:
tasks = [] # Clear the list after processing tasks = [] # Clear the list after processing
await asyncio.gather(*tasks)
tasks = [] # Clear the list after processing # Process any remaining tasks when stopping
# Process any remaining tasks when stopping
if tasks: if tasks:
try: try:
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
...@@ -91,10 +100,6 @@ class DLTRecorder(threading.Thread): ...@@ -91,10 +100,6 @@ class DLTRecorder(threading.Thread):
self.context_event_collector.stop() self.context_event_collector.stop()
self.context_client.close() self.context_client.close()
#def create_topologies(self):
#topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
#create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
async def update_record(self, event: EventTypes) -> None: async def update_record(self, event: EventTypes) -> None:
dlt_record_sender = DltRecordSender(self.context_client) dlt_record_sender = DltRecordSender(self.context_client)
await dlt_record_sender.initialize() # Ensure DltRecordSender is initialized asynchronously await dlt_record_sender.initialize() # Ensure DltRecordSender is initialized asynchronously
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment