-
Lluis Gifre Renom authored
Fix unitary tests to align with changes in monitoring component information model.
Lluis Gifre Renom authoredFix unitary tests to align with changes in monitoring component information model.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_unitary_emulated.py 21.83 KiB
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar, copy, dateutil.parser, grpc, json, logging, operator, pytest, queue, time
from datetime import datetime, timezone
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceId, DeviceOperationalStatusEnum
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
from device.service.driver_api._Driver import _Driver
from .MockService_Dependencies import MockService_Dependencies
from .PrepareTestScenario import ( # pylint: disable=unused-import
# be careful, order of symbols is important here!
mock_service, device_service, context_client, device_client, monitoring_client, test_prepare_environment)
from .Device_Emulated import (
DEVICE_EMU, DEVICE_EMU_CONFIG_ADDRESSES, DEVICE_EMU_CONFIG_ENDPOINTS, DEVICE_EMU_CONNECT_RULES,
DEVICE_EMU_DECONFIG_ADDRESSES, DEVICE_EMU_DECONFIG_ENDPOINTS, DEVICE_EMU_ENDPOINTS_COOKED, DEVICE_EMU_ID,
DEVICE_EMU_RECONFIG_ADDRESSES, DEVICE_EMU_UUID)
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING)
logging.getLogger('monitoring-client').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
# ----- Test Device Driver Emulated --------------------------------------------
# Device Driver Emulated tests are used to validate Driver API as well as Emulated Device Driver. Note that other
# Drivers might support a different set of resource paths, and attributes/values per resource; however, they must
# implement the Driver API.
def test_device_emulated_add_error_cases(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
with pytest.raises(grpc.RpcError) as e:
DEVICE_EMU_WITH_ENDPOINTS = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_ENDPOINTS['device_endpoints'].append(json_endpoint(DEVICE_EMU_ID, 'ep-id', 'ep-type'))
device_client.AddDevice(Device(**DEVICE_EMU_WITH_ENDPOINTS))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg_head = 'device.device_endpoints(['
msg_tail = ']) is invalid; RPC method AddDevice does not accept Endpoints. '\
'Endpoints are discovered through interrogation of the physical device.'
except_msg = str(e.value.details())
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)
with pytest.raises(grpc.RpcError) as e:
DEVICE_EMU_WITH_EXTRA_RULES = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES)
DEVICE_EMU_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS)
device_client.AddDevice(Device(**DEVICE_EMU_WITH_EXTRA_RULES))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg_head = 'device.device_config.config_rules(['
msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\
'with "_connect/" tag. Others should be configured after adding the device.'
except_msg = str(e.value.details())
assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail)
def test_device_emulated_add_correct(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
DEVICE_EMU_WITH_CONNECT_RULES = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONNECT_RULES)
device_client.AddDevice(Device(**DEVICE_EMU_WITH_CONNECT_RULES))
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now
assert driver is not None
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
config_rules = [
(ConfigActionEnum.Name(config_rule.action), config_rule.custom.resource_key, config_rule.custom.resource_value)
for config_rule in device_data.device_config.config_rules
if config_rule.WhichOneof('config_rule') == 'custom'
]
LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
'\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
def test_device_emulated_get(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_EMU_ID))
LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config)))
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))
def test_device_emulated_configure(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now
assert driver is not None
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED)
for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
assert endpoint_cooked in driver_config
DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ENDPOINTS)
device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES))
DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_CONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_CONFIG_ADDRESSES)
device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_CONFIG_RULES))
DEVICE_EMU_WITH_OPERATIONAL_STATUS = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_OPERATIONAL_STATUS['device_operational_status'] = \
DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_OPERATIONAL_STATUS))
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES)
for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
endpoint_cooked = copy.deepcopy(endpoint_cooked)
endpoint_cooked[1]['enabled'] = True
assert endpoint_cooked in driver_config
for config_rule in DEVICE_EMU_CONFIG_ADDRESSES:
assert 'custom' in config_rule
rule = (config_rule['custom']['resource_key'], json.loads(config_rule['custom']['resource_value']))
assert rule in driver_config
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
assert device_data.device_operational_status == DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
config_rules = [
(ConfigActionEnum.Name(config_rule.action), config_rule.custom.resource_key, config_rule.custom.resource_value)
for config_rule in device_data.device_config.config_rules
if config_rule.WhichOneof('config_rule') == 'custom'
]
#LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
# '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
RESULTING_CONFIG_ENDPOINTS = {cr['custom']['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)}
for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
values = json.loads(RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['custom']['resource_value'])
values.update(endpoint_cooked[1])
RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['custom']['resource_value'] = json.dumps(values, sort_keys=True)
for config_rule in RESULTING_CONFIG_ENDPOINTS.values():
assert 'custom' in config_rule
config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
json.loads(json.dumps(config_rule['custom']['resource_value'])))
assert config_rule in config_rules
for config_rule in DEVICE_EMU_CONFIG_ADDRESSES:
assert 'custom' in config_rule
config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
json.loads(json.dumps(config_rule['custom']['resource_value'])))
assert config_rule in config_rules
# Try to reconfigure...
DEVICE_EMU_WITH_RECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_RECONFIG_RULES['device_operational_status'] = \
DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
DEVICE_EMU_WITH_RECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_RECONFIG_ADDRESSES)
device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_RECONFIG_RULES))
RESULTING_CONFIG_RULES = {cr['custom']['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)}
for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['custom']['resource_value'])
values.update(endpoint_cooked[1])
RESULTING_CONFIG_RULES[endpoint_cooked[0]]['custom']['resource_value'] = json.dumps(values, sort_keys=True)
RESULTING_CONFIG_RULES.update({
cr['custom']['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ADDRESSES)
})
for reconfig_rule in DEVICE_EMU_RECONFIG_ADDRESSES:
assert 'custom' in reconfig_rule
if reconfig_rule['action'] == ConfigActionEnum.CONFIGACTION_DELETE:
RESULTING_CONFIG_RULES.pop(reconfig_rule['custom']['resource_key'], None)
else:
RESULTING_CONFIG_RULES[reconfig_rule['custom']['resource_key']] = reconfig_rule
RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values()
#LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES)))
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == len(RESULTING_CONFIG_RULES)
for config_rule in RESULTING_CONFIG_RULES:
assert 'custom' in config_rule
resource = [config_rule['custom']['resource_key'], json.loads(config_rule['custom']['resource_value'])]
assert resource in driver_config
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
config_rules = [
(ConfigActionEnum.Name(config_rule.action), config_rule.custom.resource_key, config_rule.custom.resource_value)
for config_rule in device_data.device_config.config_rules
]
#LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
# '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
for config_rule in RESULTING_CONFIG_RULES:
assert 'custom' in config_rule
config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
config_rule['custom']['resource_value'])
assert config_rule in config_rules
def test_device_emulated_monitor(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService, # pylint: disable=redefined-outer-name
mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name
device_uuid = DEVICE_EMU_UUID
json_device_id = DEVICE_EMU_ID
device_id = DeviceId(**json_device_id)
device_data = context_client.GetDevice(device_id)
LOGGER.info('device_data = \n{:s}'.format(str(device_data)))
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(device_uuid) # we know the driver exists now
assert driver is not None
#driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
#assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES)
SAMPLING_DURATION_SEC = 10.0
SAMPLING_INTERVAL_SEC = 2.0
MONITORING_SETTINGS_LIST = []
KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {}
for endpoint in device_data.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
for sample_type_id in endpoint.kpi_sample_types:
sample_type_name = str(KpiSampleType.Name(sample_type_id)).upper().replace('KPISAMPLETYPE_', '')
kpi_uuid = '{:s}-{:s}-{:s}-kpi_uuid'.format(device_uuid, endpoint_uuid, str(sample_type_id))
monitoring_settings = {
'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}},
'kpi_descriptor': {
'kpi_description': 'Metric {:s} for Endpoint {:s} in Device {:s}'.format(
sample_type_name, endpoint_uuid, device_uuid),
'kpi_sample_type': sample_type_id,
'device_id': json_device_id,
'endpoint_id': json_endpoint_id(json_device_id, endpoint_uuid),
},
'sampling_duration_s': SAMPLING_DURATION_SEC,
'sampling_interval_s': SAMPLING_INTERVAL_SEC,
}
MONITORING_SETTINGS_LIST.append(monitoring_settings)
KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] = 0
NUM_SAMPLES_EXPECTED_PER_KPI = SAMPLING_DURATION_SEC / SAMPLING_INTERVAL_SEC
NUM_SAMPLES_EXPECTED = len(MONITORING_SETTINGS_LIST) * NUM_SAMPLES_EXPECTED_PER_KPI
# Start monitoring the device
t_start_monitoring = datetime.timestamp(datetime.utcnow())
for monitoring_settings in MONITORING_SETTINGS_LIST:
device_client.MonitorDeviceKpi(MonitoringSettings(**monitoring_settings))
# wait to receive the expected number of samples
# if takes more than 1.5 times the sampling duration, assume there is an error
time_ini = time.time()
queue_samples : queue.Queue = mock_service.queue_samples
received_samples = []
while (len(received_samples) < NUM_SAMPLES_EXPECTED) and (time.time() - time_ini < SAMPLING_DURATION_SEC * 1.5):
try:
received_sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED)
#LOGGER.info('received_sample = {:s}'.format(str(received_sample)))
received_samples.append(received_sample)
except queue.Empty:
continue
t_end_monitoring = datetime.timestamp(datetime.utcnow())
#LOGGER.info('received_samples = {:s}'.format(str(received_samples)))
LOGGER.info('len(received_samples) = {:s}'.format(str(len(received_samples))))
LOGGER.info('NUM_SAMPLES_EXPECTED = {:s}'.format(str(NUM_SAMPLES_EXPECTED)))
assert len(received_samples) == NUM_SAMPLES_EXPECTED
for received_sample in received_samples:
kpi_uuid = received_sample.kpi_id.kpi_id.uuid
assert kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED
timestamp = float(received_sample.timestamp.timestamp)
assert timestamp > t_start_monitoring
assert timestamp < t_end_monitoring
assert received_sample.kpi_value.HasField('floatVal') or received_sample.kpi_value.HasField('intVal')
kpi_value = getattr(received_sample.kpi_value, received_sample.kpi_value.WhichOneof('value'))
assert isinstance(kpi_value, (float, int))
KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED[kpi_uuid] += 1
LOGGER.info('KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED = {:s}'.format(str(KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED)))
for kpi_uuid, num_samples_received in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.items():
assert num_samples_received == NUM_SAMPLES_EXPECTED_PER_KPI
# Unsubscribe monitoring
for kpi_uuid in KPI_UUIDS__TO__NUM_SAMPLES_RECEIVED.keys():
MONITORING_SETTINGS_UNSUBSCRIBE = {
'kpi_id' : {'kpi_id': {'uuid': kpi_uuid}},
'sampling_duration_s': -1, # negative value in sampling_duration_s or sampling_interval_s means unsibscribe
'sampling_interval_s': -1, # kpi_id is mandatory to unsibscribe
}
device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS_UNSUBSCRIBE))
def test_device_emulated_deconfigure(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID) # we know the driver exists now
assert driver is not None
driver_config = driver.GetConfig()
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_DECONFIG_RULES['device_operational_status'] = \
DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ADDRESSES)
device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES))
RESULTING_CONFIG_RULES = {cr['custom']['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)}
for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
values = json.loads(RESULTING_CONFIG_RULES[endpoint_cooked[0]]['custom']['resource_value'])
values.update(endpoint_cooked[1])
RESULTING_CONFIG_RULES[endpoint_cooked[0]]['custom']['resource_value'] = json.dumps(values, sort_keys=True)
RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values()
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
driver_config = list(filter(
lambda config_rule: (
not isinstance(config_rule[1], str) or not config_rule[1].startswith('do_sampling (trigger:')),
driver_config))
LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES)))
assert len(driver_config) == len(RESULTING_CONFIG_RULES)
for config_rule in RESULTING_CONFIG_RULES:
assert 'custom' in config_rule
config_rule = [config_rule['custom']['resource_key'], json.loads(config_rule['custom']['resource_value'])]
#LOGGER.info('config_rule = {:s}'.format(str(config_rule)))
assert config_rule in driver_config
DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_DECONFIG_RULES['device_config']['config_rules'].extend(DEVICE_EMU_DECONFIG_ENDPOINTS)
device_client.ConfigureDevice(Device(**DEVICE_EMU_WITH_DECONFIG_RULES))
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == 0
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
config_rules = device_data.device_config.config_rules
LOGGER.info('config_rules = {:s}'.format(str(config_rules)))
clean_config_rules = []
for config_rule in config_rules:
assert config_rule.WhichOneof('config_rule') == 'custom'
if config_rule.custom.resource_key.startswith('/endpoints/endpoint'): continue
config_rule_value = json.loads(config_rule.custom.resource_value)
if isinstance(config_rule_value, str) and config_rule_value.startswith('do_sampling (trigger:'): continue
clean_config_rules.append(config_rule)
LOGGER.info('clean_config_rules = {:s}'.format(str(clean_config_rules)))
assert len(clean_config_rules) == 0
def test_device_emulated_delete(
context_client : ContextClient, # pylint: disable=redefined-outer-name
device_client : DeviceClient, # pylint: disable=redefined-outer-name
device_service : DeviceService): # pylint: disable=redefined-outer-name
device_client.DeleteDevice(DeviceId(**DEVICE_EMU_ID))
driver_instance_cache = device_service.device_servicer.driver_instance_cache
driver : _Driver = driver_instance_cache.get(DEVICE_EMU_UUID, {})
assert driver is None