Commit ca74ba7d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Monitoring Component:

- Updated to new metrics export framework
parent 53da4ec3
Loading
Loading
Loading
Loading
+340 −514
Original line number Diff line number Diff line
@@ -12,12 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os, grpc
import logging, os, grpc
from queue import Queue

from typing import Iterator

from common.logger import getJSONLogger
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
@@ -25,30 +23,22 @@ from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
    KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
    MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse, RawKpiTable, RawKpi, RawKpiList
from common.method_wrappers.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float

from monitoring.service import ManagementDBTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient

from prometheus_client import Counter, Summary

from monitoring.service import ManagementDBTools, MetricsDBTools
from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.NameMapping import NameMapping
from monitoring.service.SubscriptionManager import SubscriptionManager

LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')

MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
    'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
LOGGER = logging.getLogger(__name__)

METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE_MONITORING_KPIS = os.environ.get("METRICSDB_TABLE_MONITORING_KPIS")

METRICS_POOL = MetricsPool('Monitoring', 'RPC')

class MonitoringServiceServicerImpl(MonitoringServiceServicer):
    def __init__(self, name_mapping : NameMapping):
        LOGGER.info('Init monitoringService')
@@ -63,15 +53,11 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        LOGGER.info('MetricsDB initialized')

    # SetKpi (SetKpiRequest) returns (KpiId) {}
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetKpi(
            self, request: KpiDescriptor, grpc_context: grpc.ServicerContext
    ) -> KpiId:
        # CREATEKPI_COUNTER_STARTED.inc()
        LOGGER.info('SetKpi')
        try:
            # Here the code to create a sqlite query to crete a KPI and return a KpiID
        response = KpiId()

        kpi_description = request.kpi_description
        kpi_sample_type = request.kpi_sample_type
        kpi_device_id = request.device_id.device_uuid.uuid
