Commits (7)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -27,7 +27,7 @@ MAX_RETRIES = 15 ...@@ -27,7 +27,7 @@ MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class KpiManagerclient: class KpiManagerClient:
def __init__(self, host=None, port=None): def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.KPIMANAGER) # update enum if not host: host = get_service_host(ServiceNameEnum.KPIMANAGER) # update enum
if not port: port = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # update enum if not port: port = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # update enum
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.proto import kpi_manager_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
def kpi_id():
_kpi_id = kpi_manager_pb2.KpiId()
_kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member
return _kpi_id
def create_kpi_request(kpi_id_str):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str)
_create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str)
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str)
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str)
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str)
return _create_kpi_request
def create_kpi_request_b():
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member
return _create_kpi_request
def create_kpi_request_c():
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member
return _create_kpi_request
def create_kpi_request_d():
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member
return _create_kpi_request
def kpi_descriptor_list():
_kpi_descriptor_list = kpi_manager_pb2.KpiDescriptorList()
return _kpi_descriptor_list
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, pytest
import logging, json
from apscheduler.schedulers.background import BackgroundScheduler
from common.proto.context_pb2 import ConfigActionEnum, Context, ContextId, DeviceOperationalStatusEnum, EventTypeEnum, DeviceEvent, Device, Empty, Topology, TopologyId
from common.Constants import ServiceNameEnum
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Topology import json_topology, json_topology_id
# from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, SubsDescriptor, SubsList, AlarmID, \
# AlarmDescriptor, AlarmList, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable #, Kpi, KpiList
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorList
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.DeviceService import DeviceService
from device.client.DeviceClient import DeviceClient
from kpi_manager.tests.test_messages import create_kpi_request, create_kpi_request_b, create_kpi_request_c, create_kpi_request_d
# from monitoring.service.MonitoringService import MonitoringService
from kpi_manager.service.KpiManagerService import KpiManagerService
# from monitoring.client.MonitoringClient import MonitoringClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from monitoring.service.ManagementDBTools import ManagementDB
from monitoring.service.MetricsDBTools import MetricsDB
from monitoring.service.NameMapping import NameMapping
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000
KPIMANAGER_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT)
METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME')
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
# Mock Service implementing Context to simplify unitary tests of Monitoring
def __init__(self, bind_port: Union[str, int]) -> None:
super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
@pytest.fixture(scope='session')
def context_service():
LOGGER.info('Initializing MockContextService...')
_service = MockContextService(MOCKSERVICE_PORT)
_service.start()
LOGGER.info('Yielding MockContextService...')
yield _service
LOGGER.info('Terminating MockContextService...')
_service.context_servicer.msg_broker.terminate()
_service.stop()
LOGGER.info('Terminated MockContextService...')
@pytest.fixture(scope='session')
def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing ContextClient...')
_client = ContextClient()
LOGGER.info('Yielding ContextClient...')
yield _client
LOGGER.info('Closing ContextClient...')
_client.close()
LOGGER.info('Closed ContextClient...')
@pytest.fixture(scope='session')
def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceService...')
driver_factory = DriverFactory(DRIVERS)
driver_instance_cache = DriverInstanceCache(driver_factory)
_service = DeviceService(driver_instance_cache)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding DeviceService...')
yield _service
LOGGER.info('Terminating DeviceService...')
_service.stop()
LOGGER.info('Terminated DeviceService...')
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceClient...')
_client = DeviceClient()
LOGGER.info('Yielding DeviceClient...')
yield _client
LOGGER.info('Closing DeviceClient...')
_client.close()
LOGGER.info('Closed DeviceClient...')
@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing DeviceClient...')
_client = DeviceClient()
LOGGER.info('Yielding DeviceClient...')
yield _client
LOGGER.info('Closing DeviceClient...')
_client.close()
LOGGER.info('Closed DeviceClient...')
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def kpi_manager_service(
context_service : MockContextService, # pylint: disable=redefined-outer-name,unused-argument
device_service : DeviceService # pylint: disable=redefined-outer-name,unused-argument
):
LOGGER.info('Initializing KpiManagerService...')
name_mapping = NameMapping()
# _service = MonitoringService(name_mapping)
_service = KpiManagerService(name_mapping)
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiManagerService...')
yield _service
LOGGER.info('Terminating KpiManagerService...')
_service.stop()
LOGGER.info('Terminated KpiManagerService...')
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
# def monitoring_client(monitoring_service : MonitoringService): (Add for better understanding)
@pytest.fixture(scope='session')
def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing KpiManagerClient...')
_client = KpiManagerClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiManagerClient...')
yield _client
LOGGER.info('Closing KpiManagerClient...')
_client.close()
LOGGER.info('Closed KpiManagerClient...')
@pytest.fixture(scope='session')
def management_db():
_management_db = ManagementDB('monitoring.db')
return _management_db
@pytest.fixture(scope='session')
def metrics_db(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name
return monitoring_service.monitoring_servicer.metrics_db
# This function os not clear to me (Changes should me made before execution)
@pytest.fixture(scope='session')
def metrics_db(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name
return monitoring_service.monitoring_servicer.metrics_db
#_metrics_db = MetricsDBTools.MetricsDB(
# METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE_MONITORING_KPIS)
#return _metrics_db
@pytest.fixture(scope='session')
def subs_scheduler():
_scheduler = BackgroundScheduler(executors={'processpool': ProcessPoolExecutor(max_workers=20)})
_scheduler.start()
return _scheduler
def ingestion_data(kpi_id_int):
# pylint: disable=redefined-outer-name,unused-argument
metrics_db = MetricsDB('localhost', '9009', '9000', 'monitoring')
kpiSampleType = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
kpiSampleType_name = KpiSampleType.Name(kpiSampleType).upper().replace('KPISAMPLETYPE_', '')
for _ in range(50):
kpiSampleType = kpiSampleType_name
kpiId = kpi_id_int
deviceId = 'DEV'+ str(kpi_id_int)
endpointId = 'END' + str(kpi_id_int)
serviceId = 'SERV' + str(kpi_id_int)
sliceId = 'SLC' + str(kpi_id_int)
connectionId = 'CON' + str(kpi_id_int)
time_stamp = timestamp_utcnow_to_float()
kpi_value = 500*random()
metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId,
kpi_value)
sleep(0.1)
##################################################
# Prepare Environment, should be the first test
##################################################
def test_prepare_environment(
context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument
):
context_id = json_context_id(DEFAULT_CONTEXT_NAME)
context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME)))
context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id)))
###########################
# Tests Implementation
###########################
# Test case that makes use of client fixture to test server's CreateKpi method
def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('test_create_kpi requesting')
for i in range(3):
response = kpi_manager_client.SetKpi(create_kpi_request(str(i+1)))
LOGGER.debug(str(response))
assert isinstance(response, KpiId)
# Test case that makes use of client fixture to test server's DeleteKpi method
def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# make call to server
LOGGER.warning('delete_kpi requesting')
response = kpi_manager_client.SetKpi(create_kpi_request('4'))
response = kpi_manager_client.DeleteKpi(response)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetKpiDescriptor method
def test_get_kpi_descriptor_list(kpi_manager_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin')
response = kpi_manager_client.GetKpiDescriptorList(Empty())
LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptorList)
...@@ -17,54 +17,54 @@ from common.proto import monitoring_pb2 ...@@ -17,54 +17,54 @@ from common.proto import monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.timestamp.Converters import timestamp_utcnow_to_float from common.tools.timestamp.Converters import timestamp_utcnow_to_float
def kpi_id(): # def kpi_id():
_kpi_id = monitoring_pb2.KpiId() # _kpi_id = monitoring_pb2.KpiId()
_kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member # _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member
return _kpi_id # return _kpi_id
def create_kpi_request(kpi_id_str): # def create_kpi_request(kpi_id_str):
_create_kpi_request = monitoring_pb2.KpiDescriptor() # _create_kpi_request = monitoring_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str) # _create_kpi_request.device_id.device_uuid.uuid = 'DEV' + str(kpi_id_str)
_create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str) # _create_kpi_request.service_id.service_uuid.uuid = 'SERV' + str(kpi_id_str)
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str) # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC' + str(kpi_id_str)
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str) # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END' + str(kpi_id_str)
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str) # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON' + str(kpi_id_str)
return _create_kpi_request # return _create_kpi_request
def create_kpi_request_b(): # def create_kpi_request_b():
_create_kpi_request = monitoring_pb2.KpiDescriptor() # _create_kpi_request = monitoring_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member # _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member # _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' # pylint: disable=maybe-no-member
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC2' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2' # pylint: disable=maybe-no-member
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' # pylint: disable=maybe-no-member
return _create_kpi_request # return _create_kpi_request
def create_kpi_request_c(): # def create_kpi_request_c():
_create_kpi_request = monitoring_pb2.KpiDescriptor() # _create_kpi_request = monitoring_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member # _create_kpi_request.device_id.device_uuid.uuid = 'DEV3' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member # _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' # pylint: disable=maybe-no-member
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END3' # pylint: disable=maybe-no-member
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON3' # pylint: disable=maybe-no-member
return _create_kpi_request # return _create_kpi_request
def create_kpi_request_d(): # def create_kpi_request_d():
_create_kpi_request = monitoring_pb2.KpiDescriptor() # _create_kpi_request = monitoring_pb2.KpiDescriptor()
_create_kpi_request.kpi_description = 'KPI Description Test' # _create_kpi_request.kpi_description = 'KPI Description Test'
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member # _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' # pylint: disable=maybe-no-member
_create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member # _create_kpi_request.service_id.service_uuid.uuid = 'SERV4' # pylint: disable=maybe-no-member
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member # _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC4' # pylint: disable=maybe-no-member
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member # _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END4' # pylint: disable=maybe-no-member
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member # _create_kpi_request.connection_id.connection_uuid.uuid = 'CON4' # pylint: disable=maybe-no-member
return _create_kpi_request # return _create_kpi_request
def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s):
_monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() _monitor_kpi_request = monitoring_pb2.MonitorKpiRequest()
...@@ -80,10 +80,10 @@ def include_kpi_request(kpi_id): ...@@ -80,10 +80,10 @@ def include_kpi_request(kpi_id):
_include_kpi_request.kpi_value.floatVal = 500*random() # pylint: disable=maybe-no-member _include_kpi_request.kpi_value.floatVal = 500*random() # pylint: disable=maybe-no-member
return _include_kpi_request return _include_kpi_request
def kpi_descriptor_list(): # def kpi_descriptor_list():
_kpi_descriptor_list = monitoring_pb2.KpiDescriptorList() # _kpi_descriptor_list = monitoring_pb2.KpiDescriptorList()
return _kpi_descriptor_list # return _kpi_descriptor_list
def kpi_query(kpi_id_list): def kpi_query(kpi_id_list):
_kpi_query = monitoring_pb2.KpiQuery() _kpi_query = monitoring_pb2.KpiQuery()
......