Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-lines
import copy, grpc, logging, os, pytest, requests, time, urllib
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum
from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name, get_service_baseurl_http, get_service_port_grpc, get_service_port_http
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
validate_topology_ids)
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId,
DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId,
ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId)
from context.service.database.Tools import (
FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
from context.service.rest_server.RestServer import RestServer
from context.service.rest_server.Resources import RESOURCES
from .Objects import (
CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, TOPOLOGY, TOPOLOGY_ID)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.CONTEXT) # avoid privileged ports
HTTP_PORT = 10000 + get_service_port_http(ServiceNameEnum.CONTEXT) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT.value, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID = 0
'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ),
('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG),
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
name,db_backend,db_settings,mb_backend,mb_settings = request.param
msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
_database = Database(get_database_backend(backend=db_backend, **db_settings))
_message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
yield _database, _message_broker
_message_broker.terminate()
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
_service = ContextService(context_db_mb[0], context_db_mb[1])
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database = context_db_mb[0]
for endpoint_name, resource_class, resource_url in RESOURCES:
_rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT)
request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url)
LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
reply = requests.get(request_url)
LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
return reply.json()
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------
def test_grpc_context(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListContextIds(Empty())
assert len(response.context_ids) == 0
response = context_client_grpc.ListContexts(Empty())
assert len(response.contexts) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
with pytest.raises(grpc.RpcError) as e:
WRONG_TOPOLOGY_ID = copy.deepcopy(TOPOLOGY_ID)
WRONG_TOPOLOGY_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['topology_ids'].append(WRONG_TOPOLOGY_ID)
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.topology_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
'should be == request.context_id.context_uuid.uuid(admin)'
assert e.value.details() == msg
with pytest.raises(grpc.RpcError) as e:
WRONG_SERVICE_ID = copy.deepcopy(SERVICE_R1_R2_ID)
WRONG_SERVICE_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['service_ids'].append(WRONG_SERVICE_ID)
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.service_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
'should be == request.context_id.context_uuid.uuid(admin)'
assert e.value.details() == msg
# ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ContextEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ContextEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
Loading
Loading full blame…