Skip to content
Snippets Groups Projects
Commit 34f8bb97 authored by Javier Diaz's avatar Javier Diaz
Browse files

Async refactoring of the code. Initial

parent 42005397
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!259Resolve "(CTTC) Replace DLT Gateway functionality with an opensource and Hyper Ledger v2.4+ compliant version"
Showing
with 565 additions and 166 deletions
......@@ -12,6 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging, threading
from enum import Enum
from prettytable import PrettyTable
......@@ -235,3 +249,35 @@ def safe_and_metered_rpc_method(metrics_pool : MetricsPool, logger : logging.Log
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
return inner_wrapper
return outer_wrapper
def safe_and_metered_rpc_method_async(metrics_pool: MetricsPool, logger: logging.Logger):
def outer_wrapper(func):
method_name = func.__name__
metrics = metrics_pool.get_metrics(method_name)
histogram_duration, counter_started, counter_completed, counter_failed = metrics
async def inner_wrapper(self, request, grpc_context: grpc.aio.ServicerContext):
counter_started.inc()
try:
logger.debug('{:s} request: {:s}'.format(method_name, grpc_message_to_json_string(request)))
reply = await func(self, request, grpc_context)
logger.debug('{:s} reply: {:s}'.format(method_name, grpc_message_to_json_string(reply)))
counter_completed.inc()
return reply
except ServiceException as e: # pragma: no cover (ServiceException not thrown)
if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]:
# Assume not found or already exists is just a condition, not an error
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
else:
counter_completed.inc()
await grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover, pylint: disable=broad-except
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
await grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
return inner_wrapper
return outer_wrapper
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Union
import grpc
import logging
from concurrent import futures
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers
class GenericGrpcServiceAsync:
def __init__(
self, bind_port: Union[str, int], bind_address: Optional[str] = None, max_workers: Optional[int] = None,
grace_period: Optional[int] = None, enable_health_servicer: bool = True, cls_name: str = __name__
) -> None:
self.logger = logging.getLogger(cls_name)
self.bind_port = bind_port
self.bind_address = get_grpc_bind_address() if bind_address is None else bind_address
self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers
self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period
self.enable_health_servicer = enable_health_servicer
self.endpoint = None
self.health_servicer = None
self.pool = None
self.server = None
async def install_servicers(self):
pass
async def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.aio.server(self.pool)
await self.install_servicers() # Ensure this is awaited
if self.enable_health_servicer:
self.health_servicer = HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
self.bind_port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Listening on {:s}...'.format(str(self.endpoint)))
await self.server.start()
if self.enable_health_servicer:
self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING)
self.logger.debug('Service started')
async def stop(self):
self.logger.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
if self.enable_health_servicer:
self.health_servicer.enter_graceful_shutdown()
await self.server.stop(self.grace_period)
self.logger.debug('Service stopped')
......@@ -12,7 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
# DltConnectorClient.py
import grpc
import logging
import asyncio
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, TopologyId
......@@ -35,77 +40,90 @@ class DltConnectorClient:
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
#self.connect()
#LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
async def connect(self):
self.channel = grpc.aio.insecure_channel(self.endpoint)
self.stub = DltConnectorServiceStub(self.channel)
LOGGER.debug('Channel created')
def close(self):
if self.channel is not None: self.channel.close()
async def close(self):
if self.channel is not None:
await self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def RecordAll(self, request : TopologyId) -> Empty:
async def RecordAll(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAll(request)
response = await self.stub.RecordAll(request)
LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllDevices(self, request : TopologyId) -> Empty:
async def RecordAllDevices(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllDevices(request)
response = await self.stub.RecordAllDevices(request)
LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordDevice(self, request : DltDeviceId) -> Empty:
LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordDevice(request)
# async def RecordDevice(self, request: DltDeviceId) -> Empty:
# LOGGER.debug('RECORD_DEVICE request received: {:s}'.format(grpc_message_to_json_string(request)))
# Simulate some asynchronous processing delay
# await asyncio.sleep(2) # Simulates processing time
# Create a dummy response (Empty message)
# response = Empty()
# LOGGER.debug('RECORD_DEVICE processing complete for request: {:s}'.format(grpc_message_to_json_string(request)))
# return response
async def RecordDevice(self, request: DltDeviceId) -> Empty:
LOGGER.debug('RECORD_DEVICE request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordDevice(request)
LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllLinks(self, request : TopologyId) -> Empty:
async def RecordAllLinks(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllLinks(request)
response = await self.stub.RecordAllLinks(request)
LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordLink(self, request : DltLinkId) -> Empty:
async def RecordLink(self, request: DltLinkId) -> Empty:
LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordLink(request)
response = await self.stub.RecordLink(request)
LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllServices(self, request : TopologyId) -> Empty:
async def RecordAllServices(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllServices(request)
response = await self.stub.RecordAllServices(request)
LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordService(self, request : DltServiceId) -> Empty:
async def RecordService(self, request: DltServiceId) -> Empty:
LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordService(request)
response = await self.stub.RecordService(request)
LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllSlices(self, request : TopologyId) -> Empty:
async def RecordAllSlices(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllSlices(request)
response = await self.stub.RecordAllSlices(request)
LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordSlice(self, request : DltSliceId) -> Empty:
async def RecordSlice(self, request: DltSliceId) -> Empty:
LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordSlice(request)
response = await self.stub.RecordSlice(request)
LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class DltConnectorClientSync:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.DLT)
if not port: port = get_service_port_grpc(ServiceNameEnum.DLT)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = DltConnectorServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def RecordAll(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAll(request)
LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllDevices(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllDevices(request)
LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordDevice(self, request : DltDeviceId) -> Empty:
LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordDevice(request)
LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllLinks(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllLinks(request)
LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordLink(self, request : DltLinkId) -> Empty:
LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordLink(request)
LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllServices(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllServices(request)
LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordService(self, request : DltServiceId) -> Empty:
LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordService(request)
LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordAllSlices(self, request : TopologyId) -> Empty:
LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordAllSlices(request)
LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def RecordSlice(self, request : DltSliceId) -> Empty:
LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordSlice(request)
LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response)))
return response
......@@ -16,7 +16,7 @@ from typing import Callable, Optional
import grpc, logging, queue, threading, time
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription
from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.client.DltGatewayClient import DltGatewayClient
from dlt.connector.client.DltGatewayClientEvent import DltGatewayClientEvent
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
......@@ -31,7 +31,7 @@ LOGGER.setLevel(logging.DEBUG)
class DltEventsCollector(threading.Thread):
def __init__(
self, dltgateway_client : DltGatewayClient,
self, dltgateway_client : DltGatewayClientEvent,
log_events_received : bool = False,
event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None,
) -> None:
......
......@@ -12,8 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
import grpc, logging
import grpc
import logging
import asyncio
from typing import Iterator, List
from common.proto.context_pb2 import Empty, TeraFlowController
from common.proto.dlt_gateway_pb2 import (
DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription)
......@@ -36,29 +40,50 @@ class DltGatewayClient:
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
#self.connect()
self.message_queue: List[DltRecord] = []
#LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
async def connect(self):
self.channel = grpc.aio.insecure_channel(self.endpoint)
self.stub = DltGatewayServiceStub(self.channel)
LOGGER.debug('Channel created')
def close(self):
if self.channel is not None: self.channel.close()
async def close(self):
if self.channel is not None:
await self.channel.close()
self.channel = None
self.stub = None
# async def dummy_process(self, request: DltRecord):
# # Simulate processing delay
# await asyncio.sleep(2)
# return DltRecordStatus(status="DLTRECORDSTATUS_SUCCEEDED", record_id=request.record_id)
@RETRY_DECORATOR
def RecordToDlt(self, request : DltRecord) -> DltRecordStatus:
async def RecordToDlt(self, request : DltRecord) -> DltRecordStatus:
LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RecordToDlt(request)
response = await self.stub.RecordToDlt(request)
LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# @RETRY_DECORATOR
# async def RecordToDlt(self, request: DltRecord) -> DltRecordStatus:
# self.message_queue.append(request)
# LOGGER.debug(f'RecordToDlt request: {grpc_message_to_json_string(request)}')
# LOGGER.debug(f'Queue length before processing: {len(self.message_queue)}')
# response = await self.dummy_process(request)
# LOGGER.debug(f'RecordToDlt result: {grpc_message_to_json_string(response)}')
# self.message_queue.remove(request)
# LOGGER.debug(f'Queue length after processing: {len(self.message_queue)}')
# return response
@RETRY_DECORATOR
def GetFromDlt(self, request : DltRecordId) -> DltRecord:
async def GetFromDlt(self, request : DltRecordId) -> DltRecord:
LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetFromDlt(request)
response = await self.stub.GetFromDlt(request)
LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response)))
return response
......@@ -70,15 +95,15 @@ class DltGatewayClient:
return response
@RETRY_DECORATOR
def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus:
async def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus:
LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetDltStatus(request)
response = await self.stub.GetDltStatus(request)
LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetDltPeers(self, request : Empty) -> DltPeerStatusList:
async def GetDltPeers(self, request : Empty) -> DltPeerStatusList:
LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetDltPeers(request)
response = await self.stub.GetDltPeers(request)
LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
import logging
from typing import Iterator
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription
from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.Config import DLT_GATEWAY_HOST, DLT_GATEWAY_PORT
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class DltGatewayClientEvent:
def __init__(self, host=None, port=None):
if not host: host = DLT_GATEWAY_HOST
if not port: port = DLT_GATEWAY_PORT
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = DltGatewayServiceStub(self.channel)
def close(self):
if self.channel is not None:
self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def SubscribeToDlt(self, request: DltRecordSubscription) -> Iterator[DltRecordEvent]:
LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SubscribeToDlt(request)
LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response)))
return response
......@@ -14,15 +14,16 @@
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.service.GenericGrpcServiceAsync import GenericGrpcServiceAsync
from common.proto.dlt_connector_pb2_grpc import add_DltConnectorServiceServicer_to_server
from .DltConnectorServiceServicerImpl import DltConnectorServiceServicerImpl
class DltConnectorService(GenericGrpcService):
class DltConnectorService(GenericGrpcServiceAsync):
def __init__(self, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.DLT)
super().__init__(port, cls_name=cls_name)
self.dltconnector_servicer = DltConnectorServiceServicerImpl()
def install_servicers(self):
add_DltConnectorServiceServicer_to_server(self.dltconnector_servicer, self.server)
async def install_servicers(self):
await self.dltconnector_servicer.initialize()
add_DltConnectorServiceServicer_to_server(self.dltconnector_servicer, self.server)
\ No newline at end of file
......@@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
import grpc
import asyncio
from grpc.aio import ServicerContext
import logging
from typing import Optional
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method_async
from common.proto.context_pb2 import Empty, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer
......@@ -32,94 +35,109 @@ METRICS_POOL = MetricsPool('DltConnector', 'RPC')
class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
def __init__(self):
LOGGER.debug('Creating Servicer...')
self.dltgateway_client = DltGatewayClient()
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
async def initialize(self):
await self.dltgateway_client.connect()
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAll(self, request: TopologyId, context: ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAllDevices(self, request: TopologyId, context: ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
# async def RecordDevice(self, request: DltDeviceId, context: ServicerContext) -> Empty:
# LOGGER.debug('Received RecordDevice request: {:s}'.format(grpc_message_to_json_string(request)))
# try:
# if not request.delete:
# LOGGER.debug('Processing RecordDevice request: {:s}'.format(grpc_message_to_json_string(request)))
# # Perform any dummy operation or simply log the request
# LOGGER.debug('Processed RecordDevice request: {:s}'.format(grpc_message_to_json_string(request)))
# except Exception as e:
# LOGGER.error(f"Error processing RecordDevice: {e}")
# return Empty()
async def RecordDevice(self, request: DltDeviceId, context: ServicerContext) -> Empty:
data_json = None
if not request.delete:
LOGGER.debug('RECORD_DEVICE = {:s}'.format(grpc_message_to_json_string(request)))
if not request.delete:
context_client = ContextClient()
device = context_client.GetDevice(request.device_id)
data_json = grpc_message_to_json_string(device)
self._record_entity(
await self._record_entity(
request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_DEVICE,
request.device_id.device_uuid.uuid, request.delete, data_json)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAllLinks(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAllLinks(self, request: TopologyId, context: ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordLink(self, request: DltLinkId, context: ServicerContext) -> Empty:
data_json = None
LOGGER.debug('RECORD_LINK = {:s}'.format(grpc_message_to_json_string(request)))
if not request.delete:
context_client = ContextClient()
link = context_client.GetLink(request.link_id)
data_json = grpc_message_to_json_string(link)
self._record_entity(
await self._record_entity(
request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_LINK,
request.link_id.link_uuid.uuid, request.delete, data_json)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAllServices(self, request: TopologyId, context: ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordService(self, request: DltServiceId, context: ServicerContext) -> Empty:
data_json = None
if not request.delete:
context_client = ContextClient()
service = context_client.GetService(request.service_id)
data_json = grpc_message_to_json_string(service)
self._record_entity(
await self._record_entity(
request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SERVICE,
request.service_id.service_uuid.uuid, request.delete, data_json)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAllSlices(self, request: TopologyId, context: ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordSlice(self, request: DltSliceId, context: ServicerContext) -> Empty:
data_json = None
if not request.delete:
context_client = ContextClient()
slice_ = context_client.GetSlice(request.slice_id)
data_json = grpc_message_to_json_string(slice_)
self._record_entity(
await self._record_entity(
request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SLICE,
request.slice_id.slice_uuid.uuid, request.delete, data_json)
return Empty()
def _record_entity(
self, dlt_domain_uuid : str, dlt_record_type : DltRecordTypeEnum, dlt_record_uuid : str, delete : bool,
data_json : Optional[str] = None
async def _record_entity(
self, dlt_domain_uuid: str, dlt_record_type: DltRecordTypeEnum, dlt_record_uuid: str, delete: bool,
data_json: Optional[str] = None
) -> None:
dltgateway_client = DltGatewayClient()
dlt_record_id = DltRecordId()
dlt_record_id.domain_uuid.uuid = dlt_domain_uuid # pylint: disable=no-member
dlt_record_id.type = dlt_record_type
dlt_record_id.record_uuid.uuid = dlt_record_uuid # pylint: disable=no-member
dlt_record_id.domain_uuid.uuid = dlt_domain_uuid # pylint: disable=no-member
dlt_record_id.type = dlt_record_type
dlt_record_id.record_uuid.uuid = dlt_record_uuid # pylint: disable=no-member
str_dlt_record_id = grpc_message_to_json_string(dlt_record_id)
LOGGER.debug('[_record_entity] sent dlt_record_id = {:s}'.format(str_dlt_record_id))
dlt_record = dltgateway_client.GetFromDlt(dlt_record_id)
dlt_record = await self.dltgateway_client.GetFromDlt(dlt_record_id)
str_dlt_record = grpc_message_to_json_string(dlt_record)
LOGGER.debug('[_record_entity] recv dlt_record = {:s}'.format(str_dlt_record))
......@@ -127,22 +145,24 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
LOGGER.debug('[_record_entity] exists = {:s}'.format(str(exists)))
dlt_record = DltRecord()
dlt_record.record_id.CopyFrom(dlt_record_id) # pylint: disable=no-member
dlt_record.record_id.CopyFrom(dlt_record_id) # pylint: disable=no-member
if delete and exists:
dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_DELETE
elif not delete and exists:
dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE
if data_json is None: raise Exception('data_json must be provided when updating')
if data_json is None:
raise Exception('data_json must be provided when updating')
dlt_record.data_json = data_json
elif not delete and not exists:
dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_ADD
if data_json is None: raise Exception('data_json must be provided when adding')
if data_json is None:
raise Exception('data_json must be provided when adding')
dlt_record.data_json = data_json
else:
return
str_dlt_record = grpc_message_to_json_string(dlt_record)
LOGGER.debug('[_record_entity] sent dlt_record = {:s}'.format(str_dlt_record))
dlt_record_status = dltgateway_client.RecordToDlt(dlt_record)
dlt_record_status = await self.dltgateway_client.RecordToDlt(dlt_record)
str_dlt_record_status = grpc_message_to_json_string(dlt_record_status)
LOGGER.debug('[_record_entity] recv dlt_record_status = {:s}'.format(str_dlt_record_status))
......@@ -12,7 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
import logging
import signal
import sys
import threading
import asyncio
from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (
......@@ -22,25 +27,25 @@ from .event_dispatcher.DltEventDispatcher import DltEventDispatcher
from .DltConnectorService import DltConnectorService
terminate = threading.Event()
LOGGER : logging.Logger = None
LOGGER: logging.Logger = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
def signal_handler(signal, frame):
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
async def main():
global LOGGER
log_level = get_log_level()
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
......@@ -53,17 +58,16 @@ def main():
event_dispatcher = DltEventDispatcher()
event_dispatcher.start()
# Context Event dispatcher
# Starting DLT connector service
grpc_service = DltConnectorService()
grpc_service.start()
await grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
while not terminate.wait(timeout=1.0):
await asyncio.sleep(1.0)
LOGGER.info('Terminating...')
grpc_service.stop()
await grpc_service.stop()
event_dispatcher.stop()
event_dispatcher.join()
......@@ -71,4 +75,4 @@ def main():
return 0
if __name__ == '__main__':
sys.exit(main())
asyncio.run(main())
......@@ -26,9 +26,9 @@ from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from dlt.connector.client.DltConnectorClientSync import DltConnectorClientSync
from dlt.connector.client.DltEventsCollector import DltEventsCollector
from dlt.connector.client.DltGatewayClient import DltGatewayClient
from dlt.connector.client.DltGatewayClientEvent import DltGatewayClientEvent
from interdomain.client.InterdomainClient import InterdomainClient
LOGGER = logging.getLogger(__name__)
......@@ -40,8 +40,8 @@ ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME))
class Clients:
def __init__(self) -> None:
self.context_client = ContextClient()
self.dlt_connector_client = DltConnectorClient()
self.dlt_gateway_client = DltGatewayClient()
self.dlt_connector_client = DltConnectorClientSync()
self.dlt_gateway_client = DltGatewayClientEvent()
self.interdomain_client = InterdomainClient()
def close(self) -> None:
......
......@@ -12,91 +12,108 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# DltRecordSender.py
import logging
import asyncio
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import Device, Link, Service, Slice, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .Types import DltRecordTypes
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
class DltRecordSender:
def __init__(self, context_client : ContextClient, dlt_connector_client : Optional[DltConnectorClient]) -> None:
def __init__(self, context_client: ContextClient) -> None:
self.context_client = context_client
self.dlt_connector_client = dlt_connector_client
self.dlt_record_uuids : List[str] = list()
self.dlt_record_uuid_to_data : Dict[str, Tuple[TopologyId, DltRecordTypes]] = dict()
LOGGER.debug('Creating Servicer...')
self.dlt_connector_client = DltConnectorClient()
LOGGER.debug('Servicer Created')
self.dlt_record_uuids: List[str] = list()
self.dlt_record_uuid_to_data: Dict[str, Tuple[TopologyId, object]] = dict()
async def initialize(self):
await self.dlt_connector_client.connect()
def _add_record(self, record_uuid : str, data : Tuple[TopologyId, DltRecordTypes]) -> None:
def _add_record(self, record_uuid: str, data: Tuple[TopologyId, object]) -> None:
if record_uuid in self.dlt_record_uuid_to_data: return
self.dlt_record_uuid_to_data[record_uuid] = data
self.dlt_record_uuids.append(record_uuid)
def add_device(self, topology_id : TopologyId, device : Device) -> None:
def add_device(self, topology_id: TopologyId, device: Device) -> None:
topology_uuid = topology_id.topology_uuid.uuid
device_uuid = device.device_id.device_uuid.uuid
record_uuid = '{:s}:device:{:s}'.format(topology_uuid, device_uuid)
record_uuid = f'{topology_uuid}:device:{device_uuid}'
self._add_record(record_uuid, (topology_id, device))
def add_link(self, topology_id : TopologyId, link : Link) -> None:
def add_link(self, topology_id: TopologyId, link: Link) -> None:
topology_uuid = topology_id.topology_uuid.uuid
link_uuid = link.link_id.link_uuid.uuid
record_uuid = '{:s}:link:{:s}'.format(topology_uuid, link_uuid)
record_uuid = f'{topology_uuid}:link:{link_uuid}'
self._add_record(record_uuid, (topology_id, link))
def add_service(self, topology_id : TopologyId, service : Service) -> None:
def add_service(self, topology_id: TopologyId, service: Service) -> None:
topology_uuid = topology_id.topology_uuid.uuid
context_uuid = service.service_id.context_id.context_uuid.uuid
service_uuid = service.service_id.service_uuid.uuid
record_uuid = '{:s}:service:{:s}/{:s}'.format(topology_uuid, context_uuid, service_uuid)
record_uuid = f'{topology_uuid}:service:{context_uuid}/{service_uuid}'
self._add_record(record_uuid, (topology_id, service))
def add_slice(self, topology_id : TopologyId, slice_ : Slice) -> None:
def add_slice(self, topology_id: TopologyId, slice_: Slice) -> None:
topology_uuid = topology_id.topology_uuid.uuid
context_uuid = slice_.slice_id.context_id.context_uuid.uuid
slice_uuid = slice_.slice_id.slice_uuid.uuid
record_uuid = '{:s}:slice:{:s}/{:s}'.format(topology_uuid, context_uuid, slice_uuid)
record_uuid = f'{topology_uuid}:slice:{context_uuid}/{slice_uuid}'
self._add_record(record_uuid, (topology_id, slice_))
def commit(self) -> None:
async def commit(self) -> None:
if not self.dlt_connector_client:
LOGGER.error('DLT Connector Client is None, cannot commit records.')
return
tasks = [] # List to hold all the async tasks
for dlt_record_uuid in self.dlt_record_uuids:
topology_id,dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid]
topology_id, dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid]
if isinstance(dlt_record, Device):
#device_id = self.context_client.SetDevice(dlt_record) # This causes events to be triggered infinitely.
device_id = dlt_record.device_id
#LOGGER.debug('DEVICE_ID: ({:s})'.format(grpc_message_to_json_string(device_id)))
if self.dlt_connector_client is None: continue
dlt_device_id = DltDeviceId()
dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member
self.dlt_connector_client.RecordDevice(dlt_device_id)
dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member
tasks.append(self.dlt_connector_client.RecordDevice(dlt_device_id))
# await self.dlt_connector_client.RecordDevice(dlt_device_id)
elif isinstance(dlt_record, Link):
#link_id = self.context_client.SetLink(dlt_record)
link_id = dlt_record.link_id
if self.dlt_connector_client is None: continue
dlt_link_id = DltLinkId()
dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member
self.dlt_connector_client.RecordLink(dlt_link_id)
dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member
tasks.append(self.dlt_connector_client.RecordLink(dlt_link_id))
#await self.dlt_connector_client.RecordLink(dlt_link_id)
elif isinstance(dlt_record, Service):
#service_id = self.context_client.SetService(dlt_record)
service_id = dlt_record.service_id
if self.dlt_connector_client is None: continue
dlt_service_id = DltServiceId()
dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member
self.dlt_connector_client.RecordService(dlt_service_id)
dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member
tasks.append(self.dlt_connector_client.RecordService(dlt_service_id))
#await self.dlt_connector_client.RecordService(dlt_service_id)
elif isinstance(dlt_record, Slice):
#slice_id = self.context_client.SetSlice(dlt_record)
slice_id = dlt_record.slice_id
if self.dlt_connector_client is None: continue
dlt_slice_id = DltSliceId()
dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member
self.dlt_connector_client.RecordSlice(dlt_slice_id)
dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member
dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member
tasks.append(self.dlt_connector_client.RecordSlice(dlt_slice_id))
#await self.dlt_connector_client.RecordSlice(dlt_slice_id)
else:
LOGGER.error('Unsupported Record({:s})'.format(str(dlt_record)))
LOGGER.error(f'Unsupported Record({str(dlt_record)})')
if tasks:
await asyncio.gather(*tasks) # Run all the tasks concurrently
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
import asyncio
from typing import Dict, Optional
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum
......@@ -14,7 +29,6 @@ from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .DltRecordSender import DltRecordSender
from .Types import EventTypes
......@@ -37,43 +51,54 @@ class DLTRecorder(threading.Thread):
self.terminate.set()
def run(self) -> None:
asyncio.run(self._run())
async def _run(self) -> None:
self.context_client.connect()
create_context(self.context_client, DEFAULT_CONTEXT_NAME)
self.create_topologies()
#self.create_topologies()
self.context_event_collector.start()
tasks = []
batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available
while not self.terminate.is_set():
event = self.context_event_collector.get_event(timeout=0.1)
if event is None:
continue
LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event)))
self.update_record(event)
task = asyncio.create_task(self.update_record(event))
tasks.append(task)
LOGGER.debug('Task for event scheduled.')
# Limit the number of concurrent tasks
# If we have enough tasks or it's time to process them
if len(tasks) >= 10 or (tasks and len(tasks) > 0 and await asyncio.sleep(batch_timeout)):
try:
await asyncio.gather(*tasks)
except Exception as e:
LOGGER.error(f"Error while processing tasks: {e}")
finally:
tasks = [] # Clear the list after processing
await asyncio.gather(*tasks)
tasks = [] # Clear the list after processing
# Process any remaining tasks when stopping
if tasks:
try:
await asyncio.gather(*tasks)
except Exception as e:
LOGGER.error(f"Error while processing remaining tasks: {e}")
self.context_event_collector.stop()
self.context_client.close()
def create_topologies(self):
topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
def get_dlt_connector_client(self) -> Optional[DltConnectorClient]:
# Always enable DLT for testing
dlt_connector_client = DltConnectorClient()
dlt_connector_client.connect()
return dlt_connector_client
# env_vars = find_environment_variables([
# get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST),
# get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
# ])
# if len(env_vars) == 2:
# dlt_connector_client = DltConnectorClient()
# dlt_connector_client.connect()
# return dlt_connector_client
# return None
def update_record(self, event: EventTypes) -> None:
dlt_connector_client = self.get_dlt_connector_client()
dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client)
#def create_topologies(self):
#topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME]
#create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids)
async def update_record(self, event: EventTypes) -> None:
dlt_record_sender = DltRecordSender(self.context_client)
await dlt_record_sender.initialize() # Ensure DltRecordSender is initialized asynchronously
LOGGER.debug('STARTING processing event: {:s}'.format(grpc_message_to_json_string(event)))
if isinstance(event, ContextEvent):
LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event)))
......@@ -84,7 +109,7 @@ class DLTRecorder(threading.Thread):
self.process_topology_event(event, dlt_record_sender)
elif isinstance(event, DeviceEvent):
LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event)))
LOGGER.debug('Processing DeviceEvent ASYNC({:s})'.format(grpc_message_to_json_string(event)))
self.process_device_event(event, dlt_record_sender)
elif isinstance(event, LinkEvent):
......@@ -94,9 +119,10 @@ class DLTRecorder(threading.Thread):
else:
LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event)))
dlt_record_sender.commit()
if dlt_connector_client is not None:
dlt_connector_client.close()
await dlt_record_sender.commit()
#await asyncio.sleep(2) # Simulates processing time
LOGGER.debug('Finished processing event: {:s}'.format(grpc_message_to_json_string(event)))
def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None:
topology_id = event.topology_id
......@@ -117,6 +143,7 @@ class DLTRecorder(threading.Thread):
if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \
(topology_uuid not in topology_uuids) and (topology_name not in topology_uuids):
LOGGER.debug('DEVICES({:s})'.format(grpc_message_to_json_string(topology_details.devices)))
for device in topology_details.devices:
LOGGER.debug('DEVICE_INFO_TOPO({:s})'.format(grpc_message_to_json_string(device)))
dlt_record_sender.add_device(topology_id, device)
......@@ -164,3 +191,4 @@ class DLTRecorder(threading.Thread):
dlt_record_sender.add_link(topology_id, link)
else:
LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment