Newer
Older
import copy, grpc, logging, os, pytest, requests, threading, time, urllib
from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID
from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.type_checkers.Assertions import (
validate_context, validate_context_ids, validate_contexts, validate_device, validate_device_ids, validate_devices,
validate_link, validate_link_ids, validate_links, validate_service, validate_service_ids, validate_services,
validate_topologies, validate_topology, validate_topology_ids)
from context.Config import (
GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL)
from context.client.ContextClient import ContextClient
Context, ContextEvent, ContextId, Device, DeviceEvent, DeviceId, DeviceOperationalStatusEnum, Empty,
EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId, ServiceStatusEnum, ServiceTypeEnum,
Topology, TopologyEvent, TopologyId)
from context.service.database.Tools import (
FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher)
from context.service.grpc_server.ContextService import ContextService
from context.service.Populate import populate
from context.service.rest_server.Server import Server as RestServer
from context.service.rest_server.Resources import RESOURCES
CONTEXT, CONTEXT_ID, DEVICE1, DEVICE1_ID, DEVICE1_UUID, DEVICE2, DEVICE2_ID, DEVICE2_UUID, LINK_DEV1_DEV2,
LINK_DEV1_DEV2_ID, LINK_DEV1_DEV2_UUID, SERVICE_DEV1_DEV2, SERVICE_DEV1_DEV2_ID, SERVICE_DEV1_DEV2_UUID, TOPOLOGY,
TOPOLOGY_ID)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
GRPC_PORT = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
RESTAPI_PORT = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1'
DEFAULT_REDIS_SERVICE_PORT = 6379
DEFAULT_REDIS_DATABASE_ID = 0
'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST),
'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT),
'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ),
('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ),
('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG),
@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS)
def context_db_mb(request) -> Tuple[Database, MessageBroker]:
name,db_backend,db_settings,mb_backend,mb_settings = request.param
msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...'
LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings)))
_database = Database(get_database_backend(backend=db_backend, **db_settings))
_message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
yield _database, _message_broker
_message_broker.terminate()
def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_db_mb[0], context_db_mb[1], port=GRPC_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
database = context_db_mb[0]
_rest_server = RestServer(port=RESTAPI_PORT, base_url=RESTAPI_BASE_URL)
for endpoint_name, resource_class, resource_url in RESOURCES:
_rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,))
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name
_client = ContextClient(address='127.0.0.1', port=GRPC_PORT)
def do_rest_request(url : str):
request_url = 'http://127.0.0.1:{:s}{:s}{:s}'.format(str(RESTAPI_PORT), str(RESTAPI_BASE_URL), url)
LOGGER.warning('Request: GET {:s}'.format(str(request_url)))
reply = requests.get(request_url)
LOGGER.warning('Reply: {:s}'.format(str(reply.text)))
assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code)
return reply.json()
def __init__(self, context_client_grpc : ContextClient) -> None: # pylint: disable=redefined-outer-name
self._context_stream = context_client_grpc.GetContextEvents(Empty())
self._topology_stream = context_client_grpc.GetTopologyEvents(Empty())
self._device_stream = context_client_grpc.GetDeviceEvents(Empty())
self._link_stream = context_client_grpc.GetLinkEvents(Empty())
self._service_stream = context_client_grpc.GetServiceEvents(Empty())
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
self._context_thread = threading.Thread(target=self._collect, args=(self._context_stream ,), daemon=False)
self._topology_thread = threading.Thread(target=self._collect, args=(self._topology_stream,), daemon=False)
self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False)
self._link_thread = threading.Thread(target=self._collect, args=(self._link_stream ,), daemon=False)
self._service_thread = threading.Thread(target=self._collect, args=(self._service_stream ,), daemon=False)
def _collect(self, events_stream) -> None:
try:
for event in events_stream:
self._events_queue.put_nowait(event)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member
raise # pragma: no cover
def start(self):
self._context_thread.start()
self._topology_thread.start()
self._device_thread.start()
self._link_thread.start()
self._service_thread.start()
def get_event(self, block : bool = True, timeout : float = 0.1):
return self._events_queue.get(block=block, timeout=timeout)
def stop(self):
self._context_stream.cancel()
self._topology_stream.cancel()
self._device_stream.cancel()
self._link_stream.cancel()
self._service_stream.cancel()
self._context_thread.join()
self._topology_thread.join()
self._device_thread.join()
self._link_thread.join()
self._service_thread.join()
# ----- Test gRPC methods ----------------------------------------------------------------------------------------------
def test_grpc_context(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
# ----- Clean the database -----------------------------------------------------------------------------------------
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
events_collector.start()
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
context_client_grpc.GetContext(ContextId(**CONTEXT_ID))
assert e.value.code() == grpc.StatusCode.NOT_FOUND
assert e.value.details() == 'Context({:s}) not found'.format(DEFAULT_CONTEXT_UUID)
# ----- List when the object does not exist ------------------------------------------------------------------------
response = context_client_grpc.ListContextIds(Empty())
assert len(response.context_ids) == 0
response = context_client_grpc.ListContexts(Empty())
assert len(response.contexts) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
# ----- Create the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetContext(Context(**CONTEXT))
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
with pytest.raises(grpc.RpcError) as e:
WRONG_TOPOLOGY_ID = copy.deepcopy(TOPOLOGY_ID)
WRONG_TOPOLOGY_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['topology_ids'].append(WRONG_TOPOLOGY_ID)
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.topology_ids[0].context_id.context_uuid.uuid(wrong-context-uuid) is invalid; '\
'should be == request.context_id.context_uuid.uuid(admin)'
assert e.value.details() == msg
with pytest.raises(grpc.RpcError) as e:
WRONG_SERVICE_ID = copy.deepcopy(SERVICE_DEV1_DEV2_ID)
WRONG_SERVICE_ID['context_id']['context_uuid']['uuid'] = 'wrong-context-uuid'
WRONG_CONTEXT = copy.deepcopy(CONTEXT)
WRONG_CONTEXT['service_ids'].append(WRONG_SERVICE_ID)
context_client_grpc.SetContext(Context(**WRONG_CONTEXT))
Loading
Loading full blame…