Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os,grpc, logging
import socket
Javi Moreno
committed
from prometheus_client import Summary
from prometheus_client import Counter
from common.Settings import get_setting
Javi Moreno
committed
from monitoring.Config import DEVICE_GRPC_SERVICE_PORT, DEVICE_SERVICE_HOST
from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType
Javi Moreno
committed
from monitoring.service import SqliteTools, InfluxTools
from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
Javi Moreno
committed
from context.proto import context_pb2
from device.client.DeviceClient import DeviceClient
from device.proto import device_pb2
LOGGER = logging.getLogger(__name__)
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
DEVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=DEVICE_SERVICE_HOST )
DEVICE_SERVICE_PORT = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=DEVICE_GRPC_SERVICE_PORT)
class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
Javi Moreno
committed
self.sql_db = SqliteTools.SQLite('monitoring.db')
self.deviceClient = DeviceClient(address=DEVICE_SERVICE_HOST, port=DEVICE_GRPC_SERVICE_PORT) # instantiate the client
Javi Moreno
committed
self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {}
def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId :
# CREATEKPI_COUNTER_STARTED.inc()
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
kpi_id = monitoring_pb2.KpiId()
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id
except ServiceException as e:
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
francisco.moreno.external@atos.net
committed
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def EditKpiDescriptor ( self, request : monitoring_pb2.EditedKpiDescriptor, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('EditKpiDescriptor')
try:
# TBC
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('EditKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiDescriptor exception')
def DeleteKpi ( self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('DeleteKpi')
try:
# TBC
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('DeleteKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteKpi exception')
def GetKpiDescriptorList ( self, request : context_pb2.Empty, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptorList:
LOGGER.info('GetKpiDescriptorList')
try:
# TBC
return monitoring_pb2.KpiDescriptorList()
except ServiceException as e:
LOGGER.exception('GetKpiDescriptorList exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptorList exception')
def CreateBundleKpi ( self, request : monitoring_pb2.BundleKpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId:
LOGGER.info('CreateBundleKpi')
try:
# TBC
return monitoring_pb2.KpiId()
except ServiceException as e:
LOGGER.exception('CreateBundleKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateBundleKpi exception')
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
try:
# Creates the request to send to the device service
monitor_device_request = device_pb2.MonitoringSettings()
Javi Moreno
committed
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.sampling_duration_s
monitor_device_request.sampling_interval_s = request.sampling_interval_s
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex((DEVICE_SERVICE_HOST, DEVICE_GRPC_SERVICE_PORT)) == 0:
self.deviceClient.MonitorDeviceKpi(monitor_device_request)
else:
LOGGER.warning('Device service is not reachable')
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
if kpiDescriptor is None:
LOGGER.warning('Ignoring sample with KPIId({:s}): not found in database'.format(str(request.kpi_id)))
return context_pb2.Empty()
kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
kpiId = request.kpi_id.kpi_id.uuid
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = request.timestamp
kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
# Build the structure to be included as point in the influxDB
self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
#self.influx_db.read_KPI_points()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
return context_pb2.Empty()
francisco.moreno.external@atos.net
committed
# def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext):
#
# LOGGER.info('GetStreamKpi')
# yield monitoring_pb2.Kpi()
#
# @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
# def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext):
#
# LOGGER.info('GetInstantKpi')
# return monitoring_pb2.Kpi()
def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor:
#LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS())))
#LOGGER.info('kpi_db={:s}'.format(str(kpi_db)))
if kpi_db is None: return None
kpiDescriptor = monitoring_pb2.KpiDescriptor()
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
return kpiDescriptor
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception')
francisco.moreno.external@atos.net
committed
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def QueryKpiData ( self, request : monitoring_pb2.KpiQuery, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiList:
LOGGER.info('QueryKpiData')
try:
# TBC
return monitoring_pb2.KpiQuery()
except ServiceException as e:
LOGGER.exception('QueryKpiData exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('QueryKpiData exception')
def SubscribeKpi ( self, request : monitoring_pb2.SubsDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiList:
LOGGER.info('SubscribeKpi')
try:
# TBC
yield monitoring_pb2.KpiList()
except ServiceException as e:
LOGGER.exception('SubscribeKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SubscribeKpi exception')
def GetSubsDescriptor ( self, request : monitoring_pb2.SubscriptionID, grpc_context : grpc.ServicerContext) -> monitoring_pb2.SubsDescriptor:
LOGGER.info('GetSubsDescriptor')
try:
# TBC
return monitoring_pb2.SubsDescriptor()
except ServiceException as e:
LOGGER.exception('GetSubsDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubsDescriptor exception')
def GetSubscriptions ( self, request : context_pb2.Empty, grpc_context : grpc.ServicerContext) -> monitoring_pb2.SubsIDList:
LOGGER.info('GetSubscriptions')
try:
# TBC
return monitoring_pb2.SubsIDList()
except ServiceException as e:
LOGGER.exception('GetSubscriptions exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubscriptions exception')
def EditKpiSubscription ( self, request : monitoring_pb2.SubsDescriptor, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('EditKpiSubscription')
try:
# TBC
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('EditKpiSubscription exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiSubscription exception')
def CreateKpiAlarm ( self, request : monitoring_pb2.AlarmDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.AlarmResponse:
LOGGER.info('CreateKpiAlarm')
try:
# TBC
return monitoring_pb2.AlarmResponse()
except ServiceException as e:
LOGGER.exception('CreateKpiAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpiAlarm exception')
def EditKpiAlarm ( self, request : monitoring_pb2.AlarmDescriptor, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('EditKpiAlarm')
try:
# TBC
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('EditKpiAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiAlarm exception')
def GetAlarms ( self, request : context_pb2.Empty, grpc_context : grpc.ServicerContext) -> monitoring_pb2.AlarmIDList:
LOGGER.info('GetAlarms')
try:
# TBC
return monitoring_pb2.AlarmIDList()
except ServiceException as e:
LOGGER.exception('GetAlarms exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarms exception')
def GetAlarmDescriptor ( self, request : monitoring_pb2.AlarmID, grpc_context : grpc.ServicerContext) -> monitoring_pb2.AlarmDescriptor:
LOGGER.info('GetAlarmDescriptor')
try:
# TBC
return monitoring_pb2.AlarmDescriptor()
except ServiceException as e:
LOGGER.exception('GetAlarmDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmDescriptor exception')