Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-lines
import copy, grpc, logging, os, pytest, requests, sqlalchemy, time, urllib, uuid
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
get_service_baseurl_http, get_service_port_grpc, get_service_port_http)
from context.service.Database import Database
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId,
DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId,
ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId)
from common.proto.policy_pb2 import (PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule)
from common.tools.object_factory.Context import json_context, json_context_id
from common.tools.object_factory.Service import json_service_id
from common.tools.object_factory.Slice import json_slice_id
from common.tools.object_factory.Topology import json_topology_id
validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
validate_topology_ids)
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from context.service.database.Tools import (
FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.ContextService import ContextService
#from context.service._old_code.Populate import populate
#from context.service.rest_server.RestServer import RestServer
#from context.service.rest_server.Resources import RESOURCES
from requests import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from context.service.database._Base import _Base
from common.Settings import get_setting
from context.service.Engine import Engine
from context.service.database._Base import rebuild_database
from .Objects import (
CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, TOPOLOGY, TOPOLOGY_ID,
POLICY_RULE, POLICY_RULE_ID, POLICY_RULE_UUID)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
GRPC_PORT = 10000 + int(get_service_port_grpc(ServiceNameEnum.CONTEXT)) # avoid privileged ports
HTTP_PORT = 10000 + int(get_service_port_http(ServiceNameEnum.CONTEXT)) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
#DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST
#DEFAULT_REDIS_SERVICE_PORT = 6379
#DEFAULT_REDIS_DATABASE_ID = 0
#REDIS_CONFIG = {
# 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
# 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
# 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ),
#}
#SCENARIOS = [
# ('db:cockroach_mb:inmemory', None, {}, None, {}),
# ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} )
# ('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG),
#@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
@pytest.fixture(scope='session')
def context_db_mb(request) -> Tuple[Session, MessageBroker]:
#name,db_session,mb_backend,mb_settings = request.param
#msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...'
#LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings)))
_msg_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
yield _db_engine, _msg_broker
_msg_broker.terminate()
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
_service = ContextService(context_db_mb[0], context_db_mb[1])
_service.start()
yield _service
_service.stop()
#@pytest.fixture(scope='session')
#def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
# database = context_db_mb[0]
# _rest_server = RestServer()
# for endpoint_name, resource_class, resource_url in RESOURCES:
# _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
# _rest_server.start()
# time.sleep(1) # bring time for the server to start
# yield _rest_server
# _rest_server.shutdown()
# _rest_server.join()
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
#def do_rest_request(url : str):
# base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT)
# request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url)
# LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
# reply = requests.get(request_url)
# LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
# assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
# return reply.json()
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------
def test_grpc_context(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[sqlalchemy.engine.Engine, MessageBroker] # pylint: disable=redefined-outer-name
) -> None:
db_engine = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
rebuild_database(db_engine, drop_if_exists=True)
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
#events_collector = EventsCollector(
# context_client_grpc, log_events_received=True,
# activate_context_collector = True, activate_topology_collector = False, activate_device_collector = False,
# activate_link_collector = False, activate_service_collector = False, activate_slice_collector = False,
# activate_connection_collector = False)
#events_collector.start()
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListContextIds(Empty())
assert len(response.context_ids) == 0
response = context_client_grpc.ListContexts(Empty())
assert len(response.contexts) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
#db_entries = database.dump_all()
#LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
#for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
#LOGGER.info('-----------------------------------------------------------')
#assert len(db_entries) == 0
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
wrong_context_uuid = str(uuid.uuid4())
wrong_context_id = json_context_id(wrong_context_uuid)
with pytest.raises(grpc.RpcError) as e:
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['topology_ids'].append(json_topology_id(str(uuid.uuid4()), context_id=wrong_context_id))
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.topology_ids[0].context_id.context_uuid.uuid({}) is invalid; '\
'should be == request.context_id.context_uuid.uuid({})'.format(wrong_context_uuid, DEFAULT_CONTEXT_UUID)
assert e.value.details() == msg
with pytest.raises(grpc.RpcError) as e:
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['service_ids'].append(json_service_id(str(uuid.uuid4()), context_id=wrong_context_id))
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.service_ids[0].context_id.context_uuid.uuid({}) is invalid; '\
'should be == request.context_id.context_uuid.uuid({})'.format(wrong_context_uuid, DEFAULT_CONTEXT_UUID)
assert e.value.details() == msg
with pytest.raises(grpc.RpcError) as e:
Loading
Loading full blame...