Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-lines
import copy, grpc, logging, os, pytest, requests, time, urllib
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
get_service_baseurl_http, get_service_port_grpc, get_service_port_http)
from context.service.Database import Database
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
Connection, ConnectionEvent, ConnectionId, Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId,
DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId,
ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId)
validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids,
validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids,
validate_links, validate_service, validate_service_ids, validate_services, validate_topologies, validate_topology,
validate_topology_ids)
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from context.service.database.Tools import (
FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
from context.service.rest_server.RestServer import RestServer
from context.service.rest_server.Resources import RESOURCES
from requests import Session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from context.service.database.ContextModel import ContextModel
from context.service.database.TopologyModel import TopologyModel
from context.service.database.Base import Base
from .Objects import (
CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID,
DEVICE_R1_UUID, DEVICE_R2, DEVICE_R2_ID, DEVICE_R2_UUID, DEVICE_R3, DEVICE_R3_ID, DEVICE_R3_UUID, LINK_R1_R2,
LINK_R1_R2_ID, LINK_R1_R2_UUID, SERVICE_R1_R2, SERVICE_R1_R2_ID, SERVICE_R1_R2_UUID, SERVICE_R1_R3,
SERVICE_R1_R3_ID, SERVICE_R1_R3_UUID, SERVICE_R2_R3, SERVICE_R2_R3_ID, SERVICE_R2_R3_UUID, TOPOLOGY, TOPOLOGY_ID)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
GRPC_PORT = 10000 + int(get_service_port_grpc(ServiceNameEnum.CONTEXT)) # avoid privileged ports
HTTP_PORT = 10000 + int(get_service_port_http(ServiceNameEnum.CONTEXT)) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID = 0
'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ),
('all_sqlalchemy', {}, MessageBrokerBackendEnum.INMEMORY, {} ),
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_s_mb(request) -> Tuple[Session, MessageBroker]:
name,db_session,mb_backend,mb_settings = request.param
msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...'
LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings)))
db_uri = 'cockroachdb://root@10.152.183.111:26257/defaultdb?sslmode=disable'
LOGGER.debug('Connecting to DB: {}'.format(db_uri))
try:
engine = create_engine(db_uri)
except Exception as e:
LOGGER.error("Failed to connect to database.")
LOGGER.error(f"{e}")
return 1
Base.metadata.create_all(engine)
_session = sessionmaker(bind=engine, expire_on_commit=False)
_message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
yield _session, _message_broker
_message_broker.terminate()
def context_service_grpc(context_s_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
_service = ContextService(context_s_mb[0], context_s_mb[1])
_service.start()
yield _service
_service.stop()
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database = context_db_mb[0]
for endpoint_name, resource_class, resource_url in RESOURCES:
_rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT)
request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url)
LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
reply = requests.get(request_url)
LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
return reply.json()
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------
def test_grpc_context(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_s_mb : Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name
Session = context_s_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
database.clear()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListContextIds(Empty())
assert len(response.context_ids) == 0
response = context_client_grpc.ListContexts(Empty())
assert len(response.contexts) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
wrong_uuid = 'c97c4185-e1d1-4ea7-b6b9-afbf76cb61f4'
with pytest.raises(grpc.RpcError) as e:
WRONG_TOPOLOGY_ID = copy.deepcopy(TOPOLOGY_ID)
WRONG_TOPOLOGY_ID['context_id']['context_uuid']['uuid'] = wrong_uuid
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['topology_ids'].append(WRONG_TOPOLOGY_ID)
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.topology_ids[0].context_id.context_uuid.uuid({}) is invalid; '\
'should be == request.context_id.context_uuid.uuid({})'.format(wrong_uuid, DEFAULT_CONTEXT_UUID)
assert e.value.details() == msg
with pytest.raises(grpc.RpcError) as e:
WRONG_SERVICE_ID = copy.deepcopy(SERVICE_R1_R2_ID)
WRONG_SERVICE_ID['context_id']['context_uuid']['uuid'] = wrong_uuid
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['service_ids'].append(WRONG_SERVICE_ID)
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.service_ids[0].context_id.context_uuid.uuid({}) is invalid; '\
'should be == request.context_id.context_uuid.uuid({})'.format(wrong_uuid, DEFAULT_CONTEXT_UUID)
# ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ContextEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ContextEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert len(response.topology_ids) == 0
assert len(response.service_ids) == 0
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListContextIds(Empty())
assert len(response.context_ids) == 1
assert response.context_ids[0].context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.ListContexts(Empty())
assert len(response.contexts) == 1
assert response.contexts[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert len(response.contexts[0].topology_ids) == 0
assert len(response.contexts[0].service_ids) == 0
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
# event = events_collector.get_event(block=True)
# assert isinstance(event, ContextEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name
context_s_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name
session = context_s_mb[0]
database = Database(session)
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# event = events_collector.get_event(block=True)
# assert isinstance(event, ContextEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
# assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
# assert e.value.details() == 'Topology({:s}/{:s}) not found'.format(DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID)
assert e.value.details() == 'Topology({:s}) not found'.format(DEFAULT_TOPOLOGY_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListTopologyIds(ContextId(**CONTEXT_ID))
response = context_client_grpc.ListTopologies(ContextId(**CONTEXT_ID))
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
CONTEXT_WITH_TOPOLOGY = copy.deepcopy(CONTEXT)
CONTEXT_WITH_TOPOLOGY['topology_ids'].append(TOPOLOGY_ID)
response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_TOPOLOGY))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Check create event -----------------------------------------------------------------------------------------
# events = events_collector.get_events(block=True, count=2)
# assert isinstance(events[0], TopologyEvent)
# assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
# assert events[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# assert events[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# assert isinstance(events[1], ContextEvent)
# assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
# assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
# event = events_collector.get_event(block=True)
# assert isinstance(event, TopologyEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
# assert event.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# assert event.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
assert response.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert len(response.device_ids) == 0
assert len(response.link_ids) == 0
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListTopologyIds(ContextId(**CONTEXT_ID))
assert len(response.topology_ids) == 1
assert response.topology_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_ids[0].topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
response = context_client_grpc.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == 1
assert response.topologies[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topologies[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert len(response.topologies[0].device_ids) == 0
assert len(response.topologies[0].link_ids) == 0
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID))
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
# events = events_collector.get_events(block=True, count=2)
# assert isinstance(events[0], TopologyEvent)
# assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert events[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# assert events[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# assert isinstance(events[1], ContextEvent)
# assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name
context_s_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name
session = context_s_mb[0]
database = Database(session)
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
events = events_collector.get_events(block=True, count=2)
assert isinstance(events[0], ContextEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetDevice(DeviceId(**DEVICE_R1_ID))
assert e.value.details() == 'Device({:s}) not found'.format(DEVICE_R1_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListDeviceIds(Empty())
response = context_client_grpc.ListDevices(Empty())
# ----- Dump state of database before create the object ------------------------------------------------------------
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
# ----- Create the object ------------------------------------------------------------------------------------------
WRONG_DEVICE = copy.deepcopy(DEVICE_R1)
WRONG_DEVICE_UUID = '3f03c76d-31fb-47f5-9c1d-bc6b6bfa2d08'
WRONG_DEVICE['device_endpoints'][0]['endpoint_id']['device_id']['device_uuid']['uuid'] = WRONG_DEVICE_UUID
context_client_grpc.SetDevice(Device(**WRONG_DEVICE))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.device_endpoints[0].device_id.device_uuid.uuid({}) is invalid; '\
'should be == request.device_id.device_uuid.uuid({})'.format(WRONG_DEVICE_UUID, DEVICE_R1_UUID)
response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID
# ----- Check create event -----------------------------------------------------------------------------------------
# event = events_collector.get_event(block=True)
# assert isinstance(event, DeviceEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
# assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
# event = events_collector.get_event(block=True)
# assert isinstance(event, DeviceEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
# assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetDevice(DeviceId(**DEVICE_R1_ID))
assert response.device_id.device_uuid.uuid == DEVICE_R1_UUID
assert response.device_type == 'packet-router'
assert len(response.device_config.config_rules) == 3
assert response.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
assert len(response.device_drivers) == 1
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListDeviceIds(Empty())
assert response.device_ids[0].device_uuid.uuid == DEVICE_R1_UUID
response = context_client_grpc.ListDevices(Empty())
assert response.devices[0].device_id.device_uuid.uuid == DEVICE_R1_UUID
assert response.devices[0].device_type == 'packet-router'
assert len(response.devices[0].device_config.config_rules) == 3
assert response.devices[0].device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
assert len(response.devices[0].device_drivers) == 1
assert len(response.devices[0].device_endpoints) == 3
# ----- Create object relation -------------------------------------------------------------------------------------
TOPOLOGY_WITH_DEVICE['device_ids'].append(DEVICE_R1_ID)
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY_WITH_DEVICE))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
# event = events_collector.get_event(block=True)
# assert isinstance(event, TopologyEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
# assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check relation was created ---------------------------------------------------------------------------------
response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
assert response.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert len(response.device_ids) == 1
assert response.device_ids[0].device_uuid.uuid == DEVICE_R1_UUID
# ----- Dump state of database after creating the object relation --------------------------------------------------
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R1_ID))
context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID))
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
# events = events_collector.get_events(block=True, count=3)
# assert isinstance(events[0], DeviceEvent)
# assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert events[0].device_id.device_uuid.uuid == DEVICE_R1_UUID
# assert isinstance(events[1], TopologyEvent)
# assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# assert isinstance(events[2], ContextEvent)
# assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert events[2].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
# ----- Dump state of database after remove the object -------------------------------------------------------------
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE_R2))
assert response.device_uuid.uuid == DEVICE_R2_UUID
events = events_collector.get_events(block=True, count=4)
assert isinstance(events[0], ContextEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[2], DeviceEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[2].device_id.device_uuid.uuid == DEVICE_R1_UUID
assert isinstance(events[3], DeviceEvent)
assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[3].device_id.device_uuid.uuid == DEVICE_R2_UUID
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetLink(LinkId(**LINK_R1_R2_ID))
assert e.value.details() == 'Link({:s}) not found'.format(LINK_R1_R2_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListLinkIds(Empty())
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 67
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetLink(Link(**LINK_R1_R2))
assert response.link_uuid.uuid == LINK_R1_R2_UUID
# ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, LinkEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.link_id.link_uuid.uuid == LINK_R1_R2_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetLink(Link(**LINK_R1_R2))
assert response.link_uuid.uuid == LINK_R1_R2_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, LinkEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.link_id.link_uuid.uuid == LINK_R1_R2_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 75
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetLink(LinkId(**LINK_R1_R2_ID))
assert response.link_id.link_uuid.uuid == LINK_R1_R2_UUID
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListLinkIds(Empty())
assert response.link_ids[0].link_uuid.uuid == LINK_R1_R2_UUID
assert response.links[0].link_id.link_uuid.uuid == LINK_R1_R2_UUID
assert len(response.links[0].link_endpoint_ids) == 2
# ----- Create object relation -------------------------------------------------------------------------------------
TOPOLOGY_WITH_LINK['link_ids'].append(LINK_R1_R2_ID)
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY_WITH_LINK))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, TopologyEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check relation was created ---------------------------------------------------------------------------------
response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
assert response.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert len(response.device_ids) == 2
assert response.device_ids[0].device_uuid.uuid == DEVICE_R1_UUID
assert response.device_ids[1].device_uuid.uuid == DEVICE_R2_UUID
assert response.link_ids[0].link_uuid.uuid == LINK_R1_R2_UUID
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 75
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveLink(LinkId(**LINK_R1_R2_ID))
context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R1_ID))
context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R2_ID))
context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID))
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=5)
assert isinstance(events[0], LinkEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[0].link_id.link_uuid.uuid == LINK_R1_R2_UUID
assert isinstance(events[1], DeviceEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[1].device_id.device_uuid.uuid == DEVICE_R1_UUID
assert isinstance(events[2], DeviceEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[2].device_id.device_uuid.uuid == DEVICE_R2_UUID
assert isinstance(events[3], TopologyEvent)
assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[3].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[3].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[4], ContextEvent)
assert events[4].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[4].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE_R2))
assert response.device_uuid.uuid == DEVICE_R2_UUID
events = events_collector.get_events(block=True, count=4)
assert isinstance(events[0], ContextEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[2], DeviceEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[2].device_id.device_uuid.uuid == DEVICE_R1_UUID
assert isinstance(events[3], DeviceEvent)
assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[3].device_id.device_uuid.uuid == DEVICE_R2_UUID
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetService(ServiceId(**SERVICE_R1_R2_ID))
assert e.value.details() == 'Service({:s}/{:s}) not found'.format(DEFAULT_CONTEXT_UUID, SERVICE_R1_R2_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListServiceIds(ContextId(**CONTEXT_ID))
response = context_client_grpc.ListServices(ContextId(**CONTEXT_ID))
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 67
# ----- Create the object ------------------------------------------------------------------------------------------
WRONG_SERVICE = copy.deepcopy(SERVICE_R1_R2)
WRONG_SERVICE['service_endpoint_ids'][0]\
['topology_id']['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
context_client_grpc.SetService(Service(**WRONG_SERVICE))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.service_endpoint_ids[0].topology_id.context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
'should be == request.service_id.context_id.context_uuid.uuid({:s})'.format(DEFAULT_CONTEXT_UUID)
response = context_client_grpc.SetService(Service(**SERVICE_R1_R2))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_uuid.uuid == SERVICE_R1_R2_UUID
CONTEXT_WITH_SERVICE['service_ids'].append(SERVICE_R1_R2_ID)
response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_SERVICE))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Check create event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=2)
assert isinstance(events[0], ServiceEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[0].service_id.service_uuid.uuid == SERVICE_R1_R2_UUID
assert isinstance(events[1], ContextEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetService(Service(**SERVICE_R1_R2))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_uuid.uuid == SERVICE_R1_R2_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ServiceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert event.service_id.service_uuid.uuid == SERVICE_R1_R2_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetService(ServiceId(**SERVICE_R1_R2_ID))
assert response.service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_id.service_uuid.uuid == SERVICE_R1_R2_UUID
assert response.service_type == ServiceTypeEnum.SERVICETYPE_L3NM
assert len(response.service_endpoint_ids) == 2
assert len(response.service_constraints) == 2
assert response.service_status.service_status == ServiceStatusEnum.SERVICESTATUS_PLANNED
assert len(response.service_config.config_rules) == 3
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListServiceIds(ContextId(**CONTEXT_ID))
assert len(response.service_ids) == 1
assert response.service_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_ids[0].service_uuid.uuid == SERVICE_R1_R2_UUID
response = context_client_grpc.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 1
assert response.services[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.services[0].service_id.service_uuid.uuid == SERVICE_R1_R2_UUID
assert response.services[0].service_type == ServiceTypeEnum.SERVICETYPE_L3NM
assert len(response.services[0].service_endpoint_ids) == 2
assert len(response.services[0].service_constraints) == 2
assert response.services[0].service_status.service_status == ServiceStatusEnum.SERVICESTATUS_PLANNED
assert len(response.services[0].service_config.config_rules) == 3
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveService(ServiceId(**SERVICE_R1_R2_ID))
context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R1_ID))
context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R2_ID))
context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID))
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=5)
assert isinstance(events[0], ServiceEvent)
assert events[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[0].service_id.service_uuid.uuid == SERVICE_R1_R2_UUID
assert isinstance(events[1], DeviceEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[1].device_id.device_uuid.uuid == DEVICE_R1_UUID
assert isinstance(events[2], DeviceEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[2].device_id.device_uuid.uuid == DEVICE_R2_UUID
assert isinstance(events[3], TopologyEvent)
assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[3].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[3].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[4], ContextEvent)
assert events[4].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[4].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
def test_grpc_connection(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE_R2))
assert response.device_uuid.uuid == DEVICE_R2_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE_R3))
assert response.device_uuid.uuid == DEVICE_R3_UUID
response = context_client_grpc.SetService(Service(**SERVICE_R1_R2))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_uuid.uuid == SERVICE_R1_R2_UUID
CONTEXT_WITH_SERVICE = copy.deepcopy(CONTEXT)
CONTEXT_WITH_SERVICE['service_ids'].append(SERVICE_R1_R2_ID)
response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_SERVICE))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetService(Service(**SERVICE_R2_R3))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_uuid.uuid == SERVICE_R2_R3_UUID
CONTEXT_WITH_SERVICE = copy.deepcopy(CONTEXT)
CONTEXT_WITH_SERVICE['service_ids'].append(SERVICE_R2_R3_ID)
response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_SERVICE))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetService(Service(**SERVICE_R1_R3))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_uuid.uuid == SERVICE_R1_R3_UUID
CONTEXT_WITH_SERVICE = copy.deepcopy(CONTEXT)
CONTEXT_WITH_SERVICE['service_ids'].append(SERVICE_R1_R3_ID)
response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_SERVICE))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
events = events_collector.get_events(block=True, count=11)
assert isinstance(events[0], ContextEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID