Skip to content
Snippets Groups Projects
Commit ea35c20d authored by Javier Diaz's avatar Javier Diaz
Browse files

Async Implementation, Batches

parent 113a54dd
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!259Resolve "(CTTC) Replace DLT Gateway functionality with an opensource and Hyper Ledger v2.4+ compliant version"
......@@ -65,40 +65,55 @@ class DLTRecorder(threading.Thread):
create_context(self.context_client, DEFAULT_CONTEXT_NAME)
#self.create_topologies()
self.context_event_collector.start()
tasks = []
batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available
last_task_time = time.time()
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event:
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
task = asyncio.create_task(self.update_record(event))
tasks.append(task)
LOGGER.debug('Task for event scheduled.')
# Update the last task time since we've added a new task
last_task_time = time.time()
# Check if it's time to process the tasks or if we have enough tasks
if tasks and (len(tasks) >= 10 or (time.time() - last_task_time >= batch_timeout)):
try:
await asyncio.gather(*tasks)
except Exception as e:
LOGGER.error(f"Error while processing tasks: {e}")
finally:
tasks = [] # Clear the list after processing
# Process any remaining tasks when stopping
LOGGER.info('Received Event({:s})...'.format(grpc_message_to_json_string(event)))
# Prioritize the event based on its type
if event.event.event_type == 1: # CREATE
await self.create_event_queue.put(event)
elif event.event.event_type == 2: # UPDATE
await self.update_event_queue.put(event)
elif event.event.event_type == 3: # REMOVE
await self.remove_event_queue.put(event)
# Check if it's time to process the tasks or if we have enough tasks
current_time = time.time()
if current_time - last_task_time >= batch_timeout:
await self.process_events()
last_task_time = current_time # Reset the timer after processing
self.context_event_collector.stop()
self.context_client.close()
async def process_events(self):
# Process CREATE events first
await self.process_queue(self.create_event_queue)
# Then process UPDATE events
await self.process_queue(self.update_event_queue)
# Finally, process REMOVE events
await self.process_queue(self.remove_event_queue)
async def process_queue(self, queue: asyncio.Queue):
tasks = []
while not queue.empty():
event = await queue.get()
LOGGER.info('Processing Event({:s}) from queue...'.format(grpc_message_to_json_string(event)))
task = asyncio.create_task(self.update_record(event))
tasks.append(task)
# Execute tasks concurrently
if tasks:
try:
await asyncio.gather(*tasks)
except Exception as e:
LOGGER.error(f"Error while processing remaining tasks: {e}")
self.context_event_collector.stop()
self.context_client.close()
LOGGER.error(f"Error while processing tasks: {e}")
async def update_record(self, event: EventTypes) -> None:
dlt_record_sender = DltRecordSender(self.context_client)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment