Skip to content
test_unitary.py 71.8 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=too-many-lines
import copy, grpc, logging, os, pytest, requests, time, urllib
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
    get_service_baseurl_http, get_service_port_grpc, get_service_port_http)
from context.service.Database import Database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId,
    DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId,
    ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.type_checkers.Assertions import (
    validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
    validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
    validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
    validate_topology_ids)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.Tools import (
    FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.Populate import populate
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.rest_server.RestServer import RestServer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.rest_server.Resources import RESOURCES
from requests import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from context.service.database.ContextModel import ContextModel
from context.service.database.Base import Base

from .Objects import (
    CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
    DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
    LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
    SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, TOPOLOGY, TOPOLOGY_ID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + int(get_service_port_grpc(ServiceNameEnum.CONTEXT))   # avoid privileged ports
HTTP_PORT = 10000 + int(get_service_port_http(ServiceNameEnum.CONTEXT))   # avoid privileged ports
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID  = 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
REDIS_CONFIG = {
    'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
    'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
    'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID',  DEFAULT_REDIS_DATABASE_ID ),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
SCENARIOS = [
    ('all_sqlalchemy', {},           MessageBrokerBackendEnum.INMEMORY, {}          ),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Session, MessageBroker]:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    name,db_backend,db_settings,mb_backend,mb_settings = request.param
    msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
    LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
    _database = Database(get_database_backend(backend=db_backend, **db_settings))
    _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
    yield _database, _message_broker
    _message_broker.terminate()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_s_mb(request) -> Tuple[Session, MessageBroker]:
    name,db_session,mb_backend,mb_settings = request.param
    msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...'
    LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings)))

    db_uri = 'cockroachdb://root@10.152.183.121:26257/defaultdb?sslmode=disable'
    LOGGER.debug('Connecting to DB: {}'.format(db_uri))

    try:
        engine = create_engine(db_uri)
    except Exception as e:
        LOGGER.error("Failed to connect to database.")
        LOGGER.error(f"{e}")
        return 1

    Base.metadata.create_all(engine)
    _session = sessionmaker(bind=engine)

    _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
    yield _session, _message_broker
    _message_broker.terminate()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
def context_service_grpc(context_s_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    _service = ContextService(context_s_mb[0], context_s_mb[1])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service.start()
    yield _service
    _service.stop()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    database = context_db_mb[0]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _rest_server = RestServer()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    for endpoint_name, resource_class, resource_url in RESOURCES:
        _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
    _rest_server.start()
    time.sleep(1) # bring time for the server to start
    yield _rest_server
    _rest_server.shutdown()
    _rest_server.join()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    yield _client
    _client.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def do_rest_request(url : str):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT)
    request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
    reply = requests.get(request_url)
    LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
    assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
    return reply.json()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------
def test_grpc_context(
    context_client_grpc : ContextClient,                # pylint: disable=redefined-outer-name
    context_s_mb : Tuple[Session, MessageBroker]):    # pylint: disable=redefined-outer-name
    Session = context_s_mb[0]
    database = Database(Session)
    # ----- Clean the database -----------------------------------------------------------------------------------------
    database.clear()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector = EventsCollector(context_client_grpc)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    events_collector.start()

    # ----- Get when the object does not exist -------------------------------------------------------------------------
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.NOT_FOUND
    assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)

    # ----- List when the object does not exist ------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContextIds(Empty())
    assert len(response.context_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContexts(Empty())
    assert len(response.contexts) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
    db_entries = database.query_all(ContextModel)
    LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    for db_entry in db_entries:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    LOGGER.info('-----------------------------------------------------------')
    assert len(db_entries) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Create the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.SetContext(Context(**CONTEXT))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
    wrong_uuid = 'c97c4185-e1d1-4ea7-b6b9-afbf76cb61f4'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    with pytest.raises(grpc.RpcError) as e:
        WRONG_TOPOLOGY_ID = copy.deepcopy(TOPOLOGY_ID)
        WRONG_TOPOLOGY_ID['context_id']['context_uuid']['uuid'] = wrong_uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        WRONG_CONTEXT = copy.deepcopy(CONTEXT)
        WRONG_CONTEXT['topology_ids'].append(WRONG_TOPOLOGY_ID)
        context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg = 'request.topology_ids[0].context_id.context_uuid.uuid({}) is invalid; '\
          'should be == request.context_id.context_uuid.uuid({})'.format(wrong_uuid, DEFAULT_CONTEXT_UUID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.details() == msg

    with pytest.raises(grpc.RpcError) as e:
        WRONG_SERVICE_ID = copy.deepcopy(SERVICE_R1_R2_ID)
        WRONG_SERVICE_ID['context_id']['context_uuid']['uuid'] = wrong_uuid
Loading
Loading full blame...