Skip to content
Snippets Groups Projects
test_kpi_manager.py 11.5 KiB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# import sys
# sys.path.append('.')
import os, pytest
import logging, json
# from apscheduler.schedulers.background import BackgroundScheduler

from common.proto.context_pb2 import ConfigActionEnum, Context, ContextId, DeviceOperationalStatusEnum, EventTypeEnum, DeviceEvent, Device, Empty, Topology, TopologyId
from common.Constants import ServiceNameEnum
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, ServiceNameEnum
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Topology import json_topology, json_topology_id
# from common.proto.monitoring_pb2 import KpiId, KpiDescriptor, SubsDescriptor, SubsList, AlarmID, \
#     AlarmDescriptor, AlarmList, KpiDescriptorList, SubsResponse, AlarmResponse, RawKpiTable #, Kpi, KpiList
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
from common.tools.service.GenericGrpcService import GenericGrpcService
from context.client.ContextClient import ContextClient


from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.DeviceService import DeviceService
from device.client.DeviceClient import DeviceClient

from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_id_request, \
                                        create_kpi_filter_request_a, create_kpi_descriptor_request_a
# from monitoring.service.MonitoringService import MonitoringService
from kpi_manager.service.KpiManagerService import KpiManagerService
# from monitoring.client.MonitoringClient import MonitoringClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient

from kpi_manager.service.KpiManagerServiceServicerImpl import KpiManagerServiceServicerImpl

from monitoring.service.ManagementDBTools import ManagementDB
from monitoring.service.MetricsDBTools import MetricsDB
from monitoring.service.NameMapping import NameMapping

os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
from device.service.drivers import DRIVERS

###########################
# Tests Setup
###########################

LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000

KPIMANAGER_SERVICE_PORT = MOCKSERVICE_PORT + get_service_port_grpc(ServiceNameEnum.KPIMANAGER)  # type: ignore
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT)

METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME')

LOGGER = logging.getLogger(__name__)

class MockContextService(GenericGrpcService):
    # Mock Service implementing Context to simplify unitary tests of Monitoring

    def __init__(self, bind_port: Union[str, int]) -> None:
        super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')

    # pylint: disable=attribute-defined-outside-init
    def install_servicers(self):
        self.context_servicer = MockServicerImpl_Context()
        add_ContextServiceServicer_to_server(self.context_servicer, self.server)

@pytest.fixture(scope='session')
def context_service():
    LOGGER.info('Initializing MockContextService...')
    _service = MockContextService(MOCKSERVICE_PORT)
    _service.start()
    
    LOGGER.info('Yielding MockContextService...')
    yield _service

    LOGGER.info('Terminating MockContextService...')
    _service.context_servicer.msg_broker.terminate()
    _service.stop()

    LOGGER.info('Terminated MockContextService...')

@pytest.fixture(scope='session')
def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
    LOGGER.info('Initializing ContextClient...')
    _client = ContextClient()
    
    LOGGER.info('Yielding ContextClient...')
    yield _client

    LOGGER.info('Closing ContextClient...')
    _client.close()

    LOGGER.info('Closed ContextClient...')

@pytest.fixture(scope='session')
def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
    LOGGER.info('Initializing DeviceService...')
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)
    _service = DeviceService(driver_instance_cache)
    _service.start()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding DeviceService...')
    yield _service

    LOGGER.info('Terminating DeviceService...')
    _service.stop()

    LOGGER.info('Terminated DeviceService...')

# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
#     LOGGER.info('Initializing DeviceClient...')
#     _client = DeviceClient()
#     LOGGER.info('Yielding DeviceClient...')
#     yield _client
#     LOGGER.info('Closing DeviceClient...')
#     _client.close()
#     LOGGER.info('Closed DeviceClient...')

@pytest.fixture(scope='session')
def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
    LOGGER.info('Initializing DeviceClient...')
    _client = DeviceClient()

    LOGGER.info('Yielding DeviceClient...')
    yield _client

    LOGGER.info('Closing DeviceClient...')
    _client.close()

    LOGGER.info('Closed DeviceClient...')

# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def kpi_manager_service(
        context_service : MockContextService,  # pylint: disable=redefined-outer-name,unused-argument
        device_service : DeviceService     # pylint: disable=redefined-outer-name,unused-argument
    ):
    LOGGER.info('Initializing KpiManagerService...')
    name_mapping = NameMapping()
    # _service = MonitoringService(name_mapping)
    _service = KpiManagerService(name_mapping)
    _service.start()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding KpiManagerService...')
    yield _service

    LOGGER.info('Terminating KpiManagerService...')
    _service.stop()

    LOGGER.info('Terminated KpiManagerService...')

# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
# def monitoring_client(monitoring_service : MonitoringService): (Add for better understanding)
@pytest.fixture(scope='session')
def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument
    LOGGER.info('Initializing KpiManagerClient...')
    _client = KpiManagerClient()

    # yield the server, when test finishes, execution will resume to stop it
    LOGGER.info('Yielding KpiManagerClient...')
    yield _client

    LOGGER.info('Closing KpiManagerClient...')
    _client.close()

    LOGGER.info('Closed KpiManagerClient...')

##################################################
# Prepare Environment, should be the first test
##################################################

# # ERROR on this test --- 
# def test_prepare_environment(
#     context_client : ContextClient,                 # pylint: disable=redefined-outer-name,unused-argument
# ):
#     context_id = json_context_id(DEFAULT_CONTEXT_NAME)
#     context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME)))
#     context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id)))

###########################
# Tests Implementation of Kpi Manager
###########################

# ---------- 2nd Iteration Tests -----------------
def test_SetKpiDescriptor(kpi_manager_client):
    LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
    with open("kpi_manager/tests/KPI_configs.json", 'r') as file:
        data = json.load(file)
        _descriptors = data.get('KPIs', [])
    for _descritor_name in _descriptors:
        response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(_descritor_name))
        LOGGER.info("Response gRPC message object: {:}".format(response))
    assert isinstance(response, KpiId)

# def test_GetKpiDescriptor(kpi_manager_client):
#     LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
#     response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
#     LOGGER.info("Response gRPC message object: {:}".format(response))
#     assert isinstance(response, KpiDescriptor)

# def test_DeleteKpiDescriptor(kpi_manager_client):
#     LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
#     response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
#     del_response = kpi_manager_client.DeleteKpiDescriptor(response)
#     kpi_manager_client.GetKpiDescriptor(response)
#     LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
#     assert isinstance(del_response, Empty)

# def test_SelectKpiDescriptor(kpi_manager_client):
#     LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
#     kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a())
#     response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request_a())
#     LOGGER.info("Response gRPC message object: {:}".format(response))
#     assert isinstance(response, KpiDescriptorList)

# ------------- INITIAL TESTs ----------------
# Test case that makes use of client fixture to test server's CreateKpi method
# def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
#     # make call to server
#     LOGGER.warning('test_create_kpi requesting')
#     for i in range(3):
#         response = kpi_manager_client.SetKpiDescriptor(create_kpi_request(str(i+1)))
#         LOGGER.debug(str(response))
#         assert isinstance(response, KpiId)

# # Test case that makes use of client fixture to test server's DeleteKpi method
# def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
#     # make call to server
#     LOGGER.warning('delete_kpi requesting')
#     response = kpi_manager_client.SetKpiDescriptor(create_kpi_request('4'))
#     response = kpi_manager_client.DeleteKpiDescriptor(response)
#     LOGGER.debug(str(response))
#     assert isinstance(response, Empty)

# # Test case that makes use of client fixture to test server's GetKpiDescriptor method
# def test_select_kpi_descriptor(kpi_manager_client): # pylint: disable=redefined-outer-name
#     LOGGER.warning('test_selectkpidescritor begin')
#     response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
#     LOGGER.debug(str(response))
#     assert isinstance(response, KpiDescriptorList)