MockServicerImpl_Context.py 17.4 KB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging
from typing import Any, Dict, Iterator, List
from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
    Context, ContextEvent, ContextId, ContextIdList, ContextList,
    Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
    Empty,
    Link, LinkEvent, LinkId, LinkIdList, LinkList,
    Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
    Slice, SliceEvent, SliceId, SliceIdList, SliceList,
    Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string

LOGGER = logging.getLogger(__name__)

def get_container(database : Dict[str, Dict[str, Any]], container_name : str) -> Dict[str, Any]:
    return database.setdefault(container_name, {})

def get_entries(database : Dict[str, Dict[str, Any]], container_name : str) -> List[Any]:
    container = get_container(database, container_name)
    return [container[entry_uuid] for entry_uuid in sorted(container.keys())]

def get_entry(
    context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str
) -> Any:
    LOGGER.debug('[get_entry] AFTER database={:s}'.format(str(database)))
    container = get_container(database, container_name)
    if entry_uuid not in container:
        context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
    return container[entry_uuid]

def set_entry(database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str, entry : Any) -> Any:
    container = get_container(database, container_name)
    LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(database)))
    container[entry_uuid] = entry
    LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(database)))
    return entry

def del_entry(
    context : grpc.ServicerContext, database : Dict[str, Dict[str, Any]], container_name : str, entry_uuid : str
) -> Any:
    container = get_container(database, container_name)
    if entry_uuid not in container:
        context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
    del container[entry_uuid]
    return Empty()

