Skip to content
test_unitary.py 70.2 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=too-many-lines
import copy, grpc, logging, os, pytest, requests, time, urllib
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Tuple
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.orm.Database import Database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.type_checkers.Assertions import (
    validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
    validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
    validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
    validate_topology_ids)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.Config import (
    GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId,
    DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId,
    ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.Tools import (
    FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.Populate import populate
from context.service.rest_server.Server import Server as RestServer
from context.service.rest_server.Resources import RESOURCES
from .Objects import (
    CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
    DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
    LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
    SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, TOPOLOGY, TOPOLOGY_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
GRPC_PORT    = 10000 + GRPC_SERVICE_PORT    # avoid privileged ports
RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID  = 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
REDIS_CONFIG = {
    'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
    'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
    'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID',  DEFAULT_REDIS_DATABASE_ID ),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
SCENARIOS = [
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    ('all_inmemory', DatabaseBackendEnum.INMEMORY, {},           MessageBrokerBackendEnum.INMEMORY, {}          ),
    ('all_redis',    DatabaseBackendEnum.REDIS,    REDIS_CONFIG, MessageBrokerBackendEnum.REDIS,    REDIS_CONFIG),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
    name,db_backend,db_settings,mb_backend,mb_settings = request.param
    msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
    LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
    _database = Database(get_database_backend(backend=db_backend, **db_settings))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
    yield _database, _message_broker
    _message_broker.terminate()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service = ContextService(
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_db_mb[0], context_db_mb[1], port=GRPC_PORT, max_workers=GRPC_MAX_WORKERS,
        grace_period=GRPC_GRACE_PERIOD)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    database = context_db_mb[0]
    _rest_server = RestServer(port=RESTAPI_PORT, base_url=RESTAPI_BASE_URL)
    for endpoint_name, resource_class, resource_url in RESOURCES:
        _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
    _rest_server.start()
    time.sleep(1) # bring time for the server to start
    yield _rest_server
    _rest_server.shutdown()
    _rest_server.join()

@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _client = ContextClient(address='127.0.0.1', port=GRPC_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    yield _client
    _client.close()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def do_rest_request(url : str):
    request_url = 'http://127.0.0.1:{:s}{:s}{:s}'.format(str(RESTAPI_PORT), str(RESTAPI_BASE_URL), url)
    LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
    reply = requests.get(request_url)
    LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
    assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
    return reply.json()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------

def test_grpc_context(
    context_client_grpc : ContextClient,                # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_db_mb : Tuple[Database, MessageBroker]):    # pylint: disable=redefined-outer-name
    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_database.clear_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsCollector(context_client_grpc)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector.start()

    # ----- Get when the object does not exist -------------------------------------------------------------------------
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.NOT_FOUND
    assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)

    # ----- List when the object does not exist ------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContextIds(Empty())
    assert len(response.context_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContexts(Empty())
    assert len(response.contexts) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Create the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.SetContext(Context(**CONTEXT))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    with pytest.raises(grpc.RpcError) as e:
        WRONG_TOPOLOGY_ID = copy.deepcopy(TOPOLOGY_ID)
        WRONG_TOPOLOGY_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
        WRONG_CONTEXT = copy.deepcopy(CONTEXT)
        WRONG_CONTEXT['topology_ids'].append(WRONG_TOPOLOGY_ID)
        context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg = 'request.topology_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
          'should be == request.context_id.context_uuid.uuid(admin)'
    assert e.value.details() == msg

    with pytest.raises(grpc.RpcError) as e:
        WRONG_SERVICE_ID = copy.deepcopy(SERVICE_R1_R2_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        WRONG_SERVICE_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
        WRONG_CONTEXT = copy.deepcopy(CONTEXT)
        WRONG_CONTEXT['service_ids'].append(WRONG_SERVICE_ID)
        context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg = 'request.service_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
          'should be == request.context_id.context_uuid.uuid(admin)'
    assert e.value.details() == msg

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Check create event -----------------------------------------------------------------------------------------
    event = events_collector.get_event(block=True)
    assert isinstance(event, ContextEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Update the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.SetContext(Context(**CONTEXT))
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Check update event -----------------------------------------------------------------------------------------
    event = events_collector.get_event(block=True)
    assert isinstance(event, ContextEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
    assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID

    # ----- Dump state of database after create/update the object ------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 2

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Get when the object exists ---------------------------------------------------------------------------------
Loading
Loading full blame…