# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os, grpc, logging from queue import Queue from typing import Iterator from common.Constants import ServiceNameEnum from common.Settings import get_setting, get_service_port_grpc, get_service_host from common.logger import getJSONLogger from common.proto.context_pb2 import Empty from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \ KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.tools.timestamp.Converters import timestamp_string_to_float from monitoring.service import ManagementDBTools, MetricsDBTools from device.client.DeviceClient import DeviceClient from prometheus_client import Counter, Summary from monitoring.service.SubscriptionManager import SubscriptionManager LOGGER = getJSONLogger('monitoringservice-server') LOGGER.setLevel('INFO') MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) ) DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE)) class MonitoringServiceServicerImpl(MonitoringServiceServicer): def __init__(self): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.management_db = ManagementDBTools.ManagementDB('monitoring.db') self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE) self.subs_manager = SubscriptionManager(self.metrics_db) LOGGER.info('MetricsDB initialized') # SetKpi (SetKpiRequest) returns (KpiId) {} def SetKpi( self, request : KpiDescriptor, grpc_context : grpc.ServicerContext ) -> KpiId: # CREATEKPI_COUNTER_STARTED.inc() LOGGER.info('SetKpi') try: # Here the code to create a sqlite query to crete a KPI and return a KpiID kpi_id = KpiId() kpi_description = request.kpi_description kpi_sample_type = request.kpi_sample_type kpi_device_id = request.device_id.device_uuid.uuid kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid kpi_service_id = request.service_id.service_uuid.uuid data = self.management_db.insert_KPI( kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) kpi_id.kpi_id.uuid = str(data) # CREATEKPI_COUNTER_COMPLETED.inc() return kpi_id except ServiceException as e: LOGGER.exception('SetKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('SetKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) def DeleteKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('DeleteKpi') try: # TBC return Empty() except ServiceException as e: LOGGER.exception('DeleteKpi exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteKpi exception') def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor: LOGGER.info('getting Kpi by KpiID') try: kpi_db = self.management_db.get_KPI(int(request.kpi_id.uuid)) # LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS()))) # LOGGER.info('kpi_db={:s}'.format(str(kpi_db))) if kpi_db is None: return None kpiDescriptor = KpiDescriptor() kpiDescriptor.kpi_description = kpi_db[1] kpiDescriptor.kpi_sample_type = kpi_db[2] kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3]) kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4]) kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5]) return kpiDescriptor except ServiceException as e: LOGGER.exception('GetKpiDescriptor exception') grpc_context.abort(e.code, e.details) except Exception: # pragma: no cover LOGGER.exception('GetKpiDescriptor exception') def GetKpiDescriptorList ( self, request : Empty, grpc_context : grpc.ServicerContext) -> KpiDescriptorList: LOGGER.info('GetKpiDescriptorList') try: # TBC return KpiDescriptorList() except ServiceException as e: LOGGER.exception('GetKpiDescriptorList exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetKpiDescriptorList exception') # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} def IncludeKpi(self, request : Kpi, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('IncludeKpi') try: kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) if kpiDescriptor is None: raise Exception(LOGGER.exception('IncludeKpi exception: Sample with KPIId({:s}): not found in database'.format(str(request.kpi_id)))) else: kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') kpiId = request.kpi_id.kpi_id.uuid deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid time_stamp = request.timestamp.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) # Build the structure to be included as point in the MetricsDB self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) #self.influx_db.read_KPI_points() return Empty() except ServiceException as e: LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception: # pragma: no cover LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} def MonitorKpi ( self, request : MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('MonitorKpi') try: # Sets the request to send to the device service monitor_device_request = MonitoringSettings() kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid monitor_device_request.sampling_duration_s = request.monitoring_window_s monitor_device_request.sampling_interval_s = request.sampling_rate_s device_client = DeviceClient() device_client.MonitorDeviceKpi(monitor_device_request) except ServiceException as e: LOGGER.exception('MonitorKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('MonitorKpi exception') grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # CREATEKPI_COUNTER_FAILED.inc() return Empty() def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList: LOGGER.info('QueryKpiData') try: # TBC return KpiList() except ServiceException as e: LOGGER.exception('QueryKpiData exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('QueryKpiData exception') def SetKpiSubscription ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> SubsResponse: LOGGER.info('SubscribeKpi') try: subs_queue = Queue() subs_response = SubsResponse() kpi_id = request.kpi_id.kpi_id.uuid sampling_duration_s = request.sampling_duration_s sampling_interval_s = request.sampling_interval_s start_timestamp = request.start_timestamp.timestamp end_timestamp = request.end_timestamp.timestamp subscriber = "localhost" # Investigate how to get info from the requester subs_id = self.management_db.insert_subscription(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp) self.subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp) # parse queue to append kpis into the list while not subs_queue.empty(): list = subs_queue.get_nowait() for item in list: kpi = kpi.kpi_id.kpi_id.uuid = item[0] kpi.timestamp.timestamp = timestamp_string_to_float(item[1]) kpi.kpi_value.floatVal = item[2] # This must be improved subs_response.kpi_list.kpi.append(kpi) subs_response.subs_id.subs_id.uuid = subs_id yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('SubscribeKpi exception') def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor: LOGGER.info('GetSubsDescriptor') try: # TBC return SubsDescriptor() except ServiceException as e: LOGGER.exception('GetSubsDescriptor exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetSubsDescriptor exception') def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsList: LOGGER.info('GetSubscriptions') try: # TBC return SubsList() except ServiceException as e: LOGGER.exception('GetSubscriptions exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetSubscriptions exception') def DeleteSubscription ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('DeleteSubscription') try: # TBC return Empty() except ServiceException as e: LOGGER.exception('DeleteSubscription exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteSubscription exception') def SetKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse: LOGGER.info('SetKpiAlarm') try: # TBC return AlarmResponse() except ServiceException as e: LOGGER.exception('SetKpiAlarm exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('SetKpiAlarm exception') def GetAlarms ( self, request : Empty, grpc_context : grpc.ServicerContext) -> AlarmList: LOGGER.info('GetAlarms') try: # TBC return AlarmList() except ServiceException as e: LOGGER.exception('GetAlarms exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarms exception') def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor: LOGGER.info('GetAlarmDescriptor') try: # TBC return AlarmDescriptor() except ServiceException as e: LOGGER.exception('GetAlarmDescriptor exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarmDescriptor exception') def GetAlarmResponseStream(self, request : AlarmSubscription, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]: LOGGER.info('GetAlarmResponseStream') try: # TBC yield AlarmResponse() except ServiceException as e: LOGGER.exception('GetAlarmResponseStream exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetAlarmResponseStream exception') def DeleteAlarm ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('DeleteAlarm') try: # TBC return Empty() except ServiceException as e: LOGGER.exception('DeleteAlarm exception') grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteAlarm exception') def GetStreamKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Iterator[Kpi]: LOGGER.info('GetStreamKpi') yield Kpi() @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() def GetInstantKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiList: LOGGER.info('GetInstantKpi') return KpiList()