Newer
Older
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.logger import getJSONLogger
from context.client.ContextClient import ContextClient
from monitoring.proto.context_pb2 import Empty
from device.Config import GRPC_SERVICE_PORT
from device.client.DeviceClient import DeviceClient
from device.proto import device_pb2
from monitoring.service import sqlite_tools, influx_tools
from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
from prometheus_client import Summary, Histogram
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
# CREATEKPI_COUNTER_STARTED = Counter ('monitoring_createkpi_counter_started',
# 'Monitoring:CreateKpi counter of requests started' )
# CREATEKPI_COUNTER_COMPLETED = Counter ('monitoring_createkpi counter_completed',
# 'Monitoring:CreateKpi counter of requests completed')
# CREATEKPI_COUNTER_FAILED = Counter ('monitoring_createkpi_counter_failed',
# 'Monitoring:CreateKpi counter of requests failed' )
# CREATEKPI_HISTOGRAM_DURATION = Histogram('monitoring_createkpi_histogram_duration',
# 'Monitoring:CreateKpi histogram of request duration')
#
# MONITORKPI_COUNTER_STARTED = Counter ('monitoring_monitorkpi_counter_started',
# 'Monitoring:MonitorKpi counter of requests started' )
# MONITORKPI_COUNTER_COMPLETED = Counter ('monitoring_monitorkpi counter_completed',
# 'Monitoring:MonitorKpi counter of requests completed')
# MONITORKPI_COUNTER_FAILED = Counter ('monitoring_monitorkpi_counter_failed',
# 'Monitoring:MonitorKpi counter of requests failed' )
# MONITORKPI_HISTOGRAM_DURATION = Histogram('monitoring_monitorkpi_histogram_duration',
# 'Monitoring:MonitorKpi histogram of request duration')
#
# INCLUDEKPI_COUNTER_STARTED = Counter ('monitoring_includekpi_counter_started',
# 'Monitoring:IncludeKpi counter of requests started' )
# INCLUDEKPI_COUNTER_COMPLETED = Counter ('monitoring_includekpi counter_completed',
# 'Monitoring:IncludeKpi counter of requests completed')
# INCLUDEKPI_COUNTER_FAILED = Counter ('monitoring_includekpi_counter_failed',
# 'Monitoring:IncludeKpi counter of requests failed' )
# INCLUDEKPI_HISTOGRAM_DURATION = Histogram('monitoring_includekpi_histogram_duration',
# 'Monitoring:IncludeKpi histogram of request duration')
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
self.influx_db = influx_tools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {}
def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId :
# CREATEKPI_COUNTER_STARTED.inc()
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
kpi_id = monitoring_pb2.KpiId()
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id
except ServiceException as e:
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
try:
# Creates the request to send to the device service
monitor_device_request = device_pb2.MonitoringSettings()
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.kpi_descriptor.kpi_description = kpiDescriptor.kpi_description
monitor_device_request.kpi_descriptor.kpi_sample_type = kpiDescriptor.kpi_sample_type
monitor_device_request.kpi_descriptor.device_id.device_uuid.uuid = kpiDescriptor.device_id.device_uuid.uuid
monitor_device_request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
monitor_device_request.kpi_descriptor.service_id.service_uuid.uuid = kpiDescriptor.service_id.service_uuid.uuid
monitor_device_request.sampling_duration_s = request.sampling_duration_s
monitor_device_request.sampling_interval_s = request.sampling_interval_s
deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT ) # instantiate the client
# deviceClient.MonitorDeviceKpi(monitor_device_request)
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
kpiSampleType = kpiDescriptor.kpi_sample_type
kpiId = request.kpi_id.kpi_id.uuid
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = request.timestamp
kpi_value = request.kpi_value.intVal
# Build the structure to be included as point in the influxDB
self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
self.influx_db.read_KPI_points()
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext):
# receives monitoring.KpiId returns stream monitoring.Kpi
LOGGER.info('GetStreamKpi')
yield monitoring_pb2.Kpi()
@MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext):
# receives monitoring.KpiId returns monitoring.Kpi
LOGGER.info('GetInstantKpi')
return monitoring_pb2.Kpi()
def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor:
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
try:
kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid))
print(kpi_db)
kpiDescriptor = monitoring_pb2.KpiDescriptor()
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
return kpiDescriptor
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception')
class EventsCollector:
def __init__(self, context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name
self._events_queue = Queue()
self._device_stream = context_client_grpc.GetDeviceEvents(Empty())
self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False)
def _collect(self, events_stream) -> None:
try:
for event in events_stream:
self._events_queue.put_nowait(event)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
raise # pragma: no cover
def start(self):
self._device_thread.start()
def get_event(self, block : bool = True, timeout : float = 0.1):
return self._events_queue.get(block=block, timeout=timeout)