Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Javi Moreno
committed
from typing import Iterator
from common.Constants import ServiceNameEnum
from common.Settings import get_setting, get_service_port_grpc, get_service_host
from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDList, SubsIDList, KpiId, \
KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, EditedKpiDescriptor, KpiDescriptorList, \
BundleKpiDescriptor, MonitorKpiRequest, Kpi, AlarmSubscription
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_float_to_string
from monitoring.service import SqliteTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient
from prometheus_client import Counter, Summary
LOGGER = logging.getLogger(__name__)
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) )
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE))
class MonitoringServiceServicerImpl(MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
Javi Moreno
committed
self.sql_db = SqliteTools.SQLite('monitoring.db')
self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client
# Create metrics_db client
self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {}
self, request : KpiDescriptor, grpc_context : grpc.ServicerContext
) -> KpiId:
# CREATEKPI_COUNTER_STARTED.inc()
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
data = self.sql_db.insert_KPI(
kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id
except ServiceException as e:
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
def EditKpiDescriptor ( self, request : EditedKpiDescriptor, grpc_context : grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('EditKpiDescriptor')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('EditKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiDescriptor exception')
def DeleteKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('DeleteKpi')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('DeleteKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteKpi exception')
def GetKpiDescriptorList ( self, request : Empty, grpc_context : grpc.ServicerContext) -> KpiDescriptorList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetKpiDescriptorList')
try:
# TBC
return KpiDescriptorList()
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetKpiDescriptorList exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptorList exception')
def CreateBundleKpi ( self, request : BundleKpiDescriptor, grpc_context : grpc.ServicerContext) -> KpiId:
francisco.moreno.external@atos.net
committed
LOGGER.info('CreateBundleKpi')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('CreateBundleKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateBundleKpi exception')
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> Empty:
try:
# Creates the request to send to the device service
monitor_device_request = MonitoringSettings()
Javi Moreno
committed
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.monitoring_window_s
monitor_device_request.sampling_interval_s = request.sampling_rate_s
device_client = DeviceClient()
device_client.MonitorDeviceKpi(monitor_device_request)
except ServiceException as e:
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# CREATEKPI_COUNTER_FAILED.inc()
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : Kpi, grpc_context : grpc.ServicerContext) -> Empty:
if kpiDescriptor is None:
LOGGER.warning('Ignoring sample with KPIId({:s}): not found in database'.format(str(request.kpi_id)))
kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
kpiId = request.kpi_id.kpi_id.uuid
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = timestamp_float_to_string(request.timestamp.timestamp)
kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
# Build the structure to be included as point in the MetricsDB
self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
#self.influx_db.read_KPI_points()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
francisco.moreno.external@atos.net
committed
# def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext):
#
# LOGGER.info('GetStreamKpi')
# yield monitoring_pb2.Kpi()
#
# @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
# def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext):
#
# LOGGER.info('GetInstantKpi')
# return monitoring_pb2.Kpi()
def GetKpiDescriptor(self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiDescriptor:
#LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS())))
#LOGGER.info('kpi_db={:s}'.format(str(kpi_db)))
if kpi_db is None: return None
kpiDescriptor = KpiDescriptor()
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
return kpiDescriptor
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
francisco.moreno.external@atos.net
committed
def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList:
francisco.moreno.external@atos.net
committed
LOGGER.info('QueryKpiData')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('QueryKpiData exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('QueryKpiData exception')
def SubscribeKpi ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> KpiList:
francisco.moreno.external@atos.net
committed
LOGGER.info('SubscribeKpi')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('SubscribeKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SubscribeKpi exception')
def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubsDescriptor')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetSubsDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubsDescriptor exception')
def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsIDList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubscriptions')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetSubscriptions exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubscriptions exception')
def EditKpiSubscription ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('EditKpiSubscription')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('EditKpiSubscription exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiSubscription exception')
def CreateKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse:
francisco.moreno.external@atos.net
committed
LOGGER.info('CreateKpiAlarm')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('CreateKpiAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpiAlarm exception')
def EditKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('EditKpiAlarm')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('EditKpiAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiAlarm exception')
def GetAlarms ( self, request : Empty, grpc_context : grpc.ServicerContext) -> AlarmIDList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarms')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetAlarms exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarms exception')
def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarmDescriptor')
try:
# TBC
return AlarmDescriptor()
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetAlarmDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmDescriptor exception')
def GetAlarmResponseStream(self, request : AlarmSubscription, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]:
LOGGER.info('GetAlarmResponseStream')
try:
# TBC
except ServiceException as e:
LOGGER.exception('GetAlarmResponseStream exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmResponseStream exception')