Skip to content
Snippets Groups Projects
test_unitary_emulated.py 22.3 KiB
Newer Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import calendar, copy, dateutil.parser, grpc, json, logging, operator, pytest, queue, time
from datetime import datetime, timezone
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceId, DeviceOperationalStatusEnum
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api._Driver import _Driver
from .MockService_Dependencies import MockService_Dependencies
from .PrepareTestScenario import ( # pylint: disable=unused-import
    # be careful, order of symbols is important here!
    mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment)

from .Device_Emulated import (
    DEVICE_EMU, DEVICE_EMU_CONFIG_ADDRESSES, DEVICE_EMU_CONFIG_ENDPOINTS, DEVICE_EMU_CONNECT_RULES,
    DEVICE_EMU_DECONFIG_ADDRESSES, DEVICE_EMU_DECONFIG_ENDPOINTS, DEVICE_EMU_ENDPOINTS_COOKED, DEVICE_EMU_ID,
    DEVICE_EMU_RECONFIG_ADDRESSES, DEVICE_EMU_UUID)

logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING)
logging.getLogger('monitoring-client').setLevel(logging.WARNING)

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

# ----- Test Device Driver Emulated --------------------------------------------
# Device Driver Emulated tests are used to validate Driver API as well as Emulated Device Driver. Note that other
# Drivers might support a different set of resource paths, and attributes/values per resource; however, they must
# implement the Driver API.

def test_device_emulated_add_error_cases(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    device_client : DeviceClient,       # pylint: disable=redefined-outer-name
    device_service : DeviceService):    # pylint: disable=redefined-outer-name

    with pytest.raises(grpc.RpcError) as e:
        DEVICE_EMU_WITH_ENDPOINTS = copy.deepcopy(DEVICE_EMU)
        DEVICE_EMU_WITH_ENDPOINTS['device_endpoints'].append(json_endpoint(DEVICE_EMU_ID, 'ep-id', 'ep-type'))
        device_client.AddDevice(Device(**DEVICE_EMU_WITH_ENDPOINTS))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg_head = 'device.device_endpoints(['
    msg_tail = ']) is invalid; RPC method AddDevice does not accept Endpoints. '\
               'Endpoints are discovered through interrogation of the physical device.'
    except_msg = str(e.value.details())
    assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)

    with pytest.raises(grpc.RpcError) as e:
        DEVICE_EMU_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_EMU)
        DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES)
        DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS)
        device_client.AddDevice(Device(**DEVICE_EMU_WITH_EXTRA_RULES))
    assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
    msg_head = 'device.device_config.config_rules(['
    msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\
               'with "_connect/" tag. Others should be configured after adding the device.'
    except_msg = str(e.value.details())
    assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)


def test_device_emulated_add_correct(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    device_client : DeviceClient,       # pylint: disable=redefined-outer-name
    device_service : DeviceService):    # pylint: disable=redefined-outer-name

    DEVICE_EMU_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU)
    DEVICE_EMU_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES)
    device_client.AddDevice(Device(**DEVICE_EMU_WITH_CONNECT_RULES))
    driver_instance_cache = device_service.device_servicer.driver_instance_cache
    driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now
    assert driver is not None

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
    config_rules = [
        (ConfigActionEnum.Name(config_rule.action), config_rule.custom.resource_key, config_rule.custom.resource_value)
        for config_rule in device_data.device_config.config_rules
        if config_rule.WhichOneof('config_rule') == 'custom'
    ]
    LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
        '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

def test_device_emulated_get(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    device_client : DeviceClient,       # pylint: disable=redefined-outer-name
    device_service : DeviceService):    # pylint: disable=redefined-outer-name

    initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_EMU_ID))
    LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config)))

    device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
    LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))


def test_device_emulated_configure(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    device_client : DeviceClient,       # pylint: disable=redefined-outer-name
    device_service : DeviceService):    # pylint: disable=redefined-outer-name

    driver_instance_cache = device_service.device_servicer.driver_instance_cache
    driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now
    assert driver is not None

    driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
    #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED)
    for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
        assert endpoint_cooked in driver_config

    DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU)
    DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS)
    device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES))

    DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU)
    DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ADDRESSES)
    device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES))

    DEVICE_EMU_WITH_OPERATIONAL_STATUS = copy.deepcopy(DEVICE_EMU)
    DEVICE_EMU_WITH_OPERATIONAL_STATUS['device_operational_status'] = \
        DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
    device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_OPERATIONAL_STATUS))

    driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
    #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES)
    for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
        endpoint_cooked = copy.deepcopy(endpoint_cooked)
        endpoint_cooked[1]['enabled'] = True
        assert endpoint_cooked in driver_config
    for config_rule in DEVICE_EMU_CONFIG_ADDRESSES:
        assert 'custom' in config_rule
        rule = (config_rule['custom']['resource_key'], json.loads(config_rule['custom']['resource_value']))
        assert rule in driver_config
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

    device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
    assert device_data.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED

    config_rules = [
        (ConfigActionEnum.Name(config_rule.action), config_rule.custom.resource_key, config_rule.custom.resource_value)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for config_rule in device_data.device_config.config_rules
        if config_rule.WhichOneof('config_rule') == 'custom'
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    ]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
        '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
    RESULTING_CONFIG_ENDPOINTS = {cr['custom']['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
        values = json.loads(RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['custom']['resource_value'])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        values.update(endpoint_cooked[1])
        RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['custom']['resource_value'] = json.dumps(values, sort_keys=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    for config_rule in RESULTING_CONFIG_ENDPOINTS.values():
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        config_rule = (
            ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
            json.loads(json.dumps(config_rule['custom']['resource_value'])))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('A config_rule: {:s} {:s} = {:s}'.format(*config_rule))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        assert config_rule in config_rules
    for config_rule in DEVICE_EMU_CONFIG_ADDRESSES:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        config_rule = (
            ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
            json.loads(json.dumps(config_rule['custom']['resource_value'])))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        LOGGER.info('B config_rule: {:s} {:s} = {:s}'.format(*config_rule))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        assert config_rule in config_rules

    # Try to reconfigure...

    DEVICE_EMU_WITH_RECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
    DEVICE_EMU_WITH_RECONFIG_RULES['device_operational_status'] = \
        DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
    DEVICE_EMU_WITH_RECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_RECONFIG_ADDRESSES)
    device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_RECONFIG_RULES))

    RESULTING_CONFIG_RULES = {cr['custom']['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
        values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['custom']['resource_value'])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        values.update(endpoint_cooked[1])
        RESULTING_CONFIG_RULES[endpoint_cooked[0]]['custom']['resource_value'] = json.dumps(values, sort_keys=True)
    RESULTING_CONFIG_RULES.update({
        cr['custom']['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ADDRESSES)
    })
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    for reconfig_rule in DEVICE_EMU_RECONFIG_ADDRESSES:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if reconfig_rule['action'] == ConfigActionEnum.CONFIGACTION_DELETE:
            RESULTING_CONFIG_RULES.pop(reconfig_rule['custom']['resource_key'], None)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        else:
            RESULTING_CONFIG_RULES[reconfig_rule['custom']['resource_key']] = reconfig_rule
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values()
    #LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES)))

    driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
    driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
    #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    assert len(driver_config) == len(RESULTING_CONFIG_RULES)
    for config_rule in RESULTING_CONFIG_RULES:
        assert 'custom' in config_rule
        resource = [config_rule['custom']['resource_key'], json.loads(config_rule['custom']['resource_value'])]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        assert resource in driver_config

    device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
    config_rules = [
        (ConfigActionEnum.Name(config_rule.action), config_rule.custom.resource_key, config_rule.custom.resource_value)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        for config_rule in device_data.device_config.config_rules
    ]
    #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
    #    '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
    for config_rule in RESULTING_CONFIG_RULES:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        config_rule = (
            ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
            config_rule['custom']['resource_value'])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #LOGGER.info('config_rule: {:s} {:s} = {:s}'.format(*config_rule))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        assert config_rule in config_rules


def test_device_emulated_monitor(
    context_client : ContextClient,         # pylint: disable=redefined-outer-name
    device_client : DeviceClient,           # pylint: disable=redefined-outer-name
    device_service : DeviceService,         # pylint: disable=redefined-outer-name
    mock_service : MockService_Dependencies):   # pylint: disable=redefined-outer-name

    device_uuid = DEVICE_EMU_UUID
    json_device_id = DEVICE_EMU_ID
    device_id = DeviceId(**json_device_id)
    device_data = context_client.GetDevice(device_id)
    LOGGER.info('device_data = \n{:s}'.format(str(device_data)))

    driver_instance_cache = device_service.device_servicer.driver_instance_cache
    driver : _Driver = driver_instance_cache.get(device_uuid) # we know the driver exists now
    assert driver is not None
    #driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
    #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    #assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES)

    SAMPLING_DURATION_SEC = 10.0
    SAMPLING_INTERVAL_SEC = 2.0

    MONITORING_SETTINGS_LIST = []
    KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {}
    for endpoint in device_data.device_endpoints:
        endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
        for sample_type_id in endpoint.kpi_sample_types:
            sample_type_name = str(KpiSampleType.Name(sample_type_id)).upper().replace('KPISAMPLETYPE_', '')
            kpi_uuid = '{:s}-{:s}-{:s}-kpi_uuid'.format(device_uuid, endpoint_uuid, str(sample_type_id))
            monitoring_settings = {
                'kpi_id'        : {'kpi_id': {'uuid': kpi_uuid}},
                'kpi_descriptor': {
                    'kpi_description': 'Metric {:s} for Endpoint {:s} in Device {:s}'.format(
                        sample_type_name, endpoint_uuid, device_uuid),
                    'kpi_sample_type': sample_type_id,
                    'device_id': json_device_id,
                    'endpoint_id': json_endpoint_id(json_device_id, endpoint_uuid),
                },
                'sampling_duration_s': SAMPLING_DURATION_SEC,
                'sampling_interval_s': SAMPLING_INTERVAL_SEC,
            }
            MONITORING_SETTINGS_LIST.append(monitoring_settings)
            KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] = 0

    NUM_SAMPLES_EXPECTED_PER_KPI = SAMPLING_DURATION_SEC / SAMPLING_INTERVAL_SEC
    NUM_SAMPLES_EXPECTED = len(MONITORING_SETTINGS_LIST) * NUM_SAMPLES_EXPECTED_PER_KPI

    # Start monitoring the device
    t_start_monitoring = datetime.timestamp(datetime.utcnow())
    for monitoring_settings in MONITORING_SETTINGS_LIST:
        device_client.MonitorDeviceKpi(MonitoringSettings(**monitoring_settings))

    # wait to receive the expected number of samples
    # if takes more than 1.5 times the sampling duration, assume there is an error
    time_ini = time.time()
    queue_samples : queue.Queue = mock_service.queue_samples
    received_samples = []
    while (len(received_samples) < NUM_SAMPLES_EXPECTED) and (time.time() - time_ini < SAMPLING_DURATION_SEC * 1.5):
        try:
            received_sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED)
            #LOGGER.info('received_sample = {:s}'.format(str(received_sample)))
            received_samples.append(received_sample)
        except queue.Empty:
            continue

    t_end_monitoring = datetime.timestamp(datetime.utcnow())

    #LOGGER.info('received_samples = {:s}'.format(str(received_samples)))
    LOGGER.info('len(received_samples) = {:s}'.format(str(len(received_samples))))
    LOGGER.info('NUM_SAMPLES_EXPECTED = {:s}'.format(str(NUM_SAMPLES_EXPECTED)))
    assert len(received_samples) == NUM_SAMPLES_EXPECTED
    for received_sample in received_samples:
        kpi_uuid = received_sample.kpi_id.kpi_id.uuid
        assert kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        timestamp = float(received_sample.timestamp.timestamp)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        assert timestamp > t_start_monitoring
        assert timestamp < t_end_monitoring
        assert received_sample.kpi_value.HasField('floatVal') or received_sample.kpi_value.HasField('intVal')
        kpi_value = getattr(received_sample.kpi_value, received_sample.kpi_value.WhichOneof('value'))
        assert isinstance(kpi_value, (float, int))
        KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] += 1

    LOGGER.info('KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {:s}'.format(str(KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED)))
    for kpi_uuid, num_samples_received in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.items():
        assert num_samples_received == NUM_SAMPLES_EXPECTED_PER_KPI

    # Unsubscribe monitoring
    for kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.keys():
        MONITORING_SETTINGS_UNSUBSCRIBE = {
            'kpi_id'             : {'kpi_id': {'uuid': kpi_uuid}},
            'sampling_duration_s': -1, # negative value in sampling_duration_s or sampling_interval_s means unsibscribe
            'sampling_interval_s': -1, # kpi_id is mandatory to unsibscribe
        }
        device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS_UNSUBSCRIBE))


