import copy, grpc, logging, os, pytest from typing import Tuple from google.protobuf.json_format import MessageToDict from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from common.tests.PytestGenerateTests import pytest_generate_tests # (required) pylint: disable=unused-import from context.Config import ( GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) from context.client.ContextClient import ContextClient from context.proto.context_pb2 import Context, DeviceId, Topology, Device from context.service.grpc_server.ContextService import ContextService from device.Config import ( GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD) from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS from device.tests.MockMonitoringService import MockMonitoringService from monitoring.Config import ( GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as MONITORING_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as MONITORING_GRPC_GRACE_PERIOD) from monitoring.client.monitoring_client import MonitoringClient from service.Config import ( GRPC_SERVICE_PORT as SERVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as SERVICE_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as SERVICE_GRPC_GRACE_PERIOD) from service.client.ServiceClient import ServiceClient from service.proto.context_pb2 import Service, ServiceId from service.service.ServiceService import ServiceService from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from service.service.service_handlers import SERVICE_HANDLERS from .CommonObjects import CONTEXT, TOPOLOGY LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports SERVICE_GRPC_SERVICE_PORT = 10000 + SERVICE_GRPC_SERVICE_PORT # avoid privileged ports MONITORING_GRPC_SERVICE_PORT = 10000 + MONITORING_GRPC_SERVICE_PORT # avoid privileged ports DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1' DEFAULT_REDIS_SERVICE_PORT = 6379 DEFAULT_REDIS_DATABASE_ID = 0 REDIS_CONFIG = { 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST), 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT), 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ), } SCENARIOS = [ ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), #('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG), ] @pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) def context_db_mb(request) -> Tuple[Database, MessageBroker]: name,db_backend,db_settings,mb_backend,mb_settings = request.param msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) _database = Database(get_database_backend(backend=db_backend, **db_settings)) _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name _service = ContextService( context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS, grace_period=CONTEXT_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT) yield _client _client.close() @pytest.fixture(scope='session') def monitoring_service(): _service = MockMonitoringService(port=MONITORING_GRPC_SERVICE_PORT, max_workers=MONITORING_GRPC_MAX_WORKERS, grace_period=MONITORING_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def monitoring_client(monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name _client = MonitoringClient(server='127.0.0.1', port=MONITORING_GRPC_SERVICE_PORT) #yield _client #_client.close() return _client @pytest.fixture(scope='session') def device_service( context_client : ContextClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name _driver_factory = DriverFactory(DRIVERS) _driver_instance_cache = DriverInstanceCache(_driver_factory) _service = DeviceService( context_client, monitoring_client, _driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name _client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT) yield _client _client.close() @pytest.fixture(scope='session') def service_service( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient): # pylint: disable=redefined-outer-name _service_handler_factory = ServiceHandlerFactory(SERVICE_HANDLERS) _service = ServiceService( context_client, device_client, _service_handler_factory, port=SERVICE_GRPC_SERVICE_PORT, max_workers=SERVICE_GRPC_MAX_WORKERS, grace_period=SERVICE_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def service_client(service_service : ServiceService): # pylint: disable=redefined-outer-name _client = ServiceClient(address='127.0.0.1', port=SERVICE_GRPC_SERVICE_PORT) yield _client _client.close() def grpc_message_to_json_string(message): return str(MessageToDict( message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) try: from .ServiceHandlersToTest import SERVICE_HANDLERS_TO_TEST except ImportError: LOGGER.exception('Unable to load service handlers, nothing will be tested.') SERVICE_HANDLERS_TO_TEST = [] class TestServiceHandlers: scenarios = SERVICE_HANDLERS_TO_TEST def test_prepare_environment( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, service_device_uuids, get_device_descriptor, get_connect_rules, context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient): # pylint: disable=redefined-outer-name context_client.SetContext(Context(**CONTEXT)) context_client.SetTopology(Topology(**TOPOLOGY)) for device_uuid in service_device_uuids: device_with_connect_rules = copy.deepcopy(get_device_descriptor(device_uuid)) device_with_connect_rules['device_config']['config_rules'].extend(get_connect_rules(device_uuid)) device_client.AddDevice(Device(**device_with_connect_rules)) def test_service_create_error_cases( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, service_device_uuids, get_device_descriptor, get_connect_rules, service_client : ServiceClient): # pylint: disable=redefined-outer-name with pytest.raises(grpc.RpcError) as e: service_with_endpoints = copy.deepcopy(service_descriptor) service_with_endpoints['service_endpoint_ids'].extend(service_endpoint_ids) service_client.CreateService(Service(**service_with_endpoints)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'service.service_endpoint_ids([' msg_tail = ']) is invalid; RPC method CreateService does not accept Endpoints. '\ 'Endpoints should be configured after creating the service.' except_msg = str(e.value.details()) assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) with pytest.raises(grpc.RpcError) as e: service_with_config_rules = copy.deepcopy(service_descriptor) service_with_config_rules['service_config']['config_rules'].extend(service_config_rules) service_client.CreateService(Service(**service_with_config_rules)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'service.service_config.config_rules([' msg_tail = ']) is invalid; RPC method CreateService does not accept Config Rules. '\ 'Config Rules should be configured after creating the service.' except_msg = str(e.value.details()) assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) with pytest.raises(grpc.RpcError) as e: service_with_constraints = copy.deepcopy(service_descriptor) service_with_constraints['service_constraints'].extend(service_constraints) service_client.CreateService(Service(**service_with_constraints)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'service.service_constraints([' msg_tail = ']) is invalid; RPC method CreateService does not accept Constraints. '\ 'Constraints should be configured after creating the service.' except_msg = str(e.value.details()) assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) def test_service_create_correct( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, service_device_uuids, get_device_descriptor, get_connect_rules, service_client : ServiceClient): # pylint: disable=redefined-outer-name service_client.CreateService(Service(**service_descriptor)) def test_service_get_created( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, service_device_uuids, get_device_descriptor, get_connect_rules, context_client : ContextClient): # pylint: disable=redefined-outer-name service_data = context_client.GetService(ServiceId(**service_id)) LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data))) def test_service_update( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, service_device_uuids, get_device_descriptor, get_connect_rules, context_client : ContextClient, # pylint: disable=redefined-outer-name service_client : ServiceClient): # pylint: disable=redefined-outer-name # Configure service_with_settings = copy.deepcopy(service_descriptor) service_with_settings['service_endpoint_ids'].extend(service_endpoint_ids) service_with_settings['service_config']['config_rules'].extend(service_config_rules) service_with_settings['service_constraints'].extend(service_constraints) service_client.UpdateService(Service(**service_with_settings)) for endpoint_id in service_endpoint_ids: device_id = endpoint_id['device_id'] device_data = context_client.GetDevice(DeviceId(**device_id)) for i,config_rule in enumerate(device_data.device_config.config_rules): LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format( str(device_id), i, grpc_message_to_json_string(config_rule))) # Deconfigure service_with_settings = copy.deepcopy(service_descriptor) service_with_settings['service_endpoint_ids'].extend([]) # remove endpoints service_client.UpdateService(Service(**service_with_settings)) for endpoint_id in service_endpoint_ids: device_id = endpoint_id['device_id'] device_data = context_client.GetDevice(DeviceId(**device_id)) for i,config_rule in enumerate(device_data.device_config.config_rules): LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format( str(device_id), i, grpc_message_to_json_string(config_rule))) def test_service_get_updated( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, service_device_uuids, get_device_descriptor, get_connect_rules, context_client : ContextClient): # pylint: disable=redefined-outer-name service_data = context_client.GetService(ServiceId(**service_id)) LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data))) def test_service_delete( self, service_id, service_descriptor, service_endpoint_ids, service_config_rules, service_constraints, service_device_uuids, get_device_descriptor, get_connect_rules, service_client : ServiceClient): # pylint: disable=redefined-outer-name service_client.DeleteService(ServiceId(**service_id))