Skip to content
Snippets Groups Projects
Commit 11731ae4 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device component:

- aggregated error messages in separate file
- corrected update of operational status
- corrected update of resulting device configuration from device instead of composing from rules (for simplicity)
- corrected retrieval of device_uuid in MonitorDeviceKpi
- factorized code to compose gRPC device_config rules from raw config rules
- code cleanup
- added missing logs in unitary test for emulated device driver
parent ae9705af
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!34Context Scalability extensions using CockroachDB + Removal of Stateful database inside Device + other
......@@ -15,12 +15,13 @@
import grpc, logging
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import NotFoundException, OperationFailedException
from common.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty
from common.proto.context_pb2 import Device, DeviceConfig, DeviceId, DeviceOperationalStatusEnum, Empty
from common.proto.device_pb2 import MonitoringSettings
from common.proto.device_pb2_grpc import DeviceServiceServicer
from common.tools.context_queries.Device import get_device
from common.tools.mutex_queues.MutexQueues import MutexQueues
from context.client.ContextClient import ContextClient
from device.service.Errors import ERROR_MISSING_DRIVER, ERROR_MISSING_KPI
from .driver_api._Driver import _Driver
from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver
from .monitoring.MonitoringLoops import MonitoringLoops
......@@ -32,8 +33,6 @@ LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Device', 'RPC')
ERROR_MISSING_DRIVER = 'Device({:s}) has not been added to this Device instance'
class DeviceServiceServicerImpl(DeviceServiceServicer):
def __init__(self, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops) -> None:
LOGGER.debug('Creating Servicer...')
......@@ -97,6 +96,9 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
msg = ERROR_MISSING_DRIVER.format(str(device_uuid))
raise OperationFailedException('ConfigureDevice', extra_details=msg)
if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED:
device.device_operational_status = request.device_operational_status
# TODO: use of datastores (might be virtual ones) to enable rollbacks
resources_to_set, resources_to_delete = compute_rules_to_add_delete(device, request)
......@@ -110,13 +112,8 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
# Rules updated by configure_rules() and deconfigure_rules() methods.
# Code to be removed soon if not needed.
#running_config_rules = driver.GetConfig()
#for config_rule in running_config_rules:
# if isinstance(config_rule[1], Exception): continue
# config_rule = device.device_config.config_rules.add()
# config_rule.action = ConfigActionEnum.CONFIGACTION_SET
# config_rule.custom.resource_key = config_rule[0]
# config_rule.custom.resource_value = json.dumps(config_rule[1], sort_keys=True)
del device.device_config.config_rules[:]
populate_config_rules(device, driver)
device_id = context_client.SetDevice(device)
return device_id
......@@ -161,10 +158,20 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
manage_kpi_method = subscribe_kpi if subscribe else unsubscribe_kpi
if subscribe:
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
else:
# unsubscribe only carries kpi_uuid; take device_uuid from recorded KPIs
kpi_uuid = request.kpi_id.kpi_id.uuid
kpi_details = self.monitoring_loops.get_kpi_by_uuid(kpi_uuid)
if kpi_details is None:
msg = ERROR_MISSING_KPI.format(str(kpi_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
device_uuid = kpi_details[0]
self.mutex_queues.wait_my_turn(device_uuid)
try:
driver : _Driver = self.driver_instance_cache.get(device_uuid)
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ERROR_MISSING_DRIVER = 'Device({:s}) has not been added to this Device instance'
ERROR_MISSING_KPI = 'Kpi({:s}) not found'
ERROR_BAD_ENDPOINT = 'Device({:s}): GetConfig retrieved malformed Endpoint({:s})'
ERROR_GET = 'Device({:s}): Unable to Get resource(key={:s}); error({:s})'
ERROR_GET_INIT = 'Device({:s}): Unable to Get Initial resource(key={:s}); error({:s})'
ERROR_DELETE = 'Device({:s}): Unable to Delete resource(key={:s}, value={:s}); error({:s})'
ERROR_SET = 'Device({:s}): Unable to Set resource(key={:s}, value={:s}); error({:s})'
ERROR_SAMPLETYPE = 'Device({:s})/EndPoint({:s}): SampleType({:s}/{:s}) not supported'
ERROR_SUBSCRIBE = 'Device({:s}): Unable to Subscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\
'error({:s})'
ERROR_UNSUBSCRIBE = 'Device({:s}): Unable to Unsubscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\
'error({:s})'
......@@ -13,7 +13,7 @@
# limitations under the License.
import json
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Tuple, Union
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig
......@@ -22,18 +22,9 @@ from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.Tools import grpc_message_to_json
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS
from .monitoring.MonitoringLoops import MonitoringLoops
ERROR_ENDPOINT = 'Device({:s}): GetConfig retrieved malformed Endpoint({:s})'
ERROR_GET = 'Device({:s}): Unable to Get resource(key={:s}); error({:s})'
ERROR_GET_INIT = 'Device({:s}): Unable to Get Initial resource(key={:s}); error({:s})'
ERROR_SET = 'Device({:s}): Unable to Set resource(key={:s}, value={:s}); error({:s})'
ERROR_DELETE = 'Device({:s}): Unable to Delete resource(key={:s}, value={:s}); error({:s})'
ERROR_SAMPLETYPE = 'Device({:s})/EndPoint({:s}): SampleType({:s}/{:s}) not supported'
ERROR_SUBSCRIBE = 'Device({:s}): Unable to Subscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\
'error({:s})'
ERROR_MISSING_KPI = 'Device({:s}): Kpi({:s}) not found'
ERROR_UNSUBSCRIBE = 'Device({:s}): Unable to Unsubscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\
'error({:s})'
from .Errors import (
ERROR_BAD_ENDPOINT, ERROR_DELETE, ERROR_GET, ERROR_GET_INIT, ERROR_MISSING_KPI, ERROR_SAMPLETYPE, ERROR_SET,
ERROR_SUBSCRIBE, ERROR_UNSUBSCRIBE)
def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]:
connection_config_rules = dict()
......@@ -91,7 +82,7 @@ def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : Mon
errors : List[str] = list()
for endpoint in results_getconfig:
if len(endpoint) != 2:
errors.append(ERROR_ENDPOINT.format(device_uuid, str(endpoint)))
errors.append(ERROR_BAD_ENDPOINT.format(device_uuid, str(endpoint)))
continue
resource_key, resource_value = endpoint
......@@ -115,40 +106,35 @@ def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : Mon
return errors
def populate_config_rules(device : Device, driver : _Driver) -> List[str]:
device_uuid = device.device_id.device_uuid.uuid
resources_to_get = ['ALL']
results_getconfig = driver.GetConfig()
def _raw_config_rules_to_grpc(
device_uuid : str, device_config : DeviceConfig, error_template : str, default_config_action : ConfigActionEnum,
raw_config_rules : List[Tuple[str, Union[Any, Exception, None]]]
) -> List[str]:
errors : List[str] = list()
for resource_key, resource_value in zip(resources_to_get, results_getconfig):
for resource_key, resource_value in raw_config_rules:
if isinstance(resource_value, Exception):
errors.append(ERROR_GET.format(device_uuid, str(resource_key), str(resource_value)))
errors.append(error_template.format(device_uuid, str(resource_key), str(resource_value)))
continue
config_rule = device.device_config.config_rules.add()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule = device_config.config_rules.add()
config_rule.action = default_config_action
config_rule.custom.resource_key = resource_key
config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)
config_rule.custom.resource_value = \
resource_value if isinstance(resource_value, str) else json.dumps(resource_value, sort_keys=True)
return errors
def populate_config_rules(device : Device, driver : _Driver) -> List[str]:
device_uuid = device.device_id.device_uuid.uuid
results_getconfig = driver.GetConfig()
return _raw_config_rules_to_grpc(
device_uuid, device.device_config, ERROR_GET, ConfigActionEnum.CONFIGACTION_SET, results_getconfig)
def populate_initial_config_rules(device_uuid : str, device_config : DeviceConfig, driver : _Driver) -> List[str]:
results_getinitconfig = driver.GetInitialConfig()
errors : List[str] = list()
for resource_key, resource_value in results_getinitconfig:
if isinstance(resource_value, Exception):
errors.append(ERROR_GET_INIT.format(device_uuid, str(resource_key), str(resource_value)))
continue
config_rule = device_config.config_rules.add()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule.custom.resource_key = resource_key
config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)
return errors
return _raw_config_rules_to_grpc(
device_uuid, device_config, ERROR_GET_INIT, ConfigActionEnum.CONFIGACTION_SET, results_getinitconfig)
def compute_rules_to_add_delete(
device : Device, request : Device
......@@ -186,37 +172,27 @@ def configure_rules(device : Device, driver : _Driver, resources_to_set : List[T
device_uuid = device.device_id.device_uuid.uuid
results_setconfig = driver.SetConfig(resources_to_set)
results_setconfig = [
(resource_key, result if isinstance(result, Exception) else resource_value)
for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig)
]
errors : List[str] = list()
for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig):
if isinstance(result, Exception):
errors.append(ERROR_SET.format(device_uuid, str(resource_key), str(resource_value), str(result)))
continue
# add to config of device
config_rule = device.device_config.config_rules.add()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule.custom.resource_key = resource_key
config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)
return errors
device_config = DeviceConfig() # ignored; added at the end of ConfigureDevice
return _raw_config_rules_to_grpc(
device_uuid, device_config, ERROR_SET, ConfigActionEnum.CONFIGACTION_SET, results_setconfig)
def deconfigure_rules(device : Device, driver : _Driver, resources_to_delete : List[Tuple[str, Any]]) -> List[str]:
device_uuid = device.device_id.device_uuid.uuid
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
results_deleteconfig = [
(resource_key, result if isinstance(result, Exception) else resource_value)
for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig)
]
errors : List[str] = list()
for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig):
if isinstance(result, Exception):
errors.append(ERROR_DELETE.format(device_uuid, str(resource_key), str(resource_value), str(result)))
continue
# remove from config of device
config_rule = device.device_config.config_rules.add()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule.custom.resource_key = resource_key
config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)
return errors
device_config = DeviceConfig() # ignored; added at the end of ConfigureDevice
return _raw_config_rules_to_grpc(
device_uuid, device_config, ERROR_DELETE, ConfigActionEnum.CONFIGACTION_DELETE, results_deleteconfig)
def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
kpi_uuid = request.kpi_id.kpi_id.uuid
......@@ -253,20 +229,11 @@ def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loo
return errors
def unsubscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
kpi_uuid = request.kpi_id.kpi_id.uuid
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
#endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid
#kpi_sample_type = request.kpi_descriptor.kpi_sample_type
# TODO: consider if further validation needs to be done (correct endpoint_uuid?, correct kpi_sample_type?)
#resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
#if resource_key is None:
# kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
# return [ERROR_SAMPLETYPE.format(device_uuid, endpoint_uuid, str(kpi_sample_type), str(kpi_sample_type_name))]
kpi_uuid = request.kpi_id.kpi_id.uuid
kpi_details = monitoring_loops.get_kpi_by_uuid(kpi_uuid)
if kpi_details is None:
return [ERROR_MISSING_KPI.format(str(device_uuid), str(kpi_uuid))]
return [ERROR_MISSING_KPI.format(str(kpi_uuid))]
device_uuid, resource_key, sampling_duration, sampling_interval = kpi_details
......
......@@ -18,7 +18,7 @@ from common.tools.object_factory.Device import (
json_device_emulated_connect_rules, json_device_emulated_packet_router_disabled, json_device_id)
from device.tests.CommonObjects import PACKET_PORT_SAMPLE_TYPES
DEVICE_EMU_UUID = 'EMULATED'
DEVICE_EMU_UUID = 'R1-EMU'
DEVICE_EMU_ID = json_device_id(DEVICE_EMU_UUID)
DEVICE_EMU = json_device_emulated_packet_router_disabled(DEVICE_EMU_UUID)
DEVICE_EMU_EP_UUIDS = ['EP1', 'EP2', 'EP3', 'EP4']
......
......@@ -168,12 +168,14 @@ def test_device_emulated_configure(
config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
json.loads(json.dumps(config_rule['custom']['resource_value'])))
#LOGGER.info('config_rule: {:s} {:s} = {:s}'.format(*config_rule))
assert config_rule in config_rules
for config_rule in DEVICE_EMU_CONFIG_ADDRESSES:
assert 'custom' in config_rule
config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
json.loads(json.dumps(config_rule['custom']['resource_value'])))
#LOGGER.info('config_rule: {:s} {:s} = {:s}'.format(*config_rule))
assert config_rule in config_rules
# Try to reconfigure...
......@@ -222,6 +224,7 @@ def test_device_emulated_configure(
config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['custom']['resource_key'],
config_rule['custom']['resource_value'])
#LOGGER.info('config_rule: {:s} {:s} = {:s}'.format(*config_rule))
assert config_rule in config_rules
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment