diff --git a/src/common/message_broker/Factory.py b/src/common/message_broker/Factory.py index a2ea36435c717835bf4b7c89c2522878e67074c9..34e030b0e2bd878e75cee8af7ea1b56bc095eb23 100644 --- a/src/common/message_broker/Factory.py +++ b/src/common/message_broker/Factory.py @@ -3,13 +3,13 @@ from typing import Optional, Union from .backend._Backend import _Backend from .backend.BackendEnum import BackendEnum from .backend.inmemory.InMemoryBackend import InMemoryBackend -from .backend.redis.RedisBackend import RedisBackend +# from .backend.redis.RedisBackend import RedisBackend LOGGER = logging.getLogger(__name__) BACKENDS = { BackendEnum.INMEMORY.value: InMemoryBackend, - BackendEnum.REDIS.value: RedisBackend, + # BackendEnum.REDIS.value: RedisBackend, #BackendEnum.KAFKA.value: KafkaBackend, #BackendEnum.RABBITMQ.value: RabbitMQBackend, #BackendEnum.ZEROMQ.value: ZeroMQBackend, diff --git a/src/common/orm/Factory.py b/src/common/orm/Factory.py index 6ef0e11ccdd7b2f0f9e3fde62903fef522cb9f7a..df1df56eec900ea8d6dffce5c1dd1132e0e88eee 100644 --- a/src/common/orm/Factory.py +++ b/src/common/orm/Factory.py @@ -3,13 +3,13 @@ from typing import Optional, Union from .backend._Backend import _Backend from .backend.BackendEnum import BackendEnum from .backend.inmemory.InMemoryBackend import InMemoryBackend -from .backend.redis.RedisBackend import RedisBackend +# from .backend.redis.RedisBackend import RedisBackend LOGGER = logging.getLogger(__name__) BACKENDS = { BackendEnum.INMEMORY.value: InMemoryBackend, - BackendEnum.REDIS.value: RedisBackend, + # BackendEnum.REDIS.value: RedisBackend, #BackendEnum.MONGODB.value: MongoDBBackend, #BackendEnum.RETHINKDB.value: RethinkDBBackend, #BackendEnum.ETCD.value: EtcdBackend, diff --git a/src/context/tests/example_objects.py b/src/context/tests/example_objects.py index 81339c04e1fe77667bd41179f3fa0813c5fc69df..226119f9d29213a6f1857a476b37b32c9941a813 100644 --- a/src/context/tests/example_objects.py +++ b/src/context/tests/example_objects.py @@ -2,12 +2,13 @@ from copy import deepcopy from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from context.proto.context_pb2 import ( ConfigActionEnum, DeviceDriverEnum, DeviceOperationalStatusEnum, ServiceStatusEnum, ServiceTypeEnum) +from context.proto.kpi_sample_types_pb2 import KpiSampleType # Some example objects to be used by the tests # Helper methods -def config_rule(action, resource_key, resource_value): - return {'action': action, 'resource_key': resource_key, 'resource_value': resource_value} +def config_rule(action, resource_key, resource_value, kpi_sample_type): + return {'action': action, 'resource_key': resource_key, 'resource_value': resource_value, 'kpi_sample_type': kpi_sample_type} def endpoint_id(topology_id, device_id, endpoint_uuid): return {'topology_id': deepcopy(topology_id), 'device_id': deepcopy(device_id), @@ -40,9 +41,10 @@ DEVICE1 = { 'device_id': deepcopy(DEVICE1_ID), 'device_type': 'packet-router', 'device_config': {'config_rules': [ - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc1/value', 'value1'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc2/value', 'value2'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc3/value', 'value3'), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc1/value', 'value1', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc2/value', 'value2', KpiSampleType.PACKETS_RECEIVED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc3/value', 'value3', KpiSampleType.BYTES_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc4/value', 'value4', KpiSampleType.BYTES_RECEIVED), ]}, 'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, 'device_drivers': [DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG, DeviceDriverEnum.DEVICEDRIVER_P4], @@ -59,9 +61,9 @@ DEVICE2 = { 'device_id': deepcopy(DEVICE2_ID), 'device_type': 'packet-router', 'device_config': {'config_rules': [ - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc1/value', 'value4'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc2/value', 'value5'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc3/value', 'value6'), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc1/value', 'value4', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc2/value', 'value5', KpiSampleType.PACKETS_RECEIVED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc3/value', 'value6', KpiSampleType.BYTES_TRANSMITTED), ]}, 'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, 'device_drivers': [DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG, DeviceDriverEnum.DEVICEDRIVER_P4], @@ -78,9 +80,9 @@ DEVICE3 = { 'device_id': deepcopy(DEVICE3_ID), 'device_type': 'packet-router', 'device_config': {'config_rules': [ - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc1/value', 'value4'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc2/value', 'value5'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc3/value', 'value6'), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc1/value', 'value4', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc2/value', 'value5', KpiSampleType.PACKETS_RECEIVED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc3/value', 'value6', KpiSampleType.BYTES_TRANSMITTED), ]}, 'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, 'device_drivers': [DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG, DeviceDriverEnum.DEVICEDRIVER_P4], @@ -139,9 +141,9 @@ SERVICE_DEV1_DEV2 = { ], 'service_status': {'service_status': ServiceStatusEnum.SERVICESTATUS_ACTIVE}, 'service_config': {'config_rules': [ - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc1/value', 'value7'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc2/value', 'value8'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc3/value', 'value9'), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc1/value', 'value7', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc2/value', 'value8', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc3/value', 'value9', KpiSampleType.PACKETS_TRANSMITTED), ]}, } @@ -163,9 +165,9 @@ SERVICE_DEV1_DEV3 = { ], 'service_status': {'service_status': ServiceStatusEnum.SERVICESTATUS_ACTIVE}, 'service_config': {'config_rules': [ - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc1/value', 'value7'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc2/value', 'value8'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc3/value', 'value9'), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc1/value', 'value7', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc2/value', 'value8', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc3/value', 'value9', KpiSampleType.PACKETS_TRANSMITTED), ]}, } @@ -187,8 +189,8 @@ SERVICE_DEV2_DEV3 = { ], 'service_status': {'service_status': ServiceStatusEnum.SERVICESTATUS_ACTIVE}, 'service_config': {'config_rules': [ - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc1/value', 'value7'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc2/value', 'value8'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc3/value', 'value9'), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc1/value', 'value7', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc2/value', 'value8', KpiSampleType.PACKETS_TRANSMITTED), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'svc/rsrc3/value', 'value9', KpiSampleType.PACKETS_TRANSMITTED), ]}, } diff --git a/src/monitoring/client/monitoring_client.py b/src/monitoring/client/monitoring_client.py index bebd629fc4dcef8fc980ca05875bcbedbb6d796d..dc21275482ac039c79ace8b956df24223afc7a23 100644 --- a/src/monitoring/client/monitoring_client.py +++ b/src/monitoring/client/monitoring_client.py @@ -53,16 +53,15 @@ class MonitoringClient: LOGGER.info('GetKpiDescriptor result: {}'.format(response)) return monitoring_pb2.KpiDescriptor() + def ListenEvents(self, ): + LOGGER.info('ListenEvents: {}'.format()) + response = self.server.ListenEvents() + LOGGER.info('ListenEvents result: {}'.format(response)) + return monitoring_pb2.KpiDescriptor() + if __name__ == '__main__': # get port port = sys.argv[1] if len(sys.argv) > 1 else '7070' - # form request - kpi_request = monitoring_pb2.KpiRequest() - kpi_request.device_id.device_id = 'KPIID0000' # pylint: disable=maybe-no-member - kpi_request.kpiDescription = 'KPI Description' - kpi_request.kpi_sample_type = monitoring_pb2.KpiSampleType.PACKETS_TRANSMITTED - # make call to server client = MonitoringClient(port=port) - response=client.IncludeKpi(kpi_request) diff --git a/src/monitoring/service/EventTools.py b/src/monitoring/service/EventTools.py new file mode 100644 index 0000000000000000000000000000000000000000..4bf5bffa9c93bd1b0a8fb5ac4a7a273d32ad9eaa --- /dev/null +++ b/src/monitoring/service/EventTools.py @@ -0,0 +1,78 @@ +import threading +from queue import Queue + +import grpc + +from common.rpc_method_wrapper.ServiceExceptions import ServiceException +from context.client.ContextClient import ContextClient +from context.proto import kpi_sample_types_pb2 +from context.proto.context_pb2 import Empty, EventTypeEnum + +from common.logger import getJSONLogger +from monitoring.client.monitoring_client import MonitoringClient +from monitoring.proto import monitoring_pb2 + +LOGGER = getJSONLogger('monitoringservice-server') +LOGGER.setLevel('DEBUG') + +class EventsDeviceCollector: + def __init__(self, context_client_grpc : ContextClient, monitoring_client_grpc : MonitoringClient) -> None: # pylint: disable=redefined-outer-name + self._events_queue = Queue() + + self._device_stream = context_client_grpc.GetDeviceEvents(Empty()) + self._context_client = context_client_grpc + self._monitoring_client = monitoring_client_grpc + + self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False) + + def _collect(self, events_stream) -> None: + try: + for event in events_stream: + self._events_queue.put_nowait(event) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member + raise # pragma: no cover + + def start(self): + self._device_thread.start() + + def get_event(self, block : bool = True, timeout : float = 0.1): + return self._events_queue.get(block=block, timeout=timeout) + + def stop(self): + + self._device_stream.cancel() + + self._device_thread.join() + + def listen_events(self): + LOGGER.info('getting Kpi by KpiID') + + try: + kpi_id_list = [] + for i in range(self._events_queue.qsize()): + event = self.get_event(block=True) + + if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: + device = self._context_client.GetDevice(event.device_id) + + for j,end_point in enumerate(device.device_endpoints): + # for k,rule in enumerate(device.device_config.config_rules): + kpi_descriptor = monitoring_pb2.KpiDescriptor() + + kpi_descriptor.kpi_description = device.device_type + kpi_descriptor.kpi_sample_type = kpi_sample_types_pb2.KpiSampleType.PACKETS_TRANSMITTED + kpi_descriptor.device_id.CopyFrom(device.device_id) + kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id) + kpi_descriptor.service_id.service_uuid.uuid = "SERV"+str(i+1) + + kpi_id = self._monitoring_client.CreateKpi(kpi_descriptor) + kpi_id_list.append(kpi_id) + + return kpi_id_list + + except ServiceException as e: + LOGGER.exception('ListenEvents exception') + + except Exception as e: # pragma: no cover + LOGGER.exception('ListenEvents exception') diff --git a/src/monitoring/service/influx_tools.py b/src/monitoring/service/InfluxTools.py similarity index 100% rename from src/monitoring/service/influx_tools.py rename to src/monitoring/service/InfluxTools.py diff --git a/src/monitoring/service/MonitoringService.py b/src/monitoring/service/MonitoringService.py index 1b5b5c37aa111a1e3c6a8c169efae8e3d0bc7533..665ce44f7e87ca73a92dfd6123ed3aa8935f0e92 100644 --- a/src/monitoring/service/MonitoringService.py +++ b/src/monitoring/service/MonitoringService.py @@ -4,11 +4,11 @@ import grpc from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD +from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server from grpc_health.v1 import health from grpc_health.v1 import health_pb2 from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server -from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server from common.logger import getJSONLogger LOGGER = getJSONLogger('monitoringservice-server') diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 6858da44485647f2fcee5b18241415d8b1aa8e70..727365e893f85bdfa70809324ec62c73f48cdc31 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -1,58 +1,28 @@ -import os,grpc,threading +import os,grpc + +from prometheus_client import Summary +from prometheus_client import Counter + +from monitoring.service import SqliteTools, InfluxTools +from monitoring.proto import monitoring_pb2 +from monitoring.proto import monitoring_pb2_grpc from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.logger import getJSONLogger -from context.client.ContextClient import ContextClient -from monitoring.proto.context_pb2 import Empty +from context.proto import context_pb2 + from device.Config import GRPC_SERVICE_PORT from device.client.DeviceClient import DeviceClient from device.proto import device_pb2 -from monitoring.proto import context_pb2 -from monitoring.service import sqlite_tools, influx_tools -from monitoring.proto import monitoring_pb2 -from monitoring.proto import monitoring_pb2_grpc - -from queue import Queue - LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') -from prometheus_client import Summary, Histogram -from prometheus_client import Counter - MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') -# CREATEKPI_COUNTER_STARTED = Counter ('monitoring_createkpi_counter_started', -# 'Monitoring:CreateKpi counter of requests started' ) -# CREATEKPI_COUNTER_COMPLETED = Counter ('monitoring_createkpi counter_completed', -# 'Monitoring:CreateKpi counter of requests completed') -# CREATEKPI_COUNTER_FAILED = Counter ('monitoring_createkpi_counter_failed', -# 'Monitoring:CreateKpi counter of requests failed' ) -# CREATEKPI_HISTOGRAM_DURATION = Histogram('monitoring_createkpi_histogram_duration', -# 'Monitoring:CreateKpi histogram of request duration') -# -# MONITORKPI_COUNTER_STARTED = Counter ('monitoring_monitorkpi_counter_started', -# 'Monitoring:MonitorKpi counter of requests started' ) -# MONITORKPI_COUNTER_COMPLETED = Counter ('monitoring_monitorkpi counter_completed', -# 'Monitoring:MonitorKpi counter of requests completed') -# MONITORKPI_COUNTER_FAILED = Counter ('monitoring_monitorkpi_counter_failed', -# 'Monitoring:MonitorKpi counter of requests failed' ) -# MONITORKPI_HISTOGRAM_DURATION = Histogram('monitoring_monitorkpi_histogram_duration', -# 'Monitoring:MonitorKpi histogram of request duration') -# -# INCLUDEKPI_COUNTER_STARTED = Counter ('monitoring_includekpi_counter_started', -# 'Monitoring:IncludeKpi counter of requests started' ) -# INCLUDEKPI_COUNTER_COMPLETED = Counter ('monitoring_includekpi counter_completed', -# 'Monitoring:IncludeKpi counter of requests completed') -# INCLUDEKPI_COUNTER_FAILED = Counter ('monitoring_includekpi_counter_failed', -# 'Monitoring:IncludeKpi counter of requests failed' ) -# INCLUDEKPI_HISTOGRAM_DURATION = Histogram('monitoring_includekpi_histogram_duration', -# 'Monitoring:IncludeKpi histogram of request duration') - INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") INFLUXDB_USER = os.environ.get("INFLUXDB_USER") INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") @@ -64,10 +34,10 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.info('Init monitoringService') # Init sqlite monitoring db - self.sql_db = sqlite_tools.SQLite('monitoring.db') + self.sql_db = SqliteTools.SQLite('monitoring.db') # Create influx_db client - self.influx_db = influx_tools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) + self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) # CreateKpi (CreateKpiRequest) returns (KpiId) {} def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId : @@ -107,12 +77,8 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) + monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid - monitor_device_request.kpi_descriptor.kpi_description = kpiDescriptor.kpi_description - monitor_device_request.kpi_descriptor.kpi_sample_type = kpiDescriptor.kpi_sample_type - monitor_device_request.kpi_descriptor.device_id.device_uuid.uuid = kpiDescriptor.device_id.device_uuid.uuid - monitor_device_request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid = kpiDescriptor.endpoint_id.endpoint_uuid.uuid - monitor_device_request.kpi_descriptor.service_id.service_uuid.uuid = kpiDescriptor.service_id.service_uuid.uuid monitor_device_request.sampling_duration_s = request.sampling_duration_s monitor_device_request.sampling_interval_s = request.sampling_interval_s @@ -173,10 +139,8 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor: LOGGER.info('getting Kpi by KpiID') - try: kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid)) - print(kpi_db) kpiDescriptor = monitoring_pb2.KpiDescriptor() @@ -195,30 +159,3 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.exception('GetKpiDescriptor exception') -class EventsCollector: - def __init__(self, context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name - self._events_queue = Queue() - - self._device_stream = context_client_grpc.GetDeviceEvents(Empty()) - - self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False) - - def _collect(self, events_stream) -> None: - try: - for event in events_stream: - self._events_queue.put_nowait(event) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member - raise # pragma: no cover - - def start(self): - self._device_thread.start() - - def get_event(self, block : bool = True, timeout : float = 0.1): - return self._events_queue.get(block=block, timeout=timeout) - - def stop(self): - - self._device_stream.cancel() - - self._device_thread.join() diff --git a/src/monitoring/service/sqlite_tools.py b/src/monitoring/service/SqliteTools.py similarity index 100% rename from src/monitoring/service/sqlite_tools.py rename to src/monitoring/service/SqliteTools.py diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 3dd0bef52dbeef582bde3814509a05e1bba4d830..145e2c718c12c4798c7955ade023267ffceaca4d 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -1,31 +1,79 @@ -import logging, os +import logging, grpc import pytest +from typing import Tuple from monitoring.proto import context_pb2, kpi_sample_types_pb2 from monitoring.proto import monitoring_pb2 from monitoring.client.monitoring_client import MonitoringClient -from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT +from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD from monitoring.service.MonitoringService import MonitoringService +from monitoring.service.EventTools import EventsDeviceCollector + +from common.orm.Database import Database +from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum +from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum +from common.message_broker.MessageBroker import MessageBroker + +from context.Config import GRPC_SERVICE_PORT as grpc_port_context, GRPC_MAX_WORKERS as grpc_workers_context, GRPC_GRACE_PERIOD as grpc_grace_context +from context.client.ContextClient import ContextClient +from context.service.grpc_server.ContextService import ContextService +from context.service.Populate import populate +from context.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device +from context.tests.example_objects import (DEVICE1, DEVICE1_UUID) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) -SERVER_ADDRESS = '127.0.0.1' -LISTEN_ADDRESS = '[::]' -PORT = 7070 - ########################### # Tests Setup ########################### +SERVER_ADDRESS = '127.0.0.1' +LISTEN_ADDRESS = '[::]' +GRPC_PORT_MONITORING = 7070 + +GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports + +SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit + ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), +] + +@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) +def context_db_mb(request) -> Tuple[Database, MessageBroker]: + name,db_backend,db_settings,mb_backend,mb_settings = request.param + msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' + LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) + _database = Database(get_database_backend(backend=db_backend, **db_settings)) + _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) + yield _database, _message_broker + _message_broker.terminate() + +@pytest.fixture(scope='session') +def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name + database = context_db_mb[0] + database.clear_all() + _service = ContextService( + database, context_db_mb[1], port=GRPC_PORT_CONTEXT, max_workers=grpc_workers_context, + grace_period=grpc_grace_context) + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name + _client = ContextClient(address='localhost', port=GRPC_PORT_CONTEXT) + yield _client + _client.close() + + # This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') def monitoring_service(): LOGGER.warning('monitoring_service begin') - service_port = GRPC_SERVICE_PORT - max_workers = GRPC_MAX_WORKERS - grace_period = GRPC_GRACE_PERIOD + service_port = GRPC_SERVICE_PORT + max_workers = GRPC_MAX_WORKERS + grace_period = GRPC_GRACE_PERIOD LOGGER.info('Initializing MonitoringService...') grpc_service = MonitoringService(port=service_port, max_workers=max_workers, grace_period=grace_period) @@ -43,7 +91,7 @@ def monitoring_service(): @pytest.fixture(scope='session') def monitoring_client(monitoring_service): LOGGER.warning('monitoring_client begin') - client = MonitoringClient(server=SERVER_ADDRESS, port=PORT) # instantiate the client + client = MonitoringClient(server=SERVER_ADDRESS, port=GRPC_PORT_MONITORING) # instantiate the client LOGGER.warning('monitoring_client returning') return client @@ -52,9 +100,9 @@ def monitoring_client(monitoring_service): def kpi(): LOGGER.warning('test_include_kpi begin') # form request - kpi = monitoring_pb2.Kpi() - kpi.kpi_id.kpi_id.uuid = 'KPIID0000' - kpi.kpiDescription = 'KPI Desc' + kpi = monitoring_pb2.Kpi() + kpi.kpi_id.kpi_id.uuid = 'KPIID0000' + kpi.kpiDescription = 'KPI Desc' return kpi @pytest.fixture(scope='session') @@ -62,8 +110,8 @@ def kpi_id(): LOGGER.warning('test_include_kpi begin') # form request - kpi_id = monitoring_pb2.KpiId() - kpi_id.kpi_id.uuid = str(1) + kpi_id = monitoring_pb2.KpiId() + kpi_id.kpi_id.uuid = str(1) return kpi_id @@ -71,12 +119,12 @@ def kpi_id(): def create_kpi_request(): LOGGER.warning('test_include_kpi begin') - create_kpi_request = monitoring_pb2.KpiDescriptor() - create_kpi_request.kpi_description = 'KPI Description Test' - create_kpi_request.kpi_sample_type = kpi_sample_types_pb2.KpiSampleType.PACKETS_TRANSMITTED - create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member - create_kpi_request.service_id.service_uuid.uuid = "SERV1" - create_kpi_request.endpoint_id.endpoint_uuid.uuid = "END1" + create_kpi_request = monitoring_pb2.KpiDescriptor() + create_kpi_request.kpi_description = 'KPI Description Test' + create_kpi_request.kpi_sample_type = kpi_sample_types_pb2.KpiSampleType.PACKETS_TRANSMITTED + create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member + create_kpi_request.service_id.service_uuid.uuid = "SERV1" + create_kpi_request.endpoint_id.endpoint_uuid.uuid = "END1" return create_kpi_request @@ -84,8 +132,8 @@ def create_kpi_request(): def monitor_kpi_request(): LOGGER.warning('test_monitor_kpi begin') - monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() - monitor_kpi_request.kpi_id.kpi_id.uuid = str(1) + monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() + monitor_kpi_request.kpi_id.kpi_id.uuid = str(1) monitor_kpi_request.sampling_duration_s = 120 monitor_kpi_request.sampling_interval_s = 5 @@ -96,10 +144,10 @@ def monitor_kpi_request(): def include_kpi_request(): LOGGER.warning('test_include_kpi begin') - include_kpi_request = monitoring_pb2.Kpi() - include_kpi_request.kpi_id.kpi_id.uuid = str(1) - include_kpi_request.timestamp = "2021-10-12T13:14:42Z" - include_kpi_request.kpi_value.intVal = 500 + include_kpi_request = monitoring_pb2.Kpi() + include_kpi_request.kpi_id.kpi_id.uuid = str(1) + include_kpi_request.timestamp = "2021-10-12T13:14:42Z" + include_kpi_request.kpi_value.intVal = 500 return include_kpi_request @@ -132,22 +180,87 @@ def test_include_kpi(monitoring_client,include_kpi_request): assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method -def test_getstream_kpi(monitoring_client,include_kpi_request): +def test_get_stream_kpi(monitoring_client,include_kpi_request): LOGGER.warning('test_getstream_kpi begin') response = monitoring_client.GetStreamKpi(kpi) LOGGER.debug(str(response)) #assert isinstance(response, monitoring_pb2.Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method -def test_getinstant_kpi(monitoring_client,kpi_id): +def test_get_instant_kpi(monitoring_client,kpi_id): LOGGER.warning('test_getinstant_kpi begin') response = monitoring_client.GetInstantKpi(kpi_id) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method -def test_getkpidescritor_kpi(monitoring_client,kpi_id): +def test_get_kpidescritor_kpi(monitoring_client,kpi_id): LOGGER.warning('test_getkpidescritor_kpi begin') response = monitoring_client.GetKpiDescriptor(kpi_id) LOGGER.debug(str(response)) - assert isinstance(response, monitoring_pb2.KpiDescriptor) \ No newline at end of file + assert isinstance(response, monitoring_pb2.KpiDescriptor) + +def test_get_device_events(context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient, + context_db_mb: Tuple[Database, MessageBroker]): + + LOGGER.warning('test_getkpidescritor_kpi begin') + + context_database = context_db_mb[0] + + # ----- Clean the database ----------------------------------------------------------------------------------------- + context_database.clear_all() + + # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- + events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client) + events_collector.start() + + # # ----- Dump state of database before create the object ------------------------------------------------------------ + db_entries = context_database.dump() + LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) + for db_entry in db_entries: + LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover + LOGGER.info('-----------------------------------------------------------') + assert len(db_entries) == 0 + + populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests + + # ----- Check create event ----------------------------------------------------------------------------------------- + event = events_collector.get_event(block=True) + + assert isinstance(event, DeviceEvent) + assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE + assert event.device_id.device_uuid.uuid == DEVICE1_UUID + + # ----- Update the object ------------------------------------------------------------------------------------------ + response = context_client_grpc.SetDevice(Device(**DEVICE1)) + assert response.device_uuid.uuid == DEVICE1_UUID + + events_collector.stop() + +def test_listen_events(monitoring_client: MonitoringClient, + context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name + context_db_mb: Tuple[Database, MessageBroker]): + + LOGGER.warning('test_listen_events begin') + + context_database = context_db_mb[0] + + # ----- Clean the database ----------------------------------------------------------------------------------------- + context_database.clear_all() + + # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- + events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client) + events_collector.start() + + # # ----- Dump state of database before create the object ------------------------------------------------------------ + db_entries = context_database.dump() + LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) + for db_entry in db_entries: + LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover + LOGGER.info('-----------------------------------------------------------') + assert len(db_entries) == 0 + + populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests + + events_collector.listen_events()