InMemoryObjectDatabase.py 3.15 KB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging
from typing import Any, Dict, List, Set

LOGGER = logging.getLogger(__name__)

class InMemoryObjectDatabase:
    def __init__(self) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._database : Dict[str, Dict[str, Any]] = dict()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    def _get_container(self, container_name : str) -> Dict[str, Any]:
        return self._database.setdefault(container_name, {})

    def get_entries(self, container_name : str) -> List[Any]:
        container = self._get_container(container_name)
        return [container[entry_uuid] for entry_uuid in sorted(container.keys())]

    def has_entry(self, container_name : str, entry_uuid : str) -> Any:
        LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(self._database)))
        container = self._get_container(container_name)
        return entry_uuid in container

    def get_entry(self, container_name : str, entry_uuid : str, context : grpc.ServicerContext) -> Any:
        LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(self._database)))
        container = self._get_container(container_name)
        if entry_uuid not in container:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            MSG = '{:s}({:s}) not found; available({:s})'
            msg = str(MSG.format(container_name, entry_uuid, str(container.keys())))
            context.abort(grpc.StatusCode.NOT_FOUND, msg)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        return container[entry_uuid]

    def set_entry(self, container_name : str, entry_uuid : str, entry : Any) -> Any:
        container = self._get_container(container_name)
        LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(self._database)))
        container[entry_uuid] = entry
        LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(self._database)))
        return entry

    def del_entry(self, container_name : str, entry_uuid : str, context : grpc.ServicerContext) -> None:
        container = self._get_container(container_name)
        LOGGER.debug('[del_entry] BEFORE database={:s}'.format(str(self._database)))
        if entry_uuid not in container:
            context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
        del container[entry_uuid]
        LOGGER.debug('[del_entry] AFTER database={:s}'.format(str(self._database)))

    def select_entries(self, container_name : str, entry_uuids : Set[str]) -> List[Any]:
        if len(entry_uuids) == 0: return self.get_entries(container_name)
        container = self._get_container(container_name)
        return [
            container[entry_uuid]
            for entry_uuid in sorted(container.keys())
            if entry_uuid in entry_uuids
        ]