Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, grpc
from queue import Queue
Javi Moreno
committed
from typing import Iterator
from common.Constants import ServiceNameEnum
from common.Settings import get_setting, get_service_port_grpc, get_service_host
from common.logger import getJSONLogger
from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmList, SubsList, KpiId, \
KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
MonitorKpiRequest, Kpi, AlarmSubscription, SubsResponse
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
Francisco-Javier Moreno-Muro
committed
from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float
from monitoring.service import ManagementDBTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient
from prometheus_client import Counter, Summary
from monitoring.service.AlarmManager import AlarmManager
from monitoring.service.SubscriptionManager import SubscriptionManager
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE))
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC',
default=get_service_port_grpc(ServiceNameEnum.DEVICE))
class MonitoringServiceServicerImpl(MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
self.management_db = ManagementDBTools.ManagementDB('monitoring.db')
self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST,
port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client
self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT,
METRICSDB_TABLE)
self.subs_manager = SubscriptionManager(self.metrics_db)
self.alarm_manager = AlarmManager(self.metrics_db)
LOGGER.info('MetricsDB initialized')
# SetKpi (SetKpiRequest) returns (KpiId) {}
def SetKpi(
self, request: KpiDescriptor, grpc_context: grpc.ServicerContext
# CREATEKPI_COUNTER_STARTED.inc()
LOGGER.info('SetKpi')
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
response = KpiId()
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
kpi_slice_id = request.slice_id.slice_uuid.uuid
kpi_connection_id = request.connection_id.connection_uuid.uuid
if request.kpi_id.kpi_id.uuid is not "":
response.kpi_id.uuid = request.kpi_id.kpi_id.uuid
# Here the code to modify an existing kpi
else:
data = self.management_db.insert_KPI(
kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id, kpi_slice_id, kpi_connection_id)
response.kpi_id.uuid = str(data)
return response
except ServiceException as e:
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
def DeleteKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('DeleteKpi')
try:
LOGGER.debug(f'DeleteKpi with KpiID: {request.kpi_id.uuid}')
kpi_id = int(request.kpi_id.uuid)
kpi = self.management_db.get_KPI(kpi_id)
if kpi:
self.management_db.delete_KPI(kpi_id)
else:
LOGGER.info('DeleteKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('DeleteKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteKpi exception')
def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiDescriptor:
LOGGER.info('getting Kpi by KpiID')
francisco.moreno.external@atos.net
committed
try:
kpi_id = request.kpi_id.uuid
kpi_db = self.management_db.get_KPI(int(kpi_id))
kpiDescriptor = KpiDescriptor()
if kpi_db is None:
LOGGER.info('GetKpiDescriptor error: KpiID({:s}): not found in database'.format(str(kpi_id)))
else:
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
kpiDescriptor.slice_id.slice_uuid.uuid = str(kpi_db[6])
kpiDescriptor.connection_id.connection_uuid.uuid = str(kpi_db[7])
return kpiDescriptor
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
francisco.moreno.external@atos.net
committed
grpc_context.abort(e.code, e.details)
except Exception: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception')
francisco.moreno.external@atos.net
committed
def GetKpiDescriptorList(self, request: Empty, grpc_context: grpc.ServicerContext) -> KpiDescriptorList:
LOGGER.info('GetKpiDescriptorList')
kpi_descriptor_list = KpiDescriptorList()
data = self.management_db.get_KPIS()
LOGGER.debug(f"data: {data}")
for item in data:
kpi_descriptor = KpiDescriptor()
kpi_descriptor.kpi_id.kpi_id.uuid = str(item[0])
kpi_descriptor.kpi_description = item[1]
kpi_descriptor.kpi_sample_type = item[2]
kpi_descriptor.device_id.device_uuid.uuid = str(item[3])
kpi_descriptor.endpoint_id.endpoint_uuid.uuid = str(item[4])
kpi_descriptor.service_id.service_uuid.uuid = str(item[5])
kpi_descriptor.slice_id.slice_uuid.uuid = str(item[6])
kpi_descriptor.connection_id.connection_uuid.uuid = str(item[7])
kpi_descriptor_list.kpi_descriptor_list.append(kpi_descriptor)
return kpi_descriptor_list
except ServiceException as e:
LOGGER.exception('GetKpiDescriptorList exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptorList exception')
def IncludeKpi(self, request: Kpi, grpc_context: grpc.ServicerContext) -> Empty:
kpi_id = request.kpi_id.kpi_id.uuid
if kpiDescriptor is None:
LOGGER.info('IncludeKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
else:
kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
kpiId = kpi_id
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
sliceId = kpiDescriptor.slice_id.slice_uuid.uuid
connectionId = kpiDescriptor.connection_id.connection_uuid.uuid
time_stamp = request.timestamp.timestamp
kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
# Build the structure to be included as point in the MetricsDB
self.metrics_db.write_KPI(time_stamp, kpiId, kpiSampleType, deviceId, endpointId, serviceId, sliceId, connectionId, kpi_value)
return Empty()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
def MonitorKpi(self, request: MonitorKpiRequest, grpc_context: grpc.ServicerContext) -> Empty:
LOGGER.info('MonitorKpi')
kpi_id = int(request.kpi_id.kpi_id.uuid)
kpi = self.management_db.get_KPI(kpi_id)
response = Empty()
if kpi:
# Sets the request to send to the device service
monitor_device_request = MonitoringSettings()
kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.monitoring_window_s
monitor_device_request.sampling_interval_s = request.sampling_rate_s
device_client = DeviceClient()
device_client.MonitorDeviceKpi(monitor_device_request)
else:
LOGGER.info('MonitorKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
return response
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# CREATEKPI_COUNTER_FAILED.inc()
def QueryKpiData(self, request: KpiQuery, grpc_context: grpc.ServicerContext) -> KpiList:
francisco.moreno.external@atos.net
committed
LOGGER.info('QueryKpiData')
try:
# TBC
return KpiList()
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('QueryKpiData exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('QueryKpiData exception')
def SetKpiSubscription(self, request: SubsDescriptor, grpc_context: grpc.ServicerContext) -> SubsResponse:
francisco.moreno.external@atos.net
committed
LOGGER.info('SubscribeKpi')
try:
subs_queue = Queue()
kpi_id = request.kpi_id.kpi_id.uuid
sampling_duration_s = request.sampling_duration_s
sampling_interval_s = request.sampling_interval_s
start_timestamp = request.start_timestamp.timestamp
end_timestamp = request.end_timestamp.timestamp
subscriber = "localhost" # Investigate how to get info from the requester
subs_id = self.management_db.insert_subscription(kpi_id, subscriber, sampling_duration_s,
sampling_interval_s, start_timestamp, end_timestamp)
self.subs_manager.create_subscription(subs_queue, subs_id, kpi_id, sampling_interval_s, sampling_duration_s,
start_timestamp, end_timestamp)
# parse queue to append kpis into the list
Francisco-Javier Moreno-Muro
committed
while True:
while not subs_queue.empty():
subs_response = SubsResponse()
Francisco-Javier Moreno-Muro
committed
list = subs_queue.get_nowait()
for item in list:
kpi = Kpi()
kpi.kpi_id.kpi_id.uuid = str(item[0])
kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
kpi.kpi_value.floatVal = item[2] # This must be improved
subs_response.kpi_list.kpi.append(kpi)
subs_response.subs_id.subs_id.uuid = str(subs_id)
yield subs_response
Francisco-Javier Moreno-Muro
committed
if timestamp_utcnow_to_float() > end_timestamp:
break
# yield subs_response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('SubscribeKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SubscribeKpi exception')
def GetSubsDescriptor(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> SubsDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubsDescriptor')
try:
subs_id = request.subs_id.uuid
subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
response = SubsDescriptor()
if subs_db is None:
LOGGER.info('GetSubsDescriptor error: SubsID({:s}): not found in database'.format(str(subs_id)))
else:
LOGGER.debug(subs_db)
response.subs_id.subs_id.uuid = str(subs_db[0])
response.kpi_id.kpi_id.uuid = str(subs_db[1])
response.sampling_duration_s = subs_db[3]
response.sampling_interval_s = subs_db[4]
response.start_timestamp.timestamp = subs_db[5]
response.end_timestamp.timestamp = subs_db[6]
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetSubsDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubsDescriptor exception')
def GetSubscriptions(self, request: Empty, grpc_context: grpc.ServicerContext) -> SubsList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubscriptions')
try:
response = SubsList()
data = self.management_db.get_subscriptions()
for subs_db in data:
subs_descriptor = SubsDescriptor()
subs_descriptor.subs_id.subs_id.uuid = str(subs_db[0])
subs_descriptor.kpi_id.kpi_id.uuid = str(subs_db[1])
subs_descriptor.sampling_duration_s = subs_db[3]
subs_descriptor.sampling_interval_s = subs_db[4]
subs_descriptor.start_timestamp.timestamp = subs_db[5]
subs_descriptor.end_timestamp.timestamp = subs_db[6]
response.subs_descriptor.append(subs_descriptor)
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetSubscriptions exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubscriptions exception')
def DeleteSubscription(self, request: SubscriptionID, grpc_context: grpc.ServicerContext) -> Empty:
LOGGER.info('DeleteSubscription')
try:
LOGGER.debug(f'DeleteSubscription with SubsID: {request.subs_id.uuid}')
subs_id = int(request.subs_id.uuid)
subs_db = self.management_db.get_subscription(int(request.subs_id.uuid))
if subs_db:
self.management_db.delete_subscription(subs_id)
else:
LOGGER.info('DeleteSubscription error: SubsID({:s}): not found in database'.format(str(subs_id)))
return Empty()
except ServiceException as e:
LOGGER.exception('DeleteSubscription exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteSubscription exception')
def SetKpiAlarm(self, request: AlarmDescriptor, grpc_context: grpc.ServicerContext) -> AlarmResponse:
francisco.moreno.external@atos.net
committed
LOGGER.info('SetKpiAlarm')
francisco.moreno.external@atos.net
committed
try:
response = AlarmID()
alarm_description = request.alarm_description
alarm_name = request.name
kpi_id = request.kpi_id.kpi_id.uuid
kpi_min_value = request.kpi_value_range.kpiMinValue.floatVal
kpi_max_value = request.kpi_value_range.kpiMaxValue.floatVal
in_range = request.kpi_value_range.inRange
include_min_value = request.kpi_value_range.includeMinValue
include_max_value = request.kpi_value_range.includeMaxValue
timestamp = request.timestamp.timestamp
LOGGER.debug(f"request.AlarmID: {request.alarm_id.alarm_id.uuid}")
if request.alarm_id.alarm_id.uuid is not "":
alarm_id = request.alarm_id.alarm_id.uuid
# Here the code to modify an existing alarm
else:
alarm_id = self.management_db.insert_alarm(alarm_description, alarm_name, kpi_id, kpi_min_value,
kpi_max_value,
in_range, include_min_value, include_max_value)
LOGGER.debug(f"AlarmID: {alarm_id}")
response.alarm_id.uuid = str(alarm_id)
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('SetKpiAlarm exception')
francisco.moreno.external@atos.net
committed
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SetKpiAlarm exception')
francisco.moreno.external@atos.net
committed
def GetAlarms(self, request: Empty, grpc_context: grpc.ServicerContext) -> AlarmList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarms')
try:
response = AlarmList()
data = self.management_db.get_alarms()
for alarm in data:
alarm_descriptor = AlarmDescriptor()
alarm_descriptor.alarm_id.alarm_id.uuid = str(alarm[0])
alarm_descriptor.alarm_description = alarm[1]
alarm_descriptor.name = alarm[2]
alarm_descriptor.kpi_id.kpi_id.uuid = str(alarm[3])
alarm_descriptor.kpi_value_range.kpiMinValue.floatVal = alarm[4]
alarm_descriptor.kpi_value_range.kpiMaxValue.floatVal = alarm[5]
alarm_descriptor.kpi_value_range.inRange = bool(alarm[6])
alarm_descriptor.kpi_value_range.includeMinValue = bool(alarm[7])
alarm_descriptor.kpi_value_range.includeMaxValue = bool(alarm[8])
response.alarm_descriptor.append(alarm_descriptor)
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetAlarms exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarms exception')
def GetAlarmDescriptor(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> AlarmDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarmDescriptor')
try:
alarm_id = request.alarm_id.uuid
Francisco-Javier Moreno-Muro
committed
LOGGER.debug(alarm_id)
alarm = self.management_db.get_alarm(alarm_id)
response = AlarmDescriptor()
if alarm:
LOGGER.debug(f"{alarm}")
response.alarm_id.alarm_id.uuid = str(alarm_id)
response.alarm_description = alarm[1]
response.name = alarm[2]
response.kpi_id.kpi_id.uuid = str(alarm[3])
response.kpi_value_range.kpiMinValue.floatVal = alarm[4]
response.kpi_value_range.kpiMaxValue.floatVal = alarm[5]
response.kpi_value_range.inRange = bool(alarm[6])
response.kpi_value_range.includeMinValue = bool(alarm[7])
response.kpi_value_range.includeMaxValue = bool(alarm[8])
else:
LOGGER.info('GetAlarmDescriptor error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
response.alarm_id.alarm_id.uuid = "NoID"
return response
francisco.moreno.external@atos.net
committed
except ServiceException as e:
LOGGER.exception('GetAlarmDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmDescriptor exception')
def GetAlarmResponseStream(self, request: AlarmSubscription, grpc_context: grpc.ServicerContext) -> Iterator[
AlarmResponse]:
LOGGER.info('GetAlarmResponseStream')
try:
alarm_id = request.alarm_id.alarm_id.uuid
Francisco-Javier Moreno-Muro
committed
alarm_data = self.management_db.get_alarm(alarm_id)
real_start_time = timestamp_utcnow_to_float()
Francisco-Javier Moreno-Muro
committed
if alarm_data:
LOGGER.debug(f"{alarm_data}")
alarm_queue = Queue()
alarm_id = request.alarm_id.alarm_id.uuid
kpi_id = alarm_data[3]
kpiMinValue = alarm_data[4]
kpiMaxValue = alarm_data[5]
inRange = alarm_data[6]
includeMinValue = alarm_data[7]
includeMaxValue = alarm_data[8]
subscription_frequency_ms = request.subscription_frequency_ms
subscription_timeout_s = request.subscription_timeout_s
end_timestamp = real_start_time + subscription_timeout_s
self.alarm_manager.create_alarm(alarm_queue, alarm_id, kpi_id, kpiMinValue, kpiMaxValue, inRange,
includeMinValue, includeMaxValue, subscription_frequency_ms,
subscription_timeout_s)
while True:
while not alarm_queue.empty():
alarm_response = AlarmResponse()
list = alarm_queue.get_nowait()
size = len(list)
for item in list:
kpi = Kpi()
kpi.kpi_id.kpi_id.uuid = str(item[0])
kpi.timestamp.timestamp = timestamp_string_to_float(item[1])
kpi.kpi_value.floatVal = item[2] # This must be improved
alarm_response.kpi_list.kpi.append(kpi)
alarm_response.alarm_id.alarm_id.uuid = alarm_id
yield alarm_response
if timestamp_utcnow_to_float() > end_timestamp:
break
else:
LOGGER.info('GetAlarmResponseStream error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
alarm_response = AlarmResponse()
alarm_response.alarm_id.alarm_id.uuid = "NoID"
return alarm_response
except ServiceException as e:
LOGGER.exception('GetAlarmResponseStream exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmResponseStream exception')
def DeleteAlarm(self, request: AlarmID, grpc_context: grpc.ServicerContext) -> Empty:
LOGGER.info('DeleteAlarm')
try:
LOGGER.debug(f'DeleteAlarm with AlarmID: {request.alarm_id.uuid}')
alarm_id = int(request.alarm_id.uuid)
alarm = self.management_db.get_alarm(alarm_id)
response = Empty()
if alarm:
self.alarm_manager.delete_alarm(alarm_id)
self.management_db.delete_alarm(alarm_id)
else:
LOGGER.info('DeleteAlarm error: AlarmID({:s}): not found in database'.format(str(alarm_id)))
return response
except ServiceException as e:
LOGGER.exception('DeleteAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteAlarm exception')
def GetStreamKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Iterator[Kpi]:
LOGGER.info('GetStreamKpi')
kpi_id = request.kpi_id.uuid
kpi_db = self.management_db.get_KPI(int(kpi_id))
response = Kpi()
if kpi_db is None:
LOGGER.info('GetStreamKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
response.kpi_id.kpi_id.uuid = "NoID"
return response
else:
yield response
@MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
def GetInstantKpi(self, request: KpiId, grpc_context: grpc.ServicerContext) -> Kpi:
LOGGER.info('GetInstantKpi')
try:
kpi_id = request.kpi_id.uuid
response = Kpi()
if kpi_id is "":
LOGGER.info('GetInstantKpi error: KpiID({:s}): not found in database'.format(str(kpi_id)))
response.kpi_id.kpi_id.uuid = "NoID"
else:
query = f"SELECT kpi_id, timestamp, kpi_value FROM monitoring WHERE kpi_id = '{kpi_id}' " \
f"LATEST ON timestamp PARTITION BY kpi_id"
data = self.metrics_db.run_query(query)[0]
LOGGER.debug(data)
response.kpi_id.kpi_id.uuid = str(data[0])
response.timestamp.timestamp = timestamp_string_to_float(data[1])
response.kpi_value.floatVal = data[2] # This must be improved
return response
except ServiceException as e:
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SetKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))