@@ -79,31 +65,18 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        kpi_service_id = request.service_id.service_uuid.uuid
        kpi_slice_id = request.slice_id.slice_uuid.uuid
        kpi_connection_id = request.connection_id.connection_uuid.uuid


        if request.kpi_id.kpi_id.uuid != "":
            response.kpi_id.uuid = request.kpi_id.kpi_id.uuid
            # Here the code to modify an existing kpi
        else:
            data = self.management_db.insert_KPI(
                    kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id)
                kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id,
                kpi_connection_id)
            response.kpi_id.uuid = str(data)

        return response
        except ServiceException as e:
            LOGGER.exception('SetKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('SetKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty:

        LOGGER.info('DeleteKpi')
        try:
            LOGGER.debug(f'DeleteKpi with KpiID: {request.kpi_id.uuid}')
        kpi_id = int(request.kpi_id.uuid)
        kpi = self.management_db.get_KPI(kpi_id)
        if kpi:
@@ -111,15 +84,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        else:
            LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
        return Empty()
        except ServiceException as e:
            LOGGER.exception('DeleteKpi exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('DeleteKpi exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor:
        LOGGER.info('getting Kpi by KpiID')
        try:
        kpi_id = request.kpi_id.uuid
        kpi_db = self.management_db.get_KPI(int(kpi_id))
        kpiDescriptor = KpiDescriptor()
@@ -134,21 +101,12 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            kpiDescriptor.slice_id.slice_uuid.uuid              = str(kpi_db[6])
            kpiDescriptor.connection_id.connection_uuid.uuid    = str(kpi_db[7])
        return kpiDescriptor
        except ServiceException as e:
            LOGGER.exception('GetKpiDescriptor exception')
            grpc_context.abort(e.code, e.details)
        except Exception:  # pragma: no cover
            LOGGER.exception('GetKpiDescriptor exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetKpiDescriptorList(self, request: Empty, grpc_context: grpc.ServicerContext) -> KpiDescriptorList:

        LOGGER.info('GetKpiDescriptorList')
        try:
        kpi_descriptor_list = KpiDescriptorList()

        data = self.management_db.get_KPIS()
        LOGGER.debug(f"data: {data}")

        for item in data:
            kpi_descriptor = KpiDescriptor()
            kpi_descriptor.kpi_id.kpi_id.uuid                   = str(item[0])
@@ -159,21 +117,11 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            kpi_descriptor.service_id.service_uuid.uuid         = str(item[5])
            kpi_descriptor.slice_id.slice_uuid.uuid             = str(item[6])
            kpi_descriptor.connection_id.connection_uuid.uuid   = str(item[7])

            kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor)

        return kpi_descriptor_list
        except ServiceException as e:
            LOGGER.exception('GetKpiDescriptorList exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetKpiDescriptorList exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def IncludeKpi(self, request: Kpi, grpc_context: grpc.ServicerContext) -> Empty:

        LOGGER.info('IncludeKpi')

        try:
        kpi_id = request.kpi_id.kpi_id.uuid
        kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)

@@ -192,35 +140,21 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):

            # Build the structure to be included as point in the MetricsDB
            self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value)

        return Empty()
        except ServiceException as e:
            LOGGER.exception('IncludeKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(e.code, e.details)
        except Exception:  # pragma: no cover
            LOGGER.exception('IncludeKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty:

        LOGGER.info('MonitorKpi')
        try:
        kpi_id = int(request.kpi_id.kpi_id.uuid)
        kpi = self.management_db.get_KPI(kpi_id)
        response = Empty()

        if kpi:
            # Sets the request to send to the device service
            monitor_device_request = MonitoringSettings()

            kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)

            monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
            monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
            monitor_device_request.sampling_duration_s = request.monitoring_window_s
            monitor_device_request.sampling_interval_s = request.sampling_rate_s

            if not self.management_db.check_monitoring_flag(kpi_id):
                device_client = DeviceClient()
                device_client.MonitorDeviceKpi(monitor_device_request)
@@ -231,23 +165,10 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        else:
            LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
        return response
        except ServiceException as e:
            LOGGER.exception('MonitorKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('MonitorKpi exception')
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
            # CREATEKPI_COUNTER_FAILED.inc()

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> RawKpiTable:

        LOGGER.info('QueryKpiData')
        try:
        raw_kpi_table = RawKpiTable()

            LOGGER.debug(str(request))

        kpi_id_list             = request.kpi_ids
        monitoring_window_s     = request.monitoring_window_s
        last_n_samples          = request.last_n_samples
@@ -257,7 +178,6 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        # Check if all the Kpi_ids exist
        for item in kpi_id_list:
            kpi_id = item.kpi_id.uuid

            kpiDescriptor = self.GetKpiDescriptor(item, grpc_context)
            if kpiDescriptor is None:
                LOGGER.info('QueryKpiData error: KpiID({:s}): not found in database'.format(str(kpi_id)))
@@ -280,18 +200,10 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
                        raw_kpi_list.raw_kpis.append(raw_kpi)

                raw_kpi_table.raw_kpi_lists.append(raw_kpi_list)

        return raw_kpi_table
        except ServiceException as e:
            LOGGER.exception('QueryKpiData exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('QueryKpiData exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse:

        LOGGER.info('SubscribeKpi')
        try:
        subs_queue = Queue()

        kpi_id = request.kpi_id.kpi_id.uuid
@@ -323,16 +235,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            if timestamp_utcnow_to_float() > end_timestamp:
                break
        # yield subs_response
        except ServiceException as e:
            LOGGER.exception('SubscribeKpi exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('SubscribeKpi exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor:

        LOGGER.info('GetSubsDescriptor')
        try:
        subs_id = request.subs_id.uuid
        subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
        response = SubsDescriptor()
@@ -346,45 +251,25 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            response.sampling_interval_s = subs_db[4]
            response.start_timestamp.timestamp = subs_db[5]
            response.end_timestamp.timestamp = subs_db[6]

        return response
        except ServiceException as e:
            LOGGER.exception('GetSubsDescriptor exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetSubsDescriptor exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList:

        LOGGER.info('GetSubscriptions')
        try:
        response = SubsList()
        data = self.management_db.get_subscriptions()

        for subs_db in data:
            subs_descriptor = SubsDescriptor()

            subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0])
            subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1])
            subs_descriptor.sampling_duration_s = subs_db[3]
            subs_descriptor.sampling_interval_s = subs_db[4]
            subs_descriptor.start_timestamp.timestamp = subs_db[5]
            subs_descriptor.end_timestamp.timestamp = subs_db[6]

            response.subs_descriptor.append(subs_descriptor)

        return response
        except ServiceException as e:
            LOGGER.exception('GetSubscriptions exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetSubscriptions exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty:

        LOGGER.info('DeleteSubscription')
        try:
            LOGGER.debug(f'DeleteSubscription with SubsID: {request.subs_id.uuid}')
        subs_id = int(request.subs_id.uuid)
        subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
        if subs_db:
@@ -392,18 +277,10 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        else:
            LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id)))
        return Empty()
        except ServiceException as e:
            LOGGER.exception('DeleteSubscription exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('DeleteSubscription exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse:

        LOGGER.info('SetKpiAlarm')
        try:
        response = AlarmID()

        alarm_description = request.alarm_description
        alarm_name = request.name
        kpi_id = request.kpi_id.kpi_id.uuid
@@ -413,9 +290,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        include_min_value = request.kpi_value_range.includeMinValue
        include_max_value = request.kpi_value_range.includeMaxValue
        timestamp = request.timestamp.timestamp

        LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}")

        if request.alarm_id.alarm_id.uuid != "":
            alarm_id = request.alarm_id.alarm_id.uuid
            # Here the code to modify an existing alarm
