Skip to content
test_unitary.py 73.8 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=too-many-lines
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import copy, grpc, logging, os, pytest, requests, sqlalchemy, time, urllib, uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Tuple
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum
from common.Settings import (
    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
    get_service_baseurl_http, get_service_port_grpc, get_service_port_http)
from context.service.Database import Database
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
    Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId,
    DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId,
    ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId)
from common.proto.policy_pb2 import (PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Service import json_service_id
from common.tools.object_factory.Slice import json_slice_id
from common.tools.object_factory.Topology import json_topology_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.type_checkers.Assertions import (
    validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
    validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
    validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
    validate_topology_ids)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database.Tools import (
    FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.ContextService import ContextService
#from context.service._old_code.Populate import populate
#from context.service.rest_server.RestServer import RestServer
#from context.service.rest_server.Resources import RESOURCES
from requests import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from context.service.database._Base import _Base
from common.Settings import get_setting
from context.service.Engine import Engine
from context.service.database._Base import rebuild_database
from .Objects import (
    CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
    DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
    LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
    SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, TOPOLOGY, TOPOLOGY_ID,
    POLICY_RULE, POLICY_RULE_ID, POLICY_RULE_UUID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + int(get_service_port_grpc(ServiceNameEnum.CONTEXT))   # avoid privileged ports
HTTP_PORT = 10000 + int(get_service_port_http(ServiceNameEnum.CONTEXT))   # avoid privileged ports
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST
#DEFAULT_REDIS_SERVICE_PORT = 6379
#DEFAULT_REDIS_DATABASE_ID  = 0
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#REDIS_CONFIG = {
#    'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
#    'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
#    'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID',  DEFAULT_REDIS_DATABASE_ID ),
#}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#SCENARIOS = [
#    ('db:cockroach_mb:inmemory', None, {}, None, {}),
#    ('all_inmemory', DatabaseBackendEnum.INMEMORY, {},           MessageBrokerBackendEnum.INMEMORY, {}          )
#    ('all_redis',    DatabaseBackendEnum.REDIS,    REDIS_CONFIG, MessageBrokerBackendEnum.REDIS,    REDIS_CONFIG),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
@pytest.fixture(scope='session')
def context_db_mb(request) -> Tuple[Session, MessageBroker]:
    #name,db_session,mb_backend,mb_settings = request.param
    #msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...'
    #LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _db_engine = Engine().get_engine()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _msg_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
    yield _db_engine, _msg_broker
    _msg_broker.terminate()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
    _service = ContextService(context_db_mb[0], context_db_mb[1])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _service.start()
    yield _service
    _service.stop()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

#@pytest.fixture(scope='session')
#def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
#    database = context_db_mb[0]
#    _rest_server = RestServer()
#    for endpoint_name, resource_class, resource_url in RESOURCES:
#        _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
#    _rest_server.start()
#    time.sleep(1) # bring time for the server to start
#    yield _rest_server
#    _rest_server.shutdown()
#    _rest_server.join()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    _client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    yield _client
    _client.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#def do_rest_request(url : str):
#    base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT)
#    request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url)
#    LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
#    reply = requests.get(request_url)
#    LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
#    assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
#    return reply.json()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------

def test_grpc_context(
    context_client_grpc : ContextClient,                            # pylint: disable=redefined-outer-name
    context_db_mb : Tuple[sqlalchemy.engine.Engine, MessageBroker]  # pylint: disable=redefined-outer-name
) -> None:
    db_engine = context_db_mb[0]
    # ----- Clean the database -----------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    rebuild_database(db_engine, drop_if_exists=True)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #events_collector = EventsCollector(
    #    context_client_grpc, log_events_received=True,
    #    activate_context_collector = True, activate_topology_collector = False, activate_device_collector = False,
    #    activate_link_collector = False, activate_service_collector = False, activate_slice_collector = False,
    #    activate_connection_collector = False)
    #events_collector.start()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    # ----- Get when the object does not exist -------------------------------------------------------------------------
    with pytest.raises(grpc.RpcError) as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.code() == grpc.StatusCode.NOT_FOUND
    assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)

    # ----- List when the object does not exist ------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContextIds(Empty())
    assert len(response.context_ids) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.ListContexts(Empty())
    assert len(response.contexts) == 0

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Dump state of database before create the object ------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    #db_entries = database.dump_all()
    #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
    #for db_entry in db_entries:
    #    LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
    #LOGGER.info('-----------------------------------------------------------')
    #assert len(db_entries) == 0
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # ----- Create the object ------------------------------------------------------------------------------------------
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    response = context_client_grpc.SetContext(Context(**CONTEXT))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    wrong_context_uuid = str(uuid.uuid4())
    wrong_context_id = json_context_id(wrong_context_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    with pytest.raises(grpc.RpcError) as e:
        WRONG_CONTEXT = copy.deepcopy(CONTEXT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        WRONG_CONTEXT['topology_ids'].append(json_topology_id(str(uuid.uuid4()), context_id=wrong_context_id))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg = 'request.topology_ids[0].context_id.context_uuid.uuid({}) is invalid; '\
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
          'should be == request.context_id.context_uuid.uuid({})'.format(wrong_context_uuid, DEFAULT_CONTEXT_UUID)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    assert e.value.details() == msg

    with pytest.raises(grpc.RpcError) as e:
        WRONG_CONTEXT = copy.deepcopy(CONTEXT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        WRONG_CONTEXT['service_ids'].append(json_service_id(str(uuid.uuid4()), context_id=wrong_context_id))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg = 'request.service_ids[0].context_id.context_uuid.uuid({}) is invalid; '\
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
          'should be == request.context_id.context_uuid.uuid({})'.format(wrong_context_uuid, DEFAULT_CONTEXT_UUID)
    assert e.value.details() == msg

    with pytest.raises(grpc.RpcError) as e:
Loading
Loading full blame...