Commit ce5a8ab0 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device component:

- removed internal in-memory database
- added storage of connect-related config rules in context and added driver pre-loading when Device component starts
- re-organized code of EmulatedDriver
- re-organized code to improve clarity
- minor code bug resolutions
- code cleanup
parent febe0556
Loading
Loading
Loading
Loading
+3 −7
Original line number Diff line number Diff line
@@ -14,14 +14,11 @@

from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .DeviceServiceServicerImpl import DeviceServiceServicerImpl
from .MonitoringLoops import MonitoringLoops
from .monitoring.MonitoringLoops import MonitoringLoops

# Custom gRPC settings
# Multiple clients might keep connections alive waiting for RPC methods to be executed.
@@ -32,9 +29,8 @@ class DeviceService(GenericGrpcService):
    def __init__(self, driver_instance_cache : DriverInstanceCache, cls_name: str = __name__) -> None:
        port = get_service_port_grpc(ServiceNameEnum.DEVICE)
        super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name)
        database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
        self.monitoring_loops = MonitoringLoops(database)
        self.device_servicer = DeviceServiceServicerImpl(database, driver_instance_cache, self.monitoring_loops)
        self.monitoring_loops = MonitoringLoops()
        self.device_servicer = DeviceServiceServicerImpl(driver_instance_cache, self.monitoring_loops)

    def install_servicers(self):
        self.monitoring_loops.start()
+89 −377

File changed.

Preview size limit exceeded, changes collapsed.

+286 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from typing import Any, Dict, List, Tuple
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig
from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.Tools import grpc_message_to_json
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS
from .monitoring.MonitoringLoops import MonitoringLoops

ERROR_ENDPOINT    = 'Device({:s}): GetConfig retrieved malformed Endpoint({:s})'
ERROR_GET         = 'Device({:s}): Unable to Get resource(key={:s}); error({:s})'
ERROR_GET_INIT    = 'Device({:s}): Unable to Get Initial resource(key={:s}); error({:s})'
ERROR_SET         = 'Device({:s}): Unable to Set resource(key={:s}, value={:s}); error({:s})'
ERROR_DELETE      = 'Device({:s}): Unable to Delete resource(key={:s}, value={:s}); error({:s})'
ERROR_SAMPLETYPE  = 'Device({:s})/EndPoint({:s}): SampleType({:s}/{:s}) not supported'
ERROR_SUBSCRIBE   = 'Device({:s}): Unable to Subscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\
                    'error({:s})'
ERROR_MISSING_KPI = 'Device({:s}): Kpi({:s}) not found'
ERROR_UNSUBSCRIBE = 'Device({:s}): Unable to Unsubscribe subscription(key={:s}, duration={:s}, interval={:s}); '+\
                    'error({:s})'

def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]:
    connection_config_rules = dict()
    unexpected_config_rules = list()
    for config_rule in device_config.config_rules:
        is_action_set = (config_rule.action == ConfigActionEnum.CONFIGACTION_SET)
        is_custom_rule = (config_rule.WhichOneof('config_rule') == 'custom')
        if is_action_set and is_custom_rule and (config_rule.custom.resource_key.startswith('_connect/')):
            connect_attribute = config_rule.custom.resource_key.replace('_connect/', '')
            connection_config_rules[connect_attribute] = config_rule.custom.resource_value
        else:
            unexpected_config_rules.append(config_rule)

    if len(unexpected_config_rules) > 0:
        unexpected_config_rules = grpc_message_to_json(device_config)
        unexpected_config_rules = unexpected_config_rules['config_rules']
        unexpected_config_rules = list(filter(
            lambda cr: cr.get('custom', {})['resource_key'].replace('_connect/', '') not in connection_config_rules,
            unexpected_config_rules))
        str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
        raise InvalidArgumentException(
            'device.device_config.config_rules', str_unexpected_config_rules,
            extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
                            'with "_connect/" tag. Others should be configured after adding the device.')

    return connection_config_rules

def get_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]:
    connect_rules = dict()
    for config_rule in device_config.config_rules:
        if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
        if config_rule.WhichOneof('config_rule') != 'custom': continue
        if not config_rule.custom.resource_key.startswith('_connect/'): continue
        connect_attribute = config_rule.custom.resource_key.replace('_connect/', '')
        connect_rules[connect_attribute] = config_rule.custom.resource_value
    return connect_rules