class MockServicerImpl_Context(ContextServiceServicer):
    def __init__(self):
        LOGGER.info('[__init__] Creating Servicer...')
        self.database : Dict[str, Any] = {}
        LOGGER.info('[__init__] Servicer Created')

    # ----- Context ----------------------------------------------------------------------------------------------------

    def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
        LOGGER.info('[ListContextIds] request={:s}'.format(grpc_message_to_json_string(request)))
        return ContextIdList(context_ids=[context.context_id for context in get_entries(self.database, 'context')])

    def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
        LOGGER.info('[ListContexts] request={:s}'.format(grpc_message_to_json_string(request)))
        return ContextList(contexts=get_entries(self.database, 'context'))

    def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
        LOGGER.info('[GetContext] request={:s}'.format(grpc_message_to_json_string(request)))
        return get_entry(context, self.database, 'context', request.context_uuid.uuid)

    def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
        LOGGER.info('[SetContext] request={:s}'.format(grpc_message_to_json_string(request)))
        return set_entry(self.database, 'context', request.context_id.context_uuid.uuid, request).context_id

    def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[RemoveContext] request={:s}'.format(grpc_message_to_json_string(request)))
        return del_entry(context, self.database, 'context', request.context_uuid.uuid)

    def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
        LOGGER.info('[GetContextEvents] request={:s}'.format(grpc_message_to_json_string(request)))


    # ----- Topology ---------------------------------------------------------------------------------------------------

    def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
        LOGGER.info('[ListTopologyIds] request={:s}'.format(grpc_message_to_json_string(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        topologies = get_entries(self.database, 'topology[{:s}]'.format(str(request.context_uuid.uuid)))
        return TopologyIdList(topology_ids=[topology.topology_id for topology in topologies])

    def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
        LOGGER.info('[ListTopologies] request={:s}'.format(grpc_message_to_json_string(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        topologies = get_entries(self.database, 'topology[{:s}]'.format(str(request.context_uuid.uuid)))
        return TopologyList(topologies=[topology for topology in topologies])

    def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology:
        LOGGER.info('[GetTopology] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid))
        return get_entry(context, self.database, container_name, request.topology_uuid.uuid)

    def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
        LOGGER.info('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'topology[{:s}]'.format(str(request.topology_id.context_id.context_uuid.uuid))
        return set_entry(self.database, container_name, request.topology_id.topology_uuid.uuid, request).topology_id

    def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid))
        return del_entry(context, self.database, container_name, request.topology_uuid.uuid)

    def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
        LOGGER.info('[GetTopologyEvents] request={:s}'.format(grpc_message_to_json_string(request)))


    # ----- Device -----------------------------------------------------------------------------------------------------

    def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
        LOGGER.info('[ListDeviceIds] request={:s}'.format(grpc_message_to_json_string(request)))
        return DeviceIdList(device_ids=[device.device_id for device in get_entries(self.database, 'device')])

    def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
        LOGGER.info('[ListDevices] request={:s}'.format(grpc_message_to_json_string(request)))
        return DeviceList(devices=get_entries(self.database, 'device'))

    def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
        LOGGER.info('[GetDevice] request={:s}'.format(grpc_message_to_json_string(request)))
        return get_entry(context, self.database, 'device', request.device_uuid.uuid)

    def SetDevice(self, request: Context, context : grpc.ServicerContext) -> DeviceId:
        LOGGER.info('[SetDevice] request={:s}'.format(grpc_message_to_json_string(request)))
        return set_entry(self.database, 'device', request.device_id.device_uuid.uuid, request).device_id

    def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[RemoveDevice] request={:s}'.format(grpc_message_to_json_string(request)))
        return del_entry(context, self.database, 'device', request.device_uuid.uuid)

    def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
        LOGGER.info('[GetDeviceEvents] request={:s}'.format(grpc_message_to_json_string(request)))


    # ----- Link -------------------------------------------------------------------------------------------------------

    def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList:
        LOGGER.info('[ListLinkIds] request={:s}'.format(grpc_message_to_json_string(request)))
        return LinkIdList(link_ids=[link.link_id for link in get_entries(self.database, 'link')])

    def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList:
        LOGGER.info('[ListLinks] request={:s}'.format(grpc_message_to_json_string(request)))
        return LinkList(links=get_entries(self.database, 'link'))

    def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link:
        LOGGER.info('[GetLink] request={:s}'.format(grpc_message_to_json_string(request)))
        return get_entry(context, self.database, 'link', request.link_uuid.uuid)

    def SetLink(self, request: Context, context : grpc.ServicerContext) -> LinkId:
        LOGGER.info('[SetLink] request={:s}'.format(grpc_message_to_json_string(request)))
        return set_entry(self.database, 'link', request.link_id.link_uuid.uuid, request).link_id

    def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[RemoveLink] request={:s}'.format(grpc_message_to_json_string(request)))
        return del_entry(context, self.database, 'link', request.link_uuid.uuid)

    def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
        LOGGER.info('[GetLinkEvents] request={:s}'.format(grpc_message_to_json_string(request)))


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Slice ------------------------------------------------------------------------------------------------------

    def ListSliceIds(self, request: ContextId, context : grpc.ServicerContext) -> SliceIdList:
        LOGGER.info('[ListSliceIds] request={:s}'.format(grpc_message_to_json_string(request)))
        slices = get_entries(self.database, 'slice[{:s}]'.format(str(request.context_uuid.uuid)))
        return SliceIdList(slice_ids=[slice.slice_id for slice in slices])

    def ListSlices(self, request: ContextId, context : grpc.ServicerContext) -> SliceList:
        LOGGER.info('[ListSlices] request={:s}'.format(grpc_message_to_json_string(request)))
        slices = get_entries(self.database, 'slice[{:s}]'.format(str(request.context_uuid.uuid)))
        return SliceList(slices=[slice for slice in slices])

    def GetSlice(self, request: SliceId, context : grpc.ServicerContext) -> Slice:
        LOGGER.info('[GetSlice] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid))
        return get_entry(context, self.database, container_name, request.slice_uuid.uuid)

    def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId:
        LOGGER.info('[SetSlice] request={:s}'.format(grpc_message_to_json_string(request)))
        return set_entry(
            self.database, 'slice[{:s}]'.format(str(request.slice_id.context_id.context_uuid.uuid)),
            request.slice_id.slice_uuid.uuid, request).slice_id

    def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[RemoveSlice] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid))
        return del_entry(context, self.database, container_name, request.slice_uuid.uuid)

    def GetSliceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[SliceEvent]:
        LOGGER.info('[GetSliceEvents] request={:s}'.format(grpc_message_to_json_string(request)))


    # ----- Service ----------------------------------------------------------------------------------------------------

    def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList:
        LOGGER.info('[ListServiceIds] request={:s}'.format(grpc_message_to_json_string(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        services = get_entries(self.database, 'service[{:s}]'.format(str(request.context_uuid.uuid)))
        return ServiceIdList(service_ids=[service.service_id for service in services])

    def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
        LOGGER.info('[ListServices] request={:s}'.format(grpc_message_to_json_string(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        services = get_entries(self.database, 'service[{:s}]'.format(str(request.context_uuid.uuid)))
        return ServiceList(services=[service for service in services])

    def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service:
        LOGGER.info('[GetService] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid))
        return get_entry(context, self.database, container_name, request.service_uuid.uuid)

    def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
        LOGGER.info('[SetService] request={:s}'.format(grpc_message_to_json_string(request)))
        return set_entry(
            self.database, 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid)),
            request.service_id.service_uuid.uuid, request).service_id

    def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid))
        return del_entry(context, self.database, container_name, request.service_uuid.uuid)

    def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
        LOGGER.info('[GetServiceEvents] request={:s}'.format(grpc_message_to_json_string(request)))


    # ----- Connection -------------------------------------------------------------------------------------------------

    def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
        LOGGER.info('[ListConnectionIds] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'service_connections[{:s}/{:s}]'.format(
            str(request.context_id.context_uuid.uuid), str(request.service_uuid.uuid))
        return ConnectionIdList(connection_ids=[c.connection_id for c in get_entries(self.database, container_name)])

    def ListConnections(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionList:
        LOGGER.info('[ListConnections] request={:s}'.format(grpc_message_to_json_string(request)))
        container_name = 'service_connections[{:s}/{:s}]'.format(
            str(request.context_id.context_uuid.uuid), str(request.service_uuid.uuid))
        return ConnectionList(connections=get_entries(self.database, container_name))

    def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection:
        LOGGER.info('[GetConnection] request={:s}'.format(grpc_message_to_json_string(request)))
        return get_entry(context, self.database, 'connection', request.connection_uuid.uuid)

    def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId:
        LOGGER.info('[SetConnection] request={:s}'.format(grpc_message_to_json_string(request)))
        service_connection__container_name = 'service_connection[{:s}/{:s}]'.format(
            str(request.service_id.context_id.context_uuid.uuid), str(request.service_id.service_uuid.uuid))
        set_entry(
            self.database, service_connection__container_name, request.connection_id.connection_uuid.uuid, request)
        return set_entry(
            self.database, 'connection', request.connection_id.connection_uuid.uuid, request).connection_id

    def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty:
        LOGGER.info('[RemoveConnection] request={:s}'.format(grpc_message_to_json_string(request)))
        connection = get_entry(context, self.database, 'connection', request.connection_uuid.uuid)
        service_id = connection.service_id
        service_connection__container_name = 'service_connection[{:s}/{:s}]'.format(
            str(service_id.context_id.context_uuid.uuid), str(service_id.service_uuid.uuid))
        del_entry(context, self.database, service_connection__container_name, request.connection_uuid.uuid)
        return del_entry(context, self.database, 'connection', request.connection_uuid.uuid)

    def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
        LOGGER.info('[GetConnectionEvents] request={:s}'.format(grpc_message_to_json_string(request)))