diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 2c7b98d2ca3392906e0f42896907bb887b45e80b..c7421a9b78e092884a19949200655714c941b062 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -22,9 +22,9 @@ service MonitoringService { rpc CreateKpi (KpiDescriptor ) returns (KpiId ) {} rpc EditKpiDescriptor (EditedKpiDescriptor ) returns (context.Empty ) {} rpc DeleteKpi (KpiId ) returns (context.Empty ) {} + rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList) {} rpc CreateBundleKpi (BundleKpiDescriptor ) returns (KpiId ) {} - rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} rpc IncludeKpi (Kpi ) returns (context.Empty ) {} rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} @@ -32,13 +32,13 @@ service MonitoringService { rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} rpc GetSubscriptions (context.Empty ) returns (SubsIDList ) {} rpc EditKpiSubscription (SubsDescriptor ) returns (context.Empty ) {} - rpc CreateKpiAlarm (AlarmDescriptor ) returns ( AlarmID ) {} + rpc CreateKpiAlarm (AlarmDescriptor ) returns (AlarmID ) {} rpc EditKpiAlarm (AlarmDescriptor ) returns (context.Empty ) {} rpc GetAlarms (context.Empty ) returns (AlarmIDList ) {} rpc GetAlarmDescriptor (AlarmID ) returns (AlarmDescriptor ) {} rpc GetAlarmResponseStream (AlarmID ) returns (stream AlarmResponse ) {} - // rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} - // rpc GetInstantKpi (KpiId ) returns (KpiList ) {} + rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} + rpc GetInstantKpi (KpiId ) returns (KpiList ) {} } message KpiDescriptor { diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py index d8b39b8bf8d0ae84da19fa651da00633486e6bc6..9b1b80e74895e3f3323f7cef8c78af720638296e 100644 --- a/src/monitoring/client/MonitoringClient.py +++ b/src/monitoring/client/MonitoringClient.py @@ -19,7 +19,9 @@ from common.Settings import get_service_host, get_service_port_grpc from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string from monitoring.proto.context_pb2 import Empty -from monitoring.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest +from monitoring.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, EditedKpiDescriptor, \ + KpiDescriptorList, BundleKpiDescriptor, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsIDList, \ + AlarmDescriptor, AlarmID, AlarmIDList, AlarmResponse from monitoring.proto.monitoring_pb2_grpc import MonitoringServiceStub LOGGER = logging.getLogger(__name__) @@ -54,6 +56,20 @@ class MonitoringClient: LOGGER.debug('CreateKpi result: {:s}'.format(grpc_message_to_json_string(response))) return response + @RETRY_DECORATOR + def EditKpiDescriptor(self, request : EditedKpiDescriptor) -> Empty: + LOGGER.debug('EditKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.EditKpiDescriptor(request) + LOGGER.info('EditKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def DeleteKpi(self,request : KpiId) -> Empty: + LOGGER.debug('DeleteKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.DeleteKpi(request) + LOGGER.info('DeleteKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + @RETRY_DECORATOR def GetKpiDescriptor(self, request : KpiId) -> KpiDescriptor: LOGGER.debug('GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request))) @@ -61,6 +77,20 @@ class MonitoringClient: LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) return response + @RETRY_DECORATOR + def GetKpiDescriptorList(self, request : Empty) -> KpiDescriptorList: + LOGGER.debug('GetKpiDescriptorList: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetKpiDescriptorList(request) + LOGGER.debug('GetKpiDescriptorList result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def CreateBundleKpi(self, request : BundleKpiDescriptor) -> KpiId: + LOGGER.debug('CreateBundleKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.CreateBundleKpi(request) + LOGGER.debug('CreateBundleKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + @RETRY_DECORATOR def IncludeKpi(self, request : Kpi) -> Empty: LOGGER.debug('IncludeKpi: {:s}'.format(grpc_message_to_json_string(request))) @@ -76,19 +106,87 @@ class MonitoringClient: return response @RETRY_DECORATOR - def GetStreamKpi(self, request : KpiId) -> Iterator[Kpi]: - LOGGER.debug('GetStreamKpi: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetStreamKpi(request) - LOGGER.debug('GetStreamKpi result: {:s}'.format(grpc_message_to_json_string(response))) + def QueryKpiData(self, request : KpiQuery) -> KpiList: + LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.QueryKpiData(request) + LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SubscribeKpi(self, request : SubsDescriptor) -> Iterator[KpiList]: + LOGGER.debug('SubscribeKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SubscribeKpi(request) + LOGGER.debug('SubscribeKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetSubsDescriptor(self, request : SubscriptionID) -> SubsDescriptor: + LOGGER.debug('GetSubsDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetSubsDescriptor(request) + LOGGER.debug('GetSubsDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def GetInstantKpi(self, request : KpiId) -> Kpi: - LOGGER.debug('GetInstantKpi: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetInstantKpi(request) - LOGGER.debug('GetInstantKpi result: {:s}'.format(grpc_message_to_json_string(response))) + def GetSubscriptions(self, request : Empty) -> SubsIDList: + LOGGER.debug('GetSubscriptions: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetSubscriptions(request) + LOGGER.debug('GetSubscriptions result: {:s}'.format(grpc_message_to_json_string(response))) return response + @RETRY_DECORATOR + def EditKpiSubscription(self, request : SubsDescriptor) -> Empty: + LOGGER.debug('EditKpiSubscription: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetSubscriptions(request) + LOGGER.debug('EditKpiSubscription result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def CreateKpiAlarm(self, request : AlarmDescriptor) -> AlarmID: + LOGGER.debug('CreateKpiAlarm: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.CreateKpiAlarm(request) + LOGGER.debug('CreateKpiAlarm result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def EditKpiAlarm(self, request : AlarmDescriptor) -> Empty: + LOGGER.debug('EditKpiAlarm: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.EditKpiAlarm(request) + LOGGER.debug('EditKpiAlarm result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetAlarms(self, request : Empty) -> AlarmIDList: + LOGGER.debug('GetAlarms: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetAlarms(request) + LOGGER.debug('GetAlarms result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + def GetAlarmDescriptor(self, request : AlarmID) -> AlarmDescriptor: + LOGGER.debug('GetAlarmDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetAlarmDescriptor(request) + LOGGER.debug('GetAlarmDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + def GetAlarmResponseStream(self, request : AlarmID) -> AlarmResponse: + LOGGER.debug('GetAlarmResponseStream: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetAlarmResponseStream(request) + LOGGER.debug('GetAlarmResponseStream result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + # @RETRY_DECORATOR + # def GetStreamKpi(self, request : KpiId) -> Iterator[Kpi]: + # LOGGER.debug('GetStreamKpi: {:s}'.format(grpc_message_to_json_string(request))) + # response = self.stub.GetStreamKpi(request) + # LOGGER.debug('GetStreamKpi result: {:s}'.format(grpc_message_to_json_string(response))) + # return response + # + # @RETRY_DECORATOR + # def GetInstantKpi(self, request : KpiId) -> Kpi: + # LOGGER.debug('GetInstantKpi: {:s}'.format(grpc_message_to_json_string(request))) + # response = self.stub.GetInstantKpi(request) + # LOGGER.debug('GetInstantKpi result: {:s}'.format(grpc_message_to_json_string(response))) + # return response + if __name__ == '__main__': import sys diff --git a/src/monitoring/client/monitoring_client.py b/src/monitoring/client/monitoring_client.py deleted file mode 100644 index fe7ac285d59692f2e14034abaa491a12c2c1b845..0000000000000000000000000000000000000000 --- a/src/monitoring/client/monitoring_client.py +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import grpc - -from monitoring.proto import monitoring_pb2 -from monitoring.proto import monitoring_pb2_grpc -from monitoring.proto import context_pb2 - -from common.logger import getJSONLogger -LOGGER = getJSONLogger('monitoring-client') -LOGGER.setLevel('DEBUG') - -class MonitoringClient: - - def __init__(self, server='monitoring', port='7070'): - endpoint = '{}:{}'.format(server, port) - LOGGER.info('init monitoringClient {}'.format(endpoint)) - self.channel = grpc.insecure_channel(endpoint) - self.server = monitoring_pb2_grpc.MonitoringServiceStub(self.channel) - - def CreateKpi(self, request): - LOGGER.info('CreateKpi: {}'.format(request)) - response = self.server.CreateKpi(request) - LOGGER.info('CreateKpi result: {}'.format(response)) - return response - - def EditKpiDescriptor(self, request): - LOGGER.info('EditKpiDescriptor: {}'.format(request)) - response = self.server.EditKpiDescriptor(request) - LOGGER.info('EditKpiDescriptor result: {}'.format(response)) - return response - - def DeleteKpi(self, request): - LOGGER.info('DeleteKpi: {}'.format(request)) - response = self.server.DeleteKpi(request) - LOGGER.info('DeleteKpi result: {}'.format(response)) - return response - - def GetKpiDescriptorList(self, request): - LOGGER.info('GetKpiDescriptorList: {}'.format(request)) - response = self.server.GetKpiDescriptorList(request) - LOGGER.info('GetKpiDescriptorList result: {}'.format(response)) - return response - - def CreateBundleKpi(self, request): - LOGGER.info('CreateBundleKpi: {}'.format(request)) - response = self.server.CreateBundleKpi(request) - LOGGER.info('CreateBundleKpi result: {}'.format(response)) - return response - - def GetKpiDescriptor(self, request): - LOGGER.info('GetKpiDescriptor: {}'.format(request)) - response = self.server.GetKpiDescriptor(request) - LOGGER.info('GetKpiDescriptor result: {}'.format(response)) - return response - - def MonitorKpi(self, request): - LOGGER.info('MonitorKpi: {}'.format(request)) - response = self.server.MonitorKpi(request) - LOGGER.info('MonitorKpi result: {}'.format(response)) - return response - - def IncludeKpi(self, request): - LOGGER.info('IncludeKpi: {}'.format(request)) - response = self.server.IncludeKpi(request) - LOGGER.info('IncludeKpi result: {}'.format(response)) - return response - - def QueryKpiData(self, request): - LOGGER.info('QueryKpiData: {}'.format(request)) - response = self.server.QueryKpiData(request) - LOGGER.info('QueryKpiData result: {}'.format(response)) - return response - - def SubscribeKpi(self, request): - LOGGER.info('SubscribeKpi: {}'.format(request)) - response = self.server.SubscribeKpi(request) - LOGGER.info('SubscribeKpi result: {}'.format(response)) - return response - - def GetSubsDescriptor(self, request): - LOGGER.info('GetSubsDescriptor: {}'.format(request)) - response = self.server.GetSubsDescriptor(request) - LOGGER.info('GetSubsDescriptor result: {}'.format(response)) - return response - - def EditKpiSubscription(self, request): - LOGGER.info('EditKpiSubscription: {}'.format(request)) - response = self.server.EditKpiSubscription(request) - LOGGER.info('EditKpiSubscription result: {}'.format(response)) - return response - - def CreateKpiAlarm(self, request): - LOGGER.info('CreateKpiAlarm: {}'.format(request)) - response = self.server.CreateKpiAlarm(request) - LOGGER.info('CreateKpiAlarm result: {}'.format(response)) - return response - - def EditKpiAlarm(self, request): - LOGGER.info('EditKpiAlarm: {}'.format(request)) - response = self.server.EditKpiAlarm(request) - LOGGER.info('EditKpiAlarm result: {}'.format(response)) - return response - - def GetAlarms(self, request): - LOGGER.info('GetAlarms: {}'.format(request)) - response = self.server.GetAlarms(request) - LOGGER.info('GetAlarms result: {}'.format(response)) - return response - - def GetAlarmDescriptor(self, request): - LOGGER.info('GetAlarmDescriptor: {}'.format(request)) - response = self.server.GetAlarmDescriptor(request) - LOGGER.info('GetAlarmDescriptor result: {}'.format(response)) - return response - - - # def GetStreamKpi(self, request): - # LOGGER.info('GetStreamKpi: {}'.format(request)) - # response = self.server.GetStreamKpi(request) - # LOGGER.info('GetStreamKpi result: {}'.format(response)) - # yield monitoring_pb2.Kpi() - # - # def GetInstantKpi(self, request): - # LOGGER.info('GetInstantKpi: {}'.format(request)) - # response = self.server.GetInstantKpi(request) - # LOGGER.info('GetInstantKpi result: {}'.format(response)) - # return monitoring_pb2.Kpi() - - - -if __name__ == '__main__': - # get port - port = sys.argv[1] if len(sys.argv) > 1 else '7070' - - # make call to server - client = MonitoringClient(port=port) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 21c008a06b6d53077b822a44677f28fe5d762970..e6acf0e341c150bf9fe1b8f828267f656cc5b9ce 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -14,13 +14,17 @@ import os,grpc, logging import socket +from typing import Iterator from prometheus_client import Summary from prometheus_client import Counter from common.Settings import get_setting +from context.proto.context_pb2 import Empty from monitoring.Config import DEVICE_GRPC_SERVICE_PORT, DEVICE_SERVICE_HOST from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType +from monitoring.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDList, SubsIDList, KpiId, \ + KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID from monitoring.service import SqliteTools, InfluxTools from monitoring.proto import monitoring_pb2 from monitoring.proto import monitoring_pb2_grpc @@ -42,8 +46,8 @@ INFLUXDB_USER = os.environ.get("INFLUXDB_USER") INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") -DEVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=DEVICE_SERVICE_HOST ) -DEVICE_SERVICE_PORT = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=DEVICE_GRPC_SERVICE_PORT) +DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=DEVICE_SERVICE_HOST ) +DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=DEVICE_GRPC_SERVICE_PORT) class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): @@ -52,7 +56,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService # Init sqlite monitoring db self.sql_db = SqliteTools.SQLite('monitoring.db') - self.deviceClient = DeviceClient(address=DEVICE_SERVICE_HOST, port=DEVICE_GRPC_SERVICE_PORT) # instantiate the client + self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICE_GRPC_SERVICE_PORT) # instantiate the client # Create influx_db client self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) @@ -211,7 +215,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService # return monitoring_pb2.Kpi() - def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor: + def GetKpiDescriptor(self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiDescriptor: LOGGER.info('getting Kpi by KpiID') try: kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid)) @@ -235,7 +239,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception as e: # pragma: no cover LOGGER.exception('GetKpiDescriptor exception') - def QueryKpiData ( self, request : monitoring_pb2.KpiQuery, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiList: + def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList: LOGGER.info('QueryKpiData') try: @@ -247,7 +251,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception as e: # pragma: no cover LOGGER.exception('QueryKpiData exception') - def SubscribeKpi ( self, request : monitoring_pb2.SubsDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiList: + def SubscribeKpi ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> KpiList: LOGGER.info('SubscribeKpi') try: @@ -260,7 +264,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.exception('SubscribeKpi exception') - def GetSubsDescriptor ( self, request : monitoring_pb2.SubscriptionID, grpc_context : grpc.ServicerContext) -> monitoring_pb2.SubsDescriptor: + def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor: LOGGER.info('GetSubsDescriptor') try: @@ -272,7 +276,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception as e: # pragma: no cover LOGGER.exception('GetSubsDescriptor exception') - def GetSubscriptions ( self, request : context_pb2.Empty, grpc_context : grpc.ServicerContext) -> monitoring_pb2.SubsIDList: + def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsIDList: LOGGER.info('GetSubscriptions') try: @@ -284,7 +288,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception as e: # pragma: no cover LOGGER.exception('GetSubscriptions exception') - def EditKpiSubscription ( self, request : monitoring_pb2.SubsDescriptor, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: + def EditKpiSubscription ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('EditKpiSubscription') try: @@ -296,7 +300,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception as e: # pragma: no cover LOGGER.exception('EditKpiSubscription exception') - def CreateKpiAlarm ( self, request : monitoring_pb2.AlarmDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.AlarmResponse: + def CreateKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse: LOGGER.info('CreateKpiAlarm') try: @@ -308,7 +312,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception as e: # pragma: no cover LOGGER.exception('CreateKpiAlarm exception') - def EditKpiAlarm ( self, request : monitoring_pb2.AlarmDescriptor, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: + def EditKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('EditKpiAlarm') try: @@ -320,7 +324,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception as e: # pragma: no cover LOGGER.exception('EditKpiAlarm exception') - def GetAlarms ( self, request : context_pb2.Empty, grpc_context : grpc.ServicerContext) -> monitoring_pb2.AlarmIDList: + def GetAlarms ( self, request : context_pb2.Empty, grpc_context : grpc.ServicerContext) -> AlarmIDList: LOGGER.info('GetAlarms') try: @@ -332,7 +336,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception as e: # pragma: no cover LOGGER.exception('GetAlarms exception') - def GetAlarmDescriptor ( self, request : monitoring_pb2.AlarmID, grpc_context : grpc.ServicerContext) -> monitoring_pb2.AlarmDescriptor: + def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor: LOGGER.info('GetAlarmDescriptor') try: @@ -342,4 +346,16 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService LOGGER.exception('GetAlarmDescriptor exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover - LOGGER.exception('GetAlarmDescriptor exception') \ No newline at end of file + LOGGER.exception('GetAlarmDescriptor exception') + + def GetAlarmResponseStream(self, request : AlarmID, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]: + + LOGGER.info('GetAlarmResponseStream') + try: + # TBC + yield monitoring_pb2.AlarmResponse() + except ServiceException as e: + LOGGER.exception('GetAlarmResponseStream exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('GetAlarmResponseStream exception') diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 4acda419c38420eb7175cd1a8f0ff98246d85753..7b89ade4525566fef332f113b0c9f4920f0e01bc 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -19,7 +19,7 @@ import pytest from typing import Tuple -from monitoring.client.monitoring_client import MonitoringClient +from monitoring.client.MonitoringClient import MonitoringClient from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, DEVICE_GRPC_SERVICE_PORT from monitoring.proto import context_pb2, monitoring_pb2 from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType @@ -31,18 +31,19 @@ from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker +from common.Constants import ServiceNameEnum +from common.Settings import get_grpc_max_workers, get_service_port_grpc, get_grpc_grace_period -from context.Config import ( - GRPC_SERVICE_PORT as grpc_port_context, - GRPC_MAX_WORKERS as grpc_workers_context, - GRPC_GRACE_PERIOD as grpc_grace_context -) from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService from context.service.Populate import populate from context.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device from context.tests.Objects import (DEVICE_R1, DEVICE_R1_UUID) +from .PrepareTestScenario import ( # pylint: disable=unused-import + # be careful, order of symbols is important here! + mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment) + LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -52,11 +53,13 @@ LOGGER.setLevel(logging.DEBUG) SERVER_ADDRESS = '127.0.0.1' LISTEN_ADDRESS = '[::]' -GRPC_PORT_MONITORING = 7070 -GRPC_PORT_CONTEXT = 10000 + grpc_port_context # avoid privileged ports -DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports -MONITORING_GRPC_SERVICE_PORT = GRPC_PORT_MONITORING # avoid privileged ports +GRPC_PORT_CONTEXT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports +DEVICE_GRPC_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.DEVICE) # avoid privileged ports +MONITORING_GRPC_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.MONITORING) # avoid privileged ports + +grpc_workers_context = get_grpc_max_workers() +grpc_grace_context = get_grpc_grace_period() SCENARIOS = [ # comment/uncomment scenarios to activate/deactivate them in the test unit @@ -81,18 +84,14 @@ def context_db_mb(request) -> Tuple[Database, MessageBroker]: @pytest.fixture(scope='session') def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - database = context_db_mb[0] - database.clear_all() - _service = ContextService( - database, context_db_mb[1], port=GRPC_PORT_CONTEXT, max_workers=grpc_workers_context, - grace_period=grpc_grace_context) + _service = ContextService(context_db_mb[0], context_db_mb[1]) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name - _client = ContextClient(address='localhost', port=GRPC_PORT_CONTEXT) + _client = ContextClient(host='localhost', port=GRPC_PORT_CONTEXT) yield _client _client.close() @@ -122,7 +121,7 @@ def monitoring_service(): @pytest.fixture(scope='session') def monitoring_client(monitoring_service): LOGGER.warning('monitoring_client begin') - client = MonitoringClient(server=SERVER_ADDRESS, port=GRPC_PORT_MONITORING) # instantiate the client + client = MonitoringClient(host=SERVER_ADDRESS, port=MONITORING_GRPC_SERVICE_PORT) # instantiate the client LOGGER.warning('monitoring_client returning') return client