# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging from common.proto.context_pb2 import DeviceId, Empty, LinkId, ServiceId, SliceId, TopologyId from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer from common.proto.dlt_gateway_pb2 import DltRecord, DltRecordId, DltRecordOperationEnum, DltRecordTypeEnum from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient from dlt.connector.client.DltGatewayClient import DltGatewayClient from .tools.Checkers import record_exists LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'DltConnector' METHOD_NAMES = [ 'RecordAll', 'RecordAllDevices', 'RecordDevice', 'RecordAllLinks', 'RecordLink', 'RecordAllServices', 'RecordService', 'RecordAllSlices', 'RecordSlice', ] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): def __init__(self): LOGGER.debug('Creating Servicer...') LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty: context_client = ContextClient() device = context_client.GetDevice(request.device_id) dltgateway_client = DltGatewayClient() dlt_record_id = DltRecordId() dlt_record_id.domain_uuid.uuid = request.topology_id.topology_uuid.uuid dlt_record_id.type = DltRecordTypeEnum.DLTRECORDTYPE_DEVICE dlt_record_id.record_uuid.uuid = device.device_id.device_uuid.uuid LOGGER.info('[RecordDevice] sent dlt_record_id = {:s}'.format(grpc_message_to_json_string(dlt_record_id))) dlt_record = dltgateway_client.GetFromDlt(dlt_record_id) LOGGER.info('[RecordDevice] recv dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record))) exists = record_exists(dlt_record) LOGGER.info('[RecordDevice] exists = {:s}'.format(str(exists))) dlt_record = DltRecord() dlt_record.record_id.CopyFrom(dlt_record_id) dlt_record.operation = \ DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE \ if exists else \ DltRecordOperationEnum.DLTRECORDOPERATION_ADD dlt_record.data_json = grpc_message_to_json_string(device) LOGGER.info('[RecordDevice] sent dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record))) dlt_record_status = dltgateway_client.RecordToDlt(dlt_record) LOGGER.info('[RecordDevice] recv dlt_record_status = {:s}'.format(grpc_message_to_json_string(dlt_record_status))) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordAllLinks(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty: context_client = ContextClient() link = context_client.GetLink(request.link_id) dltgateway_client = DltGatewayClient() dlt_record_id = DltRecordId() dlt_record_id.domain_uuid.uuid = request.topology_id.topology_uuid.uuid dlt_record_id.type = DltRecordTypeEnum.DLTRECORDTYPE_LINK dlt_record_id.record_uuid.uuid = link.link_id.link_uuid.uuid LOGGER.info('[RecordLink] sent dlt_record_id = {:s}'.format(grpc_message_to_json_string(dlt_record_id))) dlt_record = dltgateway_client.GetFromDlt(dlt_record_id) LOGGER.info('[RecordLink] recv dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record))) exists = record_exists(dlt_record) LOGGER.info('[RecordLink] exists = {:s}'.format(str(exists))) dlt_record = DltRecord() dlt_record.record_id.CopyFrom(dlt_record_id) dlt_record.operation = \ DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE \ if exists else \ DltRecordOperationEnum.DLTRECORDOPERATION_ADD dlt_record.data_json = grpc_message_to_json_string(link) LOGGER.info('[RecordLink] sent dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record))) dlt_record_status = dltgateway_client.RecordToDlt(dlt_record) LOGGER.info('[RecordLink] recv dlt_record_status = {:s}'.format(grpc_message_to_json_string(dlt_record_status))) return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS, LOGGER) def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty: return Empty()