import copy, grpc, logging, operator, os, pytest from typing import Tuple from google.protobuf.json_format import MessageToDict from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker from context.Config import ( GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as CONTEXT_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as CONTEXT_GRPC_GRACE_PERIOD) from context.client.ContextClient import ContextClient from context.proto.context_pb2 import Context, DeviceId, Topology, Device from context.service.grpc_server.ContextService import ContextService from device.Config import ( GRPC_SERVICE_PORT as DEVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as DEVICE_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as DEVICE_GRPC_GRACE_PERIOD) from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS from device.tests.MockMonitoringService import MockMonitoringService from monitoring.Config import ( GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as MONITORING_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as MONITORING_GRPC_GRACE_PERIOD) from monitoring.client.monitoring_client import MonitoringClient from service.Config import ( GRPC_SERVICE_PORT as SERVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as SERVICE_GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD as SERVICE_GRPC_GRACE_PERIOD) from service.client.ServiceClient import ServiceClient from service.proto.context_pb2 import Service, ServiceId from service.service.ServiceService import ServiceService from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from service.service.service_handlers import SERVICE_HANDLERS from service.tests.Service_L3NM_EMU import ( SERVICE_L3NM_EMU, SERVICE_L3NM_EMU_CONFIG_RULES, SERVICE_L3NM_EMU_CONSTRAINTS, SERVICE_L3NM_EMU_ENDPOINT_IDS, SERVICE_L3NM_EMU_ID) #from device.service.MonitoringLoops import MonitoringLoops from .CommonObjects import CONTEXT, DEVICE_EMU1, DEVICE_EMU2, DEVICE_EMU_CONNECT_RULES, TOPOLOGY LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) CONTEXT_GRPC_SERVICE_PORT = 10000 + CONTEXT_GRPC_SERVICE_PORT # avoid privileged ports DEVICE_GRPC_SERVICE_PORT = 10000 + DEVICE_GRPC_SERVICE_PORT # avoid privileged ports SERVICE_GRPC_SERVICE_PORT = 10000 + SERVICE_GRPC_SERVICE_PORT # avoid privileged ports MONITORING_GRPC_SERVICE_PORT = 10000 + MONITORING_GRPC_SERVICE_PORT # avoid privileged ports DEFAULT_REDIS_SERVICE_HOST = '127.0.0.1' DEFAULT_REDIS_SERVICE_PORT = 6379 DEFAULT_REDIS_DATABASE_ID = 0 REDIS_CONFIG = { 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST), 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT), 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ), } SCENARIOS = [ ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ), #('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG), ] @pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) def context_db_mb(request) -> Tuple[Database, MessageBroker]: name,db_backend,db_settings,mb_backend,mb_settings = request.param msg = 'Running scenario {:s} db_backend={:s}, db_settings={:s}, mb_backend={:s}, mb_settings={:s}...' LOGGER.info(msg.format(str(name), str(db_backend.value), str(db_settings), str(mb_backend.value), str(mb_settings))) _database = Database(get_database_backend(backend=db_backend, **db_settings)) _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) yield _database, _message_broker _message_broker.terminate() @pytest.fixture(scope='session') def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name _service = ContextService( context_db_mb[0], context_db_mb[1], port=CONTEXT_GRPC_SERVICE_PORT, max_workers=CONTEXT_GRPC_MAX_WORKERS, grace_period=CONTEXT_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient(address='127.0.0.1', port=CONTEXT_GRPC_SERVICE_PORT) yield _client _client.close() @pytest.fixture(scope='session') def monitoring_service(): _service = MockMonitoringService(port=MONITORING_GRPC_SERVICE_PORT, max_workers=MONITORING_GRPC_MAX_WORKERS, grace_period=MONITORING_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def monitoring_client(monitoring_service : MockMonitoringService): # pylint: disable=redefined-outer-name _client = MonitoringClient(server='127.0.0.1', port=MONITORING_GRPC_SERVICE_PORT) #yield _client #_client.close() return _client @pytest.fixture(scope='session') def device_service( context_client : ContextClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient): # pylint: disable=redefined-outer-name _driver_factory = DriverFactory(DRIVERS) _driver_instance_cache = DriverInstanceCache(_driver_factory) _service = DeviceService( context_client, monitoring_client, _driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name _client = DeviceClient(address='127.0.0.1', port=DEVICE_GRPC_SERVICE_PORT) yield _client _client.close() @pytest.fixture(scope='session') def service_service( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient): # pylint: disable=redefined-outer-name _service_handler_factory = ServiceHandlerFactory(SERVICE_HANDLERS) _service = ServiceService( context_client, device_client, _service_handler_factory, port=SERVICE_GRPC_SERVICE_PORT, max_workers=SERVICE_GRPC_MAX_WORKERS, grace_period=SERVICE_GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def service_client(service_service : ServiceService): # pylint: disable=redefined-outer-name _client = ServiceClient(address='127.0.0.1', port=SERVICE_GRPC_SERVICE_PORT) yield _client _client.close() def grpc_message_to_json_string(message): return str(MessageToDict( message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) def test_prepare_environment( context_client : ContextClient, device_client : DeviceClient): # pylint: disable=redefined-outer-name context_client.SetContext(Context(**CONTEXT)) context_client.SetTopology(Topology(**TOPOLOGY)) DEVICE_EMU1_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU1) DEVICE_EMU1_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) device_client.AddDevice(Device(**DEVICE_EMU1_WITH_CONNECT_RULES)) DEVICE_EMU2_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU2) DEVICE_EMU2_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES) device_client.AddDevice(Device(**DEVICE_EMU2_WITH_CONNECT_RULES)) def test_service_create_error_cases( context_client : ContextClient, # pylint: disable=redefined-outer-name service_client : ServiceClient, # pylint: disable=redefined-outer-name service_service : ServiceService): # pylint: disable=redefined-outer-name with pytest.raises(grpc.RpcError) as e: SERVICE_WITH_ENDPOINTS = copy.deepcopy(SERVICE_L3NM_EMU) SERVICE_WITH_ENDPOINTS['service_endpoint_ids'].extend(SERVICE_L3NM_EMU_ENDPOINT_IDS) service_client.CreateService(Service(**SERVICE_WITH_ENDPOINTS)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'service.service_endpoint_ids([' msg_tail = ']) is invalid; RPC method CreateService does not accept Endpoints. '\ 'Endpoints should be configured after creating the service.' except_msg = str(e.value.details()) assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) with pytest.raises(grpc.RpcError) as e: SERVICE_WITH_CONFIG_RULES = copy.deepcopy(SERVICE_L3NM_EMU) SERVICE_WITH_CONFIG_RULES['service_config']['config_rules'].extend(SERVICE_L3NM_EMU_CONFIG_RULES) service_client.CreateService(Service(**SERVICE_WITH_CONFIG_RULES)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'service.service_config.config_rules([' msg_tail = ']) is invalid; RPC method CreateService does not accept Config Rules. '\ 'Config Rules should be configured after creating the service.' except_msg = str(e.value.details()) assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) with pytest.raises(grpc.RpcError) as e: SERVICE_WITH_CONSTRAINTS = copy.deepcopy(SERVICE_L3NM_EMU) SERVICE_WITH_CONSTRAINTS['service_constraints'].extend(SERVICE_L3NM_EMU_CONSTRAINTS) service_client.CreateService(Service(**SERVICE_WITH_CONSTRAINTS)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg_head = 'service.service_constraints([' msg_tail = ']) is invalid; RPC method CreateService does not accept Constraints. '\ 'Constraints should be configured after creating the service.' except_msg = str(e.value.details()) assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) def test_service_create_correct( context_client : ContextClient, # pylint: disable=redefined-outer-name service_client : ServiceClient, # pylint: disable=redefined-outer-name service_service : ServiceService): # pylint: disable=redefined-outer-name service_client.CreateService(Service(**SERVICE_L3NM_EMU)) #driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now #assert driver is not None def test_service_get_created( context_client : ContextClient, # pylint: disable=redefined-outer-name service_client : ServiceClient, # pylint: disable=redefined-outer-name service_service : ServiceService): # pylint: disable=redefined-outer-name service_data = context_client.GetService(ServiceId(**SERVICE_L3NM_EMU_ID)) LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data))) def test_service_update( context_client : ContextClient, # pylint: disable=redefined-outer-name service_client : ServiceClient, # pylint: disable=redefined-outer-name service_service : ServiceService): # pylint: disable=redefined-outer-name # Configure SERVICE_WITH_SETTINGS = copy.deepcopy(SERVICE_L3NM_EMU) SERVICE_WITH_SETTINGS['service_endpoint_ids'].extend(SERVICE_L3NM_EMU_ENDPOINT_IDS) SERVICE_WITH_SETTINGS['service_config']['config_rules'].extend(SERVICE_L3NM_EMU_CONFIG_RULES) SERVICE_WITH_SETTINGS['service_constraints'].extend(SERVICE_L3NM_EMU_CONSTRAINTS) service_client.UpdateService(Service(**SERVICE_WITH_SETTINGS)) for endpoint_id in SERVICE_L3NM_EMU_ENDPOINT_IDS: device_id = endpoint_id['device_id'] device_data = context_client.GetDevice(DeviceId(**device_id)) for i,config_rule in enumerate(device_data.device_config.config_rules): LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format( str(device_id), i, grpc_message_to_json_string(config_rule))) # Deconfigure SERVICE_WITH_SETTINGS = copy.deepcopy(SERVICE_L3NM_EMU) SERVICE_WITH_SETTINGS['service_endpoint_ids'].extend([]) # remove endpoints service_client.UpdateService(Service(**SERVICE_WITH_SETTINGS)) for endpoint_id in SERVICE_L3NM_EMU_ENDPOINT_IDS: device_id = endpoint_id['device_id'] device_data = context_client.GetDevice(DeviceId(**device_id)) for i,config_rule in enumerate(device_data.device_config.config_rules): LOGGER.info('device_data[{:s}][#{:d}] => {:s}'.format( str(device_id), i, grpc_message_to_json_string(config_rule))) def test_service_get_updated( context_client : ContextClient, # pylint: disable=redefined-outer-name service_client : ServiceClient, # pylint: disable=redefined-outer-name service_service : ServiceService): # pylint: disable=redefined-outer-name service_data = context_client.GetService(ServiceId(**SERVICE_L3NM_EMU_ID)) LOGGER.info('service_data = {:s}'.format(grpc_message_to_json_string(service_data))) def test_service_delete( context_client : ContextClient, # pylint: disable=redefined-outer-name service_client : ServiceClient, # pylint: disable=redefined-outer-name service_service : ServiceService): # pylint: disable=redefined-outer-name service_client.DeleteService(ServiceId(**SERVICE_L3NM_EMU_ID)) #driver : _Driver = device_service.driver_instance_cache.get(DEVICE_EMU_UUID, {}) #assert driver is None