Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
controller
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
TFS
controller
Compare revisions
34f8bb97487572bb01143ac022fe9d477e007bde to ea35c20d9777a2673fe4bbb0307e4a2262c582b2
Compare revisions
Changes are shown as if the
source
revision was being merged into the
target
revision.
Learn more about comparing revisions.
Source
tfs/controller
Select target project
No results found
ea35c20d9777a2673fe4bbb0307e4a2262c582b2
Select Git revision
Swap
Target
tfs/controller
Select target project
tfs/controller
1 result
34f8bb97487572bb01143ac022fe9d477e007bde
Select Git revision
Show changes
Only incoming changes from source
Include changes to target since source was created
Compare
Commits on Source (2)
Async implementation
· 113a54dd
Javier Diaz
authored
7 months ago
113a54dd
Async Implementation, Batches
· ea35c20d
Javier Diaz
authored
7 months ago
ea35c20d
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/interdomain/service/topology_abstractor/DltRecorder.py
+47
-27
47 additions, 27 deletions
src/interdomain/service/topology_abstractor/DltRecorder.py
with
47 additions
and
27 deletions
src/interdomain/service/topology_abstractor/DltRecorder.py
View file @
ea35c20d
...
...
@@ -15,6 +15,7 @@
import
logging
import
threading
import
asyncio
import
time
from
typing
import
Dict
,
Optional
from
common.Constants
import
DEFAULT_CONTEXT_NAME
,
DEFAULT_TOPOLOGY_NAME
,
INTERDOMAIN_TOPOLOGY_NAME
,
ServiceNameEnum
...
...
@@ -47,6 +48,12 @@ class DLTRecorder(threading.Thread):
self
.
context_event_collector
=
EventsCollector
(
self
.
context_client
)
self
.
topology_cache
:
Dict
[
str
,
TopologyId
]
=
{}
# Queues for each event type
self
.
create_event_queue
=
asyncio
.
Queue
()
self
.
update_event_queue
=
asyncio
.
Queue
()
self
.
remove_event_queue
=
asyncio
.
Queue
()
def
stop
(
self
):
self
.
terminate
.
set
()
...
...
@@ -58,42 +65,55 @@ class DLTRecorder(threading.Thread):
create_context
(
self
.
context_client
,
DEFAULT_CONTEXT_NAME
)
#self.create_topologies()
self
.
context_event_collector
.
start
()
tasks
=
[]
batch_timeout
=
1
# Time in seconds to wait before processing whatever tasks are available
batch_timeout
=
1
# Time in seconds to wait before processing whatever tasks are available
last_task_time
=
time
.
time
()
while
not
self
.
terminate
.
is_set
():
event
=
self
.
context_event_collector
.
get_event
(
timeout
=
0.1
)
if
event
is
None
:
continue
LOGGER
.
info
(
'
Processing Event({:s})...
'
.
format
(
grpc_message_to_json_string
(
event
)))
if
event
:
LOGGER
.
info
(
'
Received Event({:s})...
'
.
format
(
grpc_message_to_json_string
(
event
)))
# Prioritize the event based on its type
if
event
.
event
.
event_type
==
1
:
# CREATE
await
self
.
create_event_queue
.
put
(
event
)
elif
event
.
event
.
event_type
==
2
:
# UPDATE
await
self
.
update_event_queue
.
put
(
event
)
elif
event
.
event
.
event_type
==
3
:
# REMOVE
await
self
.
remove_event_queue
.
put
(
event
)
# Check if it's time to process the tasks or if we have enough tasks
current_time
=
time
.
time
()
if
current_time
-
last_task_time
>=
batch_timeout
:
await
self
.
process_events
()
last_task_time
=
current_time
# Reset the timer after processing
self
.
context_event_collector
.
stop
()
self
.
context_client
.
close
()
async
def
process_events
(
self
):
# Process CREATE events first
await
self
.
process_queue
(
self
.
create_event_queue
)
# Then process UPDATE events
await
self
.
process_queue
(
self
.
update_event_queue
)
# Finally, process REMOVE events
await
self
.
process_queue
(
self
.
remove_event_queue
)
async
def
process_queue
(
self
,
queue
:
asyncio
.
Queue
):
tasks
=
[]
while
not
queue
.
empty
():
event
=
await
queue
.
get
()
LOGGER
.
info
(
'
Processing Event({:s}) from queue...
'
.
format
(
grpc_message_to_json_string
(
event
)))
task
=
asyncio
.
create_task
(
self
.
update_record
(
event
))
tasks
.
append
(
task
)
LOGGER
.
debug
(
'
Task for event scheduled.
'
)
# Limit the number of concurrent tasks
# If we have enough tasks or it's time to process them
if
len
(
tasks
)
>=
10
or
(
tasks
and
len
(
tasks
)
>
0
and
await
asyncio
.
sleep
(
batch_timeout
)):
try
:
await
asyncio
.
gather
(
*
tasks
)
except
Exception
as
e
:
LOGGER
.
error
(
f
"
Error while processing tasks:
{
e
}
"
)
finally
:
tasks
=
[]
# Clear the list after processing
await
asyncio
.
gather
(
*
tasks
)
tasks
=
[]
# Clear the list after processing
# Process any remaining tasks when stopping
# Execute tasks concurrently
if
tasks
:
try
:
await
asyncio
.
gather
(
*
tasks
)
except
Exception
as
e
:
LOGGER
.
error
(
f
"
Error while processing remaining tasks:
{
e
}
"
)
self
.
context_event_collector
.
stop
()
self
.
context_client
.
close
()
#def create_topologies(self):
#topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
#create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
LOGGER
.
error
(
f
"
Error while processing tasks:
{
e
}
"
)
async
def
update_record
(
self
,
event
:
EventTypes
)
->
None
:
dlt_record_sender
=
DltRecordSender
(
self
.
context_client
)
...
...
This diff is collapsed.
Click to expand it.