# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.tools.client.RetryDecorator import retry, delay_exponential #from common.tools.grpc.Tools import grpc_message_to_json_string #from slice.proto.context_pb2 import Empty, Slice, SliceId from dlt.connector.proto.dlt_pb2_grpc import DltServiceStub LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class DltClient: def __init__(self, host=None, port=None): if not host: host = get_service_host(ServiceNameEnum.DLT) if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None self.connect() LOGGER.debug('Channel created') def connect(self): self.channel = grpc.insecure_channel(self.endpoint) self.stub = DltServiceStub(self.channel) def close(self): if self.channel is not None: self.channel.close() self.channel = None self.stub = None # @RETRY_DECORATOR # def CreateSlice(self, request : Slice) -> SliceId: # LOGGER.debug('CreateSlice request: {:s}'.format(grpc_message_to_json_string(request))) # response = self.stub.CreateSlice(request) # LOGGER.debug('CreateSlice result: {:s}'.format(grpc_message_to_json_string(response))) # return response # @RETRY_DECORATOR # def UpdateSlice(self, request : Slice) -> SliceId: # LOGGER.debug('UpdateSlice request: {:s}'.format(grpc_message_to_json_string(request))) # response = self.stub.UpdateSlice(request) # LOGGER.debug('UpdateSlice result: {:s}'.format(grpc_message_to_json_string(response))) # return response # @RETRY_DECORATOR # def DeleteSlice(self, request : SliceId) -> Empty: # LOGGER.debug('DeleteSlice request: {:s}'.format(grpc_message_to_json_string(request))) # response = self.stub.DeleteSlice(request) # LOGGER.debug('DeleteSlice result: {:s}'.format(grpc_message_to_json_string(response))) # return response // Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. syntax = "proto3"; package dlt; import "context.proto"; service DltService { rpc RecordToDlt (DltRecord ) returns ( DltRecordStatus ) {} rpc GetFromDlt (DltRecordId ) returns ( DltRecord ) {} rpc SubscribeToDlt(DltRecordSubscription ) returns (stream DltRecordEvent ) {} rpc GetDltStatus (context.TeraFlowController) returns ( DltPeerStatus ) {} // NEC is checkig if it is possible rpc GetDltPeers (context.Empty ) returns ( DltPeerStatusList) {} // NEC is checkig if it is possible } enum DltRecordTypeEnum { DLTRECORDTYPE_UNDEFINED = 0; DLTRECORDTYPE_CONTEXT = 1; DLTRECORDTYPE_TOPOLOGY = 2; DLTRECORDTYPE_DEVICE = 3; DLTRECORDTYPE_LINK = 4; DLTRECORDTYPE_SERVICE = 5; DLTRECORDTYPE_SLICE = 6; } enum DltRecordOperationEnum { DLTRECORDOPERATION_UNDEFINED = 0; DLTRECORDOPERATION_ADD = 1; DLTRECORDOPERATION_UPDATE = 2; DLTRECORDOPERATION_DELETE = 3; } enum DltRecordStatusEnum { DLTRECORDSTATUS_UNDEFINED = 0; DLTRECORDSTATUS_SUCCEEDED = 1; DLTRECORDSTATUS_FAILED = 2; } enum DltStatusEnum { DLTSTATUS_UNDEFINED = 0; DLTSTATUS_NOTAVAILABLE = 1; DLTSTATUS_INITIALIZED = 2; DLTSTATUS_AVAILABLE = 3; DLTSTATUS_DEINIT = 4; } message DltRecordId { context.Uuid domain_uuid = 1; // unique identifier of domain owning the record DltRecordTypeEnum type = 2; // type of record context.Uuid record_uuid = 3; // unique identifier of the record within the domain context_uuid/topology_uuid } message DltRecord { DltRecordId record_id = 1; // record identifier DltRecordOperationEnum operation = 2; // operation to be performed over the record string data_json = 3; // record content: JSON-encoded record content } message DltRecordSubscription { // retrieved events have to match ALL conditions. // i.e., type in types requested, AND operation in operations requested // TODO: consider adding a more sophisticated filtering repeated DltRecordTypeEnum type = 1; // selected event types, empty=all repeated DltRecordOperationEnum operation = 2; // selected event operations, empty=all } message DltRecordEvent { context.Event event = 1; // common event data (timestamp & event_type) DltRecordId record_id = 2; // record identifier associated with this event } message DltRecordStatus { DltRecordId record_id = 1; // identifier of the associated record DltRecordStatusEnum status = 2; // status of the record string error_message = 3; // error message in case of failure, empty otherwise } message DltPeerStatus { context.TeraFlowController controller = 1; // Identifier of the TeraFlow controller instance DltStatusEnum status = 2; // Status of the TeraFlow controller instance } message DltPeerStatusList { repeated DltPeerStatus peers = 1; // List of peers and their status }