Skip to content
Snippets Groups Projects
EmulatedDriver.py 13.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • import anytree, logging, pytz, queue, random, threading
    from datetime import datetime, timedelta
    
    from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
    
    from apscheduler.executors.pool import ThreadPoolExecutor
    from apscheduler.job import Job
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.schedulers.background import BackgroundScheduler
    
    from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
    
    from device.service.database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type
    from device.service.driver_api._Driver import (
        RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES,
        _Driver)
    
    from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
    
    
    LOGGER = logging.getLogger(__name__)
    
    
    SPECIAL_RESOURCE_MAPPINGS = {
        RESOURCE_ENDPOINTS        : '/endpoints',
        RESOURCE_INTERFACES       : '/interfaces',
        RESOURCE_NETWORK_INSTANCES: '/net-instances',
    }
    
    def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Tuple[str, Any]:
        endpoint_uuid = endpoint_data.get('uuid')
        if endpoint_uuid is None: return None
        endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS)
        endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid)
    
        endpoint_type = endpoint_data.get('type')
        if endpoint_type is None: return None
    
        endpoint_sample_types = endpoint_data.get('sample_types')
        if endpoint_sample_types is None: return None
        sample_types = {}
        for endpoint_sample_type in endpoint_sample_types:
            try:
                kpi_sample_type : ORM_KpiSampleType = grpc_to_enum__kpi_sample_type(endpoint_sample_type)
            except: # pylint: disable=bare-except
                LOGGER.warning('Unknown EndpointSampleType({:s}) for Endpoint({:s}). Ignoring and continuing...'.format(
                    str(endpoint_sample_type), str(endpoint_data)))
                continue
            metric_name = kpi_sample_type.name.lower()
            monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name)
            sample_types[endpoint_sample_type] = monitoring_resource_key
    
        endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type, 'sample_types': sample_types}
        return endpoint_resource_key, endpoint_resource_value
    
    
    def do_sampling(resource_key : str, out_samples : queue.Queue):
    
        out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random()))
    
    
    class EmulatedDriver(_Driver):
    
        def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called
    
            self.__lock = threading.Lock()
    
            self.__initial = TreeNode('.')
            self.__running = TreeNode('.')
    
    
            endpoints = settings.get('endpoints', [])
            endpoint_resources = []
            for endpoint in endpoints:
                endpoint_resource = compose_resource_endpoint(endpoint)
                if endpoint_resource is None: continue
                endpoint_resources.append(endpoint_resource)
            self.SetConfig(endpoint_resources)
    
    
            self.__started = threading.Event()
    
            self.__terminate = threading.Event()
            self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
            self.__scheduler.configure(
                jobstores = {'default': MemoryJobStore()},
                executors = {'default': ThreadPoolExecutor(max_workers=1)},
                job_defaults = {'coalesce': False, 'max_instances': 3},
                timezone=pytz.utc)
            self.__out_samples = queue.Queue()
    
            # If started, assume it is already connected
            if self.__started.is_set(): return True
    
    
            # Connect triggers activation of sampling events that will be scheduled based on subscriptions
            self.__scheduler.start()
    
    
            # Indicate the driver is now connected to the device
            self.__started.set()
    
            return True
    
        def Disconnect(self) -> bool:
    
            # Trigger termination of loops and processes
            self.__terminate.set()
    
    
            # If not started, assume it is already disconnected
            if not self.__started.is_set(): return True
    
    
            # Disconnect triggers deactivation of sampling events
            self.__scheduler.shutdown()
    
        def GetInitialConfig(self) -> List[Tuple[str, Any]]:
            with self.__lock:
                return dump_subtree(self.__initial)
    
        def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
    
            chk_type('resources', resource_keys, list)
            with self.__lock:
    
                if len(resource_keys) == 0: return dump_subtree(self.__running)
    
                results = []
                resolver = anytree.Resolver(pathattr='name')
                for i,resource_key in enumerate(resource_keys):
                    str_resource_name = 'resource_key[#{:d}]'.format(i)
                    try:
                        chk_string(str_resource_name, resource_key, allow_empty=False)
    
                        resource_key = SPECIAL_RESOURCE_MAPPINGS.get(resource_key, resource_key)
    
                        resource_path = resource_key.split('/')
    
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
                        results.append((resource_key, e)) # if validation fails, store the exception
    
                    resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
    
                    # if not found, resource_node is None
    
                    if resource_node is None: continue
                    results.extend(dump_subtree(resource_node))
    
                return results
    
        def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
            chk_type('resources', resources, list)
            if len(resources) == 0: return []
            results = []
            resolver = anytree.Resolver(pathattr='name')
            with self.__lock:
                for i,resource in enumerate(resources):
                    str_resource_name = 'resources[#{:d}]'.format(i)
                    try:
                        chk_type(str_resource_name, resource, (list, tuple))
    
                        chk_length(str_resource_name, resource, min_length=2, max_length=2)
    
                        resource_key,resource_value = resource
    
                        chk_string(str_resource_name, resource_key, allow_empty=False)
    
                        resource_path = resource_key.split('/')
    
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
    
                        results.append(e) # if validation fails, store the exception
                        continue
    
                    try:
                        resource_value = json.loads(resource_value)
                    except: # pylint: disable=broad-except
                        pass
    
    
                    set_subnode_value(resolver, self.__running, resource_path, resource_value)
    
        def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
            chk_type('resources', resources, list)
            if len(resources) == 0: return []
    
            results = []
            resolver = anytree.Resolver(pathattr='name')
            with self.__lock:
    
                for i,resource in enumerate(resources):
                    str_resource_name = 'resources[#{:d}]'.format(i)
    
                        chk_type(str_resource_name, resource, (list, tuple))
                        chk_length(str_resource_name, resource, min_length=2, max_length=2)
                        resource_key,_ = resource
    
                        chk_string(str_resource_name, resource_key, allow_empty=False)
                        resource_path = resource_key.split('/')
    
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
    
                        results.append(e) # if validation fails, store the exception
                        continue
    
                    resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
    
                    # if not found, resource_node is None
                    if resource_node is None:
                        results.append(False)
    
                        continue
    
                    parent = resource_node.parent
                    children = list(parent.children)
                    children.remove(resource_node)
                    parent.children = tuple(children)
                    results.append(True)
    
        def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
            chk_type('subscriptions', subscriptions, list)
            if len(subscriptions) == 0: return []
    
            results = []
            resolver = anytree.Resolver(pathattr='name')
            with self.__lock:
    
                for i,subscription in enumerate(subscriptions):
                    str_subscription_name = 'subscriptions[#{:d}]'.format(i)
    
                        chk_type(str_subscription_name, subscription, (list, tuple))
                        chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
                        resource_key,sampling_duration,sampling_interval = subscription
                        chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
    
                        resource_path = resource_key.split('/')
    
                        chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
                        chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
    
                        results.append(e) # if validation fails, store the exception
                        continue
    
    
                    start_date,end_date = None,None
                    if sampling_duration <= 1.e-12:
                        start_date = datetime.utcnow()
                        end_date = start_date + timedelta(seconds=sampling_duration)
    
                    job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
                    job = self.__scheduler.add_job(
    
                        do_sampling, args=(resource_key, self.__out_samples), kwargs={},
    
                        id=job_id, trigger='interval', seconds=sampling_interval,
                        start_date=start_date, end_date=end_date, timezone=pytz.utc)
    
                    subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
    
                    set_subnode_value(resolver, self.__running, subscription_path, job)
    
        def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
            chk_type('subscriptions', subscriptions, list)
            if len(subscriptions) == 0: return []
    
            results = []
            resolver = anytree.Resolver(pathattr='name')
            with self.__lock:
    
                for i,resource in enumerate(subscriptions):
                    str_subscription_name = 'resources[#{:d}]'.format(i)
    
                        chk_type(str_subscription_name, resource, (list, tuple))
                        chk_length(str_subscription_name, resource, min_length=3, max_length=3)
    
                        resource_key,sampling_duration,sampling_interval = resource
    
                        chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
    
                        resource_path = resource_key.split('/')
    
                        chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
                        chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
    
                        results.append(e) # if validation fails, store the exception
                        continue
    
                    subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
    
                    subscription_node = get_subnode(resolver, self.__running, subscription_path)
    
    
                    # if not found, resource_node is None
                    if subscription_node is None:
                        results.append(False)
                        continue
    
                    job : Job = getattr(subscription_node, 'value', None)
                    if job is None or not isinstance(job, Job):
    
                        raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource)))
    
                    job.remove()
    
                    parent = subscription_node.parent
                    children = list(parent.children)
                    children.remove(subscription_node)
                    parent.children = tuple(children)
    
                    results.append(True)
            return results
    
        def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]:
            while not self.__terminate.is_set():
                try:
                    sample = self.__out_samples.get(block=blocking, timeout=0.1)
                except queue.Empty:
                    if blocking: continue
                    return
                if sample is None: continue
                yield sample