Newer
Older
import copy, grpc, logging, os, pytest, requests, threading, time, urllib
from queue import Queue, Empty
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.type_checkers.Assertions import (
validate_context, validate_context_ids, validate_contexts, validate_device, validate_device_ids, validate_devices,
validate_link, validate_link_ids, validate_links, validate_service, validate_service_ids, validate_services,
validate_topologies, validate_topology, validate_topology_ids)
from context.Config import (
GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL)
from context.client.ContextClient import ContextClient
Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, DeviceOperationalStatusEnum, Empty,
EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId, ServiceStatusEnum, ServiceTypeEnum,
Topology, TopologyEvent, TopologyId)
from context.service.database.Tools import (
FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
from context.service.Populate import populate
from context.service.rest_server.Server import Server as RestServer
from context.service.rest_server.Resources import RESOURCES
CONTEXT, CONTEXT_ID, DEVICE1, DEVICE1_ID, DEVICE1_UUID, DEVICE2, DEVICE2_ID, DEVICE2_UUID, LINK_DEV1_DEV2,
LINK_DEV1_DEV2_ID, LINK_DEV1_DEV2_UUID, SERVICE_DEV1_DEV2, SERVICE_DEV1_DEV2_ID, SERVICE_DEV1_DEV2_UUID, TOPOLOGY,
TOPOLOGY_ID)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
GRPC_PORT = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID = 0
'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ),
('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG),
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
name,db_backend,db_settings,mb_backend,mb_settings = request.param
msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
_database = Database(get_database_backend(backend=db_backend, **db_settings))
_message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
yield _database, _message_broker
_message_broker.terminate()
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_db_mb[0], context_db_mb[1], port=GRPC_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database = context_db_mb[0]
_rest_server = RestServer(port=RESTAPI_PORT, base_url=RESTAPI_BASE_URL)
for endpoint_name, resource_class, resource_url in RESOURCES:
_rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient(address='127.0.0.1', port=GRPC_PORT)
def do_rest_request(url : str):
request_url = 'http://127.0.0.1:{:s}{:s}{:s}'.format(str(RESTAPI_PORT), str(RESTAPI_BASE_URL), url)
LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
reply = requests.get(request_url)
LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
return reply.json()
def __init__(self, context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name
self._context_stream = context_client_grpc.GetContextEvents(Empty())
self._topology_stream = context_client_grpc.GetTopologyEvents(Empty())
self._device_stream = context_client_grpc.GetDeviceEvents(Empty())
self._link_stream = context_client_grpc.GetLinkEvents(Empty())
self._service_stream = context_client_grpc.GetServiceEvents(Empty())
self._context_thread = threading.Thread(target=self._collect, args=(self._context_stream ,), daemon=False)
self._topology_thread = threading.Thread(target=self._collect, args=(self._topology_stream,), daemon=False)
self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False)
self._link_thread = threading.Thread(target=self._collect, args=(self._link_stream ,), daemon=False)
self._service_thread = threading.Thread(target=self._collect, args=(self._service_stream ,), daemon=False)
def _collect(self, events_stream) -> None:
try:
for event in events_stream:
self._events_queue.put_nowait(event)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
raise # pragma: no cover
def start(self):
self._context_thread.start()
self._topology_thread.start()
self._device_thread.start()
self._link_thread.start()
self._service_thread.start()
def get_event(self, block : bool = True, timeout : float = 0.1):
return self._events_queue.get(block=block, timeout=timeout)
def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None):
events = []
if count is None:
while True:
try:
events.append(self.get_event(block=block, timeout=timeout))
except Empty: # pylint: disable=catching-non-exception
break
else:
for _ in range(count):
try:
events.append(self.get_event(block=block, timeout=timeout))
except Empty: # pylint: disable=catching-non-exception
pass
return sorted(events, key=lambda e: e.event.timestamp)
def stop(self):
self._context_stream.cancel()
self._topology_stream.cancel()
self._device_stream.cancel()
self._link_stream.cancel()
self._service_stream.cancel()
self._context_thread.join()
self._topology_thread.join()
self._device_thread.join()
self._link_thread.join()
self._service_thread.join()
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------
def test_grpc_context(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListContextIds(Empty())
assert len(response.context_ids) == 0
response = context_client_grpc.ListContexts(Empty())
assert len(response.contexts) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
Loading
Loading full blame…