Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os,grpc, logging
import socket
from typing import Iterator
Javi Moreno
committed
from prometheus_client import Summary
from prometheus_client import Counter
from common.Settings import get_setting
from context.proto.context_pb2 import Empty
Javi Moreno
committed
from monitoring.Config import DEVICE_GRPC_SERVICE_PORT, DEVICE_SERVICE_HOST
from monitoring.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDList, SubsIDList, KpiId, \
KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID
Javi Moreno
committed
from monitoring.service import SqliteTools, InfluxTools
from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
Javi Moreno
committed
from context.proto import context_pb2
from device.client.DeviceClient import DeviceClient
from device.proto import device_pb2
LOGGER = logging.getLogger(__name__)
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=DEVICE_SERVICE_HOST )
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=DEVICE_GRPC_SERVICE_PORT)
class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
Javi Moreno
committed
self.sql_db = SqliteTools.SQLite('monitoring.db')
self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICE_GRPC_SERVICE_PORT) # instantiate the client
Javi Moreno
committed
self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {}
def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId :
# CREATEKPI_COUNTER_STARTED.inc()
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
kpi_id = monitoring_pb2.KpiId()
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id
except ServiceException as e:
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
francisco.moreno.external@atos.net
committed
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def EditKpiDescriptor ( self, request : monitoring_pb2.EditedKpiDescriptor, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('EditKpiDescriptor')
try:
# TBC
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('EditKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiDescriptor exception')
def DeleteKpi ( self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('DeleteKpi')
try:
# TBC
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('DeleteKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteKpi exception')
def GetKpiDescriptorList ( self, request : context_pb2.Empty, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptorList:
LOGGER.info('GetKpiDescriptorList')
try:
# TBC
return monitoring_pb2.KpiDescriptorList()
except ServiceException as e:
LOGGER.exception('GetKpiDescriptorList exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptorList exception')
def CreateBundleKpi ( self, request : monitoring_pb2.BundleKpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId:
LOGGER.info('CreateBundleKpi')
try:
# TBC
return monitoring_pb2.KpiId()
except ServiceException as e:
LOGGER.exception('CreateBundleKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateBundleKpi exception')
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
try:
# Creates the request to send to the device service
monitor_device_request = device_pb2.MonitoringSettings()
Javi Moreno
committed
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.sampling_duration_s
monitor_device_request.sampling_interval_s = request.sampling_interval_s
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex((DEVICE_SERVICE_HOST, DEVICE_GRPC_SERVICE_PORT)) == 0:
self.deviceClient.MonitorDeviceKpi(monitor_device_request)
else:
LOGGER.warning('Device service is not reachable')
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
if kpiDescriptor is None:
LOGGER.warning('Ignoring sample with KPIId({:s}): not found in database'.format(str(request.kpi_id)))
return context_pb2.Empty()
kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
kpiId = request.kpi_id.kpi_id.uuid
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = request.timestamp
kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
# Build the structure to be included as point in the influxDB
self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
#self.influx_db.read_KPI_points()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
return context_pb2.Empty()
francisco.moreno.external@atos.net
committed
# def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext):
#
# LOGGER.info('GetStreamKpi')
# yield monitoring_pb2.Kpi()
#
# @MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
# def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext):
#
# LOGGER.info('GetInstantKpi')
# return monitoring_pb2.Kpi()
def GetKpiDescriptor(self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiDescriptor:
#LOGGER.info('sql_db.get_KPIS={:s}'.format(str(self.sql_db.get_KPIS())))
#LOGGER.info('kpi_db={:s}'.format(str(kpi_db)))
if kpi_db is None: return None
kpiDescriptor = monitoring_pb2.KpiDescriptor()
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
return kpiDescriptor
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception')
francisco.moreno.external@atos.net
committed
def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList:
francisco.moreno.external@atos.net
committed
LOGGER.info('QueryKpiData')
try:
# TBC
return monitoring_pb2.KpiQuery()
except ServiceException as e:
LOGGER.exception('QueryKpiData exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('QueryKpiData exception')
def SubscribeKpi ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> KpiList:
francisco.moreno.external@atos.net
committed
LOGGER.info('SubscribeKpi')
try:
# TBC
yield monitoring_pb2.KpiList()
except ServiceException as e:
LOGGER.exception('SubscribeKpi exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('SubscribeKpi exception')
def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubsDescriptor')
try:
# TBC
return monitoring_pb2.SubsDescriptor()
except ServiceException as e:
LOGGER.exception('GetSubsDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubsDescriptor exception')
def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsIDList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetSubscriptions')
try:
# TBC
return monitoring_pb2.SubsIDList()
except ServiceException as e:
LOGGER.exception('GetSubscriptions exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetSubscriptions exception')
def EditKpiSubscription ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('EditKpiSubscription')
try:
# TBC
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('EditKpiSubscription exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiSubscription exception')
def CreateKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse:
francisco.moreno.external@atos.net
committed
LOGGER.info('CreateKpiAlarm')
try:
# TBC
return monitoring_pb2.AlarmResponse()
except ServiceException as e:
LOGGER.exception('CreateKpiAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpiAlarm exception')
def EditKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> Empty:
francisco.moreno.external@atos.net
committed
LOGGER.info('EditKpiAlarm')
try:
# TBC
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('EditKpiAlarm exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('EditKpiAlarm exception')
def GetAlarms ( self, request : context_pb2.Empty, grpc_context : grpc.ServicerContext) -> AlarmIDList:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarms')
try:
# TBC
return monitoring_pb2.AlarmIDList()
except ServiceException as e:
LOGGER.exception('GetAlarms exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarms exception')
def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor:
francisco.moreno.external@atos.net
committed
LOGGER.info('GetAlarmDescriptor')
try:
# TBC
return monitoring_pb2.AlarmDescriptor()
except ServiceException as e:
LOGGER.exception('GetAlarmDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmDescriptor exception')
def GetAlarmResponseStream(self, request : AlarmID, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]:
LOGGER.info('GetAlarmResponseStream')
try:
# TBC
yield monitoring_pb2.AlarmResponse()
except ServiceException as e:
LOGGER.exception('GetAlarmResponseStream exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetAlarmResponseStream exception')