diff --git a/src/interdomain/service/topology_abstractor/DltRecorder.py b/src/interdomain/service/topology_abstractor/DltRecorder.py index 418a53612852fd7d2a33b2a09b95e059e0bbcccf..97b48628a03b3da99db134a126b5a010a411902b 100644 --- a/src/interdomain/service/topology_abstractor/DltRecorder.py +++ b/src/interdomain/service/topology_abstractor/DltRecorder.py @@ -65,40 +65,55 @@ class DLTRecorder(threading.Thread): create_context(self.context_client, DEFAULT_CONTEXT_NAME) #self.create_topologies() self.context_event_collector.start() - - tasks = [] + + batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available last_task_time = time.time() - + while not self.terminate.is_set(): event = self.context_event_collector.get_event(timeout=0.1) if event: - LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) - task = asyncio.create_task(self.update_record(event)) - tasks.append(task) - LOGGER.debug('Task for event scheduled.') - - # Update the last task time since we've added a new task - last_task_time = time.time() - - # Check if it's time to process the tasks or if we have enough tasks - if tasks and (len(tasks) >= 10 or (time.time() - last_task_time >= batch_timeout)): - try: - await asyncio.gather(*tasks) - except Exception as e: - LOGGER.error(f"Error while processing tasks: {e}") - finally: - tasks = [] # Clear the list after processing - - # Process any remaining tasks when stopping + LOGGER.info('Received Event({:s})...'.format(grpc_message_to_json_string(event))) + + # Prioritize the event based on its type + if event.event.event_type == 1: # CREATE + await self.create_event_queue.put(event) + elif event.event.event_type == 2: # UPDATE + await self.update_event_queue.put(event) + elif event.event.event_type == 3: # REMOVE + await self.remove_event_queue.put(event) + + # Check if it's time to process the tasks or if we have enough tasks + current_time = time.time() + if current_time - last_task_time >= batch_timeout: + await self.process_events() + last_task_time = current_time # Reset the timer after processing + + self.context_event_collector.stop() + self.context_client.close() + + async def process_events(self): + # Process CREATE events first + await self.process_queue(self.create_event_queue) + # Then process UPDATE events + await self.process_queue(self.update_event_queue) + # Finally, process REMOVE events + await self.process_queue(self.remove_event_queue) + + async def process_queue(self, queue: asyncio.Queue): + tasks = [] + while not queue.empty(): + event = await queue.get() + LOGGER.info('Processing Event({:s}) from queue...'.format(grpc_message_to_json_string(event))) + task = asyncio.create_task(self.update_record(event)) + tasks.append(task) + + # Execute tasks concurrently if tasks: try: await asyncio.gather(*tasks) except Exception as e: - LOGGER.error(f"Error while processing remaining tasks: {e}") - - self.context_event_collector.stop() - self.context_client.close() + LOGGER.error(f"Error while processing tasks: {e}") async def update_record(self, event: EventTypes) -> None: dlt_record_sender = DltRecordSender(self.context_client)