Commit 608e8ab6 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common - Tests:

- Implemented InMemoryObjectDatabase
- Implemented InMemoryTimeSeriesDatabase
- Extended Mock Monitoring with KPIDescriptor management and retrieval of timeseries data from pandas DataFrame
parent 46ce0819
Loading
Loading
Loading
Loading
+66 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging
from typing import Any, Dict, List, Set

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

class InMemoryObjectDatabase:
    def __init__(self) -> None:
        self._database : Dict[str, Dict[str, Any]] = {}

    def _get_container(self, container_name : str) -> Dict[str, Any]:
        return self._database.setdefault(container_name, {})

    def get_entries(self, container_name : str) -> List[Any]:
        container = self._get_container(container_name)
        return [container[entry_uuid] for entry_uuid in sorted(container.keys())]

    def has_entry(self, container_name : str, entry_uuid : str) -> Any:
        LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(self._database)))
        container = self._get_container(container_name)
        return entry_uuid in container

    def get_entry(self, container_name : str, entry_uuid : str, context : grpc.ServicerContext) -> Any:
        LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(self._database)))
        container = self._get_container(container_name)
        if entry_uuid not in container:
            context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
        return container[entry_uuid]

    def set_entry(self, container_name : str, entry_uuid : str, entry : Any) -> Any:
        container = self._get_container(container_name)
        LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(self._database)))
        container[entry_uuid] = entry
        LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(self._database)))
        return entry

    def del_entry(self, container_name : str, entry_uuid : str, context : grpc.ServicerContext) -> None:
        container = self._get_container(container_name)
        LOGGER.debug('[del_entry] BEFORE database={:s}'.format(str(self._database)))
        if entry_uuid not in container:
            context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
        del container[entry_uuid]
        LOGGER.debug('[del_entry] AFTER database={:s}'.format(str(self._database)))

    def select_entries(self, container_name : str, entry_uuids : Set[str]) -> List[Any]:
        if len(entry_uuids) == 0: return self.get_entries(container_name)
        container = self._get_container(container_name)
        return [
            container[entry_uuid]
            for entry_uuid in sorted(container.keys())
            if entry_uuid in entry_uuids
        ]
+33 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, pandas
from typing import List, Optional

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

class InMemoryTimeSeriesDatabase:
    def __init__(self) -> None:
        self._data = pandas.DataFrame(columns=['timestamp', 'kpi_uuid', 'value'])

    def filter(
        self, kpi_uuids : List[str] = [], start_timestamp : Optional[float] = None,
        end_timestamp : Optional[float] = None
    ) -> pandas.DataFrame:
        data = self._data
        if len(kpi_uuids) > 0: data = data[data.kpi_uuid in kpi_uuids]
        if start_timestamp is not None: data = data[data.timestamp >= pandas.to_datetime(start_timestamp)]
        if end_timestamp   is not None: data = data[data.timestamp <= pandas.to_datetime(end_timestamp  )]
        return data
+86 −5
Original line number Diff line number Diff line
@@ -12,25 +12,106 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import grpc, logging
import enum, grpc, logging, pandas
from queue import Queue
from typing import Optional
from common.proto.context_pb2 import Empty
from common.proto.monitoring_pb2 import Kpi
from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiDescriptorList, KpiId, KpiQuery, RawKpiTable
from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string
from .InMemoryObjectDatabase import InMemoryObjectDatabase
from .InMemoryTimeSeriesDatabase import InMemoryTimeSeriesDatabase

LOGGER = logging.getLogger(__name__)

class IMDB_ContainersEnum(enum.Enum):
    KPI_DESCRIPTORS = 'kpi_descriptor'