def check_no_endpoints(device_endpoints) -> None:
    if len(device_endpoints) == 0: return
    unexpected_endpoints = []
    for device_endpoint in device_endpoints:
        unexpected_endpoints.append(grpc_message_to_json(device_endpoint))
    str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
    raise InvalidArgumentException(
        'device.device_endpoints', str_unexpected_endpoints,
        extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\
                        'interrogation of the physical device.')

def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
    device_uuid = device.device_id.device_uuid.uuid

    resources_to_get = [RESOURCE_ENDPOINTS]
    results_getconfig = driver.GetConfig(resources_to_get)

    errors : List[str] = list()
    for endpoint in results_getconfig:
        if len(endpoint) != 2:
            errors.append(ERROR_ENDPOINT.format(device_uuid, str(endpoint)))
            continue

        resource_key, resource_value = endpoint
        if isinstance(resource_value, Exception):
            errors.append(ERROR_GET.format(device_uuid, str(resource_key), str(resource_value)))
            continue

        endpoint_uuid = resource_value.get('uuid')

        device_endpoint = device.device_endpoints.add()
        device_endpoint.topology_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
        device_endpoint.topology_id.topology_uuid.uuid = DEFAULT_TOPOLOGY_NAME
        device_endpoint.endpoint_id.device_id.device_uuid.uuid = device_uuid
        device_endpoint.endpoint_id.endpoint_uuid.uuid = endpoint_uuid
        device_endpoint.endpoint_type = resource_value.get('type')

        sample_types : Dict[int, str] = resource_value.get('sample_types', {})
        for kpi_sample_type, monitor_resource_key in sample_types.items():
            device_endpoint.kpi_sample_types.append(kpi_sample_type)
            monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key)

    return errors

def populate_config_rules(device : Device, driver : _Driver) -> List[str]:
    device_uuid = device.device_id.device_uuid.uuid

    resources_to_get = ['ALL']
    results_getconfig = driver.GetConfig()

    errors : List[str] = list()
    for resource_key, resource_value in zip(resources_to_get, results_getconfig):
        if isinstance(resource_value, Exception):
            errors.append(ERROR_GET.format(device_uuid, str(resource_key), str(resource_value)))
            continue

        config_rule = device.device_config.config_rules.add()
        config_rule.action = ConfigActionEnum.CONFIGACTION_SET
        config_rule.custom.resource_key = resource_key
        config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)

    return errors

def populate_initial_config_rules(device_uuid : str, device_config : DeviceConfig, driver : _Driver) -> List[str]:
    results_getinitconfig = driver.GetInitialConfig()

    errors : List[str] = list()
    for resource_key, resource_value in results_getinitconfig:
        if isinstance(resource_value, Exception):
            errors.append(ERROR_GET_INIT.format(device_uuid, str(resource_key), str(resource_value)))
            continue

        config_rule = device_config.config_rules.add()
        config_rule.action = ConfigActionEnum.CONFIGACTION_SET
        config_rule.custom.resource_key = resource_key
        config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)

    return errors

def compute_rules_to_add_delete(
    device : Device, request : Device
) -> Tuple[List[Tuple[str, Any]], List[Tuple[str, Any]]]:
    # convert config rules from context into a dictionary
    # TODO: add support for non-custom config rules
    context_config_rules = {
        config_rule.custom.resource_key: config_rule.custom.resource_value
        for config_rule in device.device_config.config_rules
        if config_rule.WhichOneof('config_rule') == 'custom'
    }

    # convert config rules from request into a list
    # TODO: add support for non-custom config rules
    request_config_rules = [
        (config_rule.action, config_rule.custom.resource_key, config_rule.custom.resource_value)
        for config_rule in request.device_config.config_rules
        if config_rule.WhichOneof('config_rule') == 'custom'
    ]

    resources_to_set    : List[Tuple[str, Any]] = [] # key, value
    resources_to_delete : List[Tuple[str, Any]] = [] # key, value

    for action, key, value in request_config_rules:
        if action == ConfigActionEnum.CONFIGACTION_SET:
            if (key in context_config_rules) and (context_config_rules[key][0] == value): continue
            resources_to_set.append((key, value))
        elif action == ConfigActionEnum.CONFIGACTION_DELETE:
            if key not in context_config_rules: continue
            resources_to_delete.append((key, value))

    return resources_to_set, resources_to_delete

def configure_rules(device : Device, driver : _Driver, resources_to_set : List[Tuple[str, Any]]) -> List[str]:
    device_uuid = device.device_id.device_uuid.uuid

    results_setconfig = driver.SetConfig(resources_to_set)

    errors : List[str] = list()
    for (resource_key, resource_value), result in zip(resources_to_set, results_setconfig):
        if isinstance(result, Exception):
            errors.append(ERROR_SET.format(device_uuid, str(resource_key), str(resource_value), str(result)))
            continue
        # add to config of device
        config_rule = device.device_config.config_rules.add()
        config_rule.action = ConfigActionEnum.CONFIGACTION_SET
        config_rule.custom.resource_key = resource_key
        config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)

    return errors

def deconfigure_rules(device : Device, driver : _Driver, resources_to_delete : List[Tuple[str, Any]]) -> List[str]:
    device_uuid = device.device_id.device_uuid.uuid

    results_deleteconfig = driver.DeleteConfig(resources_to_delete)

    errors : List[str] = list()
    for (resource_key, resource_value), result in zip(resources_to_delete, results_deleteconfig):
        if isinstance(result, Exception):
            errors.append(ERROR_DELETE.format(device_uuid, str(resource_key), str(resource_value), str(result)))
            continue
        # remove from config of device
        config_rule = device.device_config.config_rules.add()
        config_rule.action = ConfigActionEnum.CONFIGACTION_SET
        config_rule.custom.resource_key = resource_key
        config_rule.custom.resource_value = json.dumps(resource_value, sort_keys=True)

    return errors

def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
    kpi_uuid = request.kpi_id.kpi_id.uuid
    device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
    endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid
    kpi_sample_type = request.kpi_descriptor.kpi_sample_type

    resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
    if resource_key is None:
        kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
        return [
            ERROR_SAMPLETYPE.format(
                str(device_uuid), str(endpoint_uuid), str(kpi_sample_type), str(kpi_sample_type_name)
            )
        ]

    sampling_duration = request.sampling_duration_s # seconds
    sampling_interval = request.sampling_interval_s # seconds

    resources_to_subscribe = [(resource_key, sampling_duration, sampling_interval)]
    results_subscribestate = driver.SubscribeState(resources_to_subscribe)

    errors : List[str] = list()
    for (resource_key, duration, interval), result in zip(resources_to_subscribe, results_subscribestate):
        if isinstance(result, Exception):
            errors.append(ERROR_SUBSCRIBE.format(
                str(device_uuid), str(resource_key), str(duration), str(interval), str(result)
            ))
            continue

    monitoring_loops.add_kpi(device_uuid, resource_key, kpi_uuid, sampling_duration, sampling_interval)
    monitoring_loops.add_device(device_uuid, driver)

    return errors

def unsubscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loops : MonitoringLoops) -> List[str]:
    kpi_uuid = request.kpi_id.kpi_id.uuid
    device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
    #endpoint_uuid = request.kpi_descriptor.endpoint_id.endpoint_uuid.uuid
    #kpi_sample_type = request.kpi_descriptor.kpi_sample_type

    # TODO: consider if further validation needs to be done (correct endpoint_uuid?, correct kpi_sample_type?)
    #resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type)
    #if resource_key is None:
    #    kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '')
    #    return [ERROR_SAMPLETYPE.format(device_uuid, endpoint_uuid, str(kpi_sample_type), str(kpi_sample_type_name))]

    kpi_details = monitoring_loops.get_kpi_by_uuid(kpi_uuid)
    if kpi_details is None:
        return [ERROR_MISSING_KPI.format(str(device_uuid), str(kpi_uuid))]

    device_uuid, resource_key, sampling_duration, sampling_interval = kpi_details

    resources_to_unsubscribe = [(resource_key, sampling_duration, sampling_interval)]
    results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)

    errors : List[str] = list()
    for (resource_key, duration, interval), result in zip(resources_to_unsubscribe, results_unsubscribestate):
        if isinstance(result, Exception):
            errors.append(ERROR_UNSUBSCRIBE.format(
                device_uuid, str(resource_key), str(duration), str(interval), str(result)))
            continue

    monitoring_loops.remove_kpi(kpi_uuid)
    #monitoring_loops.remove_device(device_uuid) # Do not remove; one monitoring_loop/device used by multiple requests

    return errors
+4 −1
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ from common.Settings import (
    wait_for_environment_variables)
from .DeviceService import DeviceService
from .driver_api.DriverFactory import DriverFactory
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.DriverInstanceCache import DriverInstanceCache, preload_drivers
from .drivers import DRIVERS

terminate = threading.Event()
@@ -58,6 +58,9 @@ def main():
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)

    # Initialize drivers with existing devices in context
    preload_drivers(driver_instance_cache)

    # Starting device service
    grpc_service = DeviceService(driver_instance_cache)
    grpc_service.start()
