Skip to content
test_unitary.py 70.6 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=too-many-lines
import copy, grpc, logging, os, pytest, requests, time, urllib
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name, get_service_baseurl_http, get_service_port_grpc, get_service_port_http
from common.orm.Database import Database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.type_checkers.Assertions import (
    validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
    validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
    validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
    validate_topology_ids)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId,
    DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId,
    ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.Tools import (
    FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.Populate import populate
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.rest_server.RestServer import RestServer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.rest_server.Resources import RESOURCES
from .Objects import (
    CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
    DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
    LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
    SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, TOPOLOGY, TOPOLOGY_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT)   # avoid privileged ports
HTTP_PORT = 10000 + get_service_port_http(ServiceNameEnum.CONTEXT)   # avoid privileged ports
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)

DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID  = 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
REDIS_CONFIG = {
    'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
    'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
    'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID',  DEFAULT_REDIS_DATABASE_ID ),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
SCENARIOS = [
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    ('all_inmemory', DatabaseBackendEnum.INMEMORY, {},           MessageBrokerBackendEnum.INMEMORY, {}          ),
    ('all_redis',    DatabaseBackendEnum.REDIS,    REDIS_CONFIG, MessageBrokerBackendEnum.REDIS,    REDIS_CONFIG),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
    name,db_backend,db_settings,mb_backend,mb_settings = request.param
    msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
    LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
    _database = Database(get_database_backend(backend=db_backend, **db_settings))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
    yield _database, _message_broker
    _message_broker.terminate()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service = ContextService(context_db_mb[0], context_db_mb[1])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service.start()
    yield _service
    _service.stop()

@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    database = context_db_mb[0]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _rest_server = RestServer()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    for endpoint_name, resource_class, resource_url in RESOURCES:
        _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
    _rest_server.start()
    time.sleep(1) # bring time for the server to start
    yield _rest_server
    _rest_server.shutdown()
    _rest_server.join()

@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    yield _client
    _client.close()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def do_rest_request(url : str):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT)
    request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
    reply = requests.get(request_url)
    LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
    assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
    return reply.json()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------

def test_grpc_context(
    context_client_grpc : ContextClient,                # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_db_mb : Tuple[Database, MessageBroker]):    # pylint: disable=redefined-outer-name
    context_database = context_db_mb[0]

    # ----- Clean the database -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    context_database.clear_all()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsCollector(context_client_grpc)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector.start()

    # ----- Get when the object does not exist -------------------------------------------------------------------------
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.NOT_FOUND
    assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)

    # ----- List when the object does not exist ------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContextIds(Empty())
    assert len(response.context_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContexts(Empty())
    assert len(response.contexts) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Create the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.SetContext(Context(**CONTEXT))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    with pytest.raises(grpc.RpcError) as e:
        WRONG_TOPOLOGY_ID = copy.deepcopy(TOPOLOGY_ID)
        WRONG_TOPOLOGY_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
        WRONG_CONTEXT = copy.deepcopy(CONTEXT)
        WRONG_CONTEXT['topology_ids'].append(WRONG_TOPOLOGY_ID)
        context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg = 'request.topology_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
          'should be == request.context_id.context_uuid.uuid(admin)'
    assert e.value.details() == msg

    with pytest.raises(grpc.RpcError) as e:
        WRONG_SERVICE_ID = copy.deepcopy(SERVICE_R1_R2_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        WRONG_SERVICE_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
        WRONG_CONTEXT = copy.deepcopy(CONTEXT)
        WRONG_CONTEXT['service_ids'].append(WRONG_SERVICE_ID)
        context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg = 'request.service_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
          'should be == request.context_id.context_uuid.uuid(admin)'
    assert e.value.details() == msg

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Check create event -----------------------------------------------------------------------------------------
    event = events_collector.get_event(block=True)
    assert isinstance(event, ContextEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
    assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Update the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.SetContext(Context(**CONTEXT))
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Check update event -----------------------------------------------------------------------------------------
    event = events_collector.get_event(block=True)
    assert isinstance(event, ContextEvent)
    assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
    assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID

    # ----- Dump state of database after create/update the object ------------------------------------------------------
    db_entries = context_database.dump()
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
Loading
Loading full blame…