# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import uuid from common.proto import kpi_manager_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.context_pb2 import DeviceId, LinkId, ServiceId, SliceId,\ ConnectionId, EndPointId # ---------------------- 3rd iteration Test Messages --------------------------------- def create_kpi_descriptor_request(description: str = "Test Description"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_request.kpi_description = description _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' # pylint: disable=maybe-no-member return _create_kpi_request # ---------------------- 2nd iteration Test Messages --------------------------------- # def create_kpi_id_request(): # _kpi_id = kpi_manager_pb2.KpiId() # _kpi_id.kpi_id.uuid = "34f73604-eca6-424f-9995-18b519ad0978" # return _kpi_id # def create_kpi_descriptor_request_a(descriptor_name: str = "Test_name"): # _create_kpi_request = kpi_manager_pb2.KpiDescriptor() # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_kpi_request.kpi_description = descriptor_name # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member # _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' # pylint: disable=maybe-no-member # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' # pylint: disable=maybe-no-member # _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' # pylint: disable=maybe-no-member # return _create_kpi_request # def create_kpi_descriptor_request(): # _create_kpi_request = kpi_manager_pb2.KpiDescriptor() # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member # _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member # _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' # pylint: disable=maybe-no-member # return _create_kpi_request # def create_kpi_filter_request_a(): # _create_kpi_filter_request = kpi_manager_pb2.KpiDescriptorFilter() # _create_kpi_filter_request.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) # device_id_obj = DeviceId() # endpoint_id_obj = EndPointId() # service_id_obj = ServiceId() # slice_id_obj = SliceId() # connection_id_obj = ConnectionId() # link_id_obj = LinkId() # device_id_obj.device_uuid.uuid = "DEV1" # endpoint_id_obj.endpoint_uuid.uuid = "END1" # service_id_obj.service_uuid.uuid = "SERV1" # slice_id_obj.slice_uuid.uuid = "SLC1" # connection_id_obj.connection_uuid.uuid = "CON1" # link_id_obj.link_uuid.uuid = "LNK1" # _create_kpi_filter_request.device_id.append(device_id_obj) # _create_kpi_filter_request.endpoint_id.append(endpoint_id_obj) # _create_kpi_filter_request.service_id.append(service_id_obj) # _create_kpi_filter_request.slice_id.append(slice_id_obj) # _create_kpi_filter_request.connection_id.append(connection_id_obj) # _create_kpi_filter_request.link_id.append(link_id_obj) # return _create_kpi_filter_request # -------------------- Initial Test messages ------------------------------------- # def create_kpi_request(kpi_id_str): # _create_kpi_request = kpi_manager_pb2.KpiDescriptor() # _create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) # _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) # return _create_kpi_request # def create_kpi_request_b(): # _create_kpi_request = kpi_manager_pb2.KpiDescriptor() # _create_kpi_request = str(uuid.uuid4()) # _create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member # _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member # return _create_kpi_request # def create_kpi_request_c(): # _create_kpi_request = kpi_manager_pb2.KpiDescriptor() # _create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member # _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member # return _create_kpi_request # def create_kpi_request_d(): # _create_kpi_request = kpi_manager_pb2.KpiDescriptor() # _create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member # _create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member # return _create_kpi_request # def kpi_descriptor_list(): # _kpi_descriptor_list = kpi_manager_pb2.KpiDescriptorList() # return _kpi_descriptor_list # def create_kpi_filter_request(): # _create_kpi_filter_request = kpi_manager_pb2.KpiDescriptorFilter() # _create_kpi_filter_request.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) # new_device_id = _create_kpi_filter_request.device_id.add() # new_device_id.device_uuid.uuid = 'DEV1' # new_service_id = _create_kpi_filter_request.service_id.add() # new_service_id.service_uuid.uuid = 'SERV1' # new_slice_id = _create_kpi_filter_request.slice_id.add() # new_slice_id.slice_uuid.uuid = 'SLC1' # new_endpoint_id = _create_kpi_filter_request.endpoint_id.add() # new_endpoint_id.endpoint_uuid.uuid = 'END1' # new_connection_id = _create_kpi_filter_request.connection_id.add() # new_connection_id.connection_uuid.uuid = 'CON1' # return _create_kpi_filter_request