diff --git a/src/common/tests/InMemoryObjectDatabase.py b/src/common/tests/InMemoryObjectDatabase.py new file mode 100644 index 0000000000000000000000000000000000000000..47623026ccd730311837d661d5c1806c7f945f79 --- /dev/null +++ b/src/common/tests/InMemoryObjectDatabase.py @@ -0,0 +1,66 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from typing import Any, Dict, List, Set + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) + +class InMemoryObjectDatabase: + def __init__(self) -> None: + self._database : Dict[str, Dict[str, Any]] = {} + + def _get_container(self, container_name : str) -> Dict[str, Any]: + return self._database.setdefault(container_name, {}) + + def get_entries(self, container_name : str) -> List[Any]: + container = self._get_container(container_name) + return [container[entry_uuid] for entry_uuid in sorted(container.keys())] + + def has_entry(self, container_name : str, entry_uuid : str) -> Any: + LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(self._database))) + container = self._get_container(container_name) + return entry_uuid in container + + def get_entry(self, container_name : str, entry_uuid : str, context : grpc.ServicerContext) -> Any: + LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(self._database))) + container = self._get_container(container_name) + if entry_uuid not in container: + context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid))) + return container[entry_uuid] + + def set_entry(self, container_name : str, entry_uuid : str, entry : Any) -> Any: + container = self._get_container(container_name) + LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(self._database))) + container[entry_uuid] = entry + LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(self._database))) + return entry + + def del_entry(self, container_name : str, entry_uuid : str, context : grpc.ServicerContext) -> None: + container = self._get_container(container_name) + LOGGER.debug('[del_entry] BEFORE database={:s}'.format(str(self._database))) + if entry_uuid not in container: + context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid))) + del container[entry_uuid] + LOGGER.debug('[del_entry] AFTER database={:s}'.format(str(self._database))) + + def select_entries(self, container_name : str, entry_uuids : Set[str]) -> List[Any]: + if len(entry_uuids) == 0: return self.get_entries(container_name) + container = self._get_container(container_name) + return [ + container[entry_uuid] + for entry_uuid in sorted(container.keys()) + if entry_uuid in entry_uuids + ] diff --git a/src/common/tests/InMemoryTimeSeriesDatabase.py b/src/common/tests/InMemoryTimeSeriesDatabase.py new file mode 100644 index 0000000000000000000000000000000000000000..f30b4d43ad43c4f6ba333eb89c34b6015cd1c3a3 --- /dev/null +++ b/src/common/tests/InMemoryTimeSeriesDatabase.py @@ -0,0 +1,33 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, pandas +from typing import List, Optional + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.INFO) + +class InMemoryTimeSeriesDatabase: + def __init__(self) -> None: + self._data = pandas.DataFrame(columns=['timestamp', 'kpi_uuid', 'value']) + + def filter( + self, kpi_uuids : List[str] = [], start_timestamp : Optional[float] = None, + end_timestamp : Optional[float] = None + ) -> pandas.DataFrame: + data = self._data + if len(kpi_uuids) > 0: data = data[data.kpi_uuid in kpi_uuids] + if start_timestamp is not None: data = data[data.timestamp >= pandas.to_datetime(start_timestamp)] + if end_timestamp is not None: data = data[data.timestamp <= pandas.to_datetime(end_timestamp )] + return data diff --git a/src/common/tests/MockServicerImpl_Monitoring.py b/src/common/tests/MockServicerImpl_Monitoring.py index 91e63bb5c97e90ad7cce402d8aaa3eb7ba74596d..4fdfc0a8c7315a59aef19519bc19c8f135a14777 100644 --- a/src/common/tests/MockServicerImpl_Monitoring.py +++ b/src/common/tests/MockServicerImpl_Monitoring.py @@ -12,25 +12,106 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import grpc, logging +import enum, grpc, logging, pandas from queue import Queue from typing import Optional from common.proto.context_pb2 import Empty -from common.proto.monitoring_pb2 import Kpi +from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiDescriptorList, KpiId, KpiQuery, RawKpiTable from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer from common.tools.grpc.Tools import grpc_message_to_json_string +from .InMemoryObjectDatabase import InMemoryObjectDatabase +from .InMemoryTimeSeriesDatabase import InMemoryTimeSeriesDatabase LOGGER = logging.getLogger(__name__) +class IMDB_ContainersEnum(enum.Enum): + KPI_DESCRIPTORS = 'kpi_descriptor' + class MockServicerImpl_Monitoring(MonitoringServiceServicer): - def __init__(self, queue_samples : Optional[Queue] = None): + def __init__( + self, queue_samples : Optional[Queue] = None + ) -> None: LOGGER.info('[__init__] Creating Servicer...') if queue_samples is None: queue_samples = Queue() self.queue_samples = queue_samples + self.obj_db = InMemoryObjectDatabase() + self.ts_db = InMemoryTimeSeriesDatabase() LOGGER.info('[__init__] Servicer Created') + # ----- Common ----------------------------------------------------------------------------------------------------- + + def _set(self, container_name, entry_uuid, entry_id_field_name, entry): + entry = self.obj_db.set_entry(container_name, entry_uuid, entry) + return getattr(entry, entry_id_field_name) + + def _del(self, container_name, entry_uuid, grpc_context): + self.obj_db.del_entry(container_name, entry_uuid, grpc_context) + return Empty() + + # ----- KPI Descriptor --------------------------------------------------------------------------------------------- + + def GetKpiDescriptorList(self, request : Empty, context : grpc.ServicerContext) -> KpiDescriptorList: + LOGGER.info('[GetKpiDescriptorList] request={:s}'.format(grpc_message_to_json_string(request))) + kpi_descriptor_list = self.obj_db.get_entries(IMDB_ContainersEnum.KPI_DESCRIPTORS.value) + reply = KpiDescriptorList(kpi_descriptor_list=kpi_descriptor_list) + LOGGER.info('[GetKpiDescriptorList] reply={:s}'.format(grpc_message_to_json_string(reply))) + return reply + + def GetKpiDescriptor(self, request : KpiId, context : grpc.ServicerContext) -> KpiDescriptor: + LOGGER.info('[GetKpiDescriptor] request={:s}'.format(grpc_message_to_json_string(request))) + reply = self.obj_db.get_entry(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.uuid, context) + LOGGER.info('[GetKpiDescriptor] reply={:s}'.format(grpc_message_to_json_string(reply))) + return reply + + def SetKpi(self, request : KpiDescriptor, context : grpc.ServicerContext) -> KpiId: + LOGGER.info('[SetKpi] request={:s}'.format(grpc_message_to_json_string(request))) + reply = self._set(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.kpi_id.uuid, 'kpi_id', request) + LOGGER.info('[SetKpi] reply={:s}'.format(grpc_message_to_json_string(reply))) + return reply + + def DeleteKpi(self, request : KpiId, context : grpc.ServicerContext) -> Empty: + LOGGER.info('[DeleteKpi] request={:s}'.format(grpc_message_to_json_string(request))) + reply = self._del(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.kpi_id.uuid, context) + LOGGER.info('[DeleteKpi] reply={:s}'.format(grpc_message_to_json_string(reply))) + return reply + + # ----- KPI Sample ------------------------------------------------------------------------------------------------- + def IncludeKpi(self, request : Kpi, context : grpc.ServicerContext) -> Empty: LOGGER.info('[IncludeKpi] request={:s}'.format(grpc_message_to_json_string(request))) self.queue_samples.put(request) - return Empty() + reply = Empty() + LOGGER.info('[IncludeKpi] reply={:s}'.format(grpc_message_to_json_string(reply))) + return reply + + def QueryKpiData(self, request : KpiQuery, context : grpc.ServicerContext) -> RawKpiTable: + LOGGER.info('[QueryKpiData] request={:s}'.format(grpc_message_to_json_string(request))) + # TODO: add filters for request.monitoring_window_s + # TODO: add filters for request.last_n_samples + kpi_uuids = [kpi_id.kpi_id.uuid for kpi_id in request.kpi_ids] + + start_timestamp = request.start_timestamp.timestamp + if start_timestamp <= 0: start_timestamp = None + + end_timestamp = request.end_timestamp.timestamp + if end_timestamp <= 0: end_timestamp = None + + df_samples = self.ts_db.filter(kpi_uuids, start_timestamp=start_timestamp, end_timestamp=end_timestamp) + reply = RawKpiTable() + kpi_uuid__to__raw_kpi_list = dict() + + for df_sample in df_samples.itertuples(): + kpi_uuid = df_sample.kpi_uuid + if kpi_uuid in kpi_uuid__to__raw_kpi_list: + raw_kpi_list = kpi_uuid__to__raw_kpi_list[kpi_uuid] + else: + raw_kpi_list = reply.raw_kpi_lists.add() # pylint: disable=no-member + raw_kpi_list.kpi_id.kpi_id.uuid = kpi_uuid + kpi_uuid__to__raw_kpi_list[kpi_uuid] = raw_kpi_list + + raw_kpi = raw_kpi_list.raw_kpis.add() + raw_kpi.timestamp.timestamp = df_sample.timestamp + raw_kpi.kpi_value.floatVal = df_sample.value + + LOGGER.info('[QueryKpiData] reply={:s}'.format(grpc_message_to_json_string(reply))) + return reply