Commit 020f8cb5 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Progress on Device Driver API (intermediate backup):

- Finished preliminary implementation of Emulated Driver
- Emulated Driver uses APScheduler to emulate subscriptions
- Implemented basic unitary testing for Device Driver API
- Still pending to integrate Driver API within Device service
parent cd5c47e1
Loading
Loading
Loading
Loading
+1 −4
Original line number Original line Diff line number Diff line
from typing import List, Tuple
import grpc, logging
import grpc, logging
from prometheus_client import Counter, Histogram
from prometheus_client import Counter, Histogram
from common.Checkers import chk_options, chk_string
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
from common.database.api.Database import Database
from common.database.api.Database import Database
from common.database.api.context.topology.device.OperationalStatus import OperationalStatus, \
from common.database.api.context.topology.device.OperationalStatus import OperationalStatus
    operationalstatus_enum_values, to_operationalstatus_enum
from common.exceptions.ServiceException import ServiceException
from common.exceptions.ServiceException import ServiceException
from device.proto.context_pb2 import DeviceId, Device, Empty
from device.proto.context_pb2 import DeviceId, Device, Empty
from device.proto.device_pb2_grpc import DeviceServiceServicer
from device.proto.device_pb2_grpc import DeviceServiceServicer
+63 −38
Original line number Original line Diff line number Diff line
@@ -29,85 +29,110 @@ class _Driver:
        """
        """
        raise NotImplementedError()
        raise NotImplementedError()


    def GetConfig(self, resources : List[str]) -> List[Union[Any, None]]:
    def GetConfig(self, resource_keys : List[str]) -> List[Union[Any, None, Exception]]:
        """ Retrieve running configuration of entire device, or selected resources.
        """ Retrieve running configuration of entire device, or selected resource keys.
            Parameters:
            Parameters:
                resources : List[str]
                resource_keys : List[str]
                    List of keys pointing to the resources to be retrieved.
                    List of keys pointing to the resources to be retrieved.
            Returns:
            Returns:
                values : List[Union[Any, None]]
                values : List[Union[Any, None, Exception]]
                    List of values for resource keys requested. Values should be in the same order than resource keys
                    List of values for resource keys requested. Return values must be in the same order than resource
                    requested. If a resource is not found, None should be specified in the List for that resource.
                    keys requested. If a resource is found, the appropriate value type must be retrieved, if a resource
                    is not found, None must be retrieved in the List for that resource. In case of Exception processing
                    a resource, the Exception must be retrieved.
        """
        """
        raise NotImplementedError()
        raise NotImplementedError()


    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[bool]:
    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        """ Create/Update configuration for a list of resources.
        """ Create/Update configuration for a list of resources.
            Parameters:
            Parameters:
                resources : List[Tuple[str, Any]]
                resources : List[Tuple[str, Any]]
                    List of tuples, each containing a resource_key pointing the resource to be modified, and a
                    List of tuples, each containing a resource_key pointing the resource to be modified, and a
                    resource_value containing the new value to be set.
                    resource_value containing the new value to be set.
            Returns:
            Returns:
                results : List[bool]
                results : List[Union[bool, Exception]]
                    List of results for changes in resource keys requested. Each result is a boolean value indicating
                    List of results for resource key changes requested. Return values must be in the same order than
                    if the change succeeded. Results should be in the same order than resource keys requested.
                    resource keys requested. If a resource is properly set, True must be retrieved; otherwise, the
                    Exception that is raised during the processing must be retrieved.
        """
        """
        raise NotImplementedError()
        raise NotImplementedError()


    def DeleteConfig(self, resources : List[str]) -> List[bool]:
    def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]:
        """ Delete configuration for a list of resources.
        """ Delete configuration for a list of resource keys.
            Parameters:
            Parameters:
                resources : List[str]
                resource_keys : List[str]
                    List of keys pointing to the resources to be deleted.
                    List of keys pointing to the resources to be deleted.
            Returns:
            Returns:
                results : List[bool]
                results : List[bool]
                    List of results for delete resource keys requested. Each result is a boolean value indicating
                    List of results for resource key deletions requested. Return values must be in the same order than
                    if the delete succeeded. Results should be in the same order than resource keys requested.
                    resource keys requested. If a resource is properly deleted, True must be retrieved; otherwise, the
                    Exception that is raised during the processing must be retrieved.
        """
        """
        raise NotImplementedError()
        raise NotImplementedError()


    def SubscribeState(self, resources : List[Tuple[str, float]]) -> List[bool]:
    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        """ Subscribe to state information of entire device, or selected resources. Subscriptions are incremental.
        """ Subscribe to state information of entire device, or selected resources. Subscriptions are incremental.
            Driver should keep track of requested resources.
            Driver should keep track of requested resources.
            Parameters:
            Parameters:
                resources : List[Tuple[str, float]]
                subscriptions : List[Tuple[str, float, float]]
                    List of tuples, each containing a resource_key pointing the resource to be subscribed, and a
                    List of tuples, each containing a resource_key pointing the resource to be subscribed, a
                    sampling_rate in seconds defining the desired monitoring periodicity for that resource.
                    sampling_duration, and a sampling_interval (both in seconds with float representation) defining,
                    Note: sampling_rate values must be a strictly positive floats.
                    respectively, for how long monitoring should last, and the desired monitoring interval for the
                    resource specified.
            Returns:
            Returns:
                results : List[bool]
                results : List[bool]
                    List of results for subscriptions to resource keys requested. Each result is a boolean value
                    List of results for resource key subscriptions requested. Return values must be in the same order
                    indicating if the subscription succeeded. Results should be in the same order than resource keys
                    than resource keys requested. If a resource is properly subscribed, True must be retrieved;
                    requested.
                    otherwise, the Exception that is raised during the processing must be retrieved.
        """
        """
        raise NotImplementedError()
        raise NotImplementedError()


    def UnsubscribeState(self, resources : List[str]) -> List[bool]:
    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        """ Unsubscribe from state information of entire device, or selected resources. Subscriptions are incremental.
        """ Unsubscribe from state information of entire device, or selected resources. Subscriptions are incremental.
            Driver should keep track of requested resources.
            Driver should keep track of requested resources.
            Parameters:
            Parameters:
                resources : List[str]
                subscriptions : List[str]
                    List of resource_keys pointing the resource to be unsubscribed.
                    List of tuples, each containing a resource_key pointing the resource to be subscribed, a
                    sampling_duration, and a sampling_interval (both in seconds with float representation) defining,
                    respectively, for how long monitoring should last, and the desired monitoring interval for the
                    resource specified.
            Returns:
            Returns:
                results : List[bool]
                results : List[Union[bool, Exception]]
                    List of results for unsubscriptions from resource keys requested. Each result is a boolean value
                    List of results for resource key unsubscriptions requested. Return values must be in the same order
                    indicating if the unsubscription succeeded. Results should be in the same order than resource keys
                    than resource keys requested. If a resource is properly unsubscribed, True must be retrieved;
                    requested.
                    otherwise, the Exception that is raised during the processing must be retrieved.
        """
        """
        raise NotImplementedError()
        raise NotImplementedError()


    def GetState(self) -> Iterator[Tuple[str, Any]]:
    def GetState(self, blocking=False) -> Iterator[Tuple[float, str, Any]]:
        """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be
        """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be
            called once and will block until values are available. When values are available, it should yield each of
            called once and will block until values are available. When values are available, it should yield each of
            them and block again until new values are available. When the driver is destroyed, GetState() can return
            them and block again until new values are available. When the driver is destroyed, GetState() can return
            instead of yield to terminate the loop.
            instead of yield to terminate the loop.
            Example:
            Examples:
                for resource_key,resource_value in my_driver.GetState():
                # keep looping waiting for extra samples (generator loop)
                    process(resource_key,resource_value)
                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True):
                    process(timestamp, resource_key, resource_value)

                # just retrieve accumulated samples
                samples = my_driver.GetState(blocking=False)
                # or (as classical loop)
                for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False):
                    process(timestamp, resource_key, resource_value)
            Parameters:
                blocking : bool
                    Select the driver behaviour. In both cases, the driver will first retrieve the samples accumulated
                    and available in the internal queue. Then, if blocking, the driver does not terminate the loop and
                    waits for additional samples to come, thus behaving as a generator. If non-blocking, the driver
                    terminates the loop and returns. Non-blocking behaviour can be used for periodically polling the
                    driver, while blocking can be used when a separate thread is in charge of collecting the samples
                    produced by the driver.
            Returns:
            Returns:
                results : Iterator[Tuple[str, Any]]
                results : Iterator[Tuple[float, str, Any]]
                    List of tuples with state samples, each containing a resource_key and a resource_value. Only
                    Sequences of state sample. Each State sample contains a float Unix-like timestamps of the samples in
                    resources with an active subscription must be retrieved. Periodicity of the samples is specified
                    seconds with up to microsecond resolution, the resource_key of the sample, and its resource_value.
                    when creating the subscription using method SubscribeState(). Order of yielded values is arbitrary.
                    Only resources with an active subscription must be retrieved. Interval and duration of the sampling
                    process are specified when creating the subscription using method SubscribeState(). Order of values
                    yielded is arbitrary.
        """
        """
        raise NotImplementedError()
        raise NotImplementedError()
