Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (2)
......@@ -15,6 +15,7 @@
import logging
import threading
import asyncio
import time
from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
......@@ -47,6 +48,12 @@ class DLTRecorder(threading.Thread):
self.context_event_collector = EventsCollector(self.context_client)
self.topology_cache: Dict[str, TopologyId] = {}
# Queues for each event type
self.create_event_queue = asyncio.Queue()
self.update_event_queue = asyncio.Queue()
self.remove_event_queue = asyncio.Queue()
def stop(self):
self.terminate.set()
......@@ -58,42 +65,55 @@ class DLTRecorder(threading.Thread):
create_context(self.context_client, DEFAULT_CONTEXT_NAME)
#self.create_topologies()
self.context_event_collector.start()
tasks = []
batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available
batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available
last_task_time = time.time()
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event is None:
continue
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
if event:
LOGGER.info('Received Event({:s})...'.format(grpc_message_to_json_string(event)))
# Prioritize the event based on its type
if event.event.event_type == 1: # CREATE
await self.create_event_queue.put(event)
elif event.event.event_type == 2: # UPDATE
await self.update_event_queue.put(event)
elif event.event.event_type == 3: # REMOVE
await self.remove_event_queue.put(event)
# Check if it's time to process the tasks or if we have enough tasks
current_time = time.time()
if current_time - last_task_time >= batch_timeout:
await self.process_events()
last_task_time = current_time # Reset the timer after processing
self.context_event_collector.stop()
self.context_client.close()
async def process_events(self):
# Process CREATE events first
await self.process_queue(self.create_event_queue)
# Then process UPDATE events
await self.process_queue(self.update_event_queue)
# Finally, process REMOVE events
await self.process_queue(self.remove_event_queue)
async def process_queue(self, queue: asyncio.Queue):
tasks = []
while not queue.empty():
event = await queue.get()
LOGGER.info('Processing Event({:s}) from queue...'.format(grpc_message_to_json_string(event)))
task = asyncio.create_task(self.update_record(event))
tasks.append(task)
LOGGER.debug('Task for event scheduled.')
# Limit the number of concurrent tasks
# If we have enough tasks or it's time to process them
if len(tasks) >= 10 or (tasks and len(tasks) > 0 and await asyncio.sleep(batch_timeout)):
try:
await asyncio.gather(*tasks)
except Exception as e:
LOGGER.error(f"Error while processing tasks: {e}")
finally:
tasks = [] # Clear the list after processing
await asyncio.gather(*tasks)
tasks = [] # Clear the list after processing
# Process any remaining tasks when stopping
# Execute tasks concurrently
if tasks:
try:
await asyncio.gather(*tasks)
except Exception as e:
LOGGER.error(f"Error while processing remaining tasks: {e}")
self.context_event_collector.stop()
self.context_client.close()
#def create_topologies(self):
#topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
#create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
LOGGER.error(f"Error while processing tasks: {e}")
async def update_record(self, event: EventTypes) -> None:
dlt_record_sender = DltRecordSender(self.context_client)
......