Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.proto.context_pb2 import Empty
from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiDescriptorList, KpiId, KpiQuery, RawKpiTable
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string
from .InMemoryObjectDatabase import InMemoryObjectDatabase
from .InMemoryTimeSeriesDatabase import InMemoryTimeSeriesDatabase
class IMDB_ContainersEnum(enum.Enum):
KPI_DESCRIPTORS = 'kpi_descriptor'
class MockServicerImpl_Monitoring(MonitoringServiceServicer):
def __init__(
self, queue_samples : Optional[Queue] = None
) -> None:
self.obj_db = InMemoryObjectDatabase()
self.ts_db = InMemoryTimeSeriesDatabase()
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# ----- Common -----------------------------------------------------------------------------------------------------
def _set(self, container_name, entry_uuid, entry_id_field_name, entry):
entry = self.obj_db.set_entry(container_name, entry_uuid, entry)
return getattr(entry, entry_id_field_name)
def _del(self, container_name, entry_uuid, grpc_context):
self.obj_db.del_entry(container_name, entry_uuid, grpc_context)
return Empty()
# ----- KPI Descriptor ---------------------------------------------------------------------------------------------
def GetKpiDescriptorList(self, request : Empty, context : grpc.ServicerContext) -> KpiDescriptorList:
LOGGER.info('[GetKpiDescriptorList] request={:s}'.format(grpc_message_to_json_string(request)))
kpi_descriptor_list = self.obj_db.get_entries(IMDB_ContainersEnum.KPI_DESCRIPTORS.value)
reply = KpiDescriptorList(kpi_descriptor_list=kpi_descriptor_list)
LOGGER.info('[GetKpiDescriptorList] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def GetKpiDescriptor(self, request : KpiId, context : grpc.ServicerContext) -> KpiDescriptor:
LOGGER.info('[GetKpiDescriptor] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self.obj_db.get_entry(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.uuid, context)
LOGGER.info('[GetKpiDescriptor] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def SetKpi(self, request : KpiDescriptor, context : grpc.ServicerContext) -> KpiId:
LOGGER.info('[SetKpi] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._set(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.kpi_id.uuid, 'kpi_id', request)
LOGGER.info('[SetKpi] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def DeleteKpi(self, request : KpiId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[DeleteKpi] request={:s}'.format(grpc_message_to_json_string(request)))
reply = self._del(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.kpi_id.uuid, context)
LOGGER.info('[DeleteKpi] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
# ----- KPI Sample -------------------------------------------------------------------------------------------------
def IncludeKpi(self, request : Kpi, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[IncludeKpi] request={:s}'.format(grpc_message_to_json_string(request)))
self.queue_samples.put(request)
reply = Empty()
LOGGER.info('[IncludeKpi] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply
def QueryKpiData(self, request : KpiQuery, context : grpc.ServicerContext) -> RawKpiTable:
LOGGER.info('[QueryKpiData] request={:s}'.format(grpc_message_to_json_string(request)))
# TODO: add filters for request.monitoring_window_s
# TODO: add filters for request.last_n_samples
kpi_uuids = [kpi_id.kpi_id.uuid for kpi_id in request.kpi_ids]
start_timestamp = request.start_timestamp.timestamp
if start_timestamp <= 0: start_timestamp = None
end_timestamp = request.end_timestamp.timestamp
if end_timestamp <= 0: end_timestamp = None
df_samples = self.ts_db.filter(kpi_uuids, start_timestamp=start_timestamp, end_timestamp=end_timestamp)
LOGGER.info('[QueryKpiData] df_samples={:s}'.format(df_samples.to_string()))
reply = RawKpiTable()
kpi_uuid__to__raw_kpi_list = dict()
for df_sample in df_samples.itertuples():
kpi_uuid = df_sample.kpi_uuid
if kpi_uuid in kpi_uuid__to__raw_kpi_list:
raw_kpi_list = kpi_uuid__to__raw_kpi_list[kpi_uuid]
else:
raw_kpi_list = reply.raw_kpi_lists.add() # pylint: disable=no-member
raw_kpi_list.kpi_id.kpi_id.uuid = kpi_uuid
kpi_uuid__to__raw_kpi_list[kpi_uuid] = raw_kpi_list
raw_kpi = raw_kpi_list.raw_kpis.add()
raw_kpi.timestamp.timestamp = df_sample.timestamp.timestamp()
raw_kpi.kpi_value.floatVal = df_sample.value
LOGGER.info('[QueryKpiData] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply