Commit cd5c47e1 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Initial implementation of EmulatorDriver (backup commit)

- Implemented GetConfig/SetConfig/DeleteConfig
- Implemented unit tests for GetConfig/SetConfig/DeleteConfig
- Implementing simple DiscriteEventSimulator to emulate monitoring
- Remains to implement SubscribeState/UnsubscribeState/GetState
- Remains to implement unit tests for SubscribeState/UnsubscribeState/GetState
parent eda42964
Loading
Loading
Loading
Loading
+54 −7
Original line number Diff line number Diff line
from typing import Any, Set, Union
from typing import Any, List, Set, Sized, Tuple, Union

def chk_none(name : str, value : Any) -> Any:
    if value is not None:
@@ -18,13 +18,60 @@ def chk_type(name : str, value : Any, type_or_types : Union[type, Set[type]] = s
        raise AttributeError(msg.format(str(name), str(value), type(value).__name__, str(type_or_types)))
    return value

def chk_string(name, value, allow_empty=False) -> str:
    chk_not_none(name, value)
def chk_length(
    name : str, value : Sized, allow_empty : bool = False,
    min_length : Union[int, None] = None, max_length : Union[int, None] = None,
    allowed_lengths : Union[None, int, Set[int], List[int], Tuple[int]] = None) -> Any:

    length = len(chk_type(name, value, Sized))

    allow_empty = chk_type('allow_empty for {}'.format(name), allow_empty, bool)
    if not allow_empty and length == 0:
        msg = '{}({}) is out of range: allow_empty({}) min_length({}) max_length({}) allowed_lengths({}).'
        raise AttributeError(msg.format(
            str(name), str(value), str(allow_empty), str(min_length), str(max_length), str(allowed_lengths)))


    if min_length is not None:
        min_length = chk_type('min_length for {}'.format(name), min_length, int)
        if length < min_length:
            msg = '{}({}) is out of range: allow_empty({}) min_length({}) max_length({}) allowed_lengths({}).'
            raise AttributeError(msg.format(
                str(name), str(value), str(allow_empty), str(min_length), str(max_length), str(allowed_lengths)))

    if max_length is not None:
        max_length = chk_type('max_length for {}'.format(name), max_length, int)
        if length > max_length:
            msg = '{}({}) is out of range: allow_empty({}) min_value({}) max_value({}) allowed_lengths({}).'
            raise AttributeError(msg.format(
                str(name), str(value), str(allow_empty), str(max_length), str(max_length), str(allowed_lengths)))

    if allowed_lengths is not None:
        chk_type('allowed_lengths for {}'.format(name), allowed_lengths, (int, set, list, tuple))
        if isinstance(allowed_lengths, int):
            fixed_length = allowed_lengths
            if length != fixed_length:
                msg = '{}({}) is out of range: allow_empty({}) min_value({}) max_value({}) allowed_lengths({}).'
                raise AttributeError(msg.format(
                    str(name), str(value), str(allow_empty), str(max_length), str(max_length), str(allowed_lengths)))
        else:
            for i in allowed_lengths: chk_type('allowed_lengths[#{}] for {}'.format(i, name), i, int)
            if length not in allowed_lengths:
                msg = '{}({}) is out of range: allow_empty({}) min_value({}) max_value({}) allowed_lengths({}).'
                raise AttributeError(msg.format(
                    str(name), str(value), str(allow_empty), str(max_length), str(max_length), str(allowed_lengths)))

    return value

def chk_string(
    name : str, value : Any, allow_empty : bool = False,
    min_length : Union[int, None] = None, max_length : Union[int, None] = None,
    allowed_lengths : Union[None, int, Set[int], List[int], Tuple[int]] = None) -> str:

    chk_type(name, value, str)
    if allow_empty: return value
    if len(value) == 0:
        msg = '{}({}) string is empty.'
        raise AttributeError(msg.format(str(name), str(value)))
    chk_length(
        name, value, allow_empty=allow_empty, min_length=min_length, max_length=max_length,
        allowed_lengths=allowed_lengths)
    return value

def chk_float(name, value, type_or_types=(int, float), min_value=None, max_value=None) -> float:
+2 −1
Original line number Diff line number Diff line
anytree
fastcache
grpcio-health-checking
grpcio
prometheus-client
pytest
pytest-benchmark
redis
+3 −3
Original line number Diff line number Diff line
@@ -29,13 +29,13 @@ class _Driver:
        """
        raise NotImplementedError()

    def GetConfig(self, resource : List[str]) -> List[Union[str, None]]:
    def GetConfig(self, resources : List[str]) -> List[Union[Any, None]]:
        """ Retrieve running configuration of entire device, or selected resources.
            Parameters:
                resources : List[str]
                    List of keys pointing to the resources to be retrieved.
            Returns:
                values : List[Union[str, None]]
                values : List[Union[Any, None]]
                    List of values for resource keys requested. Values should be in the same order than resource keys
                    requested. If a resource is not found, None should be specified in the List for that resource.
        """
@@ -54,7 +54,7 @@ class _Driver:
        """
        raise NotImplementedError()

    def DeleteConfig(self, resource : List[str]) -> List[bool]:
    def DeleteConfig(self, resources : List[str]) -> List[bool]:
        """ Delete configuration for a list of resources.
            Parameters:
                resources : List[str]
+72 −0
Original line number Diff line number Diff line
import asyncio, logging
from typing import Dict

LOGGER = logging.getLogger(__name__)

class EventSequence:
    # all values expressed as float in seconds

    def __init__(self, name, event_rate, start_delay=0, duration=0, num_repeats=0) -> None:
        self.name        = name        # name of this event sequence
        self.start_delay = start_delay # seconds to wait before first event; 0 means trigger first event immediately
        self.duration    = duration    # seconds the event sequence should elapse, includes wait times, 0 means infinity
        self.num_repeats = num_repeats # number of times the event should be triggered, 0 means infinity
        self.event_rate  = event_rate  # wait time between events, must be positive float

        # Internal variables:
        self.__next_delay       = self.start_delay  # delay until next event
        self.__termination_time = None              # termination time, if duration > 0, else None and runs infinity
        self.__terminate        = asyncio.Event()   # do not execute more iterations when terminate is set

    def schedule(self):
        loop = asyncio.get_event_loop()
        current_time = loop.time()

        if (self.num_repeats == 0) and (self.duration > 0):
            self.__termination_time = current_time + self.duration

        if self.__termination_time and (current_time > self.__termination_time): return

        LOGGER.info('Scheduling {} for #{} time to run after {} seconds...'.format(
            self.name, self.num_repeats, self.__next_delay))
        loop.call_later(self.__next_delay, self.run)
        self.__next_delay = self.event_rate

    def terminate(self): self.__terminate.set()
    def terminate_is_set(self): return self.__terminate.is_set

    def run(self):
        if self.terminate_is_set(): return
        LOGGER.info('Running {} for #{} time...'.format(self.name, self.num_repeats))
        self.schedule()
        self.num_repeats += 1

class DiscreteEventSimulator:
    def __init__(self) -> None:
        self.__eventsequences : Dict[str, EventSequence] = {}

    def add_event_sequence(self, event_sequence : EventSequence):
        if event_sequence.name in self.__eventsequences: return
        self.__eventsequences[event_sequence.name] = event_sequence
        event_sequence.schedule()
        return event_sequence

    def remove_event_sequence(self, event_sequence : EventSequence):
        if event_sequence.name not in self.__eventsequences: return
        event_sequence = self.__eventsequences.pop(event_sequence.name)
        event_sequence.terminate()

async def terminate(des : DiscreteEventSimulator, es: EventSequence):
    des.remove_event_sequence(es)

async def async_main():
    des = DiscreteEventSimulator()
    es1 = des.add_event_sequence(EventSequence('ES1-10s', 1.0, start_delay=3.0, duration=10.0))
    es2 = des.add_event_sequence(EventSequence('ES2-inf', 1.5, start_delay=3.0))
    es3 = des.add_event_sequence(EventSequence('ES3-10r', 2.0, num_repeats=5))
    loop = asyncio.get_event_loop()
    loop.call_later(60.0, terminate, des, es3)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main())
+118 −0
Original line number Diff line number Diff line
import anytree, logging, threading
from typing import Any, Iterator, List, Tuple, Union
from common.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from device.service.drivers.emulated.tools.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value

LOGGER = logging.getLogger(__name__)

class EmulatedDriver(_Driver):
    def __init__(self, address : str, port : int, **kwargs) -> None:
        self.__lock = threading.Lock()
        self.__root = TreeNode('.')
        self.__subscriptions = {} # resource_key => (duration, sampling_rate, start_timestamp, end_timestamp)

    def Connect(self) -> bool:
        return True

    def Disconnect(self) -> bool:
        return True

    def GetConfig(self, resource_keys : List[str] = []) -> List[Union[Any, None, Exception]]:
        chk_type('resources', resource_keys, list)
        with self.__lock:
            if len(resource_keys) == 0: return dump_subtree(self.__root)
            results = []
            resolver = anytree.Resolver(pathattr='name')
            for i,resource_key in enumerate(resource_keys):
                str_resource_name = 'resource_key[#{:d}]'.format(i)
                try:
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue
                resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
                # if not found, resource_node is None
                results.append(None if resource_node is None else dump_subtree(resource_node))
            return results

    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource in enumerate(resources):
                str_resource_name = 'resources[#{:d}]'.format(i)
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, allowed_lengths=2)
                    resource_key,resource_value = resource
                    resource_path = resource_key.split('/')
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue
                set_subnode_value(resolver, self.__root, resource_path, resource_value)
                results.append(True)
        return results

    def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]:
        chk_type('resources', resource_keys, list)
        if len(resource_keys) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource_key in enumerate(resource_keys):
                str_resource_name = 'resource_key[#{:d}]'.format(i)
                try:
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue
                resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
                # if not found, resource_node is None
                if resource_node is None:
                    results.append(False)
                else:
                    parent = resource_node.parent
                    children = list(parent.children)
                    children.remove(resource_node)
                    parent.children = tuple(children)
                    results.append(True)
        return results

    def SubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource in enumerate(resources):
                str_resource_name = 'resources[#{:d}]'.format(i)
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, allowed_lengths=3)
                    resource_key,sampling_duration,sampling_rate = resource
                    chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                    chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0)
                    chk_float(str_resource_name + '.sampling_rate', sampling_rate, min_value=0)
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue
                set_subnode_value(resolver, self.__root, resource_path, resource_value)
                results.append(True)
        return results


    def UnsubscribeState(self, resources : List[str]) -> List[bool]:
        raise NotImplementedError()

    def GetState(self) -> Iterator[Tuple[str, Any]]:
        raise NotImplementedError()
Loading