diff --git a/src/common/Constants.py b/src/common/Constants.py index a536ef60047eb1f210f8d98d207134d377adcbed..1e4550fbf55ab3d97ac7b767adea777e45c8a427 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -30,8 +30,10 @@ DEFAULT_HTTP_BIND_ADDRESS = '0.0.0.0' DEFAULT_METRICS_PORT = 9192 # Default context and topology UUIDs -DEFAULT_CONTEXT_UUID = 'admin' -DEFAULT_TOPOLOGY_UUID = 'admin' +DEFAULT_CONTEXT_UUID = 'admin' +DEFAULT_TOPOLOGY_UUID = 'admin' # contains the detailed local topology +DOMAINS_TOPOLOGY_UUID = 'domains' # contains the abstracted domains (abstracted local + abstracted remotes) +AGGREGATED_TOPOLOGY_UUID = 'aggregated' # contains the aggregated view (detailed local + abstracted remotes) # Default service names class ServiceNameEnum(Enum): @@ -50,7 +52,7 @@ class ServiceNameEnum(Enum): WEBUI = 'webui' # Used for test and debugging only - DLT_GATEWAY = 'dlt-gateway' + DLT_GATEWAY = 'dltgateway' # Default gRPC service ports DEFAULT_SERVICE_GRPC_PORTS = { diff --git a/src/dlt/connector/Config.py b/src/dlt/connector/Config.py index 9953c820575d42fa88351cc8de022d880ba96e6a..bdf9f306959e86160012541e8a72cc9aabb019c0 100644 --- a/src/dlt/connector/Config.py +++ b/src/dlt/connector/Config.py @@ -11,3 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import os + +DEFAULT_DLT_GATEWAY_HOST = '127.0.0.1' +DEFAULT_DLT_GATEWAY_PORT = '50051' + +# Find IP:port of gateway container as follows: +# - first check env vars DLT_GATEWAY_HOST & DLT_GATEWAY_PORT +# - if not set, use DEFAULT_DLT_GATEWAY_HOST & DEFAULT_DLT_GATEWAY_PORT +DLT_GATEWAY_HOST = str(os.environ.get('DLT_GATEWAY_HOST', DEFAULT_DLT_GATEWAY_HOST)) +DLT_GATEWAY_PORT = int(os.environ.get('DLT_GATEWAY_PORT', DEFAULT_DLT_GATEWAY_PORT)) diff --git a/src/dlt/connector/client/DltGatewayClient.py b/src/dlt/connector/client/DltGatewayClient.py index f1f8dec391bb836cea33422176730d250090429d..e2f5530f9a971d0a25cac042d361c52db5c16304 100644 --- a/src/dlt/connector/client/DltGatewayClient.py +++ b/src/dlt/connector/client/DltGatewayClient.py @@ -14,14 +14,13 @@ from typing import Iterator import grpc, logging -from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty, TeraFlowController from common.proto.dlt_gateway_pb2 import ( DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription) from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string +from dlt.connector.Config import DLT_GATEWAY_HOST, DLT_GATEWAY_PORT LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -30,8 +29,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, class DltGatewayClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.DLT) - if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) + if not host: host = DLT_GATEWAY_HOST + if not port: port = DLT_GATEWAY_PORT self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None diff --git a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py index 860e46f3ab88b097f4aa8e06508b19518055e46f..2ed26b8b2aaa3f7ab24be08d4d43f8d45155d5fc 100644 --- a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py +++ b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py @@ -12,10 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import copy, grpc, logging +from typing import Optional +from common.Constants import DEFAULT_CONTEXT_UUID +from common.proto.dlt_gateway_pb2 import DltRecord, DltRecordId, DltRecordOperationEnum, DltRecordTypeEnum from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.proto.context_pb2 import DeviceId, Empty, ServiceId, SliceId from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from dlt.connector.client.DltGatewayClient import DltGatewayClient LOGGER = logging.getLogger(__name__) @@ -31,8 +37,26 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): def __init__(self): LOGGER.debug('Creating Servicer...') + self._own_domain_uuid : Optional[str] = None LOGGER.debug('Servicer Created') + @property + def own_domain_id(self) -> str: + if self._own_domain_uuid is not None: return self._own_domain_uuid + + context_client = ContextClient() + existing_context_ids = context_client.ListContextIds(Empty()) + existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids} + + # Detect local context name (will be used as abstracted device name); exclude DEFAULT_CONTEXT_UUID + existing_non_admin_context_uuids = copy.deepcopy(existing_context_uuids) + existing_non_admin_context_uuids.discard(DEFAULT_CONTEXT_UUID) + if len(existing_non_admin_context_uuids) != 1: + MSG = 'Unable to identify own domain name. Existing Contexts({:s})' + raise Exception(MSG.format(str(existing_context_uuids))) + self._own_domain_uuid = existing_non_admin_context_uuids.pop() + return self._own_domain_uuid + @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordAll(self, request : Empty, context : grpc.ServicerContext) -> Empty: return Empty() @@ -43,6 +67,33 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: + context_client = ContextClient() + device = context_client.GetDevice(request) + + dltgateway_client = DltGatewayClient() + + dlt_record_id = DltRecordId() + dlt_record_id.domain_uuid.uuid = self.own_domain_id + dlt_record_id.type = DltRecordTypeEnum.DLTRECORDTYPE_DEVICE + dlt_record_id.record_uuid.uuid = device.device_id.device_uuid.uuid + + LOGGER.info('[RecordDevice] sent dlt_record_id = {:s}'.format(grpc_message_to_json_string(dlt_record_id))) + dlt_record = dltgateway_client.GetFromDlt(dlt_record_id) + LOGGER.info('[RecordDevice] recv dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record))) + + exists = False + + dlt_record = DltRecord() + dlt_record.record_id.CopyFrom(dlt_record_id) + dlt_record.operation = \ + DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE \ + if exists else \ + DltRecordOperationEnum.DLTRECORDOPERATION_ADD + + dlt_record.data_json = grpc_message_to_json_string(device) + LOGGER.info('[RecordDevice] sent dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record))) + dlt_record_status = dltgateway_client.RecordToDlt(dlt_record) + LOGGER.info('[RecordDevice] recv dlt_record_status = {:s}'.format(grpc_message_to_json_string(dlt_record_status))) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) diff --git a/src/dlt/connector/service/__main__.py b/src/dlt/connector/service/__main__.py index 435a93f61bf934a17d9c044756648176e9cb2d2d..e7c3841e920d7a1ea3e2ea4554f4fcf7e1504a3d 100644 --- a/src/dlt/connector/service/__main__.py +++ b/src/dlt/connector/service/__main__.py @@ -18,6 +18,8 @@ from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables) +from dlt.connector.client.DltEventsCollector import DltEventsCollector +from dlt.connector.client.DltGatewayClient import DltGatewayClient from .DltConnectorService import DltConnectorService terminate = threading.Event() @@ -48,6 +50,10 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) + dlt_gateway_client = DltGatewayClient() + dlt_events_collector = DltEventsCollector(dlt_gateway_client, log_events_received=True) + dlt_events_collector.start() + # Starting DLT connector service grpc_service = DltConnectorService() grpc_service.start() @@ -56,6 +62,7 @@ def main(): while not terminate.wait(timeout=0.1): pass LOGGER.info('Terminating...') + dlt_events_collector.stop() grpc_service.stop() LOGGER.info('Bye')