def test_device_emulated_deconfigure(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    device_client : DeviceClient,       # pylint: disable=redefined-outer-name
    device_service : DeviceService):    # pylint: disable=redefined-outer-name

    driver_instance_cache = device_service.device_servicer.driver_instance_cache
    driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now
    assert driver is not None

    driver_config = driver.GetConfig()
    #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))

    DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
    DEVICE_EMU_WITH_DECONFIG_RULES['device_operational_status'] = \
        DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
    DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ADDRESSES)
    device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES))

    RESULTING_CONFIG_RULES = {cr['custom']['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)}
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
        values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['custom']['resource_value'])
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        values.update(endpoint_cooked[1])
        RESULTING_CONFIG_RULES[endpoint_cooked[0]]['custom']['resource_value'] = json.dumps(values, sort_keys=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values()
    driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
    driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
    driver_config = list(filter(
        lambda config_rule: (
            not isinstance(config_rule[1], str) or not config_rule[1].startswith('do_sampling (trigger:')),
        driver_config))
    LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES)))
    assert len(driver_config) == len(RESULTING_CONFIG_RULES)
    for config_rule in RESULTING_CONFIG_RULES:
        assert 'custom' in config_rule
        config_rule = [config_rule['custom']['resource_key'], json.loads(config_rule['custom']['resource_value'])]
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        #LOGGER.info('config_rule = {:s}'.format(str(config_rule)))
        assert config_rule in driver_config

    DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
    DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ENDPOINTS)
    device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES))

    driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
    driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
    #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
    assert len(driver_config) == 0

    device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
    config_rules = device_data.device_config.config_rules
    LOGGER.info('config_rules = {:s}'.format(str(config_rules)))
    clean_config_rules = []
    for config_rule in config_rules:
        assert config_rule.WhichOneof('config_rule') == 'custom'
        if config_rule.custom.resource_key.startswith('/endpoints/endpoint'): continue
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if config_rule.custom.resource_key.startswith('_connect/'): continue
        try:
            config_rule_value = json.loads(config_rule.custom.resource_value)
        except: # pylint: disable=bare-except
            config_rule_value = config_rule.custom.resource_value
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if isinstance(config_rule_value, str) and config_rule_value.startswith('do_sampling (trigger:'): continue
        clean_config_rules.append(config_rule)
    LOGGER.info('clean_config_rules = {:s}'.format(str(clean_config_rules)))
    assert len(clean_config_rules) == 0


def test_device_emulated_delete(
    context_client : ContextClient,     # pylint: disable=redefined-outer-name
    device_client : DeviceClient,       # pylint: disable=redefined-outer-name
    device_service : DeviceService):    # pylint: disable=redefined-outer-name

    device_client.DeleteDevice(DeviceId(**DEVICE_EMU_ID))
    driver_instance_cache = device_service.device_servicer.driver_instance_cache
    driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID, {})
    assert driver is None