# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import uuid from common.proto import kpi_manager_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.context_pb2 import DeviceId, LinkId, ServiceId, SliceId,\ ConnectionId, EndPointId # ---------------------- 2nd iteration Test Messages --------------------------------- def create_kpi_id_request(): _kpi_id = kpi_manager_pb2.KpiId() _kpi_id.kpi_id.uuid = "34f73604-eca6-424f-9995-18b519ad0978" return _kpi_id def create_kpi_descriptor_request_a(descriptor_name: str): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' # pylint: disable=maybe-no-member _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_descriptor_request(): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_kpi_request.kpi_description = 'KPI Description Test' _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_filter_request_a(): _create_kpi_filter_request = kpi_manager_pb2.KpiDescriptorFilter() _create_kpi_filter_request.kpi_sample_type.append(102) device_id_obj = DeviceId() device_id_obj.device_uuid.uuid = "SERV3" _create_kpi_filter_request.device_id.append(device_id_obj) # new_device_id = _create_kpi_filter_request.device_id.add() # new_device_id.device_uuid.uuid = 'DEV3' # new_service_id = _create_kpi_filter_request.service_id.add() # new_service_id.service_uuid.uuid = 'SERV1' # new_slice_id = _create_kpi_filter_request.slice_id.add() # new_slice_id.slice_uuid.uuid = 'SLC1' # new_endpoint_id = _create_kpi_filter_request.endpoint_id.add() # new_endpoint_id.endpoint_uuid.uuid = 'END1' # new_connection_id = _create_kpi_filter_request.connection_id.add() # new_connection_id.connection_uuid.uuid = 'CON1' return _create_kpi_filter_request # -------------------- Initial Test messages ------------------------------------- def create_kpi_request(kpi_id_str): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_description = 'KPI Description Test' _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) return _create_kpi_request def create_kpi_request_b(): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request = str(uuid.uuid4()) _create_kpi_request.kpi_description = 'KPI Description Test' _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_c(): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_description = 'KPI Description Test' _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member return _create_kpi_request def create_kpi_request_d(): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() _create_kpi_request.kpi_description = 'KPI Description Test' _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member _create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member _create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member return _create_kpi_request def kpi_descriptor_list(): _kpi_descriptor_list = kpi_manager_pb2.KpiDescriptorList() return _kpi_descriptor_list def create_kpi_filter_request(): _create_kpi_filter_request = kpi_manager_pb2.KpiDescriptorFilter() _create_kpi_filter_request.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED) new_device_id = _create_kpi_filter_request.device_id.add() new_device_id.device_uuid.uuid = 'DEV1' new_service_id = _create_kpi_filter_request.service_id.add() new_service_id.service_uuid.uuid = 'SERV1' new_slice_id = _create_kpi_filter_request.slice_id.add() new_slice_id.slice_uuid.uuid = 'SLC1' new_endpoint_id = _create_kpi_filter_request.endpoint_id.add() new_endpoint_id.endpoint_uuid.uuid = 'END1' new_connection_id = _create_kpi_filter_request.connection_id.add() new_connection_id.connection_uuid.uuid = 'CON1' return _create_kpi_filter_request