Newer
Older
import grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from device.Config import GRPC_SERVICE_PORT
from device.client.DeviceClient import DeviceClient
from device.proto import device_pb2
from monitoring.service import sqlite_tools, influx_tools
from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc
from common.logger import getJSONLogger
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
from prometheus_client import Summary, Histogram
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
# CREATEKPI_COUNTER_STARTED = Counter ('monitoring_createkpi_counter_started',
# 'Monitoring:CreateKpi counter of requests started' )
# CREATEKPI_COUNTER_COMPLETED = Counter ('monitoring_createkpi counter_completed',
# 'Monitoring:CreateKpi counter of requests completed')
# CREATEKPI_COUNTER_FAILED = Counter ('monitoring_createkpi_counter_failed',
# 'Monitoring:CreateKpi counter of requests failed' )
# CREATEKPI_HISTOGRAM_DURATION = Histogram('monitoring_createkpi_histogram_duration',
# 'Monitoring:CreateKpi histogram of request duration')
#
# MONITORKPI_COUNTER_STARTED = Counter ('monitoring_monitorkpi_counter_started',
# 'Monitoring:MonitorKpi counter of requests started' )
# MONITORKPI_COUNTER_COMPLETED = Counter ('monitoring_monitorkpi counter_completed',
# 'Monitoring:MonitorKpi counter of requests completed')
# MONITORKPI_COUNTER_FAILED = Counter ('monitoring_monitorkpi_counter_failed',
# 'Monitoring:MonitorKpi counter of requests failed' )
# MONITORKPI_HISTOGRAM_DURATION = Histogram('monitoring_monitorkpi_histogram_duration',
# 'Monitoring:MonitorKpi histogram of request duration')
#
# INCLUDEKPI_COUNTER_STARTED = Counter ('monitoring_includekpi_counter_started',
# 'Monitoring:IncludeKpi counter of requests started' )
# INCLUDEKPI_COUNTER_COMPLETED = Counter ('monitoring_includekpi counter_completed',
# 'Monitoring:IncludeKpi counter of requests completed')
# INCLUDEKPI_COUNTER_FAILED = Counter ('monitoring_includekpi_counter_failed',
# 'Monitoring:IncludeKpi counter of requests failed' )
# INCLUDEKPI_HISTOGRAM_DURATION = Histogram('monitoring_includekpi_histogram_duration',
# 'Monitoring:IncludeKpi histogram of request duration')
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
self.influx_db = influx_tools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {}
def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId :
# CREATEKPI_COUNTER_STARTED.inc()
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
kpi_id = monitoring_pb2.KpiId()
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id
except ServiceException as e:
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
try:
# Creates the request to send to the device service
monitor_device_request = device_pb2.MonitoringSettings()
kpiDescriptor = self.get_KpiDescriptor(request.kpi_id)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.kpi_descriptor.kpi_description = kpiDescriptor.kpi_description
monitor_device_request.kpi_descriptor.kpi_sample_type = kpiDescriptor.kpi_sample_type
monitor_device_request.kpi_descriptor.device_id.device_uuid.uuid = kpiDescriptor.device_id.device_uuid.uuid
monitor_device_request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
monitor_device_request.kpi_descriptor.service_id.service_uuid.uuid = kpiDescriptor.service_id.service_uuid.uuid
monitor_device_request.sampling_duration_s = request.sampling_duration_s
monitor_device_request.sampling_interval_s = request.sampling_interval_s
deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT ) # instantiate the client
# deviceClient.MonitorDeviceKpi(monitor_device_request)
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
try:
kpiDescriptor = self.get_KpiDescriptor(request.kpi_id)
kpiSampleType = kpiDescriptor.kpi_sample_type
kpiId = request.kpi_id.kpi_id.uuid
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = request.timestamp
kpi_value = request.kpi_value.intVal
# Build the structure to be included as point in the influxDB
self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
self.influx_db.read_KPI_points()
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext):
# receives monitoring.KpiId returns stream monitoring.Kpi
LOGGER.info('GetStreamKpi')
yield monitoring_pb2.Kpi()
@MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext):
# receives monitoring.KpiId returns monitoring.Kpi
LOGGER.info('GetInstantKpi')
return monitoring_pb2.Kpi()
def get_KpiDescriptor(self, kpiId):
kpi_db = self.sql_db.get_KPI(int(kpiId.kpi_id.uuid))
kpiDescriptor = monitoring_pb2.KpiDescriptor()
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
return kpiDescriptor