Skip to content
Snippets Groups Projects
SyntheticSamplingParameters.py 3.69 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, math, queue, random, re, threading
from datetime import datetime
from typing import Optional, Tuple

LOGGER = logging.getLogger(__name__)

RE_GET_ENDPOINT_METRIC = re.compile(r'.*\/endpoint\[([^\]]+)\]\/state\/(.*)')

MSG_ERROR_PARSE = '[get] unable to extract endpoint-metric from monitoring_resource_key "{:s}"'
MSG_INFO = '[get] monitoring_resource_key={:s}, endpoint_uuid={:s}, metric={:s}, metric_sense={:s}'

class SyntheticSamplingParameters:
    def __init__(self) -> None:
        self.__lock = threading.Lock()
        self.__data = {}
        self.__configured_endpoints = set()

    def set_endpoint_configured(self, endpoint_uuid : str):
        with self.__lock:
            self.__configured_endpoints.add(endpoint_uuid)

    def unset_endpoint_configured(self, endpoint_uuid : str):
        with self.__lock:
            self.__configured_endpoints.discard(endpoint_uuid)

    def get(self, monitoring_resource_key : str) -> Optional[Tuple[float, float, float, float, float]]:
        with self.__lock:
            match = RE_GET_ENDPOINT_METRIC.match(monitoring_resource_key)
            if match is None:
                LOGGER.error(MSG_ERROR_PARSE.format(monitoring_resource_key))
                return None
            endpoint_uuid = match.group(1)

            # If endpoint is not configured, generate a flat synthetic traffic aligned at 0
            if endpoint_uuid not in self.__configured_endpoints: return (0, 0, 1, 0, 0)

            metric = match.group(2)
            metric_sense = metric.lower().replace('packets_', '').replace('bytes_', '')

            LOGGER.info(MSG_INFO.format(monitoring_resource_key, endpoint_uuid, metric, metric_sense))

            parameters_key = '{:s}-{:s}'.format(endpoint_uuid, metric_sense)
            parameters = self.__data.get(parameters_key)
            if parameters is not None: return parameters

            # assume packets
            amplitude  = 1.e7 * random.random()
            phase      = 60 * random.random()
            period     = 3600 * random.random()
            offset     = 1.e8 * random.random() + amplitude
            avg_bytes_per_packet = random.randint(500, 1500)
            parameters = (amplitude, phase, period, offset, avg_bytes_per_packet)
            return self.__data.setdefault(parameters_key, parameters)

def do_sampling(
    synthetic_sampling_parameters : SyntheticSamplingParameters, monitoring_resource_key : str,
    out_samples : queue.Queue
) -> None:
    parameters = synthetic_sampling_parameters.get(monitoring_resource_key)
    if parameters is None: return
    amplitude, phase, period, offset, avg_bytes_per_packet = parameters

    if 'bytes' in monitoring_resource_key.lower():
        # convert to bytes
        amplitude = avg_bytes_per_packet * amplitude
        offset = avg_bytes_per_packet * offset

    timestamp = datetime.timestamp(datetime.utcnow())
    waveform  = amplitude * math.sin(2 * math.pi * timestamp / period + phase) + offset
    noise     = amplitude * random.random()
    value     = abs(0.95 * waveform + 0.05 * noise)
    out_samples.put_nowait((timestamp, monitoring_resource_key, value))