-
Lluis Gifre Renom authored
- removed internal in-memory database - added storage of connect-related config rules in context and added driver pre-loading when Device component starts - re-organized code of EmulatedDriver - re-organized code to improve clarity - minor code bug resolutions - code cleanup
Lluis Gifre Renom authored- removed internal in-memory database - added storage of connect-related config rules in context and added driver pre-loading when Device component starts - re-organized code of EmulatedDriver - re-organized code to improve clarity - minor code bug resolutions - code cleanup
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Tools.py 14.01 KiB
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import Any, Dict, List, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.Tools import grpc_message_to_json
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS
from .monitoring.MonitoringLoops import MonitoringLoops
ERROR_ENDPOINT = 'Device({:s}): GetConfig retrieved malformed Endpoint({:s})'
ERROR_GET = 'Device({:s}): Unable to Get resource(key={:s}); error({:s})'
ERROR_GET_INIT = 'Device({:s}): Unable to Get Initial resource(key={:s}); error({:s})'
ERROR_SET = 'Device({:s}): Unable to Set resource(key={:s}, value={:s}); error({:s})'
ERROR_DELETE = 'Device({:s}): Unable to Delete resource(key={:s}, value={:s}); error({:s})'
ERROR_SAMPLETYPE = 'Device({:s})/EndPoint({:s}): SampleType({:s}/{:s}) not supported'
ERROR_SUBSCRIBE = 'Device({:s}): Unable to Subscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\
'error({:s})'
ERROR_MISSING_KPI = 'Device({:s}): Kpi({:s}) not found'
ERROR_UNSUBSCRIBE = 'Device({:s}): Unable to Unsubscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\
'error({:s})'
def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]:
connection_config_rules = dict()
unexpected_config_rules = list()
for config_rule in device_config.config_rules:
is_action_set = (config_rule.action == ConfigActionEnum.CONFIGACTION_SET)
is_custom_rule = (config_rule.WhichOneof('config_rule') == 'custom')
if is_action_set and is_custom_rule and (config_rule.custom.resource_key.startswith('_connect/')):
connect_attribute = config_rule.custom.resource_key.replace('_connect/', '')
connection_config_rules[connect_attribute] = config_rule.custom.resource_value
else:
unexpected_config_rules.append(config_rule)
if len(unexpected_config_rules) > 0:
unexpected_config_rules = grpc_message_to_json(device_config)
unexpected_config_rules = unexpected_config_rules['config_rules']
unexpected_config_rules = list(filter(
lambda cr: cr.get('custom', {})['resource_key'].replace('_connect/', '') not in connection_config_rules,
unexpected_config_rules))
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'device.device_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
'with "_connect/" tag. Others should be configured after adding the device.')
return connection_config_rules
def get_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]:
connect_rules = dict()
for config_rule in device_config.config_rules:
if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
if config_rule.WhichOneof('config_rule') != 'custom': continue
if not config_rule.custom.resource_key.startswith('_connect/'): continue
connect_attribute = config_rule.custom.resource_key.replace('_connect/', '')
connect_rules[connect_attribute] = config_rule.custom.resource_value
return connect_rules
def check_no_endpoints(device_endpoints) -> None:
if len(device_endpoints) == 0: return
unexpected_endpoints = []
for device_endpoint in device_endpoints:
unexpected_endpoints.append(grpc_message_to_json(device_endpoint))
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'device.device_endpoints', str_unexpected_endpoints,
extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\
'interrogation of the physical device.')
def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
device_uuid = device.device_id.device_uuid.uuid
resources_to_get = [RESOURCE_ENDPOINTS]
results_getconfig = driver.GetConfig(resources_to_get)
errors : List[str] = list()
for endpoint in results_getconfig:
if len(endpoint) != 2:
errors.append(ERROR_ENDPOINT.format(device_uuid, str(endpoint)))
continue
resource_key, resource_value = endpoint
if isinstance(resource_value, Exception):
errors.append(ERROR_GET.format(device_uuid, str(resource_key), str(resource_value)))
continue
endpoint_uuid = resource_value.get('uuid')
device_endpoint = device.device_endpoints.add()
device_endpoint.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
device_endpoint.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME
device_endpoint.endpoint_id.device_id.device_uuid.uuid = device_uuid
device_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_uuid
device_endpoint.endpoint_type = resource_value.get('type')
sample_types : Dict[int, str] = resource_value.get('sample_types', {})
for kpi_sample_type, monitor_resource_key in sample_types.items():
device_endpoint.kpi_sample_types.append(kpi_sample_type)
monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key)
return errors
def populate_config_rules(device : Device, driver : _Driver) -> List[str]:
device_uuid = device.device_id.device_uuid.uuid
resources_to_get = ['ALL']
results_getconfig = driver.GetConfig()
errors : List[str] = list()
for resource_key, resource_value in zip(resources_to_get, results_getconfig):
if isinstance(resource_value, Exception):
errors.append(ERROR_GET.format(device_uuid, str(resource_key), str(resource_value)))
continue
config_rule = device.device_config.config_rules.add()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule.custom.resource_key = resource_key
config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)
return errors
def populate_initial_config_rules(device_uuid : str, device_config : DeviceConfig, driver : _Driver) -> List[str]:
results_getinitconfig = driver.GetInitialConfig()
errors : List[str] = list()
for resource_key, resource_value in results_getinitconfig:
if isinstance(resource_value, Exception):
errors.append(ERROR_GET_INIT.format(device_uuid, str(resource_key), str(resource_value)))
continue
config_rule = device_config.config_rules.add()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule.custom.resource_key = resource_key
config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)
return errors
def compute_rules_to_add_delete(
device : Device, request : Device
) -> Tuple[List[Tuple[str, Any]], List[Tuple[str, Any]]]:
# convert config rules from context into a dictionary
# TODO: add support for non-custom config rules
context_config_rules = {
config_rule.custom.resource_key: config_rule.custom.resource_value
for config_rule in device.device_config.config_rules
if config_rule.WhichOneof('config_rule') == 'custom'
}
# convert config rules from request into a list
# TODO: add support for non-custom config rules
request_config_rules = [
(config_rule.action, config_rule.custom.resource_key, config_rule.custom.resource_value)
for config_rule in request.device_config.config_rules
if config_rule.WhichOneof('config_rule') == 'custom'
]
resources_to_set : List[Tuple[str, Any]] = [] # key, value
resources_to_delete : List[Tuple[str, Any]] = [] # key, value
for action, key, value in request_config_rules:
if action == ConfigActionEnum.CONFIGACTION_SET:
if (key in context_config_rules) and (context_config_rules[key][0] == value): continue
resources_to_set.append((key, value))
elif action == ConfigActionEnum.CONFIGACTION_DELETE:
if key not in context_config_rules: continue
resources_to_delete.append((key, value))
return resources_to_set, resources_to_delete
def configure_rules(device : Device, driver : _Driver, resources_to_set : List[Tuple[str, Any]]) -> List[str]:
device_uuid = device.device_id.device_uuid.uuid
results_setconfig = driver.SetConfig(resources_to_set)
errors : List[str] = list()
for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig):
if isinstance(result, Exception):
errors.append(ERROR_SET.format(device_uuid, str(resource_key), str(resource_value), str(result)))
continue
# add to config of device
config_rule = device.device_config.config_rules.add()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule.custom.resource_key = resource_key
config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)
return errors
def deconfigure_rules(device : Device, driver : _Driver, resources_to_delete : List[Tuple[str, Any]]) -> List[str]:
device_uuid = device.device_id.device_uuid.uuid
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
errors : List[str] = list()
for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig):
if isinstance(result, Exception):
errors.append(ERROR_DELETE.format(device_uuid, str(resource_key), str(resource_value), str(result)))
continue
# remove from config of device
config_rule = device.device_config.config_rules.add()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule.custom.resource_key = resource_key
config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)
return errors
def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
kpi_uuid = request.kpi_id.kpi_id.uuid
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid
kpi_sample_type = request.kpi_descriptor.kpi_sample_type
resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
if resource_key is None:
kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
return [
ERROR_SAMPLETYPE.format(
str(device_uuid), str(endpoint_uuid), str(kpi_sample_type), str(kpi_sample_type_name)
)
]
sampling_duration = request.sampling_duration_s # seconds
sampling_interval = request.sampling_interval_s # seconds
resources_to_subscribe = [(resource_key, sampling_duration, sampling_interval)]
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors : List[str] = list()
for (resource_key, duration, interval), result in zip(resources_to_subscribe, results_subscribestate):
if isinstance(result, Exception):
errors.append(ERROR_SUBSCRIBE.format(
str(device_uuid), str(resource_key), str(duration), str(interval), str(result)
))
continue
monitoring_loops.add_kpi(device_uuid, resource_key, kpi_uuid, sampling_duration, sampling_interval)
monitoring_loops.add_device(device_uuid, driver)
return errors
def unsubscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
kpi_uuid = request.kpi_id.kpi_id.uuid
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
#endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid
#kpi_sample_type = request.kpi_descriptor.kpi_sample_type
# TODO: consider if further validation needs to be done (correct endpoint_uuid?, correct kpi_sample_type?)
#resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
#if resource_key is None:
# kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
# return [ERROR_SAMPLETYPE.format(device_uuid, endpoint_uuid, str(kpi_sample_type), str(kpi_sample_type_name))]
kpi_details = monitoring_loops.get_kpi_by_uuid(kpi_uuid)
if kpi_details is None:
return [ERROR_MISSING_KPI.format(str(device_uuid), str(kpi_uuid))]
device_uuid, resource_key, sampling_duration, sampling_interval = kpi_details
resources_to_unsubscribe = [(resource_key, sampling_duration, sampling_interval)]
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors : List[str] = list()
for (resource_key, duration, interval), result in zip(resources_to_unsubscribe, results_unsubscribestate):
if isinstance(result, Exception):
errors.append(ERROR_UNSUBSCRIBE.format(
device_uuid, str(resource_key), str(duration), str(interval), str(result)))
continue
monitoring_loops.remove_kpi(kpi_uuid)
#monitoring_loops.remove_device(device_uuid) # Do not remove; one monitoring_loop/device used by multiple requests
return errors