+10 −5
Original line number Original line Diff line number Diff line
import anytree
import anytree
from typing import Any, List, Optional

from anytree.render import Row


class TreeNode(anytree.node.Node):
class TreeNode(anytree.node.Node):
    def __init__(self, name, parent=None, children=None, **kwargs) -> None:
    def __init__(self, name, parent=None, children=None, **kwargs) -> None:
        super().__init__(name, parent=parent, children=children, **kwargs)
        super().__init__(name, parent=parent, children=children, **kwargs)
        self.value : Optional[Any] = None


    def get_full_path(self):
    def get_full_path(self):
        return self.separator.join([''] + [str(node.name) for node in self.path])
        return self.separator.join([''] + [str(node.name) for node in self.path])
@@ -27,7 +31,7 @@ class RawStyle(anytree.render.AbstractStyle):
        """
        """
        super(RawStyle, self).__init__(u'', u'', u'')
        super(RawStyle, self).__init__(u'', u'', u'')


def get_subnode(resolver : anytree.Resolver, root, path, default=None):
def get_subnode(resolver : anytree.Resolver, root : TreeNode, path : List[str], default : Optional[Any] = None):
    node = root
    node = root
    for path_item in path:
    for path_item in path:
        try:
        try:
@@ -36,22 +40,23 @@ def get_subnode(resolver : anytree.Resolver, root, path, default=None):
            return default
            return default
    return node
    return node


def set_subnode_value(resolver : anytree.Resolver, root, path, value):
def set_subnode_value(resolver : anytree.Resolver, root : TreeNode, path : List[str], value : Any):
    node = root
    node = root
    for path_item in path:
    for path_item in path:
        try:
        try:
            node = resolver.get(node, path_item)
            node = resolver.get(node, path_item)
        except anytree.ChildResolverError:
        except anytree.ChildResolverError:
            node = TreeNode(path_item, parent=node)
            node = TreeNode(path_item, parent=node)
        setattr(node, 'value', value)
    node.value = value


def dump_subtree(root : TreeNode):
def dump_subtree(root : TreeNode):
    if not isinstance(root, TreeNode): raise Exception('root must be a TreeNode')
    if not isinstance(root, TreeNode): raise Exception('root must be a TreeNode')
    results = []
    results = []
    for row in anytree.RenderTree(root, style=RawStyle()):
    for row in anytree.RenderTree(root, style=RawStyle()):
        path = row.node.get_full_path()[2:] # get full path except the heading root placeholder "/."
        node : TreeNode = row.node
        path = node.get_full_path()[2:] # get full path except the heading root placeholder "/."
        if len(path) == 0: continue
        if len(path) == 0: continue
        value = getattr(row.node, 'value', None)
        value = node.value
        if value is None: continue
        if value is None: continue
        results.append((path, value))
        results.append((path, value))
    return results
    return results
+0 −72
Original line number Original line Diff line number Diff line
import asyncio, logging
from typing import Dict

LOGGER = logging.getLogger(__name__)

