Skip to content
Snippets Groups Projects
Commit 0a351b59 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Telemetry backend driver added.

- Emulated driver is added.
- MetricGenerated is added.
- Emulated driver test is added.
- backend requirement file is updated.
- script added to test emulated driver.
parent ca614228
No related branches found
No related tags found
3 merge requests!346Draft: support for restconf protocol,!345Draft: support ipinfusion devices via netconf,!287Resolve "(CTTC) Implement Telemetry Backend Collector Emulated"
...@@ -21,12 +21,13 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" ...@@ -21,12 +21,13 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/"
# Set the list of components, separated by spaces, you want to build images for, and deploy. # Set the list of components, separated by spaces, you want to build images for, and deploy.
export TFS_COMPONENTS="context device pathcomp service slice nbi webui" export TFS_COMPONENTS="context device pathcomp service slice nbi webui"
# export TFS_COMPONENTS="context device"
# Uncomment to activate Monitoring (old) # Uncomment to activate Monitoring (old)
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
# Uncomment to activate Monitoring Framework (new) # Uncomment to activate Monitoring Framework (new)
#export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics automation" export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics automation"
# Uncomment to activate QoS Profiles # Uncomment to activate QoS Profiles
#export TFS_COMPONENTS="${TFS_COMPONENTS} qos_profile" #export TFS_COMPONENTS="${TFS_COMPONENTS} qos_profile"
......
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# export KFK_SERVER_ADDRESS='127.0.0.1:9092'
# CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
# export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=debug --log-cli-level=info --verbose \
telemetry/backend/tests/test_emulated.py
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from typing import Any, Iterator, List, Optional, Tuple, Union
# Special resource names to request to the driver to retrieve the specified
# configuration/structural resources.
# These resource names should be used with GetConfig() method.
RESOURCE_ENDPOINTS = '__endpoints__'
RESOURCE_INTERFACES = '__interfaces__'
RESOURCE_NETWORK_INSTANCES = '__network_instances__'
RESOURCE_ROUTING_POLICIES = '__routing_policies__'
RESOURCE_SERVICES = '__services__'
RESOURCE_ACL = '__acl__'
RESOURCE_INVENTORY = '__inventory__'
class _Driver:
def __init__(self, name : str, address: str, port: int, **settings) -> None:
""" Initialize Driver.
Parameters:
address : str
The address of the device
port : int
The port of the device
**settings
Extra settings required by the driver.
"""
self._name = name
self._address = address
self._port = port
self._settings = settings
@property
def name(self): return self._name
@property
def address(self): return self._address
@property
def port(self): return self._port
@property
def settings(self): return self._settings
def Connect(self) -> bool:
""" Connect to the Device.
Returns:
succeeded : bool
Boolean variable indicating if connection succeeded.
"""
raise NotImplementedError()
def Disconnect(self) -> bool:
""" Disconnect from the Device.
Returns:
succeeded : bool
Boolean variable indicating if disconnection succeeded.
"""
raise NotImplementedError()
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
""" Retrieve initial configuration of entire device.
Returns:
values : List[Tuple[str, Any]]
List of tuples (resource key, resource value) for
resource keys.
"""
raise NotImplementedError()
def GetConfig(self, resource_keys: List[str] = []) -> \
List[Tuple[str, Union[Any, None, Exception]]]:
""" Retrieve running configuration of entire device or
selected resource keys.
Parameters:
resource_keys : List[str]
List of keys pointing to the resources to be retrieved.
Returns:
values : List[Tuple[str, Union[Any, None, Exception]]]
List of tuples (resource key, resource value) for
resource keys requested. If a resource is found,
the appropriate value type must be retrieved.
If a resource is not found, None must be retrieved as
value for that resource. In case of Exception,
the Exception must be retrieved as value.
"""
raise NotImplementedError()
def SetConfig(self, resources: List[Tuple[str, Any]]) -> \
List[Union[bool, Exception]]:
""" Create/Update configuration for a list of resources.
Parameters:
resources : List[Tuple[str, Any]]
List of tuples, each containing a resource_key pointing the
resource to be modified, and a resource_value containing
the new value to be set.
Returns:
results : List[Union[bool, Exception]]
List of results for resource key changes requested.
Return values must be in the same order as the
resource keys requested. If a resource is properly set,
True must be retrieved; otherwise, the Exception that is
raised during the processing must be retrieved.
"""
raise NotImplementedError()
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \
List[Union[bool, Exception]]:
""" Delete configuration for a list of resources.
Parameters:
resources : List[Tuple[str, Any]]
List of tuples, each containing a resource_key pointing the
resource to be modified, and a resource_value containing
possible additionally required values to locate
the value to be removed.
Returns:
results : List[Union[bool, Exception]]
List of results for resource key deletions requested.
Return values must be in the same order as the resource keys
requested. If a resource is properly deleted, True must be
retrieved; otherwise, the Exception that is raised during
the processing must be retrieved.
"""
raise NotImplementedError()
def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> \
List[Union[bool, Exception]]:
""" Subscribe to state information of entire device or
selected resources. Subscriptions are incremental.
Driver should keep track of requested resources.
Parameters:
subscriptions : List[Tuple[str, float, float]]
List of tuples, each containing a resource_key pointing the
resource to be subscribed, a sampling_duration, and a
sampling_interval (both in seconds with float
representation) defining, respectively, for how long
monitoring should last, and the desired monitoring interval
for the resource specified.
Returns:
results : List[Union[bool, Exception]]
List of results for resource key subscriptions requested.
Return values must be in the same order as the resource keys
requested. If a resource is properly subscribed,
True must be retrieved; otherwise, the Exception that is
raised during the processing must be retrieved.
"""
raise NotImplementedError()
def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) \
-> List[Union[bool, Exception]]:
""" Unsubscribe from state information of entire device
or selected resources. Subscriptions are incremental.
Driver should keep track of requested resources.
Parameters:
subscriptions : List[str]
List of tuples, each containing a resource_key pointing the
resource to be subscribed, a sampling_duration, and a
sampling_interval (both in seconds with float
representation) defining, respectively, for how long
monitoring should last, and the desired monitoring interval
for the resource specified.
Returns:
results : List[Union[bool, Exception]]
List of results for resource key un-subscriptions requested.
Return values must be in the same order as the resource keys
requested. If a resource is properly unsubscribed,
True must be retrieved; otherwise, the Exception that is
raised during the processing must be retrieved.
"""
raise NotImplementedError()
def GetState(
self, blocking=False, terminate : Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
""" Retrieve last collected values for subscribed resources.
Operates as a generator, so this method should be called once and will
block until values are available. When values are available,
it should yield each of them and block again until new values are
available. When the driver is destroyed, GetState() can return instead
of yield to terminate the loop.
Terminate enables to request interruption of the generation.
Examples:
# keep looping waiting for extra samples (generator loop)
terminate = threading.Event()
i = 0
for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True, terminate=terminate):
process(timestamp, resource_key, resource_value)
i += 1
if i == 10: terminate.set()
# just retrieve accumulated samples
samples = my_driver.GetState(blocking=False, terminate=terminate)
# or (as classical loop)
i = 0
for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False, terminate=terminate):
process(timestamp, resource_key, resource_value)
i += 1
if i == 10: terminate.set()
Parameters:
blocking : bool
Select the driver behaviour. In both cases, the driver will
first retrieve the samples accumulated and available in the
internal queue. Then, if blocking, the driver does not
terminate the loop and waits for additional samples to come,
thus behaving as a generator. If non-blocking, the driver
terminates the loop and returns. Non-blocking behaviour can
be used for periodically polling the driver, while blocking
can be used when a separate thread is in charge of
collecting the samples produced by the driver.
terminate : threading.Event
Signals the interruption of the GetState method as soon as
possible.
Returns:
results : Iterator[Tuple[float, str, Any]]
Sequences of state sample. Each State sample contains a
float Unix-like timestamps of the samples in seconds with up
to microsecond resolution, the resource_key of the sample,
and its resource_value.
Only resources with an active subscription must be
retrieved. Interval and duration of the sampling process are
specified when creating the subscription using method
SubscribeState(). Order of values yielded is arbitrary.
"""
raise NotImplementedError()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This diff is collapsed.
from types import NoneType
import numpy as np
import random
import logging
import queue
import time
LOGGER = logging.getLogger(__name__)
class SyntheticMetricsGenerator():
"""
This collector class generates synthetic network metrics based on the current network state.
The metrics include packet_in, packet_out, bytes_in, bytes_out, packet_loss (percentage), packet_drop_count, byte_drop_count, and latency.
The network state can be 'good', 'moderate', or 'poor', and it affects the generated metrics accordingly.
"""
def __init__(self, metric_queue=None, network_state="good"):
LOGGER.info("Initiaitng Emulator")
super().__init__()
self.metric_queue = metric_queue if metric_queue is not None else queue.Queue()
self.network_state = network_state
self.running = True
self.set_initial_parameter_values() # update this method to set the initial values for the parameters
def set_initial_parameter_values(self):
self.bytes_per_pkt = random.uniform(65, 150)
self.states = ["good", "moderate", "poor"]
self.state_probabilities = {
"good" : [0.9, 0.1, 0.0],
"moderate": [0.2, 0.7, 0.1],
"poor" : [0.0, 0.3, 0.7]
}
if self.network_state == "good":
self.packet_in = random.uniform(700, 900)
elif self.network_state == "moderate":
self.packet_in = random.uniform(300, 700)
else:
self.packet_in = random.uniform(100, 300)
def generate_synthetic_data_point(self, resource_key, sample_type_ids):
"""
Generates a synthetic data point based on the current network state.
Parameters:
resource_key (str): The key associated with the resource for which the data point is generated.
Returns:
tuple: A tuple containing the timestamp, resource key, and a dictionary of generated metrics.
"""
if self.network_state == "good":
packet_loss = random.uniform(0.01, 0.1)
random_noise = random.uniform(1,10)
latency = random.uniform(5, 25)
elif self.network_state == "moderate":
packet_loss = random.uniform(0.1, 1)
random_noise = random.uniform(10, 40)
latency = random.uniform(25, 100)
elif self.network_state == "poor":
packet_loss = random.uniform(1, 3)
random_noise = random.uniform(40, 100)
latency = random.uniform(100, 300)
else:
raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.")
period = 60 * 60 * random.uniform(10, 100)
amplitude = random.uniform(50, 100)
sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.packet_in
packet_in = sin_wave + ((sin_wave/100) * random_noise)
packet_out = packet_in - ((packet_in / 100) * packet_loss)
bytes_in = packet_in * self.bytes_per_pkt
bytes_out = packet_out * self.bytes_per_pkt
packet_drop_count = packet_in * (packet_loss / 100)
byte_drop_count = packet_drop_count * self.bytes_per_pkt
state_prob = self.state_probabilities[self.network_state]
self.network_state = random.choices(self.states, state_prob)[0]
print (self.network_state)
generated_samples = {
"packet_in" : int(packet_in), "packet_out" : int(packet_out), "bytes_in" : float(bytes_in),
"bytes_out" : float(bytes_out), "packet_loss": float(packet_loss), "packet_drop_count" : int(packet_drop_count),
"latency" : float(latency), "byte_drop_count": float(byte_drop_count)
}
requested_metrics = self.metric_id_mapper(sample_type_ids)
generated_samples = {metric: generated_samples[metric] for metric in requested_metrics}
return (time.time(), resource_key, generated_samples)
def metric_id_mapper(self, sample_type_ids):
"""
Maps the sample type IDs to the corresponding metric names.
Parameters:
sample_type_ids (list): A list of sample type IDs.
Returns:
list: A list of metric names.
"""
metric_names = []
for sample_type_id in sample_type_ids:
if sample_type_id == 102:
metric_names.append("packet_in")
elif sample_type_id == 101:
metric_names.append("packet_out")
elif sample_type_id == 103:
metric_names.append("packet_drop_count")
elif sample_type_id == 202:
metric_names.append("bytes_in")
elif sample_type_id == 201:
metric_names.append("bytes_out")
elif sample_type_id == 203:
metric_names.append("byte_drop_count")
elif sample_type_id == 701:
metric_names.append("latency")
else:
raise ValueError(f"Invalid sample type ID: {sample_type_id}")
return metric_names
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
# Configure logging to ensure logs appear on the console
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def create_test_configuration():
return {
"config_rules": [
{"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}},
{"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": 8080}},
{"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {
"endpoints": [
{"uuid": "eth0", "type": "ethernet", "sample_types": [101, 102]},
{"uuid": "eth1", "type": "ethernet", "sample_types": [201, 202]},
{"uuid": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}
]
}}},
{"action": 1, "custom": {"resource_key": "/interface[eth0]/settings", "resource_value": {
"name": "eth0", "enabled": True
}}},
{"action": 1, "custom": {"resource_key": "/interface[eth1]/settings", "resource_value": {
"name": "eth1", "enabled": False
}}},
{"action": 1, "custom": {"resource_key": "/interface[13/1/2]/settings", "resource_value": {
"name": "13/1/2", "enabled": True
}}}
]
}
# This method is used to create a specific configuration to be used in the test case test_get_config in the test_EmulatedDriver.py file
def create_specific_config_keys():
# config = create_test_configuration()
keys_to_return = ["_connect/settings/endpoints/eth1", "/interface/[13/1/2]/settings", "_connect/address"]
return keys_to_return
# return {rule["custom"]["resource_key"]: rule["custom"]["resource_value"] for rule in config["config_rules"] if rule["custom"]["resource_key"] in keys_to_return}
# write a method to create a specific configuration to be used in the test case test_delete_config in the test_EmulatedDriver1.py file
def create_config_for_delete():
keys_to_delete = ["_connect/settings/endpoints/eth0", "/interface/[eth1]", "_connect/port"]
return keys_to_delete
# write a method to generate subscription for specific endpoints.
def create_test_subscriptions():
return [("_connect/settings/endpoints/eth1", 10, 2),
("_connect/settings/endpoints/13/1/2", 15, 3),
("_connect/settings/endpoints/eth0", 8, 2)]
def create_unscubscribe_subscriptions():
return [("_connect/settings/endpoints/eth1", 10, 2),
("_connect/settings/endpoints/13/1/2", 15, 3),
("_connect/settings/endpoints/eth0", 8, 2)]
\ No newline at end of file
import logging
import time
import pytest
from telemetry.backend.drivers.emulated.EmulatedDriver import EmulatedDriver
from telemetry.backend.tests.messages_emulated import (
create_test_configuration,
create_specific_config_keys,
create_config_for_delete,
create_test_subscriptions,
)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@pytest.fixture
def setup_driver():
"""Sets up an EmulatedDriver instance for testing."""
yield EmulatedDriver(address="127.0.0.1", port=8080)
@pytest.fixture
def connected_configured_driver(setup_driver):
driver = setup_driver # EmulatedDriver(address="127.0.0.1", port=8080)
driver.Connect()
driver.SetConfig(create_test_configuration())
yield driver
driver.Disconnect()
def test_connect(setup_driver):
logger.info(">>> test_connect <<<")
driver = setup_driver
assert driver.Connect() is True
assert driver.connected is True
def test_disconnect(setup_driver):
logger.info(">>> test_disconnect <<<")
driver = setup_driver
driver.Connect()
assert driver.Disconnect() is True
assert driver.connected is False
def test_set_config(setup_driver):
logger.info(">>> test_set_config <<<")
driver = setup_driver
driver.Connect()
config = create_test_configuration()
results = driver.SetConfig(config)
assert all(result is True for result in results)
def test_get_config(connected_configured_driver):
logger.info(">>> test_get_config <<<")
resource_keys = create_specific_config_keys()
results = connected_configured_driver.GetConfig(resource_keys)
for key, value in results:
assert key in create_specific_config_keys()
assert value is not None
def test_delete_config(connected_configured_driver):
logger.info(">>> test_delete_config <<<")
resource_keys = create_config_for_delete()
results = connected_configured_driver.DeleteConfig(resource_keys)
assert all(result is True for result in results)
def test_subscribe_state(connected_configured_driver):
logger.info(">>> test_subscribe_state <<<")
subscriptions = create_test_subscriptions()
results = connected_configured_driver.SubscribeState(subscriptions)
assert all(result is True for result in results)
def test_unsubscribe_state(connected_configured_driver):
logger.info(">>> test_unsubscribe_state <<<")
subscriptions = create_test_subscriptions()
connected_configured_driver.SubscribeState(subscriptions)
results = connected_configured_driver.UnsubscribeState(subscriptions)
assert all(result is True for result in results)
def test_get_state(connected_configured_driver):
logger.info(">>> test_get_state <<<")
subscriptions = create_test_subscriptions()
connected_configured_driver.SubscribeState(subscriptions)
logger.info(f"Subscribed to state: {subscriptions}. waiting for 3 seconds ...")
time.sleep(3)
state_iterator = connected_configured_driver.GetState(blocking=False)
states = list(state_iterator)
assert len(states) > 0
...@@ -12,7 +12,9 @@ ...@@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
anytree==2.8.0
APScheduler==3.10.1 APScheduler==3.10.1
numpy==2.2.1
psycopg2-binary==2.9.3 psycopg2-binary==2.9.3
python-dateutil==2.8.2 python-dateutil==2.8.2
python-json-logger==2.0.2 python-json-logger==2.0.2
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment