Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Javi Moreno
committed
from typing import Iterator
from common.Constants import ServiceNameEnum
from common.Settings import get_setting, get_service_port_grpc, get_service_host
from common.logger import getJSONLogger
from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_string_to_float
from monitoring.service import ManagementDBTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient
from prometheus_client import Counter, Summary
from monitoring.service.SubscriptionManager import SubscriptionManager
LOGGER = getJSONLogger('monitoringservice-server')
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) )
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE))
class MonitoringServiceServicerImpl(MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client
self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE)
self.subs_manager = SubscriptionManager(self.metrics_db)
LOGGER.info('MetricsDB initialized')
# SetKpi (SetKpiRequest) returns (KpiId) {}
def SetKpi(
self, request : KpiDescriptor, grpc_context : grpc.ServicerContext
) -> KpiId:
# CREATEKPI_COUNTER_STARTED.inc()
LOGGER.info('SetKpi')
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
data = self.management_db.insert_KPI(
kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id
except ServiceException as e:
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
def DeleteKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('DeleteKpi')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('DeleteKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteKpi exception')
def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor:
LOGGER.info('getting Kpi by KpiID')
try:
kpi_db = self.management_db.get_KPI(int(request.kpi_id.uuid))
# LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS())))
# LOGGER.info('kpi_db={:s}'.format(str(kpi_db)))
if kpi_db is None: return None
kpiDescriptor = KpiDescriptor()
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
return kpiDescriptor
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception')
def GetKpiDescriptorList ( self, request : Empty, grpc_context : grpc.ServicerContext) -> KpiDescriptorList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetKpiDescriptorList')
try:
# TBC
return KpiDescriptorList()
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetKpiDescriptorList exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptorList exception')
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : Kpi, grpc_context : grpc.ServicerContext) -> Empty:
if kpiDescriptor is None:
raise Exception(LOGGER.exception('IncludeKpi exception: Sample with KPIId({:s}): not found in database'.format(str(request.kpi_id))))
else:
kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
kpiId = request.kpi_id.kpi_id.uuid
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = request.timestamp.timestamp
kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
# Build the structure to be included as point in the MetricsDB
self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
#self.influx_db.read_KPI_points()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> Empty:
LOGGER.info('MonitorKpi')
# Sets the request to send to the device service
monitor_device_request = MonitoringSettings()
kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.monitoring_window_s
monitor_device_request.sampling_interval_s = request.sampling_rate_s
device_client = DeviceClient()
device_client.MonitorDeviceKpi(monitor_device_request)
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# CREATEKPI_COUNTER_FAILED.inc()
return Empty()
francisco.moreno.external@atos.net
committed
def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList:
francisco.moreno.external@atos.net
committed
LOGGER.info('QueryKpiData')
try:
# TBC
return KpiList()
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('QueryKpiData exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('QueryKpiData exception')
def SetKpiSubscription ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> SubsResponse:
francisco.moreno.external@atos.net
committed
LOGGER.info('SubscribeKpi')
try:
subs_queue = Queue()
subs_response = SubsResponse()
kpi_id = request.kpi_id.kpi_id.uuid
sampling_duration_s = request.sampling_duration_s
sampling_interval_s = request.sampling_interval_s
start_timestamp = request.start_timestamp.timestamp
end_timestamp = request.end_timestamp.timestamp
subscriber = "localhost" # Investigate how to get info from the requester
subs_id = self.management_db.insert_subscription(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp)
self.subs_manager.create_subscription(subs_queue,subs_id,kpi_id,sampling_interval_s,sampling_duration_s,start_timestamp,end_timestamp)
# parse queue to append kpis into the list
while not subs_queue.empty():
list = subs_queue.get_nowait()
for item in list:
kpi = kpi.kpi_id.kpi_id.uuid = item[0]
kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
kpi.kpi_value.floatVal = item[2] # This must be improved
subs_response.kpi_list.kpi.append(kpi)
subs_response.subs_id.subs_id.uuid = subs_id
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('SubscribeKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SubscribeKpi exception')
def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubsDescriptor')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetSubsDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubsDescriptor exception')
def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubscriptions')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetSubscriptions exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubscriptions exception')
def DeleteSubscription ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> Empty:
LOGGER.info('DeleteSubscription')
try:
# TBC
return Empty()
except ServiceException as e:
LOGGER.exception('DeleteSubscription exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteSubscription exception')
def SetKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse:
francisco.moreno.external@atos.net
committed
LOGGER.info('SetKpiAlarm')
francisco.moreno.external@atos.net
committed
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('SetKpiAlarm exception')
francisco.moreno.external@atos.net
committed
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SetKpiAlarm exception')
francisco.moreno.external@atos.net
committed
def GetAlarms ( self, request : Empty, grpc_context : grpc.ServicerContext) -> AlarmList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarms')
try:
# TBC
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetAlarms exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarms exception')
def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarmDescriptor')
try:
# TBC
return AlarmDescriptor()
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetAlarmDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmDescriptor exception')
def GetAlarmResponseStream(self, request : AlarmSubscription, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]:
LOGGER.info('GetAlarmResponseStream')
try:
# TBC
except ServiceException as e:
LOGGER.exception('GetAlarmResponseStream exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmResponseStream exception')
def DeleteAlarm ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> Empty:
LOGGER.info('DeleteAlarm')
try:
# TBC
return Empty()
except ServiceException as e:
LOGGER.exception('DeleteAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteAlarm exception')
def GetStreamKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Iterator[Kpi]:
LOGGER.info('GetStreamKpi')
yield Kpi()
@MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
def GetInstantKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiList:
LOGGER.info('GetInstantKpi')
return KpiList()