Skip to content
test_unitary.py 57.3 KiB
Newer Older
import copy, grpc, logging, os, pytest, requests, threading, time, urllib
from queue import Queue, Empty
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Tuple
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.orm.Database import Database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.type_checkers.Assertions import (
    validate_context, validate_context_ids, validate_contexts, validate_device, validate_device_ids, validate_devices,
    validate_link, validate_link_ids, validate_links, validate_service, validate_service_ids, validate_services,
    validate_topologies, validate_topology, validate_topology_ids)
from context.Config import (
    GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.proto.context_pb2 import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, DeviceOperationalStatusEnum, Empty,
    EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId, ServiceStatusEnum, ServiceTypeEnum,
    Topology, TopologyEvent, TopologyId)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.Tools import (
    FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.Populate import populate
from context.service.rest_server.Server import Server as RestServer
from context.service.rest_server.Resources import RESOURCES
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .example_objects import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    CONTEXT, CONTEXT_ID, DEVICE1, DEVICE1_ID, DEVICE1_UUID, DEVICE2, DEVICE2_ID, DEVICE2_UUID, LINK_DEV1_DEV2,
    LINK_DEV1_DEV2_ID, LINK_DEV1_DEV2_UUID, SERVICE_DEV1_DEV2, SERVICE_DEV1_DEV2_ID, SERVICE_DEV1_DEV2_UUID, TOPOLOGY,
    TOPOLOGY_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
GRPC_PORT    = 10000 + GRPC_SERVICE_PORT    # avoid privileged ports
RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID  = 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
REDIS_CONFIG = {
    'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
    'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
    'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID',  DEFAULT_REDIS_DATABASE_ID ),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
SCENARIOS = [
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    ('all_inmemory', DatabaseBackendEnum.INMEMORY, {},           MessageBrokerBackendEnum.INMEMORY, {}          ),
    ('all_redis',    DatabaseBackendEnum.REDIS,    REDIS_CONFIG, MessageBrokerBackendEnum.REDIS,    REDIS_CONFIG),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
    name,db_backend,db_settings,mb_backend,mb_settings = request.param
    msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
    LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
    _database = Database(get_database_backend(backend=db_backend, **db_settings))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
    yield _database, _message_broker
    _message_broker.terminate()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service = ContextService(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_db_mb[0], context_db_mb[1], port=GRPC_PORT, max_workers=GRPC_MAX_WORKERS,
        grace_period=GRPC_GRACE_PERIOD)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    database = context_db_mb[0]
    _rest_server = RestServer(port=RESTAPI_PORT, base_url=RESTAPI_BASE_URL)
    for endpoint_name, resource_class, resource_url in RESOURCES:
        _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
    _rest_server.start()
    time.sleep(1) # bring time for the server to start
    yield _rest_server
    _rest_server.shutdown()
    _rest_server.join()

@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _client = ContextClient(address='127.0.0.1', port=GRPC_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    yield _client
    _client.close()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def do_rest_request(url : str):
    request_url = 'http://127.0.0.1:{:s}{:s}{:s}'.format(str(RESTAPI_PORT), str(RESTAPI_BASE_URL), url)
    LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
    reply = requests.get(request_url)
    LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
    assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
    return reply.json()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
class EventsCollector:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._events_queue = Queue()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self._context_stream  = context_client_grpc.GetContextEvents(Empty())
        self._topology_stream = context_client_grpc.GetTopologyEvents(Empty())
        self._device_stream   = context_client_grpc.GetDeviceEvents(Empty())
        self._link_stream     = context_client_grpc.GetLinkEvents(Empty())
        self._service_stream  = context_client_grpc.GetServiceEvents(Empty())
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

        self._context_thread  = threading.Thread(target=self._collect, args=(self._context_stream ,), daemon=False)
        self._topology_thread = threading.Thread(target=self._collect, args=(self._topology_stream,), daemon=False)
        self._device_thread   = threading.Thread(target=self._collect, args=(self._device_stream  ,), daemon=False)
        self._link_thread     = threading.Thread(target=self._collect, args=(self._link_stream    ,), daemon=False)
        self._service_thread  = threading.Thread(target=self._collect, args=(self._service_stream ,), daemon=False)

    def _collect(self, events_stream) -> None:
        try:
            for event in events_stream:
                self._events_queue.put_nowait(event)
        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
                raise # pragma: no cover

    def start(self):
        self._context_thread.start()
        self._topology_thread.start()
        self._device_thread.start()
        self._link_thread.start()
        self._service_thread.start()

    def get_event(self, block : bool = True, timeout : float = 0.1):
        return self._events_queue.get(block=block, timeout=timeout)

    def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None):
        events = []
        if count is None:
            while True:
                try:
                    events.append(self.get_event(block=block, timeout=timeout))
                except Empty: # pylint: disable=catching-non-exception
                    break
        else:
            for _ in range(count):
                try:
                    events.append(self.get_event(block=block, timeout=timeout))
                except Empty: # pylint: disable=catching-non-exception
                    pass
        return sorted(events, key=lambda e: e.event.timestamp)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def stop(self):
        self._context_stream.cancel()
        self._topology_stream.cancel()
        self._device_stream.cancel()
        self._link_stream.cancel()
        self._service_stream.cancel()

        self._context_thread.join()
        self._topology_thread.join()
        self._device_thread.join()
        self._link_thread.join()
        self._service_thread.join()


Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------

def test_grpc_context(
    context_client_grpc : ContextClient,                # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_db_mb : Tuple[Database, MessageBroker]):    # pylint: disable=redefined-outer-name
    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_database.clear_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsCollector(context_client_grpc)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector.start()

    # ----- Get when the object does not exist -------------------------------------------------------------------------
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.NOT_FOUND
    assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)

    # ----- List when the object does not exist ------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContextIds(Empty())
    assert len(response.context_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContexts(Empty())
    assert len(response.contexts) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Create the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.SetContext(Context(**CONTEXT))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    with pytest.raises(grpc.RpcError) as e:
Loading
Loading full blame…