class EventSequence:
    # all values expressed as float in seconds

    def __init__(self, name, event_rate, start_delay=0, duration=0, num_repeats=0) -> None:
        self.name        = name        # name of this event sequence
        self.start_delay = start_delay # seconds to wait before first event; 0 means trigger first event immediately
        self.duration    = duration    # seconds the event sequence should elapse, includes wait times, 0 means infinity
        self.num_repeats = num_repeats # number of times the event should be triggered, 0 means infinity
        self.event_rate  = event_rate  # wait time between events, must be positive float

        # Internal variables:
        self.__next_delay       = self.start_delay  # delay until next event
        self.__termination_time = None              # termination time, if duration > 0, else None and runs infinity
        self.__terminate        = asyncio.Event()   # do not execute more iterations when terminate is set

    def schedule(self):
        loop = asyncio.get_event_loop()
        current_time = loop.time()

        if (self.num_repeats == 0) and (self.duration > 0):
            self.__termination_time = current_time + self.duration

        if self.__termination_time and (current_time > self.__termination_time): return

        LOGGER.info('Scheduling {} for #{} time to run after {} seconds...'.format(
            self.name, self.num_repeats, self.__next_delay))
        loop.call_later(self.__next_delay, self.run)
        self.__next_delay = self.event_rate

    def terminate(self): self.__terminate.set()
    def terminate_is_set(self): return self.__terminate.is_set

    def run(self):
        if self.terminate_is_set(): return
        LOGGER.info('Running {} for #{} time...'.format(self.name, self.num_repeats))
        self.schedule()
        self.num_repeats += 1

class DiscreteEventSimulator:
    def __init__(self) -> None:
        self.__eventsequences : Dict[str, EventSequence] = {}

    def add_event_sequence(self, event_sequence : EventSequence):
        if event_sequence.name in self.__eventsequences: return
        self.__eventsequences[event_sequence.name] = event_sequence
        event_sequence.schedule()
        return event_sequence

    def remove_event_sequence(self, event_sequence : EventSequence):
        if event_sequence.name not in self.__eventsequences: return
        event_sequence = self.__eventsequences.pop(event_sequence.name)
        event_sequence.terminate()

async def terminate(des : DiscreteEventSimulator, es: EventSequence):
    des.remove_event_sequence(es)

async def async_main():
    des = DiscreteEventSimulator()
    es1 = des.add_event_sequence(EventSequence('ES1-10s', 1.0, start_delay=3.0, duration=10.0))
    es2 = des.add_event_sequence(EventSequence('ES2-inf', 1.5, start_delay=3.0))
    es3 = des.add_event_sequence(EventSequence('ES3-10r', 2.0, num_repeats=5))
    loop = asyncio.get_event_loop()
    loop.call_later(60.0, terminate, des, es3)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main())
+99 −16
Original line number Original line Diff line number Diff line
import anytree, logging, threading
import anytree, logging, pytz, queue, random, threading
from datetime import datetime, timedelta
from typing import Any, Iterator, List, Tuple, Union
from typing import Any, Iterator, List, Tuple, Union
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.job import Job
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from common.Checkers import chk_float, chk_length, chk_string, chk_type
from common.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from device.service.driver_api._Driver import _Driver
from device.service.drivers.emulated.tools.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
from .AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value


LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)


def sample(resource_key : str, out_samples : queue.Queue):
    out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random()))

class EmulatedDriver(_Driver):
class EmulatedDriver(_Driver):
    def __init__(self, address : str, port : int, **kwargs) -> None:
    def __init__(self, address : str, port : int, **kwargs) -> None:
        self.__lock = threading.Lock()
        self.__lock = threading.Lock()
        self.__root = TreeNode('.')
        self.__root = TreeNode('.')
        self.__subscriptions = {} # resource_key => (duration, sampling_rate, start_timestamp, end_timestamp)
        self.__terminate = threading.Event()
        self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
        self.__scheduler.configure(
            jobstores = {'default': MemoryJobStore()},
            executors = {'default': ThreadPoolExecutor(max_workers=1)},
            job_defaults = {'coalesce': False, 'max_instances': 3},
            timezone=pytz.utc)
        self.__out_samples = queue.Queue()


    def Connect(self) -> bool:
    def Connect(self) -> bool:
        # Connect triggers activation of sampling events that will be scheduled based on subscriptions
        self.__scheduler.start()
        return True
        return True


    def Disconnect(self) -> bool:
    def Disconnect(self) -> bool:
        # Trigger termination of loops and processes
        self.__terminate.set()
        # Disconnect triggers deactivation of sampling events
        self.__scheduler.shutdown()
        return True
        return True


    def GetConfig(self, resource_keys : List[str] = []) -> List[Union[Any, None, Exception]]:
    def GetConfig(self, resource_keys : List[str] = []) -> List[Union[Any, None, Exception]]:
@@ -33,6 +54,7 @@ class EmulatedDriver(_Driver):
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    results.append(e) # if validation fails, store the exception
                    continue
                    continue

                resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
                resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
                # if not found, resource_node is None
                # if not found, resource_node is None
                results.append(None if resource_node is None else dump_subtree(resource_node))
                results.append(None if resource_node is None else dump_subtree(resource_node))
@@ -55,6 +77,7 @@ class EmulatedDriver(_Driver):
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    results.append(e) # if validation fails, store the exception
                    continue
                    continue

                set_subnode_value(resolver, self.__root, resource_path, resource_value)
                set_subnode_value(resolver, self.__root, resource_path, resource_value)
                results.append(True)
                results.append(True)
        return results
        return results
@@ -74,11 +97,13 @@ class EmulatedDriver(_Driver):
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    results.append(e) # if validation fails, store the exception
                    continue
                    continue

                resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
                resource_node = get_subnode(resolver, self.__root, resource_path, default=None)
                # if not found, resource_node is None
                # if not found, resource_node is None
                if resource_node is None:
                if resource_node is None:
                    results.append(False)
                    results.append(False)
                else:
                    continue

                parent = resource_node.parent
                parent = resource_node.parent
                children = list(parent.children)
                children = list(parent.children)
                children.remove(resource_node)
                children.remove(resource_node)
@@ -97,22 +122,80 @@ class EmulatedDriver(_Driver):
                try:
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, allowed_lengths=3)
                    chk_length(str_resource_name, resource, allowed_lengths=3)
                    resource_key,sampling_duration,sampling_rate = resource
                    resource_key,sampling_duration,sampling_interval = resource
                    chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False)
                    chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                    resource_path = resource_key.split('/')
                    chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0)
                    chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0)
                    chk_float(str_resource_name + '.sampling_rate', sampling_rate, min_value=0)
                    chk_float(str_resource_name + '.sampling_interval', sampling_interval, min_value=0)
                except Exception as e:
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    results.append(e) # if validation fails, store the exception
                    continue
                    continue
                set_subnode_value(resolver, self.__root, resource_path, resource_value)

                start_date,end_date = None,None
                if sampling_duration <= 1.e-12:
                    start_date = datetime.utcnow()
                    end_date = start_date + timedelta(seconds=sampling_duration)

                job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
                job = self.__scheduler.add_job(
                    sample, args=(resource_key, self.__out_samples), kwargs={},
                    id=job_id, trigger='interval', seconds=sampling_interval,
                    start_date=start_date, end_date=end_date, timezone=pytz.utc)

                subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
                set_subnode_value(resolver, self.__root, subscription_path, job)
                results.append(True)
                results.append(True)
        return results
        return results


    def UnsubscribeState(self, resources : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource in enumerate(resources):
                str_resource_name = 'resources[#{:d}]'.format(i)
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, allowed_lengths=3)
                    resource_key,sampling_duration,sampling_interval = resource
                    chk_string(str_resource_name + '.resource_key', resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                    chk_float(str_resource_name + '.sampling_duration', sampling_duration, min_value=0)
                    chk_float(str_resource_name + '.sampling_interval', sampling_interval, min_value=0)
                except Exception as e:
                    LOGGER.exception('Exception validating {}: {}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue


    def UnsubscribeState(self, resources : List[str]) -> List[bool]:
                subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
        raise NotImplementedError()
                subscription_node = get_subnode(resolver, self.__root, subscription_path)
                
                
    def GetState(self) -> Iterator[Tuple[str, Any]]:
                # if not found, resource_node is None
        raise NotImplementedError()
                if subscription_node is None:
                    results.append(False)
                    continue

                job : Job = getattr(subscription_node, 'value', None)
                if job is None or not isinstance(job, Job):
                    raise Exception('Malformed subscription node or wrong resource key: {}'.format(str(resource)))
                job.remove()

                parent = subscription_node.parent
                children = list(parent.children)
                children.remove(subscription_node)
                parent.children = tuple(children)

                results.append(True)
        return results

    def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
        while not self.__terminate.is_set():
            try:
                sample = self.__out_samples.get(block=blocking, timeout=0.1)
            except queue.Empty:
                if blocking: continue
                return
            if sample is None: continue
            yield sample
Loading