@@ -425,18 +300,10 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
                                                        in_range, include_min_value, include_max_value)
            LOGGER.debug(f"AlarmID: {alarm_id}")
        response.alarm_id.uuid = str(alarm_id)

        return response
        except ServiceException as e:
            LOGGER.exception('SetKpiAlarm exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('SetKpiAlarm exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList:

        LOGGER.info('GetAlarms')
        try:
        response = AlarmList()
        data = self.management_db.get_alarms()

@@ -456,16 +323,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            response.alarm_descriptor.append(alarm_descriptor)

        return response
        except ServiceException as e:
            LOGGER.exception('GetAlarms exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetAlarms exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor:

        LOGGER.info('GetAlarmDescriptor')
        try:
        alarm_id = request.alarm_id.uuid
        LOGGER.debug(alarm_id)
        alarm = self.management_db.get_alarm(alarm_id)
@@ -486,17 +346,11 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
            response.alarm_id.alarm_id.uuid = "NoID"
        return response
        except ServiceException as e:
            LOGGER.exception('GetAlarmDescriptor exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetAlarmDescriptor exception')

    def GetAlarmResponseStream(self, request: AlarmSubscription, grpc_context: grpc.ServicerContext) -> Iterator[
        AlarmResponse]:

        LOGGER.info('GetAlarmResponseStream')
        try:
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetAlarmResponseStream(
        self, request: AlarmSubscription, grpc_context: grpc.ServicerContext
    ) -> Iterator[AlarmResponse]:
        alarm_id = request.alarm_id.alarm_id.uuid
        alarm_data = self.management_db.get_alarm(alarm_id)
        real_start_time = timestamp_utcnow_to_float()
@@ -541,17 +395,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            alarm_response = AlarmResponse()
            alarm_response.alarm_id.alarm_id.uuid = "NoID"
            return alarm_response
        except ServiceException as e:
            LOGGER.exception('GetAlarmResponseStream exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetAlarmResponseStream exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteAlarm(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> Empty:

        LOGGER.info('DeleteAlarm')
        try:
            LOGGER.debug(f'DeleteAlarm with AlarmID: {request.alarm_id.uuid}')
        alarm_id = int(request.alarm_id.uuid)
        alarm = self.management_db.get_alarm(alarm_id)
        response = Empty()
@@ -561,16 +407,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        else:
            LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
        return response
        except ServiceException as e:
            LOGGER.exception('DeleteAlarm exception')
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('DeleteAlarm exception')

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetStreamKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Iterator[Kpi]:

        LOGGER.info('GetStreamKpi')

        kpi_id = request.kpi_id.uuid
        kpi_db = self.management_db.get_KPI(int(kpi_id))
        response = Kpi()
@@ -581,11 +420,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        else:
            yield response

    @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetInstantKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Kpi:

        LOGGER.info('GetInstantKpi')
        try:
        kpi_id = request.kpi_id.uuid
        response = Kpi()
        if kpi_id == "":
@@ -603,14 +439,4 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
                response.kpi_id.kpi_id.uuid = str(_data[0])
                response.timestamp.timestamp = timestamp_string_to_float(_data[1])
                response.kpi_value.floatVal = _data[2]

        return response
        except ServiceException as e:
            LOGGER.exception('GetInstantKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(e.code, e.details)
        except Exception as e:  # pragma: no cover
            LOGGER.exception('GetInstantKpi exception')
            # CREATEKPI_COUNTER_FAILED.inc()
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
+0 −1
Original line number Diff line number Diff line
@@ -25,7 +25,6 @@ from grpc._channel import _MultiThreadedRendezvous
from common.Constants import ServiceNameEnum
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
#from common.logger import getJSONLogger
from common.proto.context_pb2 import DeviceOperationalStatusEnum, EventTypeEnum, DeviceEvent, Device, Empty
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_sample_types_pb2 import KpiSampleType