"src/analytics/database/Analyzer_DB.py" did not exist on "d8687c1cdcbe4289c74bd814c0f510cbbe22547a"
Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, grpc
from queue import Queue
Javi Moreno
committed
from typing import Iterator
from common.Constants import ServiceNameEnum
from common.Settings import get_setting, get_service_port_grpc, get_service_host
from common.logger import getJSONLogger
from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_string_to_float
from monitoring.service import ManagementDBTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient
from prometheus_client import Counter, Summary
from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.SubscriptionManager import SubscriptionManager
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE))
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC',
default=get_service_port_grpc(ServiceNameEnum.DEVICE))
class MonitoringServiceServicerImpl(MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST,
port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client
self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT,
METRICSDB_TABLE)
self.subs_manager = SubscriptionManager(self.metrics_db)
self.alarm_manager = AlarmManager(self.metrics_db)
LOGGER.info('MetricsDB initialized')
# SetKpi (SetKpiRequest) returns (KpiId) {}
def SetKpi(
self, request: KpiDescriptor, grpc_context: grpc.ServicerContext
# CREATEKPI_COUNTER_STARTED.inc()
LOGGER.info('SetKpi')
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
response = KpiId()
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
kpi_connection_id = request.connection_id.connection_uuid.uuid
LOGGER.debug(kpi_connection_id)
if request.kpi_id.kpi_id.uuid is not "":
response.kpi_id.uuid = request.kpi_id.kpi_id.uuid
# Here the code to modify an existing kpi
else:
data = self.management_db.insert_KPI(
kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_connection_id)
response.kpi_id.uuid = str(data)
return response
except ServiceException as e:
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
def DeleteKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('DeleteKpi')
try:
LOGGER.debug(f'DeleteKpi with KpiID: {request.kpi_id.uuid}')
kpi_id = int(request.kpi_id.uuid)
kpi = self.management_db.get_KPI(kpi_id)
if kpi:
self.management_db.delete_KPI(kpi_id)
else:
LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('DeleteKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteKpi exception')
def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor:
LOGGER.info('getting Kpi by KpiID')
francisco.moreno.external@atos.net
committed
try:
kpi_id = request.kpi_id.uuid
kpi_db = self.management_db.get_KPI(int(kpi_id))
kpiDescriptor = KpiDescriptor()
if kpi_db is None:
LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id)))
else:
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[6])
return kpiDescriptor
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
francisco.moreno.external@atos.net
committed
grpc_context.abort(e.code, e.details)
except Exception: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception')
francisco.moreno.external@atos.net
committed
def GetKpiDescriptorList(self, request: Empty, grpc_context: grpc.ServicerContext) -> KpiDescriptorList:
LOGGER.info('GetKpiDescriptorList')
kpi_descriptor_list = KpiDescriptorList()
data = self.management_db.get_KPIS()
LOGGER.debug(f"data: {data}")
for item in data:
kpi_descriptor = KpiDescriptor()
kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0])
kpi_descriptor.kpi_description = item[1]
kpi_descriptor.kpi_sample_type = item[2]
kpi_descriptor.device_id.device_uuid.uuid = str(item[3])
kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4])
kpi_descriptor.service_id.service_uuid.uuid = str(item[5])
kpi_descriptor.connection_id.connection_uuid.uuid = str(item[6])
kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor)
return kpi_descriptor_list
except ServiceException as e:
LOGGER.exception('GetKpiDescriptorList exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptorList exception')
def IncludeKpi(self, request: Kpi, grpc_context: grpc.ServicerContext) -> Empty:
kpi_id = request.kpi_id.kpi_id.uuid
if kpiDescriptor is None:
LOGGER.info('IncludeKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
else:
kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
kpiId = kpi_id
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
connectionId = kpiDescriptor.connection_id.connection_uuid.uuid
time_stamp = request.timestamp.timestamp
kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
# Build the structure to be included as point in the MetricsDB
self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, connectionId, kpi_value)
return Empty()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty:
LOGGER.info('MonitorKpi')
kpi_id = int(request.kpi_id.kpi_id.uuid)
kpi = self.management_db.get_KPI(kpi_id)
response = Empty()
if kpi:
# Sets the request to send to the device service
monitor_device_request = MonitoringSettings()
kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.monitoring_window_s
monitor_device_request.sampling_interval_s = request.sampling_rate_s
device_client = DeviceClient()
device_client.MonitorDeviceKpi(monitor_device_request)
else:
LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
return response
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# CREATEKPI_COUNTER_FAILED.inc()
def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> KpiList:
francisco.moreno.external@atos.net
committed
LOGGER.info('QueryKpiData')
try:
# TBC
return KpiList()
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('QueryKpiData exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('QueryKpiData exception')
def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse:
francisco.moreno.external@atos.net
committed
LOGGER.info('SubscribeKpi')
try:
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
subs_queue = Queue()
subs_response = SubsResponse()
kpi_id = request.kpi_id.kpi_id.uuid
sampling_duration_s = request.sampling_duration_s
sampling_interval_s = request.sampling_interval_s
start_timestamp = request.start_timestamp.timestamp
end_timestamp = request.end_timestamp.timestamp
subscriber = "localhost" # Investigate how to get info from the requester
subs_id = self.management_db.insert_subscription(kpi_id, subscriber, sampling_duration_s,
sampling_interval_s, start_timestamp, end_timestamp)
self.subs_manager.create_subscription(subs_queue, subs_id, kpi_id, sampling_interval_s, sampling_duration_s,
start_timestamp, end_timestamp)
# parse queue to append kpis into the list
while not subs_queue.empty():
list = subs_queue.get_nowait()
for item in list:
kpi = Kpi()
kpi.kpi_id.kpi_id.uuid = str(item[0])
kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
kpi.kpi_value.floatVal = item[2] # This must be improved
subs_response.kpi_list.kpi.append(kpi)
subs_response.subs_id.subs_id.uuid = str(subs_id)
yield subs_response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('SubscribeKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SubscribeKpi exception')
def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubsDescriptor')
try:
subs_id = request.subs_id.uuid
subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
response = SubsDescriptor()
if subs_db is None:
LOGGER.info('GetSubsDescriptor error: SubsID({:s}): not found in database'.format(str(subs_id)))
else:
LOGGER.debug(subs_db)
response.subs_id.subs_id.uuid = str(subs_db[0])
response.kpi_id.kpi_id.uuid = str(subs_db[1])
response.sampling_duration_s = subs_db[3]
response.sampling_interval_s = subs_db[4]
response.start_timestamp.timestamp = subs_db[5]
response.end_timestamp.timestamp = subs_db[6]
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetSubsDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubsDescriptor exception')
def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubscriptions')
try:
response = SubsList()
data = self.management_db.get_subscriptions()
for subs_db in data:
subs_descriptor = SubsDescriptor()
subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0])
subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1])
subs_descriptor.sampling_duration_s = subs_db[3]
subs_descriptor.sampling_interval_s = subs_db[4]
subs_descriptor.start_timestamp.timestamp = subs_db[5]
subs_descriptor.end_timestamp.timestamp = subs_db[6]
response.subs_descriptor.append(subs_descriptor)
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetSubscriptions exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubscriptions exception')
def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty:
LOGGER.info('DeleteSubscription')
try:
LOGGER.debug(f'DeleteSubscription with SubsID: {request.subs_id.uuid}')
subs_id = int(request.subs_id.uuid)
subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
if subs_db:
self.management_db.delete_subscription(subs_id)
else:
LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id)))
return Empty()
except ServiceException as e:
LOGGER.exception('DeleteSubscription exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteSubscription exception')
def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse:
francisco.moreno.external@atos.net
committed
LOGGER.info('SetKpiAlarm')
francisco.moreno.external@atos.net
committed
try:
response = AlarmID()
alarm_description = request.alarm_description
alarm_name = request.name
kpi_id = request.kpi_id.kpi_id.uuid
kpi_min_value = request.kpi_value_range.kpiMinValue.floatVal
kpi_max_value = request.kpi_value_range.kpiMaxValue.floatVal
in_range = request.kpi_value_range.inRange
include_min_value = request.kpi_value_range.includeMinValue
include_max_value = request.kpi_value_range.includeMaxValue
timestamp = request.timestamp.timestamp
LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}")
if request.alarm_id.alarm_id.uuid is not "":
alarm_id = request.alarm_id.alarm_id.uuid
# Here the code to modify an existing alarm
else:
alarm_id = self.management_db.insert_alarm(alarm_description, alarm_name, kpi_id, kpi_min_value,
kpi_max_value,
in_range, include_min_value, include_max_value)
LOGGER.debug(f"AlarmID: {alarm_id}")
response.alarm_id.uuid = str(alarm_id)
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('SetKpiAlarm exception')
francisco.moreno.external@atos.net
committed
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SetKpiAlarm exception')
francisco.moreno.external@atos.net
committed
def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarms')
try:
response = AlarmList()
data = self.management_db.get_alarms()
for alarm in data:
alarm_descriptor = AlarmDescriptor()
alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm[0])
alarm_descriptor.alarm_description = alarm[1]
alarm_descriptor.name = alarm[2]
alarm_descriptor.kpi_id.kpi_id.uuid = str(alarm[3])
alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = alarm[4]
alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = alarm[5]
alarm_descriptor.kpi_value_range.inRange = bool(alarm[6])
alarm_descriptor.kpi_value_range.includeMinValue = bool(alarm[7])
alarm_descriptor.kpi_value_range.includeMaxValue = bool(alarm[8])
response.alarm_descriptor.append(alarm_descriptor)
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetAlarms exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarms exception')
def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarmDescriptor')
try:
alarm_id = request.alarm_id.uuid
alarm = self.management_db.get_alarm(alarm_id)
response = AlarmDescriptor()
if alarm:
LOGGER.debug(f"{alarm}")
response.alarm_id.alarm_id.uuid = str(alarm_id)
response.alarm_description = alarm[1]
response.name = alarm[2]
response.kpi_id.kpi_id.uuid = str(alarm[3])
response.kpi_value_range.kpiMinValue.floatVal = alarm[4]
response.kpi_value_range.kpiMaxValue.floatVal = alarm[5]
response.kpi_value_range.inRange = bool(alarm[6])
response.kpi_value_range.includeMinValue = bool(alarm[7])
response.kpi_value_range.includeMaxValue = bool(alarm[8])
else:
LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
response.alarm_id.alarm_id.uuid = "NoID"
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetAlarmDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmDescriptor exception')
def GetAlarmResponseStream(self, request: AlarmSubscription, grpc_context: grpc.ServicerContext) -> Iterator[
AlarmResponse]:
LOGGER.info('GetAlarmResponseStream')
try:
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
alarm_id = request.alarm_id.alarm_id.uuid
alarm = self.management_db.get_alarm(alarm_id)
alarm_response = AlarmResponse()
if alarm:
alarm_queue = Queue()
alarm_data = self.management_db.get_alarm(alarm)
alarm_id = request.alarm_id.alarm_id.uuid
kpi_id = alarm_data[3]
kpiMinValue = alarm_data[4]
kpiMaxValue = alarm_data[5]
inRange = alarm_data[6]
includeMinValue = alarm_data[7]
includeMaxValue = alarm_data[8]
subscription_frequency_ms = request.subscription_frequency_ms
subscription_timeout_s = request.subscription_timeout_s
self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange,
includeMinValue, includeMaxValue, subscription_frequency_ms,
subscription_timeout_s)
while not alarm_queue.empty():
list = alarm_queue.get_nowait()
for item in list:
kpi = Kpi()
kpi.kpi_id.kpi_id.uuid = str(item[0])
kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
kpi.kpi_value.floatVal = item[2] # This must be improved
alarm_response.kpi_list.kpi.append(kpi)
alarm_response.alarm_id.alarm_id.uuid = alarm_id
yield alarm_response
else:
LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
alarm_response.alarm_id.alarm_id.uuid = "NoID"
return alarm_response
except ServiceException as e:
LOGGER.exception('GetAlarmResponseStream exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmResponseStream exception')
def DeleteAlarm(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> Empty:
LOGGER.info('DeleteAlarm')
try:
LOGGER.debug(f'DeleteAlarm with AlarmID: {request.alarm_id.uuid}')
alarm_id = int(request.alarm_id.uuid)
alarm = self.management_db.get_alarm(alarm_id)
response = Empty()
if alarm:
self.alarm_manager.delete_alarm(alarm_id)
self.management_db.delete_alarm(alarm_id)
else:
LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
return response
except ServiceException as e:
LOGGER.exception('DeleteAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteAlarm exception')
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def GetStreamKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Iterator[Kpi]:
LOGGER.info('GetStreamKpi')
kpi_id = request.kpi_id.uuid
kpi_db = self.management_db.get_KPI(int(kpi_id))
response = Kpi()
if kpi_db is None:
LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
response.kpi_id.kpi_id.uuid = "NoID"
return response
else:
yield response
@MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
def GetInstantKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Kpi:
LOGGER.info('GetInstantKpi')
try:
kpi_id = request.kpi_id.uuid
response = Kpi()
if kpi_id is "":
LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
response.kpi_id.kpi_id.uuid = "NoID"
else:
query = f"SELECT kpi_id, timestamp, kpi_value FROM monitoring WHERE kpi_id = '{kpi_id}' " \
f"LATEST ON timestamp PARTITION BY kpi_id"
data = self.metrics_db.run_query(query)[0]
LOGGER.debug(data)
response.kpi_id.kpi_id.uuid = str(data[0])
response.timestamp.timestamp = timestamp_string_to_float(data[1])
response.kpi_value.floatVal = data[2] # This must be improved
return response
except ServiceException as e:
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))