diff --git a/src/dlt/connector/Config.py b/src/dlt/connector/Config.py index 70a33251242c51f49140e596b8208a19dd5245f7..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/dlt/connector/Config.py +++ b/src/dlt/connector/Config.py @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/src/dlt/connector/client/DltClient.py b/src/dlt/connector/client/DltClient.py deleted file mode 100644 index 79d38d12d345357561dd8ea3a64de726c5a9c77e..0000000000000000000000000000000000000000 --- a/src/dlt/connector/client/DltClient.py +++ /dev/null @@ -1,171 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import grpc, logging -from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import retry, delay_exponential -#from common.tools.grpc.Tools import grpc_message_to_json_string -#from slice.proto.context_pb2 import Empty, Slice, SliceId -from dlt.connector.proto.dlt_pb2_grpc import DltServiceStub - -LOGGER = logging.getLogger(__name__) -MAX_RETRIES = 15 -DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) -RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') - -class DltClient: - def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.DLT) - if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) - self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) - LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) - self.channel = None - self.stub = None - self.connect() - LOGGER.debug('Channel created') - - def connect(self): - self.channel = grpc.insecure_channel(self.endpoint) - self.stub = DltServiceStub(self.channel) - - def close(self): - if self.channel is not None: self.channel.close() - self.channel = None - self.stub = None - -# @RETRY_DECORATOR -# def CreateSlice(self, request : Slice) -> SliceId: -# LOGGER.debug('CreateSlice request: {:s}'.format(grpc_message_to_json_string(request))) -# response = self.stub.CreateSlice(request) -# LOGGER.debug('CreateSlice result: {:s}'.format(grpc_message_to_json_string(response))) -# return response - -# @RETRY_DECORATOR -# def UpdateSlice(self, request : Slice) -> SliceId: -# LOGGER.debug('UpdateSlice request: {:s}'.format(grpc_message_to_json_string(request))) -# response = self.stub.UpdateSlice(request) -# LOGGER.debug('UpdateSlice result: {:s}'.format(grpc_message_to_json_string(response))) -# return response - -# @RETRY_DECORATOR -# def DeleteSlice(self, request : SliceId) -> Empty: -# LOGGER.debug('DeleteSlice request: {:s}'.format(grpc_message_to_json_string(request))) -# response = self.stub.DeleteSlice(request) -# LOGGER.debug('DeleteSlice result: {:s}'.format(grpc_message_to_json_string(response))) -# return response - - - - - - - -// Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package dlt; - -import "context.proto"; - -service DltService { - rpc RecordToDlt (DltRecord ) returns ( DltRecordStatus ) {} - rpc GetFromDlt (DltRecordId ) returns ( DltRecord ) {} - rpc SubscribeToDlt(DltRecordSubscription ) returns (stream DltRecordEvent ) {} - rpc GetDltStatus (context.TeraFlowController) returns ( DltPeerStatus ) {} // NEC is checkig if it is possible - rpc GetDltPeers (context.Empty ) returns ( DltPeerStatusList) {} // NEC is checkig if it is possible -} - -enum DltRecordTypeEnum { - DLTRECORDTYPE_UNDEFINED = 0; - DLTRECORDTYPE_CONTEXT = 1; - DLTRECORDTYPE_TOPOLOGY = 2; - DLTRECORDTYPE_DEVICE = 3; - DLTRECORDTYPE_LINK = 4; - DLTRECORDTYPE_SERVICE = 5; - DLTRECORDTYPE_SLICE = 6; -} - -enum DltRecordOperationEnum { - DLTRECORDOPERATION_UNDEFINED = 0; - DLTRECORDOPERATION_ADD = 1; - DLTRECORDOPERATION_UPDATE = 2; - DLTRECORDOPERATION_DELETE = 3; -} - -enum DltRecordStatusEnum { - DLTRECORDSTATUS_UNDEFINED = 0; - DLTRECORDSTATUS_SUCCEEDED = 1; - DLTRECORDSTATUS_FAILED = 2; -} - -enum DltStatusEnum { - DLTSTATUS_UNDEFINED = 0; - DLTSTATUS_NOTAVAILABLE = 1; - DLTSTATUS_INITIALIZED = 2; - DLTSTATUS_AVAILABLE = 3; - DLTSTATUS_DEINIT = 4; -} - -message DltRecordId { - context.Uuid domain_uuid = 1; // unique identifier of domain owning the record - DltRecordTypeEnum type = 2; // type of record - context.Uuid record_uuid = 3; // unique identifier of the record within the domain context_uuid/topology_uuid -} - -message DltRecord { - DltRecordId record_id = 1; // record identifier - DltRecordOperationEnum operation = 2; // operation to be performed over the record - string data_json = 3; // record content: JSON-encoded record content -} - -message DltRecordSubscription { - // retrieved events have to match ALL conditions. - // i.e., type in types requested, AND operation in operations requested - // TODO: consider adding a more sophisticated filtering - repeated DltRecordTypeEnum type = 1; // selected event types, empty=all - repeated DltRecordOperationEnum operation = 2; // selected event operations, empty=all -} - -message DltRecordEvent { - context.Event event = 1; // common event data (timestamp & event_type) - DltRecordId record_id = 2; // record identifier associated with this event -} - -message DltRecordStatus { - DltRecordId record_id = 1; // identifier of the associated record - DltRecordStatusEnum status = 2; // status of the record - string error_message = 3; // error message in case of failure, empty otherwise -} - -message DltPeerStatus { - context.TeraFlowController controller = 1; // Identifier of the TeraFlow controller instance - DltStatusEnum status = 2; // Status of the TeraFlow controller instance -} - -message DltPeerStatusList { - repeated DltPeerStatus peers = 1; // List of peers and their status -} diff --git a/src/dlt/connector/client/DltConnectorClient.py b/src/dlt/connector/client/DltConnectorClient.py new file mode 100644 index 0000000000000000000000000000000000000000..f48562996b067ca81a99b6ceb7288029be7ba1c8 --- /dev/null +++ b/src/dlt/connector/client/DltConnectorClient.py @@ -0,0 +1,95 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc +from common.proto.context_pb2 import DeviceId, Empty, ServiceId, SliceId +from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceStub +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class DltConnectorClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.DLT) + if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = DltConnectorServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def RecordAll(self, request : Empty) -> Empty: + LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAll(request) + LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordAllDevices(self, request : Empty) -> Empty: + LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAllDevices(request) + LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordDevice(self, request : DeviceId) -> Empty: + LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordDevice(request) + LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordAllServices(self, request : Empty) -> Empty: + LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAllServices(request) + LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordService(self, request : ServiceId) -> Empty: + LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordService(request) + LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordAllSlices(self, request : Empty) -> Empty: + LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordAllSlices(request) + LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RecordSlice(self, request : SliceId) -> Empty: + LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordSlice(request) + LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/dlt/connector/client/DltGatewayClient.py b/src/dlt/connector/client/DltGatewayClient.py new file mode 100644 index 0000000000000000000000000000000000000000..f1f8dec391bb836cea33422176730d250090429d --- /dev/null +++ b/src/dlt/connector/client/DltGatewayClient.py @@ -0,0 +1,84 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Iterator +import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc +from common.proto.context_pb2 import Empty, TeraFlowController +from common.proto.dlt_gateway_pb2 import ( + DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription) +from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class DltGatewayClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.DLT) + if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = DltGatewayServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: + LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RecordToDlt(request) + LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetFromDlt(self, request : DltRecordId) -> DltRecord: + LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetFromDlt(request) + LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SubscribeToDlt(self, request : DltRecordSubscription) -> Iterator[DltRecordEvent]: + LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SubscribeToDlt(request) + LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: + LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetDltStatus(request) + LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetDltPeers(self, request : Empty) -> DltPeerStatusList: + LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetDltPeers(request) + LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/dlt/connector/client/__init__.py b/src/dlt/connector/client/__init__.py index 70a33251242c51f49140e596b8208a19dd5245f7..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/dlt/connector/client/__init__.py +++ b/src/dlt/connector/client/__init__.py @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/src/dlt/connector/service/DltConnector.py b/src/dlt/connector/service/DltConnector.py index b8f7e5311dcff4013c76ffe370076acd34e46521..0c42d66852e8eb895a07c761f7535a0d768a9e91 100644 --- a/src/dlt/connector/service/DltConnector.py +++ b/src/dlt/connector/service/DltConnector.py @@ -16,7 +16,7 @@ import logging, threading from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector -from dlt.connector.client.DltClient import DltClient +from dlt.connector.client.DltConnectorClient import DltConnectorClient LOGGER = logging.getLogger(__name__) @@ -33,7 +33,7 @@ class DltConnector: self._thread.start() def _run_events_collector(self) -> None: - dlt_client = DltClient() + dltconnector_client = DltConnectorClient() context_client = ContextClient() events_collector = EventsCollector(context_client) events_collector.start() @@ -44,7 +44,7 @@ class DltConnector: events_collector.stop() context_client.close() - dlt_client.close() + dltconnector_client.close() def stop(self): self._terminate.set() diff --git a/src/dlt/connector/service/DltConnectorService.py b/src/dlt/connector/service/DltConnectorService.py index de696e88c3bc9b72b733e3763e0fec79de199068..40237b628776f7053092b45d036072fbde35253c 100644 --- a/src/dlt/connector/service/DltConnectorService.py +++ b/src/dlt/connector/service/DltConnectorService.py @@ -15,14 +15,14 @@ from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from common.tools.service.GenericGrpcService import GenericGrpcService -from ..proto.dlt_pb2_grpc import add_SliceServiceServicer_to_server -from .DltConnectorServiceServicerImpl import SliceServiceServicerImpl +from common.proto.dlt_connector_pb2_grpc import add_DltConnectorServiceServicer_to_server +from .DltConnectorServiceServicerImpl import DltConnectorServiceServicerImpl class DltConnectorService(GenericGrpcService): def __init__(self, cls_name: str = __name__) -> None: - port = get_service_port_grpc(ServiceNameEnum.SLICE) + port = get_service_port_grpc(ServiceNameEnum.DLT) super().__init__(port, cls_name=cls_name) - self.slice_servicer = SliceServiceServicerImpl() + self.dltconnector_servicer = DltConnectorServiceServicerImpl() def install_servicers(self): - add_SliceServiceServicer_to_server(self.slice_servicer, self.server) + add_DltConnectorServiceServicer_to_server(self.dltconnector_servicer, self.server) diff --git a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py index a19c797eadebb2fa0f03e3c33c91e62781807ccb..860e46f3ab88b097f4aa8e06508b19518055e46f 100644 --- a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py +++ b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py @@ -14,20 +14,49 @@ import grpc, logging from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method -from context.proto.context_pb2 import Slice, SliceId -from ..proto.dlt_pb2_grpc import DltServiceServicer +from common.proto.context_pb2 import DeviceId, Empty, ServiceId, SliceId +from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'DltConnector' -METHOD_NAMES = ['CreateSlice', 'UpdateSlice', 'DeleteSlice'] +METHOD_NAMES = [ + 'RecordAll', + 'RecordAllDevices', 'RecordDevice', + 'RecordAllServices', 'RecordService', + 'RecordAllSlices', 'RecordSlice', +] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) -class DltConnectorServiceServicerImpl(DltServiceServicer): +class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): def __init__(self): LOGGER.debug('Creating Servicer...') LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS, LOGGER) - def CreateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: - return SliceId() + def RecordAll(self, request : Empty, context : grpc.ServicerContext) -> Empty: + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RecordAllDevices(self, request : Empty, context : grpc.ServicerContext) -> Empty: + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RecordDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RecordAllServices(self, request : Empty, context : grpc.ServicerContext) -> Empty: + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RecordService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty: + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RecordAllSlices(self, request : Empty, context : grpc.ServicerContext) -> Empty: + return Empty() + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RecordSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty: + return Empty() diff --git a/src/dlt/connector/service/__init__.py b/src/dlt/connector/service/__init__.py index 70a33251242c51f49140e596b8208a19dd5245f7..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/dlt/connector/service/__init__.py +++ b/src/dlt/connector/service/__init__.py @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -