Commit c4453f5d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Factorization of code between services and minor unitary testing improvements....

Factorization of code between services and minor unitary testing improvements. (Not finished, it is just a backup)
parent 91f36fbb
Loading
Loading
Loading
Loading
+0 −0

Empty file added.

+50 −0
Original line number Diff line number Diff line
import grpc
from common.database.api.Database import Database
from common.database.api.context.topology.device.Endpoint import Endpoint
from common.exceptions.ServiceException import ServiceException

def check_device_exists(database : Database, context_id : str, topology_id : str, device_id : str):
    db_context = database.context(context_id).create()
    db_topology = db_context.topology(topology_id).create()
    if db_topology.devices.contains(device_id): return
    msg = 'Context({})/Topology({})/Device({}) does not exist in the database.'
    msg = msg.format(context_id, topology_id, device_id)
    raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

def check_device_not_exists(database : Database, context_id : str, topology_id : str, device_id : str):
    db_context = database.context(context_id).create()
    db_topology = db_context.topology(topology_id).create()
    if not db_topology.devices.contains(device_id): return
    msg = 'Context({})/Topology({})/Device({}) already exists in the database.'
    msg = msg.format(context_id, topology_id, device_id)
    raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)

def check_device_endpoint_exists(
    database : Database, parent_name : str,
    context_id : str, topology_id : str, device_id : str, port_id : str) -> Endpoint:

    # Implicit validation: parent.context == endpoint.context, and parent.context created automatically
    if not database.contexts.contains(context_id):          # pragma: no cover
        msg = 'Context({}) in {} does not exist in the database.'
        msg = msg.format(context_id, parent_name)
        raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
    db_context = database.context(context_id)

    if not db_context.topologies.contains(topology_id):
        msg = 'Context({})/Topology({}) in {} does not exist in the database.'
        msg = msg.format(context_id, topology_id, parent_name)
        raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
    db_topology = db_context.topology(topology_id)

    if not db_topology.devices.contains(device_id):
        msg = 'Context({})/Topology({})/Device({}) in {} does not exist in the database.'
        msg = msg.format(context_id, topology_id, device_id, parent_name)
        raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
    db_device = db_topology.device(device_id)

    if not db_device.endpoints.contains(port_id):
        msg = 'Context({})/Topology({})/Device({})/Port({}) in {} does not exist in the database.'
        msg = msg.format(context_id, topology_id, device_id, port_id, parent_name)
        raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

    return db_device.endpoint(port_id)
+73 −0
Original line number Diff line number Diff line
import grpc, logging
from typing import Dict, Set, Tuple, Union
from common.Checkers import chk_string
from common.exceptions.ServiceException import ServiceException
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID

def check_endpoint_id(
    logger : logging.Logger, endpoint_number : int, parent_name : str, endpoint_id : 'EndpointId',
    add_topology_devices_endpoints : Dict[str, Dict[str, Set[str]]],
    default_device_if_empty : Union[str, None] = None,
    predefined_context_id : str = DEFAULT_CONTEXT_ID, acceptable_context_ids : Set[str] = set([DEFAULT_CONTEXT_ID]),
    predefined_topology_id : str = DEFAULT_TOPOLOGY_ID, acceptable_topology_ids : Set[str] = set([DEFAULT_TOPOLOGY_ID])
    ) -> Tuple[str, str, str]:

    try:
        ep_context_id  = chk_string('endpoint_id[#{}].topoId.contextId.contextUuid.uuid'.format(endpoint_number),
                                    endpoint_id.topoId.contextId.contextUuid.uuid,
                                    allow_empty=True)
        ep_topology_id = chk_string('endpoint_id[#{}].topoId.topoId.uuid'.format(endpoint_number),
                                    endpoint_id.topoId.topoId.uuid,
                                    allow_empty=True)
        ep_device_id   = chk_string('endpoint_id[#{}].dev_id.device_id.uuid'.format(endpoint_number),
                                    endpoint_id.dev_id.device_id.uuid,
                                    allow_empty=(default_device_if_empty is not None))
        ep_port_id     = chk_string('endpoint_id[#{}].port_id.uuid'.format(endpoint_number),
                                    endpoint_id.port_id.uuid,
                                    allow_empty=False)
    except Exception as e:
        logger.exception('Invalid arguments:')
        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

    if len(ep_context_id) == 0:
        # Assumption: if no context is specified for an endpoint_id, use parent context
        ep_context_id = predefined_context_id
    elif ep_context_id not in acceptable_context_ids:
        # Assumption: parent and endpoints should belong to the same context
        msg = ' '.join([
            'Context({}) in {} mismatches acceptable Contexts({}).',
            'Optionally, leave field empty to use predefined Context({}).',
        ])
        msg = msg.format(ep_context_id, parent_name, str(acceptable_context_ids), predefined_context_id)
        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

    if len(ep_topology_id) == 0:
        # Assumption: if no topology is specified for an endpoint_id, use parent topology
        ep_topology_id = predefined_topology_id
    elif ep_topology_id not in acceptable_topology_ids:
        msg = ' '.join([
            'Topology({}) in {} mismatches acceptable Topologies({}).',
            'Optionally, leave field empty to use predefined Topology({}).',
        ])
        msg = msg.format(ep_topology_id, parent_name, str(acceptable_topology_ids), predefined_topology_id)
        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

    if (len(ep_device_id) == 0) and (default_device_if_empty is not None):
        ep_device_id = default_device_if_empty

    add_devices = add_topology_devices_endpoints.setdefault(ep_topology_id, dict())
    if ep_device_id in add_devices:
        msg = 'Duplicated Context({})/Topology({})/Device({}) in {}.'
        msg = msg.format(ep_context_id, ep_topology_id, ep_device_id, parent_name)
        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

    add_device_and_endpoints = add_devices.setdefault(ep_device_id, set())

    # Implicit validation: same device cannot appear 2 times in the list of endpoints
    if ep_port_id in add_device_and_endpoints:                                # pragma: no cover
        msg = 'Duplicated Context({})/Topology({})/Device({})/Port({}) in {}.'
        msg = msg.format(ep_context_id, ep_topology_id, ep_device_id, ep_port_id, parent_name)
        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

    add_device_and_endpoints.add(ep_port_id)
    return ep_topology_id, ep_device_id, ep_port_id
+24 −0
Original line number Diff line number Diff line
import grpc
from enum import Enum
from common.exceptions.ServiceException import ServiceException

def check_enum(enum_name, method_name, value, to_enum_method, accepted_values_dict) -> Enum:
    _value = to_enum_method(value)
    if _value is None:                          # pragma: no cover (gRPC prevents unsupported values)
        msg = 'Unsupported {}({}).'
        msg = msg.format(enum_name, value)
        raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

    accepted_values = accepted_values_dict.get(method_name)
    if accepted_values is None:                 # pragma: no cover (test requires malforming the code)
        msg = '{} acceptable values not specified for Method({}).'
        msg = msg.format(enum_name, method_name)
        raise ServiceException(grpc.StatusCode.INTERNAL, msg)

    if len(accepted_values) == 0: return _value
    if _value in accepted_values: return _value

    msg = 'Method({}) does not accept {}({}). Permitted values for Method({}) are {}({}).'
    accepted_values_list = sorted(map(lambda v: v.name, accepted_values))
    msg = msg.format(method_name, enum_name, _value.name, method_name, enum_name, accepted_values_list)
    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
Loading