Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import anytree, json, logging, math, pytz, queue, random, re, threading
from datetime import datetime, timedelta
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.job import Job
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, metered_subclass_method, INF
from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.database.KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type
from device.service.driver_api._Driver import (
RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES,
_Driver)
from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
LOGGER = logging.getLogger(__name__)
SPECIAL_RESOURCE_MAPPINGS = {
RESOURCE_ENDPOINTS : '/endpoints',
RESOURCE_INTERFACES : '/interfaces',
RESOURCE_NETWORK_INSTANCES: '/net-instances',
}
def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Tuple[str, Any]:
endpoint_uuid = endpoint_data.get('uuid')
if endpoint_uuid is None: return None
endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS)
endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid)
endpoint_type = endpoint_data.get('type')
if endpoint_type is None: return None
endpoint_sample_types = endpoint_data.get('sample_types')
if endpoint_sample_types is None: return None
sample_types = {}
for endpoint_sample_type in endpoint_sample_types:
try:
kpi_sample_type : ORM_KpiSampleTypeEnum = grpc_to_enum__kpi_sample_type(endpoint_sample_type)
except: # pylint: disable=bare-except
LOGGER.warning('Unknown EndpointSampleType({:s}) for Endpoint({:s}). Ignoring and continuing...'.format(
str(endpoint_sample_type), str(endpoint_data)))
continue
metric_name = kpi_sample_type.name.lower()
monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name)
sample_types[endpoint_sample_type] = monitoring_resource_key
endpoint_resource_value = {'uuid': endpoint_uuid, 'type': endpoint_type, 'sample_types': sample_types}
return endpoint_resource_key, endpoint_resource_value
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
RE_GET_ENDPOINT_METRIC = re.compile(r'.*\/endpoint\[([^\]]+)\]\/state\/(.*)')
RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r'.*\/interface\[([^\]]+)\].*')
class SyntheticSamplingParameters:
def __init__(self) -> None:
self.__lock = threading.Lock()
self.__data = {}
self.__configured_endpoints = set()
def set_endpoint_configured(self, endpoint_uuid : str):
with self.__lock:
self.__configured_endpoints.add(endpoint_uuid)
def unset_endpoint_configured(self, endpoint_uuid : str):
with self.__lock:
self.__configured_endpoints.discard(endpoint_uuid)
def get(self, resource_key : str) -> Tuple[float, float, float, float]:
with self.__lock:
match = RE_GET_ENDPOINT_METRIC.match(resource_key)
if match is None:
msg = '[SyntheticSamplingParameters:get] unable to extract endpoint-metric from resource_key "{:s}"'
LOGGER.error(msg.format(resource_key))
return (0, 0, 1, 0, 0)
endpoint_uuid = match.group(1)
# If endpoint is not configured, generate a flat synthetic traffic aligned at 0
if endpoint_uuid not in self.__configured_endpoints: return (0, 0, 1, 0, 0)
metric = match.group(2)
metric_sense = metric.lower().replace('packets_', '').replace('bytes_', '')
msg = '[SyntheticSamplingParameters:get] resource_key={:s}, endpoint_uuid={:s}, metric={:s}, metric_sense={:s}'
LOGGER.info(msg.format(resource_key, endpoint_uuid, metric, metric_sense))
parameters_key = '{:s}-{:s}'.format(endpoint_uuid, metric_sense)
parameters = self.__data.get(parameters_key)
if parameters is not None: return parameters
# assume packets
amplitude = 1.e7 * random.random()
phase = 60 * random.random()
period = 3600 * random.random()
offset = 1.e8 * random.random() + amplitude
avg_bytes_per_packet = random.randint(500, 1500)
parameters = (amplitude, phase, period, offset, avg_bytes_per_packet)
return self.__data.setdefault(parameters_key, parameters)
def do_sampling(
synthetic_sampling_parameters : SyntheticSamplingParameters, resource_key : str, out_samples : queue.Queue
):
amplitude, phase, period, offset, avg_bytes_per_packet = synthetic_sampling_parameters.get(resource_key)
if 'bytes' in resource_key.lower():
# convert to bytes
amplitude = avg_bytes_per_packet * amplitude
offset = avg_bytes_per_packet * offset
timestamp = datetime.timestamp(datetime.utcnow())
waveform = amplitude * math.sin(2 * math.pi * timestamp / period + phase) + offset
noise = amplitude * random.random()
value = abs(0.95 * waveform + 0.05 * noise)
out_samples.put_nowait((timestamp, resource_key, value))
HISTOGRAM_BUCKETS = (
# .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF
0.0001, 0.00025, 0.00050, 0.00075,
0.0010, 0.0025, 0.0050, 0.0075,
0.0100, 0.0250, 0.0500, 0.0750,
0.1000, 0.2500, 0.5000, 0.7500,
1.0000, 2.5000, 5.0000, 7.5000,
10.0, 25.0, 50.0, 75.0,
100.0, INF
)
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': 'emulated'})
METRICS_POOL.get_or_create('GetInitialConfig', MetricTypeEnum.HISTOGRAM_DURATION, buckets=HISTOGRAM_BUCKETS)
METRICS_POOL.get_or_create('GetConfig', MetricTypeEnum.HISTOGRAM_DURATION, buckets=HISTOGRAM_BUCKETS)
METRICS_POOL.get_or_create('SetConfig', MetricTypeEnum.HISTOGRAM_DURATION, buckets=HISTOGRAM_BUCKETS)
METRICS_POOL.get_or_create('DeleteConfig', MetricTypeEnum.HISTOGRAM_DURATION, buckets=HISTOGRAM_BUCKETS)
METRICS_POOL.get_or_create('SubscribeState', MetricTypeEnum.HISTOGRAM_DURATION, buckets=HISTOGRAM_BUCKETS)
METRICS_POOL.get_or_create('UnsubscribeState', MetricTypeEnum.HISTOGRAM_DURATION, buckets=HISTOGRAM_BUCKETS)
class EmulatedDriver(_Driver):
def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called
self.__lock = threading.Lock()
self.__initial = TreeNode('.')
self.__running = TreeNode('.')
self.__subscriptions = TreeNode('.')
endpoints = settings.get('endpoints', [])
endpoint_resources = []
for endpoint in endpoints:
endpoint_resource = compose_resource_endpoint(endpoint)
if endpoint_resource is None: continue
endpoint_resources.append(endpoint_resource)
self.SetConfig(endpoint_resources)
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
self.__scheduler.configure(
jobstores = {'default': MemoryJobStore()},
executors = {'default': ThreadPoolExecutor(max_workers=1)},
job_defaults = {'coalesce': False, 'max_instances': 3},
timezone=pytz.utc)
self.__out_samples = queue.Queue()
self.__synthetic_sampling_parameters = SyntheticSamplingParameters()
def Connect(self) -> bool:
# If started, assume it is already connected
if self.__started.is_set(): return True
# Connect triggers activation of sampling events that will be scheduled based on subscriptions
self.__scheduler.start()
# Indicate the driver is now connected to the device
self.__started.set()
return True
def Disconnect(self) -> bool:
# Trigger termination of loops and processes
self.__terminate.set()
# If not started, assume it is already disconnected
if not self.__started.is_set(): return True
# Disconnect triggers deactivation of sampling events
self.__scheduler.shutdown()
return True
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
with self.__lock:
return dump_subtree(self.__initial)
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list)
with self.__lock:
if len(resource_keys) == 0: return dump_subtree(self.__running)
results = []
resolver = anytree.Resolver(pathattr='name')
for i,resource_key in enumerate(resource_keys):
str_resource_name = 'resource_key[#{:d}]'.format(i)
try:
chk_string(str_resource_name, resource_key, allow_empty=False)
resource_key = SPECIAL_RESOURCE_MAPPINGS.get(resource_key, resource_key)
resource_path = resource_key.split('/')
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
results.append((resource_key, e)) # if validation fails, store the exception
resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
# if not found, resource_node is None
if resource_node is None: continue
results.extend(dump_subtree(resource_node))
return results
def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
chk_type('resources', resources, list)
if len(resources) == 0: return []
results = []
resolver = anytree.Resolver(pathattr='name')
with self.__lock:
for i,resource in enumerate(resources):
str_resource_name = 'resources[#{:d}]'.format(i)
try:
chk_type(str_resource_name, resource, (list, tuple))
chk_length(str_resource_name, resource, min_length=2, max_length=2)
resource_key,resource_value = resource
chk_string(str_resource_name, resource_key, allow_empty=False)
resource_path = resource_key.split('/')
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
results.append(e) # if validation fails, store the exception
continue
try:
resource_value = json.loads(resource_value)
except: # pylint: disable=broad-except
pass
set_subnode_value(resolver, self.__running, resource_path, resource_value)
match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key)
if match is not None:
endpoint_uuid = match.group(1)
self.__synthetic_sampling_parameters.set_endpoint_configured(endpoint_uuid)
results.append(True)
return results
def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
chk_type('resources', resources, list)
if len(resources) == 0: return []
results = []
resolver = anytree.Resolver(pathattr='name')
with self.__lock:
for i,resource in enumerate(resources):
str_resource_name = 'resources[#{:d}]'.format(i)
chk_type(str_resource_name, resource, (list, tuple))
chk_length(str_resource_name, resource, min_length=2, max_length=2)
resource_key,_ = resource
chk_string(str_resource_name, resource_key, allow_empty=False)
resource_path = resource_key.split('/')
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
results.append(e) # if validation fails, store the exception
continue
resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
# if not found, resource_node is None
if resource_node is None:
results.append(False)
match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key)
if match is not None:
endpoint_uuid = match.group(1)
self.__synthetic_sampling_parameters.unset_endpoint_configured(endpoint_uuid)
parent = resource_node.parent
children = list(parent.children)
children.remove(resource_node)
parent.children = tuple(children)
results.append(True)
return results
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
chk_type('subscriptions', subscriptions, list)
if len(subscriptions) == 0: return []
results = []
resolver = anytree.Resolver(pathattr='name')
with self.__lock:
for i,subscription in enumerate(subscriptions):
str_subscription_name = 'subscriptions[#{:d}]'.format(i)
chk_type(str_subscription_name, subscription, (list, tuple))
chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
resource_key,sampling_duration,sampling_interval = subscription
chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
resource_path = resource_key.split('/')
chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
results.append(e) # if validation fails, store the exception
continue
start_date,end_date = None,None
if sampling_duration <= 1.e-12:
start_date = datetime.utcnow()
end_date = start_date + timedelta(seconds=sampling_duration)
job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
job = self.__scheduler.add_job(
do_sampling, args=(self.__synthetic_sampling_parameters, resource_key, self.__out_samples),
kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date,
end_date=end_date, timezone=pytz.utc)
subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
set_subnode_value(resolver, self.__subscriptions, subscription_path, job)
results.append(True)
return results
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
chk_type('subscriptions', subscriptions, list)
if len(subscriptions) == 0: return []
results = []
resolver = anytree.Resolver(pathattr='name')
with self.__lock:
for i,resource in enumerate(subscriptions):
str_subscription_name = 'resources[#{:d}]'.format(i)
chk_type(str_subscription_name, resource, (list, tuple))
chk_length(str_subscription_name, resource, min_length=3, max_length=3)
resource_key,sampling_duration,sampling_interval = resource
chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
resource_path = resource_key.split('/')
chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
results.append(e) # if validation fails, store the exception
continue
subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
subscription_node = get_subnode(resolver, self.__subscriptions, subscription_path)
# if not found, resource_node is None
if subscription_node is None:
results.append(False)
continue
job : Job = getattr(subscription_node, 'value', None)
if job is None or not isinstance(job, Job):
raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource)))
job.remove()
parent = subscription_node.parent
children = list(parent.children)
children.remove(subscription_node)
parent.children = tuple(children)
results.append(True)
return results
def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]:
while True:
if self.__terminate.is_set(): break
if terminate is not None and terminate.is_set(): break
try:
sample = self.__out_samples.get(block=blocking, timeout=0.1)
except queue.Empty:
if blocking: continue
return
if sample is None: continue
yield sample