diff --git a/src/common/method_wrappers/Decorator.py b/src/common/method_wrappers/Decorator.py index 71b3999bf6e42c3cd9130747af2cdcbe2d9a570e..bfda31ec972af79828065f449e587a39f3f1a365 100644 --- a/src/common/method_wrappers/Decorator.py +++ b/src/common/method_wrappers/Decorator.py @@ -12,6 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import grpc, json, logging, threading from enum import Enum from prettytable import PrettyTable @@ -235,3 +249,35 @@ def safe_and_metered_rpc_method(metrics_pool : MetricsPool, logger : logging.Log grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) return inner_wrapper return outer_wrapper + +def safe_and_metered_rpc_method_async(metrics_pool: MetricsPool, logger: logging.Logger): + def outer_wrapper(func): + method_name = func.__name__ + metrics = metrics_pool.get_metrics(method_name) + histogram_duration, counter_started, counter_completed, counter_failed = metrics + + async def inner_wrapper(self, request, grpc_context: grpc.aio.ServicerContext): + counter_started.inc() + try: + logger.debug('{:s} request: {:s}'.format(method_name, grpc_message_to_json_string(request))) + reply = await func(self, request, grpc_context) + logger.debug('{:s} reply: {:s}'.format(method_name, grpc_message_to_json_string(reply))) + counter_completed.inc() + return reply + except ServiceException as e: # pragma: no cover (ServiceException not thrown) + if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]: + # Assume not found or already exists is just a condition, not an error + logger.exception('{:s} exception'.format(method_name)) + counter_failed.inc() + else: + counter_completed.inc() + await grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover, pylint: disable=broad-except + logger.exception('{:s} exception'.format(method_name)) + counter_failed.inc() + await grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) + + return inner_wrapper + + return outer_wrapper + diff --git a/src/common/tools/service/GenericGrpcServiceAsync.py b/src/common/tools/service/GenericGrpcServiceAsync.py new file mode 100644 index 0000000000000000000000000000000000000000..488d861777ee7200fc4331449f21dded6b2f6dac --- /dev/null +++ b/src/common/tools/service/GenericGrpcServiceAsync.py @@ -0,0 +1,72 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Union +import grpc +import logging +from concurrent import futures +from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH +from grpc_health.v1.health_pb2 import HealthCheckResponse +from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers + +class GenericGrpcServiceAsync: + def __init__( + self, bind_port: Union[str, int], bind_address: Optional[str] = None, max_workers: Optional[int] = None, + grace_period: Optional[int] = None, enable_health_servicer: bool = True, cls_name: str = __name__ + ) -> None: + self.logger = logging.getLogger(cls_name) + self.bind_port = bind_port + self.bind_address = get_grpc_bind_address() if bind_address is None else bind_address + self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers + self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period + self.enable_health_servicer = enable_health_servicer + self.endpoint = None + self.health_servicer = None + self.pool = None + self.server = None + + async def install_servicers(self): + pass + + async def start(self): + self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) + self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( + str(self.endpoint), str(self.max_workers))) + + self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) + self.server = grpc.aio.server(self.pool) + + await self.install_servicers() # Ensure this is awaited + + if self.enable_health_servicer: + self.health_servicer = HealthServicer( + experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) + add_HealthServicer_to_server(self.health_servicer, self.server) + + self.bind_port = self.server.add_insecure_port(self.endpoint) + self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) + self.logger.info('Listening on {:s}...'.format(str(self.endpoint))) + await self.server.start() + if self.enable_health_servicer: + self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) + + self.logger.debug('Service started') + + async def stop(self): + self.logger.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) + if self.enable_health_servicer: + self.health_servicer.enter_graceful_shutdown() + await self.server.stop(self.grace_period) + self.logger.debug('Service stopped') diff --git a/src/dlt/connector/client/DltConnectorClient.py b/src/dlt/connector/client/DltConnectorClient.py index e383217d8b6971aea6127d39b8de348fb84b35da..a2224dd3278bc41251eb4fa13e811c80f5a747ae 100644 --- a/src/dlt/connector/client/DltConnectorClient.py +++ b/src/dlt/connector/client/DltConnectorClient.py @@ -12,7 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +# DltConnectorClient.py + +import grpc +import logging +import asyncio + from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty, TopologyId @@ -35,77 +40,90 @@ class DltConnectorClient: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None - self.connect() - LOGGER.debug('Channel created') + #self.connect() + #LOGGER.debug('Channel created') - def connect(self): - self.channel = grpc.insecure_channel(self.endpoint) + async def connect(self): + self.channel = grpc.aio.insecure_channel(self.endpoint) self.stub = DltConnectorServiceStub(self.channel) + LOGGER.debug('Channel created') - def close(self): - if self.channel is not None: self.channel.close() + async def close(self): + if self.channel is not None: + await self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR - def RecordAll(self, request : TopologyId) -> Empty: + async def RecordAll(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAll(request) + response = await self.stub.RecordAll(request) LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordAllDevices(self, request : TopologyId) -> Empty: + async def RecordAllDevices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAllDevices(request) + response = await self.stub.RecordAllDevices(request) LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordDevice(self, request : DltDeviceId) -> Empty: - LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordDevice(request) + # async def RecordDevice(self, request: DltDeviceId) -> Empty: + # LOGGER.debug('RECORD_DEVICE request received: {:s}'.format(grpc_message_to_json_string(request))) + + # Simulate some asynchronous processing delay + # await asyncio.sleep(2) # Simulates processing time + + # Create a dummy response (Empty message) + # response = Empty() + + # LOGGER.debug('RECORD_DEVICE processing complete for request: {:s}'.format(grpc_message_to_json_string(request))) + # return response + async def RecordDevice(self, request: DltDeviceId) -> Empty: + LOGGER.debug('RECORD_DEVICE request: {:s}'.format(grpc_message_to_json_string(request))) + response = await self.stub.RecordDevice(request) LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordAllLinks(self, request : TopologyId) -> Empty: + async def RecordAllLinks(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAllLinks(request) + response = await self.stub.RecordAllLinks(request) LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordLink(self, request : DltLinkId) -> Empty: + async def RecordLink(self, request: DltLinkId) -> Empty: LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordLink(request) + response = await self.stub.RecordLink(request) LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordAllServices(self, request : TopologyId) -> Empty: + async def RecordAllServices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAllServices(request) + response = await self.stub.RecordAllServices(request) LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordService(self, request : DltServiceId) -> Empty: + async def RecordService(self, request: DltServiceId) -> Empty: LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordService(request) + response = await self.stub.RecordService(request) LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordAllSlices(self, request : TopologyId) -> Empty: + async def RecordAllSlices(self, request: TopologyId) -> Empty: LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAllSlices(request) + response = await self.stub.RecordAllSlices(request) LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordSlice(self, request : DltSliceId) -> Empty: + async def RecordSlice(self, request: DltSliceId) -> Empty: LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordSlice(request) + response = await self.stub.RecordSlice(request) LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/dlt/connector/client/DltConnectorClientSync.py b/src/dlt/connector/client/DltConnectorClientSync.py new file mode 100644 index 0000000000000000000000000000000000000000..a633e89bd3622c81cafd7293e77444ce9f707f1a --- /dev/null +++ b/src/dlt/connector/client/DltConnectorClientSync.py @@ -0,0 +1,111 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc +from common.proto.context_pb2 import Empty, TopologyId +from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId +from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceStub +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class DltConnectorClientSync: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.DLT) + if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = DltConnectorServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def RecordAll(self, request : TopologyId) -> Empty: + LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAll(request) + LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordAllDevices(self, request : TopologyId) -> Empty: + LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAllDevices(request) + LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordDevice(self, request : DltDeviceId) -> Empty: + LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordDevice(request) + LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordAllLinks(self, request : TopologyId) -> Empty: + LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAllLinks(request) + LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordLink(self, request : DltLinkId) -> Empty: + LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordLink(request) + LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordAllServices(self, request : TopologyId) -> Empty: + LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAllServices(request) + LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordService(self, request : DltServiceId) -> Empty: + LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordService(request) + LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordAllSlices(self, request : TopologyId) -> Empty: + LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAllSlices(request) + LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordSlice(self, request : DltSliceId) -> Empty: + LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordSlice(request) + LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/dlt/connector/client/DltEventsCollector.py b/src/dlt/connector/client/DltEventsCollector.py index e59784a4d2902459d7bc88925e5b83a698770012..2e38d0445ed872d887c3c50e7ee9ec77548ffbfd 100644 --- a/src/dlt/connector/client/DltEventsCollector.py +++ b/src/dlt/connector/client/DltEventsCollector.py @@ -16,7 +16,7 @@ from typing import Callable, Optional import grpc, logging, queue, threading, time from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription from common.tools.grpc.Tools import grpc_message_to_json_string -from dlt.connector.client.DltGatewayClient import DltGatewayClient +from dlt.connector.client.DltGatewayClientEvent import DltGatewayClientEvent LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -31,7 +31,7 @@ LOGGER.setLevel(logging.DEBUG) class DltEventsCollector(threading.Thread): def __init__( - self, dltgateway_client : DltGatewayClient, + self, dltgateway_client : DltGatewayClientEvent, log_events_received : bool = False, event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, ) -> None: diff --git a/src/dlt/connector/client/DltGatewayClient.py b/src/dlt/connector/client/DltGatewayClient.py index cde278517f7a4f0db267a20d050c71a02de4ae23..2690dfb661fda4a76be8cc1d9318f1af183e2f64 100644 --- a/src/dlt/connector/client/DltGatewayClient.py +++ b/src/dlt/connector/client/DltGatewayClient.py @@ -12,8 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterator -import grpc, logging + + +import grpc +import logging +import asyncio +from typing import Iterator, List from common.proto.context_pb2 import Empty, TeraFlowController from common.proto.dlt_gateway_pb2 import ( DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription) @@ -36,29 +40,50 @@ class DltGatewayClient: LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None - self.connect() - LOGGER.debug('Channel created') + #self.connect() + self.message_queue: List[DltRecord] = [] + #LOGGER.debug('Channel created') + - def connect(self): - self.channel = grpc.insecure_channel(self.endpoint) + async def connect(self): + self.channel = grpc.aio.insecure_channel(self.endpoint) self.stub = DltGatewayServiceStub(self.channel) + LOGGER.debug('Channel created') - def close(self): - if self.channel is not None: self.channel.close() + async def close(self): + if self.channel is not None: + await self.channel.close() self.channel = None self.stub = None + # async def dummy_process(self, request: DltRecord): + # # Simulate processing delay + # await asyncio.sleep(2) + # return DltRecordStatus(status="DLTRECORDSTATUS_SUCCEEDED", record_id=request.record_id) + + @RETRY_DECORATOR - def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: + async def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordToDlt(request) + response = await self.stub.RecordToDlt(request) LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response + + # @RETRY_DECORATOR + # async def RecordToDlt(self, request: DltRecord) -> DltRecordStatus: + # self.message_queue.append(request) + # LOGGER.debug(f'RecordToDlt request: {grpc_message_to_json_string(request)}') + # LOGGER.debug(f'Queue length before processing: {len(self.message_queue)}') + # response = await self.dummy_process(request) + # LOGGER.debug(f'RecordToDlt result: {grpc_message_to_json_string(response)}') + # self.message_queue.remove(request) + # LOGGER.debug(f'Queue length after processing: {len(self.message_queue)}') + # return response @RETRY_DECORATOR - def GetFromDlt(self, request : DltRecordId) -> DltRecord: + async def GetFromDlt(self, request : DltRecordId) -> DltRecord: LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetFromDlt(request) + response = await self.stub.GetFromDlt(request) LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @@ -70,15 +95,15 @@ class DltGatewayClient: return response @RETRY_DECORATOR - def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: + async def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetDltStatus(request) + response = await self.stub.GetDltStatus(request) LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def GetDltPeers(self, request : Empty) -> DltPeerStatusList: + async def GetDltPeers(self, request : Empty) -> DltPeerStatusList: LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetDltPeers(request) + response = await self.stub.GetDltPeers(request) LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/dlt/connector/client/DltGatewayClientEvent.py b/src/dlt/connector/client/DltGatewayClientEvent.py new file mode 100644 index 0000000000000000000000000000000000000000..6cbaf1a273de70c909b8c56e6cf60e5707c97dbe --- /dev/null +++ b/src/dlt/connector/client/DltGatewayClientEvent.py @@ -0,0 +1,57 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import grpc +import logging +from typing import Iterator +from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription +from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string +from dlt.connector.Config import DLT_GATEWAY_HOST, DLT_GATEWAY_PORT + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class DltGatewayClientEvent: + def __init__(self, host=None, port=None): + if not host: host = DLT_GATEWAY_HOST + if not port: port = DLT_GATEWAY_PORT + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = DltGatewayServiceStub(self.channel) + + def close(self): + if self.channel is not None: + self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def SubscribeToDlt(self, request: DltRecordSubscription) -> Iterator[DltRecordEvent]: + LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SubscribeToDlt(request) + LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/dlt/connector/service/DltConnectorService.py b/src/dlt/connector/service/DltConnectorService.py index 7e99cb8f85e519ec875a1decc7dc0ad1e030a6f4..601d3e70d298d8ae2a098339e71fc49d66fb79ad 100644 --- a/src/dlt/connector/service/DltConnectorService.py +++ b/src/dlt/connector/service/DltConnectorService.py @@ -14,15 +14,16 @@ from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc -from common.tools.service.GenericGrpcService import GenericGrpcService +from common.tools.service.GenericGrpcServiceAsync import GenericGrpcServiceAsync from common.proto.dlt_connector_pb2_grpc import add_DltConnectorServiceServicer_to_server from .DltConnectorServiceServicerImpl import DltConnectorServiceServicerImpl -class DltConnectorService(GenericGrpcService): +class DltConnectorService(GenericGrpcServiceAsync): def __init__(self, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.DLT) super().__init__(port, cls_name=cls_name) self.dltconnector_servicer = DltConnectorServiceServicerImpl() - def install_servicers(self): - add_DltConnectorServiceServicer_to_server(self.dltconnector_servicer, self.server) + async def install_servicers(self): + await self.dltconnector_servicer.initialize() + add_DltConnectorServiceServicer_to_server(self.dltconnector_servicer, self.server) \ No newline at end of file diff --git a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py index c05d46b48ed214191a6195b948986810b9c03d20..46c58e0635b0f46d2f87a46134990f91f85f051f 100644 --- a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py +++ b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import grpc +import asyncio +from grpc.aio import ServicerContext +import logging from typing import Optional -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method_async from common.proto.context_pb2 import Empty, TopologyId from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer @@ -32,94 +35,109 @@ METRICS_POOL = MetricsPool('DltConnector', 'RPC') class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): def __init__(self): LOGGER.debug('Creating Servicer...') + self.dltgateway_client = DltGatewayClient() LOGGER.debug('Servicer Created') - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + async def initialize(self): + await self.dltgateway_client.connect() + + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + async def RecordAll(self, request: TopologyId, context: ServicerContext) -> Empty: return Empty() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + async def RecordAllDevices(self, request: TopologyId, context: ServicerContext) -> Empty: return Empty() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty: + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + # async def RecordDevice(self, request: DltDeviceId, context: ServicerContext) -> Empty: + # LOGGER.debug('Received RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) + # try: + # if not request.delete: + # LOGGER.debug('Processing RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) + # # Perform any dummy operation or simply log the request + # LOGGER.debug('Processed RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) + # except Exception as e: + # LOGGER.error(f"Error processing RecordDevice: {e}") + # return Empty() + async def RecordDevice(self, request: DltDeviceId, context: ServicerContext) -> Empty: data_json = None - if not request.delete: + LOGGER.debug('RECORD_DEVICE = {:s}'.format(grpc_message_to_json_string(request))) + if not request.delete: context_client = ContextClient() device = context_client.GetDevice(request.device_id) data_json = grpc_message_to_json_string(device) - self._record_entity( + await self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_DEVICE, request.device_id.device_uuid.uuid, request.delete, data_json) return Empty() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAllLinks(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + async def RecordAllLinks(self, request: TopologyId, context: ServicerContext) -> Empty: return Empty() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty: + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + async def RecordLink(self, request: DltLinkId, context: ServicerContext) -> Empty: data_json = None + LOGGER.debug('RECORD_LINK = {:s}'.format(grpc_message_to_json_string(request))) + if not request.delete: context_client = ContextClient() link = context_client.GetLink(request.link_id) data_json = grpc_message_to_json_string(link) - self._record_entity( + await self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_LINK, request.link_id.link_uuid.uuid, request.delete, data_json) return Empty() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + async def RecordAllServices(self, request: TopologyId, context: ServicerContext) -> Empty: return Empty() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty: + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + async def RecordService(self, request: DltServiceId, context: ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() service = context_client.GetService(request.service_id) data_json = grpc_message_to_json_string(service) - self._record_entity( + await self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SERVICE, request.service_id.service_uuid.uuid, request.delete, data_json) return Empty() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + async def RecordAllSlices(self, request: TopologyId, context: ServicerContext) -> Empty: return Empty() - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty: + @safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER) + async def RecordSlice(self, request: DltSliceId, context: ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() slice_ = context_client.GetSlice(request.slice_id) data_json = grpc_message_to_json_string(slice_) - self._record_entity( + await self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SLICE, request.slice_id.slice_uuid.uuid, request.delete, data_json) return Empty() - def _record_entity( - self, dlt_domain_uuid : str, dlt_record_type : DltRecordTypeEnum, dlt_record_uuid : str, delete : bool, - data_json : Optional[str] = None + async def _record_entity( + self, dlt_domain_uuid: str, dlt_record_type: DltRecordTypeEnum, dlt_record_uuid: str, delete: bool, + data_json: Optional[str] = None ) -> None: - dltgateway_client = DltGatewayClient() - dlt_record_id = DltRecordId() - dlt_record_id.domain_uuid.uuid = dlt_domain_uuid # pylint: disable=no-member - dlt_record_id.type = dlt_record_type - dlt_record_id.record_uuid.uuid = dlt_record_uuid # pylint: disable=no-member + dlt_record_id.domain_uuid.uuid = dlt_domain_uuid # pylint: disable=no-member + dlt_record_id.type = dlt_record_type + dlt_record_id.record_uuid.uuid = dlt_record_uuid # pylint: disable=no-member str_dlt_record_id = grpc_message_to_json_string(dlt_record_id) LOGGER.debug('[_record_entity] sent dlt_record_id = {:s}'.format(str_dlt_record_id)) - dlt_record = dltgateway_client.GetFromDlt(dlt_record_id) + dlt_record = await self.dltgateway_client.GetFromDlt(dlt_record_id) str_dlt_record = grpc_message_to_json_string(dlt_record) LOGGER.debug('[_record_entity] recv dlt_record = {:s}'.format(str_dlt_record)) @@ -127,22 +145,24 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): LOGGER.debug('[_record_entity] exists = {:s}'.format(str(exists))) dlt_record = DltRecord() - dlt_record.record_id.CopyFrom(dlt_record_id) # pylint: disable=no-member + dlt_record.record_id.CopyFrom(dlt_record_id) # pylint: disable=no-member if delete and exists: dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_DELETE elif not delete and exists: dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE - if data_json is None: raise Exception('data_json must be provided when updating') + if data_json is None: + raise Exception('data_json must be provided when updating') dlt_record.data_json = data_json elif not delete and not exists: dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_ADD - if data_json is None: raise Exception('data_json must be provided when adding') + if data_json is None: + raise Exception('data_json must be provided when adding') dlt_record.data_json = data_json else: return str_dlt_record = grpc_message_to_json_string(dlt_record) LOGGER.debug('[_record_entity] sent dlt_record = {:s}'.format(str_dlt_record)) - dlt_record_status = dltgateway_client.RecordToDlt(dlt_record) + dlt_record_status = await self.dltgateway_client.RecordToDlt(dlt_record) str_dlt_record_status = grpc_message_to_json_string(dlt_record_status) LOGGER.debug('[_record_entity] recv dlt_record_status = {:s}'.format(str_dlt_record_status)) diff --git a/src/dlt/connector/service/__main__.py b/src/dlt/connector/service/__main__.py index b13f4257b77f6b4596b0397e62d7a736f09c4ce8..09f525ed4fa850581fcdef62542b930b34b8bd62 100644 --- a/src/dlt/connector/service/__main__.py +++ b/src/dlt/connector/service/__main__.py @@ -12,7 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, signal, sys, threading + +import logging +import signal +import sys +import threading +import asyncio from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( @@ -22,25 +27,25 @@ from .event_dispatcher.DltEventDispatcher import DltEventDispatcher from .DltConnectorService import DltConnectorService terminate = threading.Event() -LOGGER : logging.Logger = None +LOGGER: logging.Logger = None -def signal_handler(signal, frame): # pylint: disable=redefined-outer-name +def signal_handler(signal, frame): LOGGER.warning('Terminate signal received') terminate.set() -def main(): - global LOGGER # pylint: disable=global-statement +async def main(): + global LOGGER log_level = get_log_level() logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) - signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) LOGGER.info('Starting...') @@ -53,17 +58,16 @@ def main(): event_dispatcher = DltEventDispatcher() event_dispatcher.start() - # Context Event dispatcher - # Starting DLT connector service grpc_service = DltConnectorService() - grpc_service.start() + await grpc_service.start() # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=1.0): pass + while not terminate.wait(timeout=1.0): + await asyncio.sleep(1.0) LOGGER.info('Terminating...') - grpc_service.stop() + await grpc_service.stop() event_dispatcher.stop() event_dispatcher.join() @@ -71,4 +75,4 @@ def main(): return 0 if __name__ == '__main__': - sys.exit(main()) + asyncio.run(main()) diff --git a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py index 779bae9c19f583ec47584673da51b536cc1dc8b3..29e8c096d5f2b842cae866e907483416e343445e 100644 --- a/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py +++ b/src/dlt/connector/service/event_dispatcher/DltEventDispatcher.py @@ -26,9 +26,9 @@ from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient -from dlt.connector.client.DltConnectorClient import DltConnectorClient +from dlt.connector.client.DltConnectorClientSync import DltConnectorClientSync from dlt.connector.client.DltEventsCollector import DltEventsCollector -from dlt.connector.client.DltGatewayClient import DltGatewayClient +from dlt.connector.client.DltGatewayClientEvent import DltGatewayClientEvent from interdomain.client.InterdomainClient import InterdomainClient LOGGER = logging.getLogger(__name__) @@ -40,8 +40,8 @@ ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) class Clients: def __init__(self) -> None: self.context_client = ContextClient() - self.dlt_connector_client = DltConnectorClient() - self.dlt_gateway_client = DltGatewayClient() + self.dlt_connector_client = DltConnectorClientSync() + self.dlt_gateway_client = DltGatewayClientEvent() self.interdomain_client = InterdomainClient() def close(self) -> None: diff --git a/src/interdomain/service/topology_abstractor/DltRecordSender.py b/src/interdomain/service/topology_abstractor/DltRecordSender.py index cfa51c928d512d432eba7223ae7bae2d9085ad0c..d6d5ef0f6ada706484dd9f627caf21863bea9a15 100644 --- a/src/interdomain/service/topology_abstractor/DltRecordSender.py +++ b/src/interdomain/service/topology_abstractor/DltRecordSender.py @@ -12,91 +12,108 @@ # See the License for the specific language governing permissions and # limitations under the License. +# DltRecordSender.py + import logging +import asyncio + + from typing import Dict, List, Optional, Tuple from common.proto.context_pb2 import Device, Link, Service, Slice, TopologyId from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient from dlt.connector.client.DltConnectorClient import DltConnectorClient -from .Types import DltRecordTypes LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) class DltRecordSender: - def __init__(self, context_client : ContextClient, dlt_connector_client : Optional[DltConnectorClient]) -> None: + def __init__(self, context_client: ContextClient) -> None: self.context_client = context_client - self.dlt_connector_client = dlt_connector_client - self.dlt_record_uuids : List[str] = list() - self.dlt_record_uuid_to_data : Dict[str, Tuple[TopologyId, DltRecordTypes]] = dict() + LOGGER.debug('Creating Servicer...') + self.dlt_connector_client = DltConnectorClient() + LOGGER.debug('Servicer Created') + self.dlt_record_uuids: List[str] = list() + self.dlt_record_uuid_to_data: Dict[str, Tuple[TopologyId, object]] = dict() + + async def initialize(self): + await self.dlt_connector_client.connect() - def _add_record(self, record_uuid : str, data : Tuple[TopologyId, DltRecordTypes]) -> None: + def _add_record(self, record_uuid: str, data: Tuple[TopologyId, object]) -> None: if record_uuid in self.dlt_record_uuid_to_data: return self.dlt_record_uuid_to_data[record_uuid] = data self.dlt_record_uuids.append(record_uuid) - def add_device(self, topology_id : TopologyId, device : Device) -> None: + def add_device(self, topology_id: TopologyId, device: Device) -> None: topology_uuid = topology_id.topology_uuid.uuid device_uuid = device.device_id.device_uuid.uuid - record_uuid = '{:s}:device:{:s}'.format(topology_uuid, device_uuid) + record_uuid = f'{topology_uuid}:device:{device_uuid}' self._add_record(record_uuid, (topology_id, device)) - def add_link(self, topology_id : TopologyId, link : Link) -> None: + def add_link(self, topology_id: TopologyId, link: Link) -> None: topology_uuid = topology_id.topology_uuid.uuid link_uuid = link.link_id.link_uuid.uuid - record_uuid = '{:s}:link:{:s}'.format(topology_uuid, link_uuid) + record_uuid = f'{topology_uuid}:link:{link_uuid}' self._add_record(record_uuid, (topology_id, link)) - def add_service(self, topology_id : TopologyId, service : Service) -> None: + def add_service(self, topology_id: TopologyId, service: Service) -> None: topology_uuid = topology_id.topology_uuid.uuid context_uuid = service.service_id.context_id.context_uuid.uuid service_uuid = service.service_id.service_uuid.uuid - record_uuid = '{:s}:service:{:s}/{:s}'.format(topology_uuid, context_uuid, service_uuid) + record_uuid = f'{topology_uuid}:service:{context_uuid}/{service_uuid}' self._add_record(record_uuid, (topology_id, service)) - def add_slice(self, topology_id : TopologyId, slice_ : Slice) -> None: + def add_slice(self, topology_id: TopologyId, slice_: Slice) -> None: topology_uuid = topology_id.topology_uuid.uuid context_uuid = slice_.slice_id.context_id.context_uuid.uuid slice_uuid = slice_.slice_id.slice_uuid.uuid - record_uuid = '{:s}:slice:{:s}/{:s}'.format(topology_uuid, context_uuid, slice_uuid) + record_uuid = f'{topology_uuid}:slice:{context_uuid}/{slice_uuid}' self._add_record(record_uuid, (topology_id, slice_)) - def commit(self) -> None: + async def commit(self) -> None: + if not self.dlt_connector_client: + LOGGER.error('DLT Connector Client is None, cannot commit records.') + return + tasks = [] # List to hold all the async tasks + for dlt_record_uuid in self.dlt_record_uuids: - topology_id,dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid] + topology_id, dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid] if isinstance(dlt_record, Device): - #device_id = self.context_client.SetDevice(dlt_record) # This causes events to be triggered infinitely. device_id = dlt_record.device_id - #LOGGER.debug('DEVICE_ID: ({:s})'.format(grpc_message_to_json_string(device_id))) if self.dlt_connector_client is None: continue dlt_device_id = DltDeviceId() - dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member - dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member - self.dlt_connector_client.RecordDevice(dlt_device_id) + dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member + tasks.append(self.dlt_connector_client.RecordDevice(dlt_device_id)) +# await self.dlt_connector_client.RecordDevice(dlt_device_id) elif isinstance(dlt_record, Link): - #link_id = self.context_client.SetLink(dlt_record) link_id = dlt_record.link_id if self.dlt_connector_client is None: continue dlt_link_id = DltLinkId() - dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member - dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member - self.dlt_connector_client.RecordLink(dlt_link_id) + dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member + tasks.append(self.dlt_connector_client.RecordLink(dlt_link_id)) + #await self.dlt_connector_client.RecordLink(dlt_link_id) elif isinstance(dlt_record, Service): - #service_id = self.context_client.SetService(dlt_record) service_id = dlt_record.service_id if self.dlt_connector_client is None: continue dlt_service_id = DltServiceId() - dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member - dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member - self.dlt_connector_client.RecordService(dlt_service_id) + dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member + tasks.append(self.dlt_connector_client.RecordService(dlt_service_id)) + #await self.dlt_connector_client.RecordService(dlt_service_id) elif isinstance(dlt_record, Slice): - #slice_id = self.context_client.SetSlice(dlt_record) slice_id = dlt_record.slice_id if self.dlt_connector_client is None: continue dlt_slice_id = DltSliceId() - dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member - dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member - self.dlt_connector_client.RecordSlice(dlt_slice_id) + dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member + dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member + tasks.append(self.dlt_connector_client.RecordSlice(dlt_slice_id)) + #await self.dlt_connector_client.RecordSlice(dlt_slice_id) else: - LOGGER.error('Unsupported Record({:s})'.format(str(dlt_record))) + LOGGER.error(f'Unsupported Record({str(dlt_record)})') + + if tasks: + await asyncio.gather(*tasks) # Run all the tasks concurrently + diff --git a/src/interdomain/service/topology_abstractor/DltRecorder.py b/src/interdomain/service/topology_abstractor/DltRecorder.py index 0e94159c4c9f2103941c6bdad4cdfd18e7cc2419..c5660e43d55bf2fe52a859c75acd5b09d6bdaa0f 100644 --- a/src/interdomain/service/topology_abstractor/DltRecorder.py +++ b/src/interdomain/service/topology_abstractor/DltRecorder.py @@ -1,5 +1,20 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging import threading +import asyncio from typing import Dict, Optional from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME, ServiceNameEnum @@ -14,7 +29,6 @@ from common.tools.object_factory.Device import json_device_id from common.tools.object_factory.Topology import json_topology_id from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector -from dlt.connector.client.DltConnectorClient import DltConnectorClient from .DltRecordSender import DltRecordSender from .Types import EventTypes @@ -37,43 +51,54 @@ class DLTRecorder(threading.Thread): self.terminate.set() def run(self) -> None: + asyncio.run(self._run()) + + async def _run(self) -> None: self.context_client.connect() create_context(self.context_client, DEFAULT_CONTEXT_NAME) - self.create_topologies() + #self.create_topologies() self.context_event_collector.start() + + tasks = [] + batch_timeout = 1 # Time in seconds to wait before processing whatever tasks are available while not self.terminate.is_set(): event = self.context_event_collector.get_event(timeout=0.1) if event is None: continue LOGGER.info('Processing Event({:s})...'.format(grpc_message_to_json_string(event))) - self.update_record(event) + task = asyncio.create_task(self.update_record(event)) + tasks.append(task) + LOGGER.debug('Task for event scheduled.') + # Limit the number of concurrent tasks + # If we have enough tasks or it's time to process them + if len(tasks) >= 10 or (tasks and len(tasks) > 0 and await asyncio.sleep(batch_timeout)): + try: + await asyncio.gather(*tasks) + except Exception as e: + LOGGER.error(f"Error while processing tasks: {e}") + finally: + tasks = [] # Clear the list after processing + await asyncio.gather(*tasks) + tasks = [] # Clear the list after processing + # Process any remaining tasks when stopping + if tasks: + try: + await asyncio.gather(*tasks) + except Exception as e: + LOGGER.error(f"Error while processing remaining tasks: {e}") self.context_event_collector.stop() self.context_client.close() - def create_topologies(self): - topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] - create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) - - def get_dlt_connector_client(self) -> Optional[DltConnectorClient]: - # Always enable DLT for testing - dlt_connector_client = DltConnectorClient() - dlt_connector_client.connect() - return dlt_connector_client - # env_vars = find_environment_variables([ - # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_HOST), - # get_env_var_name(ServiceNameEnum.DLT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - # ]) - # if len(env_vars) == 2: - # dlt_connector_client = DltConnectorClient() - # dlt_connector_client.connect() - # return dlt_connector_client - # return None - - def update_record(self, event: EventTypes) -> None: - dlt_connector_client = self.get_dlt_connector_client() - dlt_record_sender = DltRecordSender(self.context_client, dlt_connector_client) + #def create_topologies(self): + #topology_uuids = [DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME] + #create_missing_topologies(self.context_client, ADMIN_CONTEXT_ID, topology_uuids) + + async def update_record(self, event: EventTypes) -> None: + dlt_record_sender = DltRecordSender(self.context_client) + await dlt_record_sender.initialize() # Ensure DltRecordSender is initialized asynchronously + LOGGER.debug('STARTING processing event: {:s}'.format(grpc_message_to_json_string(event))) if isinstance(event, ContextEvent): LOGGER.debug('Processing ContextEvent({:s})'.format(grpc_message_to_json_string(event))) @@ -84,7 +109,7 @@ class DLTRecorder(threading.Thread): self.process_topology_event(event, dlt_record_sender) elif isinstance(event, DeviceEvent): - LOGGER.debug('Processing DeviceEvent({:s})'.format(grpc_message_to_json_string(event))) + LOGGER.debug('Processing DeviceEvent ASYNC({:s})'.format(grpc_message_to_json_string(event))) self.process_device_event(event, dlt_record_sender) elif isinstance(event, LinkEvent): @@ -94,9 +119,10 @@ class DLTRecorder(threading.Thread): else: LOGGER.warning('Unsupported Event({:s})'.format(grpc_message_to_json_string(event))) - dlt_record_sender.commit() - if dlt_connector_client is not None: - dlt_connector_client.close() + await dlt_record_sender.commit() + #await asyncio.sleep(2) # Simulates processing time + LOGGER.debug('Finished processing event: {:s}'.format(grpc_message_to_json_string(event))) + def process_topology_event(self, event: TopologyEvent, dlt_record_sender: DltRecordSender) -> None: topology_id = event.topology_id @@ -117,6 +143,7 @@ class DLTRecorder(threading.Thread): if ((context_uuid == DEFAULT_CONTEXT_NAME) or (context_name == DEFAULT_CONTEXT_NAME)) and \ (topology_uuid not in topology_uuids) and (topology_name not in topology_uuids): + LOGGER.debug('DEVICES({:s})'.format(grpc_message_to_json_string(topology_details.devices))) for device in topology_details.devices: LOGGER.debug('DEVICE_INFO_TOPO({:s})'.format(grpc_message_to_json_string(device))) dlt_record_sender.add_device(topology_id, device) @@ -164,3 +191,4 @@ class DLTRecorder(threading.Thread): dlt_record_sender.add_link(topology_id, link) else: LOGGER.warning(f"Topology not found for link {link_id.link_uuid.uuid}") +