class MockServicerImpl_Monitoring(MonitoringServiceServicer):
    def __init__(self, queue_samples : Optional[Queue] = None):
    def __init__(
        self, queue_samples : Optional[Queue] = None
    ) -> None:
        LOGGER.info('[__init__] Creating Servicer...')
        if queue_samples is None: queue_samples = Queue()
        self.queue_samples = queue_samples
        self.obj_db = InMemoryObjectDatabase()
        self.ts_db  = InMemoryTimeSeriesDatabase()
        LOGGER.info('[__init__] Servicer Created')

    # ----- Common -----------------------------------------------------------------------------------------------------

    def _set(self, container_name, entry_uuid, entry_id_field_name, entry):
        entry = self.obj_db.set_entry(container_name, entry_uuid, entry)
        return getattr(entry, entry_id_field_name)

    def _del(self, container_name, entry_uuid, grpc_context):
        self.obj_db.del_entry(container_name, entry_uuid, grpc_context)
        return Empty()

    # ----- KPI Descriptor ---------------------------------------------------------------------------------------------

    def GetKpiDescriptorList(self, request : Empty, context : grpc.ServicerContext) -> KpiDescriptorList:
        LOGGER.info('[GetKpiDescriptorList] request={:s}'.format(grpc_message_to_json_string(request)))
        kpi_descriptor_list = self.obj_db.get_entries(IMDB_ContainersEnum.KPI_DESCRIPTORS.value)
        reply = KpiDescriptorList(kpi_descriptor_list=kpi_descriptor_list)
        LOGGER.info('[GetKpiDescriptorList] reply={:s}'.format(grpc_message_to_json_string(reply)))
        return reply

    def GetKpiDescriptor(self, request : KpiId, context : grpc.ServicerContext) -> KpiDescriptor:
        LOGGER.info('[GetKpiDescriptor] request={:s}'.format(grpc_message_to_json_string(request)))
        reply = self.obj_db.get_entry(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.uuid, context)
        LOGGER.info('[GetKpiDescriptor] reply={:s}'.format(grpc_message_to_json_string(reply)))
        return reply

    def SetKpi(self, request : KpiDescriptor, context : grpc.ServicerContext) -> KpiId:
        LOGGER.info('[SetKpi] request={:s}'.format(grpc_message_to_json_string(request)))
        reply = self._set(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.kpi_id.uuid, 'kpi_id', request)
        LOGGER.info('[SetKpi] reply={:s}'.format(grpc_message_to_json_string(reply)))
        return reply

    def DeleteKpi(self, request : KpiId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[DeleteKpi] request={:s}'.format(grpc_message_to_json_string(request)))
        reply = self._del(IMDB_ContainersEnum.KPI_DESCRIPTORS.value, request.kpi_id.kpi_id.uuid, context)
        LOGGER.info('[DeleteKpi] reply={:s}'.format(grpc_message_to_json_string(reply)))
        return reply

    # ----- KPI Sample -------------------------------------------------------------------------------------------------

    def IncludeKpi(self, request : Kpi, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[IncludeKpi] request={:s}'.format(grpc_message_to_json_string(request)))
        self.queue_samples.put(request)
        return Empty()
        reply = Empty()
        LOGGER.info('[IncludeKpi] reply={:s}'.format(grpc_message_to_json_string(reply)))
        return reply

    def QueryKpiData(self, request : KpiQuery, context : grpc.ServicerContext) -> RawKpiTable:
        LOGGER.info('[QueryKpiData] request={:s}'.format(grpc_message_to_json_string(request)))
        # TODO: add filters for request.monitoring_window_s
        # TODO: add filters for request.last_n_samples
        kpi_uuids = [kpi_id.kpi_id.uuid for kpi_id in request.kpi_ids]

        start_timestamp = request.start_timestamp.timestamp
        if start_timestamp <= 0: start_timestamp = None

        end_timestamp = request.end_timestamp.timestamp
        if end_timestamp <= 0: end_timestamp = None

        df_samples = self.ts_db.filter(kpi_uuids, start_timestamp=start_timestamp, end_timestamp=end_timestamp)
        reply = RawKpiTable()
        kpi_uuid__to__raw_kpi_list = dict()

        for df_sample in df_samples.itertuples():
            kpi_uuid  = df_sample.kpi_uuid
            if kpi_uuid in kpi_uuid__to__raw_kpi_list:
                raw_kpi_list = kpi_uuid__to__raw_kpi_list[kpi_uuid]
            else:
                raw_kpi_list = reply.raw_kpi_lists.add()    # pylint: disable=no-member
                raw_kpi_list.kpi_id.kpi_id.uuid = kpi_uuid
                kpi_uuid__to__raw_kpi_list[kpi_uuid] = raw_kpi_list

            raw_kpi = raw_kpi_list.raw_kpis.add()
            raw_kpi.timestamp.timestamp = df_sample.timestamp
            raw_kpi.kpi_value.floatVal  = df_sample.value

        LOGGER.info('[QueryKpiData] reply={:s}'.format(grpc_message_to_json_string(reply)))
        return reply