+0 −122
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools, logging, operator
from enum import Enum
from typing import Dict, List, Tuple, Union
from common.orm.Database import Database
from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object
from common.orm.backend.Tools import key_to_str
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.IntegerField import IntegerField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import ConfigActionEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
from .Tools import fast_hasher, grpc_to_enum, remove_dict_key

LOGGER = logging.getLogger(__name__)

class ORM_ConfigActionEnum(Enum):
    UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED
    SET       = ConfigActionEnum.CONFIGACTION_SET
    DELETE    = ConfigActionEnum.CONFIGACTION_DELETE

grpc_to_enum__config_action = functools.partial(
    grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum)

class ConfigModel(Model): # pylint: disable=abstract-method
    pk = PrimaryKeyField()

    def dump(self) -> List[Dict]:
        db_config_rule_pks = self.references(ConfigRuleModel)
        config_rules = [ConfigRuleModel(self.database, pk).dump(include_position=True) for pk,_ in db_config_rule_pks]
        config_rules = sorted(config_rules, key=operator.itemgetter('position'))
        return [remove_dict_key(config_rule, 'position') for config_rule in config_rules]

class ConfigRuleModel(Model): # pylint: disable=abstract-method
    pk = PrimaryKeyField()
    config_fk = ForeignKeyField(ConfigModel)
    position = IntegerField(min_value=0, required=True)
    action = EnumeratedField(ORM_ConfigActionEnum, required=True)
    key = StringField(required=True, allow_empty=False)
    value = StringField(required=False, allow_empty=True)

    def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
        result = {
            'action': self.action.value,
            'custom': {
                'resource_key': self.key,
                'resource_value': self.value,
            },
        }
        if include_position: result['position'] = self.position
        return result

def delete_all_config_rules(database : Database, db_parent_pk : str, config_name : str) -> None:
    str_config_key = key_to_str([db_parent_pk, config_name], separator=':')
    db_config : ConfigModel = get_object(database, ConfigModel, str_config_key, raise_if_not_found=False)
    if db_config is None: return
    db_config_rule_pks = db_config.references(ConfigRuleModel)
    for pk,_ in db_config_rule_pks: ConfigRuleModel(database, pk).delete()

def grpc_config_rules_to_raw(grpc_config_rules) -> List[Tuple[ORM_ConfigActionEnum, str, str]]:
    def translate(grpc_config_rule):
        action = grpc_to_enum__config_action(grpc_config_rule.action)
        config_rule_type = str(grpc_config_rule.WhichOneof('config_rule'))
        if config_rule_type != 'custom':
            raise NotImplementedError('ConfigRule of type {:s} is not implemented: {:s}'.format(
                config_rule_type, grpc_message_to_json_string(grpc_config_rule)))
        return action, grpc_config_rule.custom.resource_key, grpc_config_rule.custom.resource_value
    return [translate(grpc_config_rule) for grpc_config_rule in grpc_config_rules]

def get_config_rules(
    database : Database, db_parent_pk : str, config_name : str
    ) -> List[Tuple[ORM_ConfigActionEnum, str, str]]:

    str_config_key = key_to_str([db_parent_pk, config_name], separator=':')
    db_config = get_object(database, ConfigModel, str_config_key, raise_if_not_found=False)
    return [] if db_config is None else [
        # pylint: disable=no-member, protected-access
        (ORM_ConfigActionEnum._value2member_map_.get(config_rule['action']),
            config_rule['custom']['resource_key'], config_rule['custom']['resource_value'])
        for config_rule in db_config.dump()
        if 'custom' in config_rule
    ]

def update_config(
    database : Database, db_parent_pk : str, config_name : str,
    raw_config_rules : List[Tuple[ORM_ConfigActionEnum, str, str]]
) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:

    str_config_key = key_to_str([db_parent_pk, config_name], separator=':')
    result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key)
    db_config, created = result

    db_objects : List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)]

    for position,(action, resource_key, resource_value) in enumerate(raw_config_rules):
        str_rule_key_hash = fast_hasher(resource_key)
        str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':')
        result : Tuple[ConfigRuleModel, bool] = update_or_create_object(
            database, ConfigRuleModel, str_config_rule_key, {
                'config_fk': db_config, 'position': position, 'action': action, 'key': resource_key,
                'value': resource_value,
            })
        db_config_rule, updated = result
        db_objects.append((db_config_rule, updated))

    return db_objects
Loading