import logging, grpc import pytest from typing import Tuple from monitoring.proto import context_pb2, kpi_sample_types_pb2 from monitoring.proto import monitoring_pb2 from monitoring.client.monitoring_client import MonitoringClient from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD from monitoring.service.MonitoringService import MonitoringService from monitoring.service.EventTools import EventsDeviceCollector from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from context.Config import GRPC_SERVICE_PORT as grpc_port_context, GRPC_MAX_WORKERS as grpc_workers_context, GRPC_GRACE_PERIOD as grpc_grace_context from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService from context.service.Populate import populate from context.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device from context.tests.example_objects import (DEVICE1, DEVICE1_UUID) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) ########################### # Tests Setup ########################### SERVER_ADDRESS = '127.0.0.1' LISTEN_ADDRESS = '[::]' GRPC_PORT_MONITORING = 7070 GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), ] @pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) def context_db_mb(request) -> Tuple[Database, MessageBroker]: name,db_backend,db_settings,mb_backend,mb_settings = request.param msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) _database = Database(get_database_backend(backend=db_backend, **db_settings)) _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name database = context_db_mb[0] database.clear_all() _service = ContextService( database, context_db_mb[1], port=GRPC_PORT_CONTEXT, max_workers=grpc_workers_context, grace_period=grpc_grace_context) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient(address='localhost', port=GRPC_PORT_CONTEXT) yield _client _client.close() # This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') def monitoring_service(): LOGGER.warning('monitoring_service begin') service_port = GRPC_SERVICE_PORT max_workers = GRPC_MAX_WORKERS grace_period = GRPC_GRACE_PERIOD LOGGER.info('Initializing MonitoringService...') grpc_service = MonitoringService(port=service_port, max_workers=max_workers, grace_period=grace_period) server = grpc_service.start() # yield the server, when test finishes, execution will resume to stop it LOGGER.warning('monitoring_service yielding') yield server LOGGER.info('Terminating MonitoringService...') grpc_service.stop() # This fixture will be requested by test cases and last during testing session. # The client requires the server, so client fixture has the server as dependency. @pytest.fixture(scope='session') def monitoring_client(monitoring_service): LOGGER.warning('monitoring_client begin') client = MonitoringClient(server=SERVER_ADDRESS, port=GRPC_PORT_MONITORING) # instantiate the client LOGGER.warning('monitoring_client returning') return client # This fixture will be requested by test cases and last during testing session. @pytest.fixture(scope='session') def kpi(): LOGGER.warning('test_include_kpi begin') # form request kpi = monitoring_pb2.Kpi() kpi.kpi_id.kpi_id.uuid = 'KPIID0000' kpi.kpiDescription = 'KPI Desc' return kpi @pytest.fixture(scope='session') def kpi_id(): LOGGER.warning('test_include_kpi begin') # form request kpi_id = monitoring_pb2.KpiId() kpi_id.kpi_id.uuid = str(1) return kpi_id @pytest.fixture(scope='session') def create_kpi_request(): LOGGER.warning('test_include_kpi begin') create_kpi_request = monitoring_pb2.KpiDescriptor() create_kpi_request.kpi_description = 'KPI Description Test' create_kpi_request.kpi_sample_type = kpi_sample_types_pb2.KpiSampleType.PACKETS_TRANSMITTED create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member create_kpi_request.service_id.service_uuid.uuid = "SERV1" create_kpi_request.endpoint_id.endpoint_uuid.uuid = "END1" return create_kpi_request @pytest.fixture(scope='session') def monitor_kpi_request(): LOGGER.warning('test_monitor_kpi begin') monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() monitor_kpi_request.kpi_id.kpi_id.uuid = str(1) monitor_kpi_request.sampling_duration_s = 120 monitor_kpi_request.sampling_interval_s = 5 return monitor_kpi_request @pytest.fixture(scope='session') def include_kpi_request(): LOGGER.warning('test_include_kpi begin') include_kpi_request = monitoring_pb2.Kpi() include_kpi_request.kpi_id.kpi_id.uuid = str(1) include_kpi_request.timestamp = "2021-10-12T13:14:42Z" include_kpi_request.kpi_value.intVal = 500 return include_kpi_request ########################### # Tests Implementation ########################### # Test case that makes use of client fixture to test server's CreateKpi method def test_create_kpi(monitoring_client,create_kpi_request): # make call to server LOGGER.warning('test_create_kpi requesting') response = monitoring_client.CreateKpi(create_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.KpiId) # Test case that makes use of client fixture to test server's MonitorKpi method def test_monitor_kpi(monitoring_client,monitor_kpi_request): LOGGER.warning('test_monitor_kpi begin') response = monitoring_client.MonitorKpi(monitor_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's IncludeKpi method def test_include_kpi(monitoring_client,include_kpi_request): # make call to server LOGGER.warning('test_include_kpi requesting') response = monitoring_client.IncludeKpi(include_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client,include_kpi_request): LOGGER.warning('test_getstream_kpi begin') response = monitoring_client.GetStreamKpi(kpi) LOGGER.debug(str(response)) #assert isinstance(response, monitoring_pb2.Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method def test_get_instant_kpi(monitoring_client,kpi_id): LOGGER.warning('test_getinstant_kpi begin') response = monitoring_client.GetInstantKpi(kpi_id) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method def test_get_kpidescritor_kpi(monitoring_client,kpi_id): LOGGER.warning('test_getkpidescritor_kpi begin') response = monitoring_client.GetKpiDescriptor(kpi_id) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.KpiDescriptor) def test_get_device_events(context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, context_db_mb: Tuple[Database, MessageBroker]): LOGGER.warning('test_getkpidescritor_kpi begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client) events_collector.start() # # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests # ----- Check create event ----------------------------------------------------------------------------------------- event = events_collector.get_event(block=True) assert isinstance(event, DeviceEvent) assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE assert event.device_id.device_uuid.uuid == DEVICE1_UUID # ----- Update the object ------------------------------------------------------------------------------------------ response = context_client_grpc.SetDevice(Device(**DEVICE1)) assert response.device_uuid.uuid == DEVICE1_UUID events_collector.stop() def test_listen_events(monitoring_client: MonitoringClient, context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name context_db_mb: Tuple[Database, MessageBroker]): LOGGER.warning('test_listen_events begin') context_database = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client) events_collector.start() # # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests kpi_id_list = events_collector.listen_events() assert bool(kpi_id_list) == True