import anytree, json, logging, math, pytz, queue, random, re, threading from datetime import datetime, timedelta from typing import Any, Dict, Iterator, List, Optional, Tuple, Union from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.job import Job from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.background import BackgroundScheduler from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type from device.service.database.KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type from device.service.driver_api._Driver import ( RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, _Driver) from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value LOGGER = logging.getLogger(__name__) SPECIAL_RESOURCE_MAPPINGS = { RESOURCE_ENDPOINTS : '/endpoints', RESOURCE_INTERFACES : '/interfaces', RESOURCE_NETWORK_INSTANCES: '/net-instances', } def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Tuple[str, Any]: endpoint_uuid = endpoint_data.get('uuid') if endpoint_uuid is None: return None endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS) endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid) endpoint_type = endpoint_data.get('type') if endpoint_type is None: return None endpoint_sample_types = endpoint_data.get('sample_types') if endpoint_sample_types is None: return None sample_types = {} for endpoint_sample_type in endpoint_sample_types: try: kpi_sample_type : ORM_KpiSampleTypeEnum = grpc_to_enum__kpi_sample_type(endpoint_sample_type) except: # pylint: disable=bare-except LOGGER.warning('Unknown EndpointSampleType({:s}) for Endpoint({:s}). Ignoring and continuing...'.format( str(endpoint_sample_type), str(endpoint_data))) continue metric_name = kpi_sample_type.name.lower() monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name) sample_types[endpoint_sample_type] = monitoring_resource_key endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type, 'sample_types': sample_types} return endpoint_resource_key, endpoint_resource_value RE_GET_ENDPOINT_METRIC = re.compile(r'.*\/endpoint\[([^\]]+)\]\/state\/(.*)') RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r'.*\/interface\[([^\]]+)\].*') class SyntheticSamplingParameters: def __init__(self) -> None: self.__lock = threading.Lock() self.__data = {} self.__configured_endpoints = set() def set_endpoint_configured(self, endpoint_uuid : str): with self.__lock: self.__configured_endpoints.add(endpoint_uuid) def unset_endpoint_configured(self, endpoint_uuid : str): with self.__lock: self.__configured_endpoints.discard(endpoint_uuid) def get(self, resource_key : str) -> Tuple[float, float, float, float]: with self.__lock: match = RE_GET_ENDPOINT_METRIC.match(resource_key) if match is None: msg = '[SyntheticSamplingParameters:get] unable to extract endpoint-metric from resource_key "{:s}"' LOGGER.error(msg.format(resource_key)) return (0, 0, 1, 0, 0) endpoint_uuid = match.group(1) # If endpoint is not configured, generate a flat synthetic traffic aligned at 0 if endpoint_uuid not in self.__configured_endpoints: return (0, 0, 1, 0, 0) metric = match.group(2) metric_sense = metric.lower().replace('packets_', '').replace('bytes_', '') msg = '[SyntheticSamplingParameters:get] resource_key={:s}, endpoint_uuid={:s}, metric={:s}, metric_sense={:s}' LOGGER.info(msg.format(resource_key, endpoint_uuid, metric, metric_sense)) parameters_key = '{:s}-{:s}'.format(endpoint_uuid, metric_sense) parameters = self.__data.get(parameters_key) if parameters is not None: return parameters # assume packets amplitude = 1.e7 * random.random() phase = 60 * random.random() period = 3600 * random.random() offset = 1.e8 * random.random() + amplitude avg_bytes_per_packet = random.randint(500, 1500) parameters = (amplitude, phase, period, offset, avg_bytes_per_packet) return self.__data.setdefault(parameters_key, parameters) def do_sampling( synthetic_sampling_parameters : SyntheticSamplingParameters, resource_key : str, out_samples : queue.Queue ): amplitude, phase, period, offset, avg_bytes_per_packet = synthetic_sampling_parameters.get(resource_key) if 'bytes' in resource_key.lower(): # convert to bytes amplitude = avg_bytes_per_packet * amplitude offset = avg_bytes_per_packet * offset timestamp = datetime.timestamp(datetime.utcnow()) waveform = amplitude * math.sin(2 * math.pi * timestamp / period + phase) + offset noise = amplitude * random.random() value = abs(0.95 * waveform + 0.05 * noise) out_samples.put_nowait((timestamp, resource_key, value)) class EmulatedDriver(_Driver): def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called self.__lock = threading.Lock() self.__initial = TreeNode('.') self.__running = TreeNode('.') endpoints = settings.get('endpoints', []) endpoint_resources = [] for endpoint in endpoints: endpoint_resource = compose_resource_endpoint(endpoint) if endpoint_resource is None: continue endpoint_resources.append(endpoint_resource) self.SetConfig(endpoint_resources) self.__started = threading.Event() self.__terminate = threading.Event() self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events self.__scheduler.configure( jobstores = {'default': MemoryJobStore()}, executors = {'default': ThreadPoolExecutor(max_workers=1)}, job_defaults = {'coalesce': False, 'max_instances': 3}, timezone=pytz.utc) self.__out_samples = queue.Queue() self.__synthetic_sampling_parameters = SyntheticSamplingParameters() def Connect(self) -> bool: # If started, assume it is already connected if self.__started.is_set(): return True # Connect triggers activation of sampling events that will be scheduled based on subscriptions self.__scheduler.start() # Indicate the driver is now connected to the device self.__started.set() return True def Disconnect(self) -> bool: # Trigger termination of loops and processes self.__terminate.set() # If not started, assume it is already disconnected if not self.__started.is_set(): return True # Disconnect triggers deactivation of sampling events self.__scheduler.shutdown() return True def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return dump_subtree(self.__initial) def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) with self.__lock: if len(resource_keys) == 0: return dump_subtree(self.__running) results = [] resolver = anytree.Resolver(pathattr='name') for i,resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) try: chk_string(str_resource_name, resource_key, allow_empty=False) resource_key = SPECIAL_RESOURCE_MAPPINGS.get(resource_key, resource_key) resource_path = resource_key.split('/') except Exception as e: # pylint: disable=broad-except LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) results.append((resource_key, e)) # if validation fails, store the exception continue resource_node = get_subnode(resolver, self.__running, resource_path, default=None) # if not found, resource_node is None if resource_node is None: continue results.extend(dump_subtree(resource_node)) return results def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource in enumerate(resources): str_resource_name = 'resources[#{:d}]'.format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, min_length=2, max_length=2) resource_key,resource_value = resource chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split('/') except Exception as e: # pylint: disable=broad-except LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue try: resource_value = json.loads(resource_value) except: # pylint: disable=broad-except pass set_subnode_value(resolver, self.__running, resource_path, resource_value) match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key) if match is not None: endpoint_uuid = match.group(1) self.__synthetic_sampling_parameters.set_endpoint_configured(endpoint_uuid) results.append(True) return results def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource in enumerate(resources): str_resource_name = 'resources[#{:d}]'.format(i) try: chk_type(str_resource_name, resource, (list, tuple)) chk_length(str_resource_name, resource, min_length=2, max_length=2) resource_key,_ = resource chk_string(str_resource_name, resource_key, allow_empty=False) resource_path = resource_key.split('/') except Exception as e: # pylint: disable=broad-except LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue resource_node = get_subnode(resolver, self.__running, resource_path, default=None) # if not found, resource_node is None if resource_node is None: results.append(False) continue match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key) if match is not None: endpoint_uuid = match.group(1) self.__synthetic_sampling_parameters.unset_endpoint_configured(endpoint_uuid) parent = resource_node.parent children = list(parent.children) children.remove(resource_node) parent.children = tuple(children) results.append(True) return results def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: chk_type('subscriptions', subscriptions, list) if len(subscriptions) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,subscription in enumerate(subscriptions): str_subscription_name = 'subscriptions[#{:d}]'.format(i) try: chk_type(str_subscription_name, subscription, (list, tuple)) chk_length(str_subscription_name, subscription, min_length=3, max_length=3) resource_key,sampling_duration,sampling_interval = subscription chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) resource_path = resource_key.split('/') chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue start_date,end_date = None,None if sampling_duration <= 1.e-12: start_date = datetime.utcnow() end_date = start_date + timedelta(seconds=sampling_duration) job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval) job = self.__scheduler.add_job( do_sampling, args=(self.__synthetic_sampling_parameters, resource_key, self.__out_samples), kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date, end_date=end_date, timezone=pytz.utc) subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] set_subnode_value(resolver, self.__running, subscription_path, job) results.append(True) return results def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: chk_type('subscriptions', subscriptions, list) if len(subscriptions) == 0: return [] results = [] resolver = anytree.Resolver(pathattr='name') with self.__lock: for i,resource in enumerate(subscriptions): str_subscription_name = 'resources[#{:d}]'.format(i) try: chk_type(str_subscription_name, resource, (list, tuple)) chk_length(str_subscription_name, resource, min_length=3, max_length=3) resource_key,sampling_duration,sampling_interval = resource chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) resource_path = resource_key.split('/') chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) results.append(e) # if validation fails, store the exception continue subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] subscription_node = get_subnode(resolver, self.__running, subscription_path) # if not found, resource_node is None if subscription_node is None: results.append(False) continue job : Job = getattr(subscription_node, 'value', None) if job is None or not isinstance(job, Job): raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource))) job.remove() parent = subscription_node.parent children = list(parent.children) children.remove(subscription_node) parent.children = tuple(children) results.append(True) return results def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]: while True: if self.__terminate.is_set(): break if terminate is not None and terminate.is_set(): break try: sample = self.__out_samples.get(block=blocking, timeout=0.1) except queue.Empty: if blocking: continue return if sample is None: continue yield sample