diff --git a/src/dlt/connector/client/DltConnectorClient.py b/src/dlt/connector/client/DltConnectorClient.py index e383217d8b6971aea6127d39b8de348fb84b35da..e71cf0bd7e7abe699fd6bcb81fcae015527de834 100644 --- a/src/dlt/connector/client/DltConnectorClient.py +++ b/src/dlt/connector/client/DltConnectorClient.py @@ -48,64 +48,64 @@ class DltConnectorClient: self.stub = None @RETRY_DECORATOR - def RecordAll(self, request : TopologyId) -> Empty: + async def RecordAll(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAll(request) + response = await self.stub.RecordAll(request) LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordAllDevices(self, request : TopologyId) -> Empty: + async def RecordAllDevices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAllDevices(request) + response = await self.stub.RecordAllDevices(request) LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordDevice(self, request : DltDeviceId) -> Empty: + async def RecordDevice(self, request : DltDeviceId) -> Empty: LOGGER.debug('RecordDevice request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordDevice(request) + response = await self.stub.RecordDevice(request) LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordAllLinks(self, request : TopologyId) -> Empty: + async def RecordAllLinks(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAllLinks(request) + response = await self.stub.RecordAllLinks(request) LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordLink(self, request : DltLinkId) -> Empty: + async def RecordLink(self, request : DltLinkId) -> Empty: LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordLink(request) + response = await self.stub.RecordLink(request) LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordAllServices(self, request : TopologyId) -> Empty: + async def RecordAllServices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAllServices(request) + response = await self.stub.RecordAllServices(request) LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordService(self, request : DltServiceId) -> Empty: + async def RecordService(self, request : DltServiceId) -> Empty: LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordService(request) + response = await self.stub.RecordService(request) LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordAllSlices(self, request : TopologyId) -> Empty: + async def RecordAllSlices(self, request : TopologyId) -> Empty: LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordAllSlices(request) + response = await self.stub.RecordAllSlices(request) LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def RecordSlice(self, request : DltSliceId) -> Empty: + async def RecordSlice(self, request : DltSliceId) -> Empty: LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordSlice(request) + response = await self.stub.RecordSlice(request) LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/dlt/connector/client/DltGatewayClient.py b/src/dlt/connector/client/DltGatewayClient.py index cde278517f7a4f0db267a20d050c71a02de4ae23..b04e08d7fd5f0b83a41f36a976376be83b603551 100644 --- a/src/dlt/connector/client/DltGatewayClient.py +++ b/src/dlt/connector/client/DltGatewayClient.py @@ -49,36 +49,36 @@ class DltGatewayClient: self.stub = None @RETRY_DECORATOR - def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: + async def RecordToDlt(self, request : DltRecord) -> DltRecordStatus: LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.RecordToDlt(request) + response = await self.stub.RecordToDlt(request) LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def GetFromDlt(self, request : DltRecordId) -> DltRecord: + async def GetFromDlt(self, request : DltRecordId) -> DltRecord: LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetFromDlt(request) + response = await self.stub.GetFromDlt(request) LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def SubscribeToDlt(self, request : DltRecordSubscription) -> Iterator[DltRecordEvent]: + async def SubscribeToDlt(self, request : DltRecordSubscription) -> Iterator[DltRecordEvent]: LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.SubscribeToDlt(request) + response = await self.stub.SubscribeToDlt(request) LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: + async def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus: LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetDltStatus(request) + response = await self.stub.GetDltStatus(request) LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def GetDltPeers(self, request : Empty) -> DltPeerStatusList: + async def GetDltPeers(self, request : Empty) -> DltPeerStatusList: LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetDltPeers(request) + response = await self.stub.GetDltPeers(request) LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py index c05d46b48ed214191a6195b948986810b9c03d20..df5e8fd0871f2589bf8b1349c6f393f47482a47d 100644 --- a/src/dlt/connector/service/DltConnectorServiceServicerImpl.py +++ b/src/dlt/connector/service/DltConnectorServiceServicerImpl.py @@ -35,22 +35,22 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): LOGGER.debug('Servicer Created') @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + async def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + async def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty: + async def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() device = context_client.GetDevice(request.device_id) data_json = grpc_message_to_json_string(device) - self._record_entity( + await self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_DEVICE, request.device_id.device_uuid.uuid, request.delete, data_json) return Empty() @@ -60,53 +60,53 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty: + async def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() link = context_client.GetLink(request.link_id) data_json = grpc_message_to_json_string(link) - self._record_entity( + await self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_LINK, request.link_id.link_uuid.uuid, request.delete, data_json) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + async def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty: + async def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() service = context_client.GetService(request.service_id) data_json = grpc_message_to_json_string(service) - self._record_entity( + await self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SERVICE, request.service_id.service_uuid.uuid, request.delete, data_json) return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: + async def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty: return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty: + async def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty: data_json = None if not request.delete: context_client = ContextClient() slice_ = context_client.GetSlice(request.slice_id) data_json = grpc_message_to_json_string(slice_) - self._record_entity( + await self._record_entity( request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SLICE, request.slice_id.slice_uuid.uuid, request.delete, data_json) return Empty() - def _record_entity( + async def _record_entity( self, dlt_domain_uuid : str, dlt_record_type : DltRecordTypeEnum, dlt_record_uuid : str, delete : bool, data_json : Optional[str] = None ) -> None: @@ -143,6 +143,6 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): str_dlt_record = grpc_message_to_json_string(dlt_record) LOGGER.debug('[_record_entity] sent dlt_record = {:s}'.format(str_dlt_record)) - dlt_record_status = dltgateway_client.RecordToDlt(dlt_record) + dlt_record_status = await dltgateway_client.RecordToDlt(dlt_record) str_dlt_record_status = grpc_message_to_json_string(dlt_record_status) LOGGER.debug('[_record_entity] recv dlt_record_status = {:s}'.format(str_dlt_record_status)) diff --git a/src/interdomain/service/topology_abstractor/DltRecordSender.py b/src/interdomain/service/topology_abstractor/DltRecordSender.py index cfa51c928d512d432eba7223ae7bae2d9085ad0c..c17878e0e9d1bc88df26004a3e7b8d60738ec403 100644 --- a/src/interdomain/service/topology_abstractor/DltRecordSender.py +++ b/src/interdomain/service/topology_abstractor/DltRecordSender.py @@ -62,7 +62,7 @@ class DltRecordSender: record_uuid = '{:s}:slice:{:s}/{:s}'.format(topology_uuid, context_uuid, slice_uuid) self._add_record(record_uuid, (topology_id, slice_)) - def commit(self) -> None: + async def commit(self) -> None: for dlt_record_uuid in self.dlt_record_uuids: topology_id,dlt_record = self.dlt_record_uuid_to_data[dlt_record_uuid] if isinstance(dlt_record, Device): @@ -73,7 +73,7 @@ class DltRecordSender: dlt_device_id = DltDeviceId() dlt_device_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_device_id.device_id.CopyFrom(device_id) # pylint: disable=no-member - self.dlt_connector_client.RecordDevice(dlt_device_id) + await self.dlt_connector_client.RecordDevice(dlt_device_id) elif isinstance(dlt_record, Link): #link_id = self.context_client.SetLink(dlt_record) link_id = dlt_record.link_id @@ -81,7 +81,7 @@ class DltRecordSender: dlt_link_id = DltLinkId() dlt_link_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_link_id.link_id.CopyFrom(link_id) # pylint: disable=no-member - self.dlt_connector_client.RecordLink(dlt_link_id) + await self.dlt_connector_client.RecordLink(dlt_link_id) elif isinstance(dlt_record, Service): #service_id = self.context_client.SetService(dlt_record) service_id = dlt_record.service_id @@ -89,7 +89,7 @@ class DltRecordSender: dlt_service_id = DltServiceId() dlt_service_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_service_id.service_id.CopyFrom(service_id) # pylint: disable=no-member - self.dlt_connector_client.RecordService(dlt_service_id) + await self.dlt_connector_client.RecordService(dlt_service_id) elif isinstance(dlt_record, Slice): #slice_id = self.context_client.SetSlice(dlt_record) slice_id = dlt_record.slice_id @@ -97,6 +97,6 @@ class DltRecordSender: dlt_slice_id = DltSliceId() dlt_slice_id.topology_id.CopyFrom(topology_id) # pylint: disable=no-member dlt_slice_id.slice_id.CopyFrom(slice_id) # pylint: disable=no-member - self.dlt_connector_client.RecordSlice(dlt_slice_id) + await self.dlt_connector_client.RecordSlice(dlt_slice_id) else: LOGGER.error('Unsupported Record({:s})'.format(str(dlt_record)))