Newer
Older
import copy, grpc, logging, os, pytest, requests, threading, time, urllib
from queue import Queue, Empty
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.type_checkers.Assertions import (
validate_context, validate_context_ids, validate_contexts, validate_device, validate_device_ids, validate_devices,
validate_link, validate_link_ids, validate_links, validate_service, validate_service_ids, validate_services,
validate_topologies, validate_topology, validate_topology_ids)
from context.Config import (
GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL)
from context.client.ContextClient import ContextClient
Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, DeviceOperationalStatusEnum, Empty,
EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId, ServiceStatusEnum, ServiceTypeEnum,
Topology, TopologyEvent, TopologyId)
from context.service.database.Tools import (
FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
from context.service.Populate import populate
from context.service.rest_server.Server import Server as RestServer
from context.service.rest_server.Resources import RESOURCES
CONTEXT, CONTEXT_ID, DEVICE1, DEVICE1_ID, DEVICE1_UUID, DEVICE2, DEVICE2_ID, DEVICE2_UUID, LINK_DEV1_DEV2,
LINK_DEV1_DEV2_ID, LINK_DEV1_DEV2_UUID, SERVICE_DEV1_DEV2, SERVICE_DEV1_DEV2_ID, SERVICE_DEV1_DEV2_UUID, TOPOLOGY,
TOPOLOGY_ID)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
GRPC_PORT = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID = 0
'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ),
('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG),
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
name,db_backend,db_settings,mb_backend,mb_settings = request.param
msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
_database = Database(get_database_backend(backend=db_backend, **db_settings))
_message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
yield _database, _message_broker
_message_broker.terminate()
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_db_mb[0], context_db_mb[1], port=GRPC_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database = context_db_mb[0]
_rest_server = RestServer(port=RESTAPI_PORT, base_url=RESTAPI_BASE_URL)
for endpoint_name, resource_class, resource_url in RESOURCES:
_rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient(address='127.0.0.1', port=GRPC_PORT)
def do_rest_request(url : str):
request_url = 'http://127.0.0.1:{:s}{:s}{:s}'.format(str(RESTAPI_PORT), str(RESTAPI_BASE_URL), url)
LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
reply = requests.get(request_url)
LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
return reply.json()
def __init__(self, context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name
self._context_stream = context_client_grpc.GetContextEvents(Empty())
self._topology_stream = context_client_grpc.GetTopologyEvents(Empty())
self._device_stream = context_client_grpc.GetDeviceEvents(Empty())
self._link_stream = context_client_grpc.GetLinkEvents(Empty())
self._service_stream = context_client_grpc.GetServiceEvents(Empty())
self._context_thread = threading.Thread(target=self._collect, args=(self._context_stream ,), daemon=False)
self._topology_thread = threading.Thread(target=self._collect, args=(self._topology_stream,), daemon=False)
self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False)
self._link_thread = threading.Thread(target=self._collect, args=(self._link_stream ,), daemon=False)
self._service_thread = threading.Thread(target=self._collect, args=(self._service_stream ,), daemon=False)
def _collect(self, events_stream) -> None:
try:
for event in events_stream:
self._events_queue.put_nowait(event)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
raise # pragma: no cover
def start(self):
self._context_thread.start()
self._topology_thread.start()
self._device_thread.start()
self._link_thread.start()
self._service_thread.start()
def get_event(self, block : bool = True, timeout : float = 0.1):
return self._events_queue.get(block=block, timeout=timeout)
def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None):
events = []
if count is None:
while True:
try:
events.append(self.get_event(block=block, timeout=timeout))
except Empty: # pylint: disable=catching-non-exception
break
else:
for _ in range(count):
try:
events.append(self.get_event(block=block, timeout=timeout))
except Empty: # pylint: disable=catching-non-exception
pass
return sorted(events, key=lambda e: e.event.timestamp)
def stop(self):
self._context_stream.cancel()
self._topology_stream.cancel()
self._device_stream.cancel()
self._link_stream.cancel()
self._service_stream.cancel()
self._context_thread.join()
self._topology_thread.join()
self._device_thread.join()
self._link_thread.join()
self._service_thread.join()
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------
def test_grpc_context(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListContextIds(Empty())
assert len(response.context_ids) == 0
response = context_client_grpc.ListContexts(Empty())
assert len(response.contexts) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
with pytest.raises(grpc.RpcError) as e:
WRONG_TOPOLOGY_ID = copy.deepcopy(TOPOLOGY_ID)
WRONG_TOPOLOGY_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['topology_ids'].append(WRONG_TOPOLOGY_ID)
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.topology_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
'should be == request.context_id.context_uuid.uuid(admin)'
assert e.value.details() == msg
with pytest.raises(grpc.RpcError) as e:
WRONG_SERVICE_ID = copy.deepcopy(SERVICE_DEV1_DEV2_ID)
WRONG_SERVICE_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['service_ids'].append(WRONG_SERVICE_ID)
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.service_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
'should be == request.context_id.context_uuid.uuid(admin)'
assert e.value.details() == msg
# ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ContextEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ContextEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 2
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert len(response.topology_ids) == 0
assert len(response.service_ids) == 0
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListContextIds(Empty())
assert len(response.context_ids) == 1
assert response.context_ids[0].context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.ListContexts(Empty())
assert len(response.contexts) == 1
assert response.contexts[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert len(response.contexts[0].topology_ids) == 0
assert len(response.contexts[0].service_ids) == 0
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ContextEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
event = events_collector.get_event(block=True)
assert isinstance(event, ContextEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Topology({:s}/{:s}) not found'.format(DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListTopologyIds(ContextId(**CONTEXT_ID))
response = context_client_grpc.ListTopologies(ContextId(**CONTEXT_ID))
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
CONTEXT_WITH_TOPOLOGY = copy.deepcopy(CONTEXT)
CONTEXT_WITH_TOPOLOGY['topology_ids'].append(TOPOLOGY_ID)
response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_TOPOLOGY))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Check create event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=2)
assert isinstance(events[0], TopologyEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[1], ContextEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, TopologyEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert event.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 5
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
assert response.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert len(response.device_ids) == 0
assert len(response.link_ids) == 0
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListTopologyIds(ContextId(**CONTEXT_ID))
assert len(response.topology_ids) == 1
assert response.topology_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_ids[0].topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
response = context_client_grpc.ListTopologies(ContextId(**CONTEXT_ID))
assert len(response.topologies) == 1
assert response.topologies[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topologies[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert len(response.topologies[0].device_ids) == 0
assert len(response.topologies[0].link_ids) == 0
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID))
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=2)
assert isinstance(events[0], TopologyEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[0].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[0].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[1], ContextEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
events = events_collector.get_events(block=True, count=2)
assert isinstance(events[0], ContextEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetDevice(DeviceId(**DEVICE1_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Device({:s}) not found'.format(DEVICE1_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListDeviceIds(Empty())
response = context_client_grpc.ListDevices(Empty())
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 5
# ----- Create the object ------------------------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
WRONG_DEVICE = copy.deepcopy(DEVICE1)
WRONG_DEVICE['device_endpoints'][0]['endpoint_id']['device_id']['device_uuid']['uuid'] = 'wrong-device-uuid'
context_client_grpc.SetDevice(Device(**WRONG_DEVICE))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.device_endpoints[0].device_id.device_uuid.uuid(wrong-device-uuid) is invalid; '\
'should be == request.device_id.device_uuid.uuid(DEV1)'
assert e.value.details() == msg
response = context_client_grpc.SetDevice(Device(**DEVICE1))
# ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.device_id.device_uuid.uuid == DEVICE1_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetDevice(Device(**DEVICE1))
assert response.device_uuid.uuid == DEVICE1_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.device_id.device_uuid.uuid == DEVICE1_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 41
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetDevice(DeviceId(**DEVICE1_ID))
assert response.device_id.device_uuid.uuid == DEVICE1_UUID
assert response.device_type == 'packet-router'
assert len(response.device_config.config_rules) == 3
assert response.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
assert len(response.device_drivers) == 2
assert len(response.device_endpoints) == 3
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListDeviceIds(Empty())
assert len(response.device_ids) == 1
assert response.device_ids[0].device_uuid.uuid == DEVICE1_UUID
response = context_client_grpc.ListDevices(Empty())
assert len(response.devices) == 1
assert response.devices[0].device_id.device_uuid.uuid == DEVICE1_UUID
assert response.devices[0].device_type == 'packet-router'
assert len(response.devices[0].device_config.config_rules) == 3
assert response.devices[0].device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
assert len(response.devices[0].device_drivers) == 2
assert len(response.devices[0].device_endpoints) == 3
# ----- Create object relation -------------------------------------------------------------------------------------
TOPOLOGY_WITH_DEVICE = copy.deepcopy(TOPOLOGY)
TOPOLOGY_WITH_DEVICE['device_ids'].append(DEVICE1_ID)
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY_WITH_DEVICE))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, TopologyEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check relation was created ---------------------------------------------------------------------------------
response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
assert response.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert len(response.device_ids) == 1
assert response.device_ids[0].device_uuid.uuid == DEVICE1_UUID
assert len(response.link_ids) == 0
# ----- Dump state of database after creating the object relation --------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 41
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveDevice(DeviceId(**DEVICE1_ID))
context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID))
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=3)
assert isinstance(events[0], DeviceEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[0].device_id.device_uuid.uuid == DEVICE1_UUID
assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[2], ContextEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[2].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE1))
assert response.device_uuid.uuid == DEVICE1_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE2))
assert response.device_uuid.uuid == DEVICE2_UUID
events = events_collector.get_events(block=True, count=4)
assert isinstance(events[0], ContextEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[2], DeviceEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[2].device_id.device_uuid.uuid == DEVICE1_UUID
assert isinstance(events[3], DeviceEvent)
assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[3].device_id.device_uuid.uuid == DEVICE2_UUID
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetLink(LinkId(**LINK_DEV1_DEV2_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Link({:s}) not found'.format(LINK_DEV1_DEV2_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListLinkIds(Empty())
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 69
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetLink(Link(**LINK_DEV1_DEV2))
assert response.link_uuid.uuid == LINK_DEV1_DEV2_UUID
# ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, LinkEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.link_id.link_uuid.uuid == LINK_DEV1_DEV2_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetLink(Link(**LINK_DEV1_DEV2))
assert response.link_uuid.uuid == LINK_DEV1_DEV2_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, LinkEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.link_id.link_uuid.uuid == LINK_DEV1_DEV2_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 77
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetLink(LinkId(**LINK_DEV1_DEV2_ID))
assert response.link_id.link_uuid.uuid == LINK_DEV1_DEV2_UUID
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListLinkIds(Empty())
assert response.link_ids[0].link_uuid.uuid == LINK_DEV1_DEV2_UUID
assert response.links[0].link_id.link_uuid.uuid == LINK_DEV1_DEV2_UUID
assert len(response.links[0].link_endpoint_ids) == 2
# ----- Create object relation -------------------------------------------------------------------------------------
TOPOLOGY_WITH_LINK['link_ids'].append(LINK_DEV1_DEV2_ID)
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY_WITH_LINK))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, TopologyEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check relation was created ---------------------------------------------------------------------------------
response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
assert response.topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert len(response.device_ids) == 2
assert response.device_ids[0].device_uuid.uuid == DEVICE1_UUID
assert response.device_ids[1].device_uuid.uuid == DEVICE2_UUID
assert len(response.link_ids) == 1
assert response.link_ids[0].link_uuid.uuid == LINK_DEV1_DEV2_UUID
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 77
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveLink(LinkId(**LINK_DEV1_DEV2_ID))
context_client_grpc.RemoveDevice(DeviceId(**DEVICE1_ID))
context_client_grpc.RemoveDevice(DeviceId(**DEVICE2_ID))
context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID))
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=5)
assert isinstance(events[0], LinkEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[0].link_id.link_uuid.uuid == LINK_DEV1_DEV2_UUID
assert isinstance(events[1], DeviceEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[1].device_id.device_uuid.uuid == DEVICE1_UUID
assert isinstance(events[2], DeviceEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[2].device_id.device_uuid.uuid == DEVICE2_UUID
assert isinstance(events[3], TopologyEvent)
assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[3].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[3].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[4], ContextEvent)
assert events[4].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[4].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Prepare dependencies for the test and capture related events -----------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
response = context_client_grpc.SetTopology(Topology(**TOPOLOGY))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE1))
assert response.device_uuid.uuid == DEVICE1_UUID
response = context_client_grpc.SetDevice(Device(**DEVICE2))
assert response.device_uuid.uuid == DEVICE2_UUID
events = events_collector.get_events(block=True, count=4)
assert isinstance(events[0], ContextEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[2], DeviceEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[2].device_id.device_uuid.uuid == DEVICE1_UUID
assert isinstance(events[3], DeviceEvent)
assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[3].device_id.device_uuid.uuid == DEVICE2_UUID
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetService(ServiceId(**SERVICE_DEV1_DEV2_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Service({:s}/{:s}) not found'.format(DEFAULT_CONTEXT_UUID, SERVICE_DEV1_DEV2_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListServiceIds(ContextId(**CONTEXT_ID))
response = context_client_grpc.ListServices(ContextId(**CONTEXT_ID))
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 69
# ----- Create the object ------------------------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
WRONG_SERVICE = copy.deepcopy(SERVICE_DEV1_DEV2)
WRONG_SERVICE['service_endpoint_ids'][0]\
['topology_id']['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
context_client_grpc.SetService(Service(**WRONG_SERVICE))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.service_endpoint_ids[0].topology_id.context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
'should be == request.service_id.context_id.context_uuid.uuid(admin)'
assert e.value.details() == msg
response = context_client_grpc.SetService(Service(**SERVICE_DEV1_DEV2))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_uuid.uuid == SERVICE_DEV1_DEV2_UUID
CONTEXT_WITH_SERVICE = copy.deepcopy(CONTEXT)
CONTEXT_WITH_SERVICE['service_ids'].append(SERVICE_DEV1_DEV2_ID)
response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_SERVICE))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Check create event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=2)
assert isinstance(events[0], ServiceEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert events[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[0].service_id.service_uuid.uuid == SERVICE_DEV1_DEV2_UUID
assert isinstance(events[1], ContextEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert events[1].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetService(Service(**SERVICE_DEV1_DEV2))
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_uuid.uuid == SERVICE_DEV1_DEV2_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, ServiceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert event.service_id.service_uuid.uuid == SERVICE_DEV1_DEV2_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 86
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetService(ServiceId(**SERVICE_DEV1_DEV2_ID))
assert response.service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_id.service_uuid.uuid == SERVICE_DEV1_DEV2_UUID
assert response.service_type == ServiceTypeEnum.SERVICETYPE_L3NM
assert len(response.service_endpoint_ids) == 2
assert len(response.service_constraints) == 2
assert response.service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE
assert len(response.service_config.config_rules) == 3
# ----- List when the object exists --------------------------------------------------------------------------------
response = context_client_grpc.ListServiceIds(ContextId(**CONTEXT_ID))
assert len(response.service_ids) == 1
assert response.service_ids[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.service_ids[0].service_uuid.uuid == SERVICE_DEV1_DEV2_UUID
response = context_client_grpc.ListServices(ContextId(**CONTEXT_ID))
assert len(response.services) == 1
assert response.services[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.services[0].service_id.service_uuid.uuid == SERVICE_DEV1_DEV2_UUID
assert response.services[0].service_type == ServiceTypeEnum.SERVICETYPE_L3NM
assert len(response.services[0].service_endpoint_ids) == 2
assert len(response.services[0].service_constraints) == 2
assert response.services[0].service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE
assert len(response.services[0].service_config.config_rules) == 3
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveService(ServiceId(**SERVICE_DEV1_DEV2_ID))
context_client_grpc.RemoveDevice(DeviceId(**DEVICE1_ID))
context_client_grpc.RemoveDevice(DeviceId(**DEVICE2_ID))
context_client_grpc.RemoveTopology(TopologyId(**TOPOLOGY_ID))
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=5)
assert isinstance(events[0], ServiceEvent)
assert events[0].service_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[0].service_id.service_uuid.uuid == SERVICE_DEV1_DEV2_UUID
assert isinstance(events[1], DeviceEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[1].device_id.device_uuid.uuid == DEVICE1_UUID
assert isinstance(events[2], DeviceEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[2].device_id.device_uuid.uuid == DEVICE2_UUID
assert isinstance(events[3], TopologyEvent)
assert events[3].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[3].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[3].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[4], ContextEvent)
assert events[4].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[4].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# ----- Test REST API methods ------------------------------------------------------------------------------------------
def test_rest_populate_database(
context_db_mb : Tuple[Database, MessageBroker], # pylint: disable=redefined-outer-name
context_service_grpc : ContextService # pylint: disable=redefined-outer-name
):
database = context_db_mb[0]
database.clear_all()
populate('127.0.0.1', GRPC_PORT)
def test_rest_get_context_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_request('/context_ids')
validate_context_ids(reply)
def test_rest_get_contexts(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
reply = do_rest_request('/contexts')
validate_contexts(reply)
def test_rest_get_context(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
context_uuid = urllib.parse.quote('admin')
reply = do_rest_request('/context/{:s}'.format(context_uuid))
validate_context(reply)
def test_rest_get_topology_ids(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
context_uuid = urllib.parse.quote('admin')
reply = do_rest_request('/context/{:s}/topology_ids'.format(context_uuid))
validate_topology_ids(reply)
def test_rest_get_topologies(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
context_uuid = urllib.parse.quote('admin')
reply = do_rest_request('/context/{:s}/topologies'.format(context_uuid))
validate_topologies(reply)
def test_rest_get_topology(context_service_rest : RestServer): # pylint: disable=redefined-outer-name
context_uuid = urllib.parse.quote('admin')
topology_uuid = urllib.parse.quote('admin')
reply = do_rest_request('/context/{:s}/topology/{:s}'.format(context_uuid, topology_uuid))
validate_topology(reply, num_devices=3, num_links=3)