diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py new file mode 100644 index 0000000000000000000000000000000000000000..d8b39b8bf8d0ae84da19fa651da00633486e6bc6 --- /dev/null +++ b/src/monitoring/client/MonitoringClient.py @@ -0,0 +1,99 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from typing import Iterator +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string +from monitoring.proto.context_pb2 import Empty +from monitoring.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest +from monitoring.proto.monitoring_pb2_grpc import MonitoringServiceStub + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class MonitoringClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.MONITORING) + if not port: port = get_service_port_grpc(ServiceNameEnum.MONITORING) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = MonitoringServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def CreateKpi(self, request : KpiDescriptor) -> KpiId: + LOGGER.debug('CreateKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.CreateKpi(request) + LOGGER.debug('CreateKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetKpiDescriptor(self, request : KpiId) -> KpiDescriptor: + LOGGER.debug('GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetKpiDescriptor(request) + LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def IncludeKpi(self, request : Kpi) -> Empty: + LOGGER.debug('IncludeKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.IncludeKpi(request) + LOGGER.debug('IncludeKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def MonitorKpi(self, request : MonitorKpiRequest) -> Empty: + LOGGER.debug('MonitorKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.MonitorKpi(request) + LOGGER.debug('MonitorKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetStreamKpi(self, request : KpiId) -> Iterator[Kpi]: + LOGGER.debug('GetStreamKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetStreamKpi(request) + LOGGER.debug('GetStreamKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetInstantKpi(self, request : KpiId) -> Kpi: + LOGGER.debug('GetInstantKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetInstantKpi(request) + LOGGER.debug('GetInstantKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + +if __name__ == '__main__': + import sys + # get port + _port = sys.argv[1] if len(sys.argv) > 1 else '7070' + + # make call to server + client = MonitoringClient(port=_port) diff --git a/src/monitoring/client/monitoring_client.py b/src/monitoring/client/monitoring_client.py deleted file mode 100644 index 62bfb519e7649427cad5b8f9e3bc0f849a9b9a39..0000000000000000000000000000000000000000 --- a/src/monitoring/client/monitoring_client.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import grpc - -from monitoring.proto import monitoring_pb2 -from monitoring.proto import monitoring_pb2_grpc -from monitoring.proto import context_pb2 - -from common.logger import getJSONLogger -LOGGER = getJSONLogger('monitoring-client') -LOGGER.setLevel('DEBUG') - -class MonitoringClient: - - def __init__(self, server='monitoring', port='7070'): - endpoint = '{}:{}'.format(server, port) - LOGGER.info('init monitoringClient {}'.format(endpoint)) - self.channel = grpc.insecure_channel(endpoint) - self.server = monitoring_pb2_grpc.MonitoringServiceStub(self.channel) - - def CreateKpi(self, request): - LOGGER.info('CreateKpi: {}'.format(request)) - response = self.server.CreateKpi(request) - LOGGER.info('CreateKpi result: {}'.format(response)) - return response - - def MonitorKpi(self, request): - LOGGER.info('MonitorKpi: {}'.format(request)) - response = self.server.MonitorKpi(request) - LOGGER.info('MonitorKpi result: {}'.format(response)) - return response - - def IncludeKpi(self, request): - LOGGER.info('IncludeKpi: {}'.format(request)) - response = self.server.IncludeKpi(request) - LOGGER.info('IncludeKpi result: {}'.format(response)) - return response - - def GetStreamKpi(self, request): - LOGGER.info('GetStreamKpi: {}'.format(request)) - response = self.server.GetStreamKpi(request) - LOGGER.info('GetStreamKpi result: {}'.format(response)) - yield monitoring_pb2.Kpi() - - def GetInstantKpi(self, request): - LOGGER.info('GetInstantKpi: {}'.format(request)) - response = self.server.GetInstantKpi(request) - LOGGER.info('GetInstantKpi result: {}'.format(response)) - return monitoring_pb2.Kpi() - - def GetKpiDescriptor(self, request): - LOGGER.info('GetKpiDescriptor: {}'.format(request)) - response = self.server.GetKpiDescriptor(request) - LOGGER.info('GetKpiDescriptor result: {}'.format(response)) - return response - -if __name__ == '__main__': - # get port - port = sys.argv[1] if len(sys.argv) > 1 else '7070' - - # make call to server - client = MonitoringClient(port=port) diff --git a/src/monitoring/service/EventTools.py b/src/monitoring/service/EventTools.py index 636556425af9ac02487386d81b9d8d4e786aa560..04c06e74203efcd3fab7566b2687dcdfc7e62658 100644 --- a/src/monitoring/service/EventTools.py +++ b/src/monitoring/service/EventTools.py @@ -19,26 +19,27 @@ import grpc from common.rpc_method_wrapper.ServiceExceptions import ServiceException from context.client.ContextClient import ContextClient -from context.proto import kpi_sample_types_pb2 +#from context.proto import kpi_sample_types_pb2 from context.proto.context_pb2 import Empty, EventTypeEnum from common.logger import getJSONLogger -from monitoring.client.monitoring_client import MonitoringClient +from monitoring.client.MonitoringClient import MonitoringClient from monitoring.proto import monitoring_pb2 LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('DEBUG') class EventsDeviceCollector: - def __init__(self, context_client_grpc : ContextClient, monitoring_client_grpc : MonitoringClient) -> None: # pylint: disable=redefined-outer-name + def __init__(self) -> None: # pylint: disable=redefined-outer-name self._events_queue = Queue() - self._device_stream = context_client_grpc.GetDeviceEvents(Empty()) - self._context_client = context_client_grpc - self._channel = context_client_grpc.channel - self._monitoring_client = monitoring_client_grpc + self._context_client_grpc = ContextClient() + self._device_stream = self._context_client_grpc.GetDeviceEvents(Empty()) + self._context_client = self._context_client_grpc + self._channel = self._context_client_grpc.channel + self._monitoring_client = MonitoringClient(host='127.0.0.1') - self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False) + self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream,), daemon=False) def grpc_server_on(self): try: diff --git a/src/monitoring/service/MonitoringService.py b/src/monitoring/service/MonitoringService.py index f1ecba3664dfee74fddb7093ff352724791b4f7d..0736eba435820344750225c28e5b1348c7f7dfbc 100644 --- a/src/monitoring/service/MonitoringService.py +++ b/src/monitoring/service/MonitoringService.py @@ -12,63 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures - -import grpc, logging - -from monitoring.service.MonitoringServiceServicerImpl import MonitoringServiceServicerImpl -from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD -from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server - -from grpc_health.v1 import health -from grpc_health.v1 import health_pb2 -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server - -from common.logger import getJSONLogger -LOGGER = getJSONLogger('monitoring-server') - -BIND_ADDRESS = '0.0.0.0' - -class MonitoringService: - def __init__(self, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, - grace_period=GRPC_GRACE_PERIOD): - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.monitoring_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - # create gRPC server - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.max_workers)) # ,interceptors=(tracer_interceptor,)) - - # add monitoring servicer class to gRPC server +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService +from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server +from .MonitoringServiceServicerImpl import MonitoringServiceServicerImpl + +class MonitoringService(GenericGrpcService): + def __init__(self, cls_name: str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.MONITORING) + super().__init__(port, cls_name=cls_name) self.monitoring_servicer = MonitoringServiceServicerImpl() - add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server) - - # add gRPC health checker servicer class to gRPC server - self.health_servicer = health.HealthServicer( - experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) - add_HealthServicer_to_server(self.health_servicer, self.server) - - # start server - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) - LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( - str(self.endpoint), str(self.max_workers))) - - self.server.add_insecure_port(self.endpoint) - self.server.start() - self.health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member - - LOGGER.debug('Service started') - - def stop(self): - LOGGER.debug('Stopping service (grace period {} seconds)...'.format(self.grace_period)) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug('Service stopped') + def install_servicers(self): + add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 88cd2d3a83357dec5c49e1894ed4243ceb1b4b6e..28c1ed12045a8d6dfbd9669e8ce2f081248cabd4 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -12,29 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os,grpc, logging -import socket - -from prometheus_client import Summary -from prometheus_client import Counter -from common.Settings import get_setting - -from monitoring.Config import DEVICE_GRPC_SERVICE_PORT, DEVICE_SERVICE_HOST +import os, grpc, logging +from prometheus_client import Counter, Summary from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType from monitoring.service import SqliteTools, InfluxTools from monitoring.proto import monitoring_pb2 from monitoring.proto import monitoring_pb2_grpc - from common.rpc_method_wrapper.ServiceExceptions import ServiceException - from context.proto import context_pb2 - from device.client.DeviceClient import DeviceClient from device.proto import device_pb2 LOGGER = logging.getLogger(__name__) -MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') +MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( + 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") @@ -42,9 +34,6 @@ INFLUXDB_USER = os.environ.get("INFLUXDB_USER") INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") -DEVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=DEVICE_SERVICE_HOST ) -DEVICE_SERVICE_PORT = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=DEVICE_GRPC_SERVICE_PORT) - class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): def __init__(self): @@ -52,13 +41,14 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService # Init sqlite monitoring db self.sql_db = SqliteTools.SQLite('monitoring.db') - self.deviceClient = DeviceClient(address=DEVICE_SERVICE_HOST, port=DEVICE_GRPC_SERVICE_PORT) # instantiate the client # Create influx_db client self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) # CreateKpi (CreateKpiRequest) returns (KpiId) {} - def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId : + def CreateKpi( + self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext + ) -> monitoring_pb2.KpiId: # CREATEKPI_COUNTER_STARTED.inc() LOGGER.info('CreateKpi') try: @@ -71,7 +61,8 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid - data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + data = self.sql_db.insert_KPI( + kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) kpi_id.kpi_id.uuid = str(data) @@ -87,7 +78,9 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} - def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: + def MonitorKpi( + self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext + ) -> context_pb2.Empty: LOGGER.info('MonitorKpi') try: @@ -97,25 +90,23 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) - monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid - monitor_device_request.sampling_duration_s = request.sampling_duration_s - monitor_device_request.sampling_interval_s = request.sampling_interval_s + monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid + monitor_device_request.sampling_duration_s = request.sampling_duration_s + monitor_device_request.sampling_interval_s = request.sampling_interval_s - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if s.connect_ex((DEVICE_SERVICE_HOST, DEVICE_GRPC_SERVICE_PORT)) == 0: - self.deviceClient.MonitorDeviceKpi(monitor_device_request) - else: - LOGGER.warning('Device service is not reachable') + device_client = DeviceClient() + device_client.MonitorDeviceKpi(monitor_device_request) - return context_pb2.Empty() except ServiceException as e: LOGGER.exception('MonitorKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('MonitorKpi exception') + grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # CREATEKPI_COUNTER_FAILED.inc() + return context_pb2.Empty() # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: @@ -145,7 +136,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover + except Exception: # pragma: no cover LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() return context_pb2.Empty() @@ -162,7 +153,9 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService return monitoring_pb2.Kpi() - def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor: + def GetKpiDescriptor( + self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext + ) -> monitoring_pb2.KpiDescriptor: LOGGER.info('getting Kpi by KpiID') try: kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid)) @@ -183,5 +176,5 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.exception('GetKpiDescriptor exception') grpc_context.abort(e.code, e.details) - except Exception as e: # pragma: no cover + except Exception: # pragma: no cover LOGGER.exception('GetKpiDescriptor exception') diff --git a/src/monitoring/service/__main__.py b/src/monitoring/service/__main__.py index 7835b4fc8a37f27419cebebfd0bf75b86820f38d..714046517e7b0ea591dce7411c5cb91ca3b867a6 100644 --- a/src/monitoring/service/__main__.py +++ b/src/monitoring/service/__main__.py @@ -12,44 +12,27 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, signal, sys, threading, socket - -from common.Settings import get_setting, wait_for_environment_variables -from context.client.ContextClient import ContextClient -from monitoring.Config import ( - GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT, CONTEXT_GRPC_SERVICE_PORT, - CONTEXT_SERVICE_HOST) - -from monitoring.client.monitoring_client import MonitoringClient -from monitoring.proto import monitoring_pb2 -from monitoring.service.EventTools import EventsDeviceCollector -from monitoring.service.MonitoringService import MonitoringService - +import logging, signal, sys, threading from prometheus_client import start_http_server +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, + wait_for_environment_variables) +from monitoring.proto import monitoring_pb2 +from .EventTools import EventsDeviceCollector +from .MonitoringService import MonitoringService terminate = threading.Event() LOGGER = None -LOCALHOST = '127.0.0.1' -def signal_handler(signal, frame): +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') terminate.set() def start_monitoring(): LOGGER.info('Start Monitoring...',) - grpc_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) - context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST ) - context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_GRPC_SERVICE_PORT) - - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if s.connect_ex((context_service_host, int(context_service_port))) != 0: - LOGGER.info('Context service is not reachable') - return - - context_client_grpc = ContextClient(address=context_service_host, port=context_service_port) - monitoring_client = MonitoringClient(server=LOCALHOST, port=grpc_service_port) # instantiate the client - events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client) + events_collector = EventsDeviceCollector() events_collector.start() # Iterate while terminate is not set @@ -64,8 +47,7 @@ def start_monitoring(): monitor_kpi_request.kpi_id.CopyFrom(kpi_id) monitor_kpi_request.sampling_duration_s = 86400 monitor_kpi_request.sampling_interval_s = 30 - - monitoring_client.MonitorKpi(monitor_kpi_request) + events_collector._monitoring_client.MonitorKpi(monitor_kpi_request) else: # Terminate is set, looping terminates LOGGER.warning("Stopping execution...") @@ -73,31 +55,28 @@ def start_monitoring(): events_collector.start() def main(): - global LOGGER - - grpc_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT) - max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) - grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD) - log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) - metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) + global LOGGER # pylint: disable=global-statement + log_level = get_log_level() logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ - 'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', - 'DEVICESERVICE_SERVICE_HOST', 'DEVICESERVICE_SERVICE_PORT_GRPC' + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) LOGGER.info('Starting...') + # Start metrics server + metrics_port = get_metrics_port() start_http_server(metrics_port) # Starting monitoring service - grpc_service = MonitoringService(port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) + grpc_service = MonitoringService() grpc_service.start() start_monitoring() @@ -112,4 +91,4 @@ def main(): return 0 if __name__ == '__main__': - sys.exit(main()) \ No newline at end of file + sys.exit(main()) diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py new file mode 100644 index 0000000000000000000000000000000000000000..dd4db01fdd6f94e0e376129b90c89b6475b48449 --- /dev/null +++ b/src/monitoring/tests/Messages.py @@ -0,0 +1,49 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from monitoring.proto import monitoring_pb2 +from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType + +def kpi(): + _kpi = monitoring_pb2.Kpi() + _kpi.kpi_id.kpi_id.uuid = 'KPIID0000' # pylint: disable=maybe-no-member + return _kpi + +def kpi_id(): + _kpi_id = monitoring_pb2.KpiId() + _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member + return _kpi_id + +def create_kpi_request(): + _create_kpi_request = monitoring_pb2.KpiDescriptor() + _create_kpi_request.kpi_description = 'KPI Description Test' + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED + _create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member + _create_kpi_request.service_id.service_uuid.uuid = 'SERV1' # pylint: disable=maybe-no-member + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member + return _create_kpi_request + +def monitor_kpi_request(kpi_uuid, sampling_duration_s, sampling_interval_s): + _monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() + _monitor_kpi_request.kpi_id.kpi_id.uuid = kpi_uuid # pylint: disable=maybe-no-member + _monitor_kpi_request.sampling_duration_s = sampling_duration_s + _monitor_kpi_request.sampling_interval_s = sampling_interval_s + return _monitor_kpi_request + +def include_kpi_request(): + _include_kpi_request = monitoring_pb2.Kpi() + _include_kpi_request.kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member + _include_kpi_request.timestamp = "2021-10-12T13:14:42Z" + _include_kpi_request.kpi_value.intVal = 500 # pylint: disable=maybe-no-member + return _include_kpi_request diff --git a/src/monitoring/tests/Objects.py b/src/monitoring/tests/Objects.py new file mode 100644 index 0000000000000000000000000000000000000000..852da6bc98dee441a1e21e60e1b3ae7436b98ebf --- /dev/null +++ b/src/monitoring/tests/Objects.py @@ -0,0 +1,30 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.tools.object_factory.Device import ( + json_device_emulated_connect_rules, json_device_emulated_packet_router_disabled) +from context.proto.kpi_sample_types_pb2 import KpiSampleType + +PACKET_PORT_SAMPLE_TYPES = [ + KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED, + KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED, + KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED, + KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED, +] + +DEVICE_DEV1_UUID = 'DEV1' +ENDPOINT_END1_UUID = 'END1' +DEVICE_DEV1_ENDPOINT_DEFS = [(ENDPOINT_END1_UUID, 'copper', PACKET_PORT_SAMPLE_TYPES)] +DEVICE_DEV1 = json_device_emulated_packet_router_disabled(DEVICE_DEV1_UUID) +DEVICE_DEV1_CONNECT_RULES = json_device_emulated_connect_rules(DEVICE_DEV1_ENDPOINT_DEFS) diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 0701c5ce8bc56ee34d0a90760b0d6e88fdbbee42..d3799689eee4a1a12f4d4d7998d92482c59adb16 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -12,36 +12,35 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import os -import socket -import pytest +import copy, logging, os, pytest from typing import Tuple - - -from monitoring.client.monitoring_client import MonitoringClient -from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, DEVICE_GRPC_SERVICE_PORT -from monitoring.proto import context_pb2, monitoring_pb2 -from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType -from monitoring.service import SqliteTools, InfluxTools -from monitoring.service.MonitoringService import MonitoringService -from monitoring.service.EventTools import EventsDeviceCollector - +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker -from context.Config import ( - GRPC_SERVICE_PORT as grpc_port_context, - GRPC_MAX_WORKERS as grpc_workers_context, - GRPC_GRACE_PERIOD as grpc_grace_context -) from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService -from context.service.Populate import populate from context.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device -from context.tests.Objects import (DEVICE_R1, DEVICE_R1_UUID) + +from device.client.DeviceClient import DeviceClient +from device.service.DeviceService import DeviceService +from device.service.driver_api.DriverFactory import DriverFactory +from device.service.driver_api.DriverInstanceCache import DriverInstanceCache +from device.service.drivers import DRIVERS + +from monitoring.client.MonitoringClient import MonitoringClient +from monitoring.proto import context_pb2, monitoring_pb2 +from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType +from monitoring.service import SqliteTools, InfluxTools +from monitoring.service.MonitoringService import MonitoringService +from monitoring.service.EventTools import EventsDeviceCollector +from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request +from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID + LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -50,18 +49,19 @@ LOGGER.setLevel(logging.DEBUG) # Tests Setup ########################### -SERVER_ADDRESS = '127.0.0.1' -LISTEN_ADDRESS = '[::]' -GRPC_PORT_MONITORING = 7070 +LOCAL_HOST = '127.0.0.1' -GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports -DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports -MONITORING_GRPC_SERVICE_PORT = GRPC_PORT_MONITORING # avoid privileged ports +CONTEXT_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports +os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(CONTEXT_SERVICE_PORT) +DEVICE_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports +os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(DEVICE_SERVICE_PORT) -SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit - ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), -] +MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports +os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT) INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") INFLUXDB_PORT = os.environ.get("INFLUXDB_PORT") @@ -69,49 +69,61 @@ INFLUXDB_USER = os.environ.get("INFLUXDB_USER") INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") -@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) -def context_db_mb(request) -> Tuple[Database, MessageBroker]: - name,db_backend,db_settings,mb_backend,mb_settings = request.param - msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' - LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) - _database = Database(get_database_backend(backend=db_backend, **db_settings)) - _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) +@pytest.fixture(scope='session') +def context_db_mb() -> Tuple[Database, MessageBroker]: + _database = Database(get_database_backend(backend=DatabaseBackendEnum.INMEMORY)) + _message_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') -def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - database = context_db_mb[0] +def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name + database, message_broker = context_db_mb database.clear_all() - _service = ContextService( - database, context_db_mb[1], port=GRPC_PORT_CONTEXT, max_workers=grpc_workers_context, - grace_period=grpc_grace_context) + _service = ContextService(database, message_broker) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') -def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name - _client = ContextClient(address='localhost', port=GRPC_PORT_CONTEXT) +def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name + _client = ContextClient() yield _client _client.close() - -# This fixture will be requested by test cases and last during testing session @pytest.fixture(scope='session') -def monitoring_service(): - LOGGER.warning('monitoring_service begin') +def device_service(context_service : ContextService): # pylint: disable=redefined-outer-name + LOGGER.info('Initializing DeviceService...') + driver_factory = DriverFactory(DRIVERS) + driver_instance_cache = DriverInstanceCache(driver_factory) + _service = DeviceService(driver_instance_cache) + _service.start() - service_port = GRPC_SERVICE_PORT - max_workers = GRPC_MAX_WORKERS - grace_period = GRPC_GRACE_PERIOD + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding DeviceService...') + yield _service + + LOGGER.info('Terminating DeviceService...') + _service.stop() + +@pytest.fixture(scope='session') +def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name + _client = DeviceClient() + yield _client + _client.close() +# This fixture will be requested by test cases and last during testing session +@pytest.fixture(scope='session') +def monitoring_service( + context_service : ContextService, # pylint: disable=redefined-outer-name + device_service : DeviceService # pylint: disable=redefined-outer-name + ): LOGGER.info('Initializing MonitoringService...') - _service = MonitoringService(port=service_port, max_workers=max_workers, grace_period=grace_period) + _service = MonitoringService() _service.start() # yield the server, when test finishes, execution will resume to stop it - LOGGER.warning('monitoring_service yielding') + LOGGER.info('Yielding MonitoringService...') yield _service LOGGER.info('Terminating MonitoringService...') @@ -120,224 +132,187 @@ def monitoring_service(): # This fixture will be requested by test cases and last during testing session. # The client requires the server, so client fixture has the server as dependency. @pytest.fixture(scope='session') -def monitoring_client(monitoring_service): - LOGGER.warning('monitoring_client begin') - client = MonitoringClient(server=SERVER_ADDRESS, port=GRPC_PORT_MONITORING) # instantiate the client - LOGGER.warning('monitoring_client returning') - return client +def monitoring_client(monitoring_service : MonitoringService): # pylint: disable=redefined-outer-name + LOGGER.info('Initializing MonitoringClient...') + _client = MonitoringClient() -# This fixture will be requested by test cases and last during testing session. -@pytest.fixture(scope='session') -def kpi(): - LOGGER.warning('test_include_kpi begin') - # form request - kpi = monitoring_pb2.Kpi() - kpi.kpi_id.kpi_id.uuid = 'KPIID0000' - kpi.kpiDescription = 'KPI Desc' - return kpi - -@pytest.fixture(scope='session') -def kpi_id(): - LOGGER.warning('test_include_kpi begin') - - # form request - kpi_id = monitoring_pb2.KpiId() - kpi_id.kpi_id.uuid = str(1) + # yield the server, when test finishes, execution will resume to stop it + LOGGER.info('Yielding MonitoringClient...') + yield _client - return kpi_id + LOGGER.info('Closing MonitoringClient...') + _client.close() @pytest.fixture(scope='session') def sql_db(): - sql_db = SqliteTools.SQLite('monitoring.db') - return sql_db + _sql_db = SqliteTools.SQLite('monitoring.db') + return _sql_db @pytest.fixture(scope='session') def influx_db(): - influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) - return influx_db - -@pytest.fixture(scope='session') -def create_kpi_request(): - LOGGER.warning('test_include_kpi begin') - - create_kpi_request = monitoring_pb2.KpiDescriptor() - create_kpi_request.kpi_description = 'KPI Description Test' - create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - create_kpi_request.device_id.device_uuid.uuid = 'DEV1' # pylint: disable=maybe-no-member - create_kpi_request.service_id.service_uuid.uuid = "SERV1" - create_kpi_request.endpoint_id.endpoint_uuid.uuid = "END1" - - return create_kpi_request - -@pytest.fixture(scope='session') -def monitor_kpi_request(): - LOGGER.warning('test_monitor_kpi begin') - - monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() - monitor_kpi_request.kpi_id.kpi_id.uuid = str(1) - monitor_kpi_request.sampling_duration_s = 120 - monitor_kpi_request.sampling_interval_s = 5 - - return monitor_kpi_request - - -@pytest.fixture(scope='session') -def include_kpi_request(): - LOGGER.warning('test_include_kpi begin') - - include_kpi_request = monitoring_pb2.Kpi() - include_kpi_request.kpi_id.kpi_id.uuid = str(1) - include_kpi_request.timestamp = "2021-10-12T13:14:42Z" - include_kpi_request.kpi_value.intVal = 500 - - return include_kpi_request - -@pytest.fixture(scope='session') -def address(): - address = '127.0.0.1' - return address + _influx_db = InfluxTools.Influx( + INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) + return _influx_db -@pytest.fixture(scope='session') -def port(): - port = 7070 - return port ########################### # Tests Implementation ########################### # Test case that makes use of client fixture to test server's CreateKpi method -def test_create_kpi(monitoring_client,create_kpi_request): +def test_create_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_create_kpi requesting') - response = monitoring_client.CreateKpi(create_kpi_request) + response = monitoring_client.CreateKpi(create_kpi_request()) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.KpiId) # Test case that makes use of client fixture to test server's MonitorKpi method -def test_monitor_kpi(monitoring_client,create_kpi_request): +def test_monitor_kpi( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name + context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name + ): LOGGER.warning('test_monitor_kpi begin') - response = monitoring_client.CreateKpi(create_kpi_request) + context_database = context_db_mb[0] - monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() - monitor_kpi_request.kpi_id.kpi_id.uuid = response.kpi_id.uuid - monitor_kpi_request.sampling_duration_s = 120 - monitor_kpi_request.sampling_interval_s = 5 + # ----- Clean the database ----------------------------------------------------------------------------------------- + context_database.clear_all() - response = monitoring_client.MonitorKpi(monitor_kpi_request) + # ----- Dump state of database before create the object ------------------------------------------------------------ + db_entries = context_database.dump() + LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) + for db_entry in db_entries: + LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover + LOGGER.info('-----------------------------------------------------------') + assert len(db_entries) == 0 + + # ----- Update the object ------------------------------------------------------------------------------------------ + LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) + device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) + device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) + response = device_client.AddDevice(Device(**device_with_connect_rules)) + assert response.device_uuid.uuid == DEVICE_DEV1_UUID + + response = monitoring_client.CreateKpi(create_kpi_request()) + _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member + response = monitoring_client.MonitorKpi(_monitor_kpi_request) LOGGER.debug(str(response)) assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's IncludeKpi method -def test_include_kpi(monitoring_client,include_kpi_request): +def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_include_kpi requesting') - response = monitoring_client.IncludeKpi(include_kpi_request) + response = monitoring_client.IncludeKpi(include_kpi_request()) LOGGER.debug(str(response)) assert isinstance(response, context_pb2.Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method -def test_get_stream_kpi(monitoring_client,include_kpi_request): +def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getstream_kpi begin') - response = monitoring_client.GetStreamKpi(kpi) + response = monitoring_client.GetStreamKpi(kpi()) LOGGER.debug(str(response)) #assert isinstance(response, monitoring_pb2.Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method -def test_get_instant_kpi(monitoring_client,kpi_id): +def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getinstant_kpi begin') - response = monitoring_client.GetInstantKpi(kpi_id) + response = monitoring_client.GetInstantKpi(kpi_id()) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method -def test_get_kpidescritor_kpi(monitoring_client,create_kpi_request): +def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') - - response = monitoring_client.CreateKpi(create_kpi_request) - + response = monitoring_client.CreateKpi(create_kpi_request()) response = monitoring_client.GetKpiDescriptor(response) LOGGER.debug(str(response)) assert isinstance(response, monitoring_pb2.KpiDescriptor) -def test_sqlitedb_tools_insert_kpi(sql_db, create_kpi_request): +def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_sqlitedb_tools_insert_kpi begin') - - kpi_description = create_kpi_request.kpi_description - kpi_sample_type = create_kpi_request.kpi_sample_type - kpi_device_id = create_kpi_request.device_id.device_uuid.uuid - kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid - kpi_service_id = create_kpi_request.service_id.service_uuid.uuid + _create_kpi_request = create_kpi_request() + kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member + kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member + kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member + kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member + kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member response = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) assert isinstance(response, int) -def test_sqlitedb_tools_get_kpi(sql_db, create_kpi_request): +def test_sqlitedb_tools_get_kpi(sql_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_sqlitedb_tools_get_kpi begin') - - kpi_description = create_kpi_request.kpi_description - kpi_sample_type = create_kpi_request.kpi_sample_type - kpi_device_id = create_kpi_request.device_id.device_uuid.uuid - kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid - kpi_service_id = create_kpi_request.service_id.service_uuid.uuid - - kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - - response = sql_db.get_KPI(kpi_id) + _create_kpi_request = create_kpi_request() + kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member + kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member + kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member + kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member + kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member + + _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + response = sql_db.get_KPI(_kpi_id) assert isinstance(response, tuple) -def test_sqlitedb_tools_get_kpis(sql_db): +def test_sqlitedb_tools_get_kpis(sql_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_sqlitedb_tools_get_kpis begin') response = sql_db.get_KPIS() assert isinstance(response, list) -def test_sqlitedb_tools_delete_kpi(sql_db, create_kpi_request): +def test_sqlitedb_tools_delete_kpi(sql_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_sqlitedb_tools_get_kpi begin') response = sql_db.delete_KPI("DEV1",KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED) - if response == False: - kpi_description = create_kpi_request.kpi_description - kpi_sample_type = create_kpi_request.kpi_sample_type - kpi_device_id = create_kpi_request.device_id.device_uuid.uuid - kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid - kpi_service_id = create_kpi_request.service_id.service_uuid.uuid + if not response: + _create_kpi_request = create_kpi_request() + kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member + kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member + kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member + kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member + kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) response = sql_db.delete_KPI("DEV1", KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED) - assert response == True + assert response -def test_sqlitedb_tools_delete_kpid_id(sql_db, create_kpi_request): +def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_sqlitedb_tools_delete_kpid_id begin') response = sql_db.delete_kpid_id(1) - if response == False: - kpi_description = create_kpi_request.kpi_description - kpi_sample_type = create_kpi_request.kpi_sample_type - kpi_device_id = create_kpi_request.device_id.device_uuid.uuid - kpi_endpoint_id = create_kpi_request.endpoint_id.endpoint_uuid.uuid - kpi_service_id = create_kpi_request.service_id.service_uuid.uuid + if not response: + _create_kpi_request = create_kpi_request() + kpi_description = _create_kpi_request.kpi_description # pylint: disable=maybe-no-member + kpi_sample_type = _create_kpi_request.kpi_sample_type # pylint: disable=maybe-no-member + kpi_device_id = _create_kpi_request.device_id.device_uuid.uuid # pylint: disable=maybe-no-member + kpi_endpoint_id = _create_kpi_request.endpoint_id.endpoint_uuid.uuid # pylint: disable=maybe-no-member + kpi_service_id = _create_kpi_request.service_id.service_uuid.uuid # pylint: disable=maybe-no-member - kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) - response = sql_db.delete_kpid_id(kpi_id) + _kpi_id = sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) + response = sql_db.delete_kpid_id(_kpi_id) - assert response == True + assert response -def test_influxdb_tools_write_kpi(influx_db): +def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_influxdb_tools_write_kpi begin') -def test_influxdb_tools_read_kpi_points(influx_db): +def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_influxdb_tools_read_kpi_points begin') -def test_events_tools(context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name - monitoring_client : MonitoringClient, - context_db_mb: Tuple[Database, MessageBroker]): +def test_events_tools( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name + context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name + ): LOGGER.warning('test_get_device_events begin') context_database = context_db_mb[0] @@ -346,10 +321,10 @@ def test_events_tools(context_client_grpc: ContextClient, # pylint: disable=red context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsDeviceCollector(context_client_grpc, monitoring_client) + events_collector = EventsDeviceCollector() events_collector.start() - # # ----- Dump state of database before create the object ------------------------------------------------------------ + # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: @@ -357,18 +332,22 @@ def test_events_tools(context_client_grpc: ContextClient, # pylint: disable=red LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 - populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests - # ----- Update the object ------------------------------------------------------------------------------------------ - response = context_client_grpc.SetDevice(Device(**DEVICE_R1)) - assert response.device_uuid.uuid == DEVICE_R1_UUID + LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) + device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) + device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) + response = device_client.AddDevice(Device(**device_with_connect_rules)) + assert response.device_uuid.uuid == DEVICE_DEV1_UUID events_collector.stop() -def test_get_device_events(context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name - monitoring_client : MonitoringClient, - context_db_mb: Tuple[Database, MessageBroker]): +def test_get_device_events( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name + context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name + ): LOGGER.warning('test_get_device_events begin') @@ -378,10 +357,10 @@ def test_get_device_events(context_client_grpc: ContextClient, # pylint: disabl context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client) + events_collector = EventsDeviceCollector() events_collector.start() - # # ----- Dump state of database before create the object ------------------------------------------------------------ + # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: @@ -389,20 +368,26 @@ def test_get_device_events(context_client_grpc: ContextClient, # pylint: disabl LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 - populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests - # ----- Check create event ----------------------------------------------------------------------------------------- - event = events_collector.get_event(block=True) + LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) + device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) + device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) + response = device_client.AddDevice(Device(**device_with_connect_rules)) + assert response.device_uuid.uuid == DEVICE_DEV1_UUID + event = events_collector.get_event(block=True) assert isinstance(event, DeviceEvent) assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE - assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID + assert event.device_id.device_uuid.uuid == DEVICE_DEV1_UUID events_collector.stop() -def test_listen_events(monitoring_client: MonitoringClient, - context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name - context_db_mb: Tuple[Database, MessageBroker]): +def test_listen_events( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name + context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name + ): LOGGER.warning('test_listen_events begin') @@ -412,10 +397,10 @@ def test_listen_events(monitoring_client: MonitoringClient, context_database.clear_all() # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsDeviceCollector(context_client_grpc,monitoring_client) + events_collector = EventsDeviceCollector() events_collector.start() - # # ----- Dump state of database before create the object ------------------------------------------------------------ + # ----- Dump state of database before create the object ------------------------------------------------------------ db_entries = context_database.dump() LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) for db_entry in db_entries: @@ -423,18 +408,12 @@ def test_listen_events(monitoring_client: MonitoringClient, LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 - populate('localhost', GRPC_PORT_CONTEXT) # place this call in the appropriate line, according to your tests + LOGGER.info('Adding Device {:s}'.format(DEVICE_DEV1_UUID)) + device_with_connect_rules = copy.deepcopy(DEVICE_DEV1) + device_with_connect_rules['device_config']['config_rules'].extend(DEVICE_DEV1_CONNECT_RULES) + response = device_client.AddDevice(Device(**device_with_connect_rules)) + assert response.device_uuid.uuid == DEVICE_DEV1_UUID kpi_id_list = events_collector.listen_events() - assert bool(kpi_id_list) == True - -def test_socket_ports(address, port): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - result = s.connect_ex((address,port)) - - if result == 0: - print('socket is open') - else: - print('socket is not open') - s.close() + assert len(kpi_id_list) > 0