Skip to content
Snippets Groups Projects
Commit ffa2ce8d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'develop' of ssh://gifrerenom_labs.etsi.org/tfs/controller into...

Merge branch 'develop' of ssh://gifrerenom_labs.etsi.org/tfs/controller into feat/247-cttc-analytics-module-enhancements
parents cfa1829b 4e7a76d8
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!317Resolve "(CTTC) Analytics Module Enhancements"
Showing with 75 additions and 77 deletions
...@@ -26,7 +26,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui" ...@@ -26,7 +26,7 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui"
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
# Uncomment to activate Monitoring Framework (new) # Uncomment to activate Monitoring Framework (new)
export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics automation" #export TFS_COMPONENTS="${TFS_COMPONENTS} kpi_manager kpi_value_writer kpi_value_api telemetry analytics automation"
# Uncomment to activate QoS Profiles # Uncomment to activate QoS Profiles
#export TFS_COMPONENTS="${TFS_COMPONENTS} qos_profile" #export TFS_COMPONENTS="${TFS_COMPONENTS} qos_profile"
......
#!/bin/bash #!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import threading import threading
from typing import Any, Iterator, List, Optional, Tuple, Union from typing import Any, Iterator, List, Optional, Tuple, Union
# Special resource names to request to the driver to retrieve the specified # Special resource names to request to the collector to retrieve the specified
# configuration/structural resources. # configuration/structural resources.
# These resource names should be used with GetConfig() method. # These resource names should be used with GetConfig() method.
RESOURCE_ENDPOINTS = '__endpoints__' RESOURCE_ENDPOINTS = '__endpoints__'
...@@ -27,16 +27,16 @@ RESOURCE_ACL = '__acl__' ...@@ -27,16 +27,16 @@ RESOURCE_ACL = '__acl__'
RESOURCE_INVENTORY = '__inventory__' RESOURCE_INVENTORY = '__inventory__'
class _Driver: class _Collector:
def __init__(self, name : str, address: str, port: int, **settings) -> None: def __init__(self, name : str, address: str, port: int, **settings) -> None:
""" Initialize Driver. """ Initialize Collector.
Parameters: Parameters:
address : str address : str
The address of the device The address of the device
port : int port : int
The port of the device The port of the device
**settings **settings
Extra settings required by the driver. Extra settings required by the collector.
""" """
self._name = name self._name = name
self._address = address self._address = address
...@@ -139,7 +139,7 @@ class _Driver: ...@@ -139,7 +139,7 @@ class _Driver:
List[Union[bool, Exception]]: List[Union[bool, Exception]]:
""" Subscribe to state information of entire device or """ Subscribe to state information of entire device or
selected resources. Subscriptions are incremental. selected resources. Subscriptions are incremental.
Driver should keep track of requested resources. Collector should keep track of requested resources.
Parameters: Parameters:
subscriptions : List[Tuple[str, float, float]] subscriptions : List[Tuple[str, float, float]]
List of tuples, each containing a resource_key pointing the List of tuples, each containing a resource_key pointing the
...@@ -162,7 +162,7 @@ class _Driver: ...@@ -162,7 +162,7 @@ class _Driver:
-> List[Union[bool, Exception]]: -> List[Union[bool, Exception]]:
""" Unsubscribe from state information of entire device """ Unsubscribe from state information of entire device
or selected resources. Subscriptions are incremental. or selected resources. Subscriptions are incremental.
Driver should keep track of requested resources. Collector should keep track of requested resources.
Parameters: Parameters:
subscriptions : List[str] subscriptions : List[str]
List of tuples, each containing a resource_key pointing the List of tuples, each containing a resource_key pointing the
...@@ -188,37 +188,37 @@ class _Driver: ...@@ -188,37 +188,37 @@ class _Driver:
Operates as a generator, so this method should be called once and will Operates as a generator, so this method should be called once and will
block until values are available. When values are available, block until values are available. When values are available,
it should yield each of them and block again until new values are it should yield each of them and block again until new values are
available. When the driver is destroyed, GetState() can return instead available. When the collector is destroyed, GetState() can return instead
of yield to terminate the loop. of yield to terminate the loop.
Terminate enables to request interruption of the generation. Terminate enables to request interruption of the generation.
Examples: Examples:
# keep looping waiting for extra samples (generator loop) # keep looping waiting for extra samples (generator loop)
terminate = threading.Event() terminate = threading.Event()
i = 0 i = 0
for timestamp,resource_key,resource_value in my_driver.GetState(blocking=True, terminate=terminate): for timestamp,resource_key,resource_value in my_collector.GetState(blocking=True, terminate=terminate):
process(timestamp, resource_key, resource_value) process(timestamp, resource_key, resource_value)
i += 1 i += 1
if i == 10: terminate.set() if i == 10: terminate.set()
# just retrieve accumulated samples # just retrieve accumulated samples
samples = my_driver.GetState(blocking=False, terminate=terminate) samples = my_collector.GetState(blocking=False, terminate=terminate)
# or (as classical loop) # or (as classical loop)
i = 0 i = 0
for timestamp,resource_key,resource_value in my_driver.GetState(blocking=False, terminate=terminate): for timestamp,resource_key,resource_value in my_collector.GetState(blocking=False, terminate=terminate):
process(timestamp, resource_key, resource_value) process(timestamp, resource_key, resource_value)
i += 1 i += 1
if i == 10: terminate.set() if i == 10: terminate.set()
Parameters: Parameters:
blocking : bool blocking : bool
Select the driver behaviour. In both cases, the driver will Select the collector behaviour. In both cases, the collector will
first retrieve the samples accumulated and available in the first retrieve the samples accumulated and available in the
internal queue. Then, if blocking, the driver does not internal queue. Then, if blocking, the collector does not
terminate the loop and waits for additional samples to come, terminate the loop and waits for additional samples to come,
thus behaving as a generator. If non-blocking, the driver thus behaving as a generator. If non-blocking, the collector
terminates the loop and returns. Non-blocking behaviour can terminates the loop and returns. Non-blocking behaviour can
be used for periodically polling the driver, while blocking be used for periodically polling the collector, while blocking
can be used when a separate thread is in charge of can be used when a separate thread is in charge of
collecting the samples produced by the driver. collecting the samples produced by the collector.
terminate : threading.Event terminate : threading.Event
Signals the interruption of the GetState method as soon as Signals the interruption of the GetState method as soon as
possible. possible.
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -17,8 +17,6 @@ import queue ...@@ -17,8 +17,6 @@ import queue
import logging import logging
import uuid import uuid
import json import json
from telemetry.backend.drivers.emulated.EmulatedHelper import EmulatedDriverHelper
from telemetry.backend.driver_api._Driver import _Driver
from anytree import Node, Resolver from anytree import Node, Resolver
from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
...@@ -26,18 +24,18 @@ from apscheduler.jobstores.memory import MemoryJobStore ...@@ -26,18 +24,18 @@ from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Iterator, List, Tuple, Union, Optional from typing import Any, Iterator, List, Tuple, Union, Optional
from telemetry.backend.collector_api._Collector import _Collector
from .EmulatedHelper import EmulatedCollectorHelper
from .SyntheticMetricsGenerator import SyntheticMetricsGenerator from .SyntheticMetricsGenerator import SyntheticMetricsGenerator
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class EmulatedCollector(_Collector):
class EmulatedDriver(_Driver):
""" """
EmulatedDriver is a class that simulates a network driver for testing purposes. EmulatedCollector is a class that simulates a network collector for testing purposes.
It provides functionalities to manage configurations, state subscriptions, and synthetic data generation. It provides functionalities to manage configurations, state subscriptions, and synthetic data generation.
""" """
def __init__(self, address: str, port: int, **settings): def __init__(self, address: str, port: int, **settings):
super().__init__('emulated_driver', address, port, **settings) super().__init__('emulated_collector', address, port, **settings)
self._initial_config = Node('root') # Tree structure for initial config self._initial_config = Node('root') # Tree structure for initial config
self._running_config = Node('root') # Tree structure for running config self._running_config = Node('root') # Tree structure for running config
self._subscriptions = Node('subscriptions') # Tree for state subscriptions self._subscriptions = Node('subscriptions') # Tree for state subscriptions
...@@ -52,11 +50,11 @@ class EmulatedDriver(_Driver): ...@@ -52,11 +50,11 @@ class EmulatedDriver(_Driver):
) )
self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED)
self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED)
self._helper_methods = EmulatedDriverHelper() self._helper_methods = EmulatedCollectorHelper()
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.connected = False # To track connection state self.connected = False # To track connection state
self.logger.info("EmulatedDriver initialized") self.logger.info("EmulatedCollector initialized")
def Connect(self) -> bool: def Connect(self) -> bool:
self.logger.info(f"Connecting to {self.address}:{self.port}") self.logger.info(f"Connecting to {self.address}:{self.port}")
...@@ -68,7 +66,7 @@ class EmulatedDriver(_Driver): ...@@ -68,7 +66,7 @@ class EmulatedDriver(_Driver):
def Disconnect(self) -> bool: def Disconnect(self) -> bool:
self.logger.info(f"Disconnecting from {self.address}:{self.port}") self.logger.info(f"Disconnecting from {self.address}:{self.port}")
if not self.connected: if not self.connected:
self.logger.warning("Driver is not connected. Nothing to disconnect.") self.logger.warning("Collector is not connected. Nothing to disconnect.")
return False return False
self._scheduler.shutdown() self._scheduler.shutdown()
self.connected = False self.connected = False
...@@ -77,7 +75,7 @@ class EmulatedDriver(_Driver): ...@@ -77,7 +75,7 @@ class EmulatedDriver(_Driver):
def _require_connection(self): def _require_connection(self):
if not self.connected: if not self.connected:
raise RuntimeError("Driver is not connected. Please connect before performing operations.") raise RuntimeError("Collector is not connected. Please connect before performing operations.")
def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
self._require_connection() self._require_connection()
...@@ -219,7 +217,7 @@ class EmulatedDriver(_Driver): ...@@ -219,7 +217,7 @@ class EmulatedDriver(_Driver):
# ------- The below methods are kept for debugging purposes (test-case) only --------- # ------- The below methods are kept for debugging purposes (test-case) only ---------
#------------------------------------------------------------------------------------- #-------------------------------------------------------------------------------------
# This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_driver()). # This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_collector()).
def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes. def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes.
self._require_connection() self._require_connection()
results = [] results = []
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -17,9 +17,9 @@ import json ...@@ -17,9 +17,9 @@ import json
from typing import Any, List from typing import Any, List
class EmulatedDriverHelper: class EmulatedCollectorHelper:
""" """
Helper class for the emulated driver. Helper class for the emulated collector.
""" """
def __init__(self): def __init__(self):
pass pass
...@@ -48,7 +48,7 @@ class EmulatedDriverHelper: ...@@ -48,7 +48,7 @@ class EmulatedDriverHelper:
# ------- Below function is kept for debugging purposes (test-cases) only ------------- # ------- Below function is kept for debugging purposes (test-cases) only -------------
#-------------------------------------------------------------------------------------- #--------------------------------------------------------------------------------------
# This below methods can be commented but are called by the SetConfig method in EmulatedDriver.py # This below methods can be commented but are called by the SetConfig method in EmulatedCollector.py
def _find_or_create_node(self, name: str, parent: Node) -> Node: def _find_or_create_node(self, name: str, parent: Node) -> Node:
""" """
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import logging import logging
import time import time
import pytest import pytest
from telemetry.backend.drivers.emulated.EmulatedDriver import EmulatedDriver from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector
from telemetry.backend.tests.messages_emulated import ( from telemetry.backend.tests.messages_emulated import (
create_test_configuration, create_test_configuration,
create_specific_config_keys, create_specific_config_keys,
...@@ -27,82 +27,82 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %( ...@@ -27,82 +27,82 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@pytest.fixture @pytest.fixture
def setup_driver(): def setup_collector():
"""Sets up an EmulatedDriver instance for testing.""" """Sets up an EmulatedCollector instance for testing."""
yield EmulatedDriver(address="127.0.0.1", port=8080) yield EmulatedCollector(address="127.0.0.1", port=8080)
@pytest.fixture @pytest.fixture
def connected_configured_driver(setup_driver): def connected_configured_collector(setup_collector):
driver = setup_driver # EmulatedDriver(address="127.0.0.1", port=8080) collector = setup_collector # EmulatedCollector(address="127.0.0.1", port=8080)
driver.Connect() collector.Connect()
driver.SetConfig(create_test_configuration()) collector.SetConfig(create_test_configuration())
yield driver yield collector
driver.Disconnect() collector.Disconnect()
def test_connect(setup_driver): def test_connect(setup_collector):
logger.info(">>> test_connect <<<") logger.info(">>> test_connect <<<")
driver = setup_driver collector = setup_collector
assert driver.Connect() is True assert collector.Connect() is True
assert driver.connected is True assert collector.connected is True
def test_disconnect(setup_driver): def test_disconnect(setup_collector):
logger.info(">>> test_disconnect <<<") logger.info(">>> test_disconnect <<<")
driver = setup_driver collector = setup_collector
driver.Connect() collector.Connect()
assert driver.Disconnect() is True assert collector.Disconnect() is True
assert driver.connected is False assert collector.connected is False
# def test_set_config(setup_driver): # def test_set_config(setup_collector):
# logger.info(">>> test_set_config <<<") # logger.info(">>> test_set_config <<<")
# driver = setup_driver # collector = setup_collector
# driver.Connect() # collector.Connect()
# config = create_test_configuration() # config = create_test_configuration()
# results = driver.SetConfig(config) # results = collector.SetConfig(config)
# assert all(result is True for result in results) # assert all(result is True for result in results)
# def test_get_config(connected_configured_driver): # def test_get_config(connected_configured_collector):
# logger.info(">>> test_get_config <<<") # logger.info(">>> test_get_config <<<")
# resource_keys = create_specific_config_keys() # resource_keys = create_specific_config_keys()
# results = connected_configured_driver.GetConfig(resource_keys) # results = connected_configured_collector.GetConfig(resource_keys)
# for key, value in results: # for key, value in results:
# assert key in create_specific_config_keys() # assert key in create_specific_config_keys()
# assert value is not None # assert value is not None
# def test_delete_config(connected_configured_driver): # def test_delete_config(connected_configured_collector):
# logger.info(">>> test_delete_config <<<") # logger.info(">>> test_delete_config <<<")
# resource_keys = create_config_for_delete() # resource_keys = create_config_for_delete()
# results = connected_configured_driver.DeleteConfig(resource_keys) # results = connected_configured_collector.DeleteConfig(resource_keys)
# assert all(result is True for result in results) # assert all(result is True for result in results)
def test_subscribe_state(connected_configured_driver): def test_subscribe_state(connected_configured_collector):
logger.info(">>> test_subscribe_state <<<") logger.info(">>> test_subscribe_state <<<")
subscriptions = create_test_subscriptions() subscriptions = create_test_subscriptions()
results = connected_configured_driver.SubscribeState(subscriptions) results = connected_configured_collector.SubscribeState(subscriptions)
# logger.info(f"Subscribed result: {results}.") # logger.info(f"Subscribed result: {results}.")
assert results == [False, True, True] # all(result is True for result in results) assert results == [False, True, True] # all(result is True for result in results)
def test_unsubscribe_state(connected_configured_driver): def test_unsubscribe_state(connected_configured_collector):
logger.info(">>> test_unsubscribe_state <<<") logger.info(">>> test_unsubscribe_state <<<")
subscriptions = create_test_subscriptions() subscriptions = create_test_subscriptions()
connected_configured_driver.SubscribeState(subscriptions) connected_configured_collector.SubscribeState(subscriptions)
results = connected_configured_driver.UnsubscribeState(subscriptions) results = connected_configured_collector.UnsubscribeState(subscriptions)
assert results == [False, True, True] # all(result is True for result in results) assert results == [False, True, True] # all(result is True for result in results)
def test_get_state(connected_configured_driver): def test_get_state(connected_configured_collector):
logger.info(">>> test_get_state <<<") logger.info(">>> test_get_state <<<")
subscriptions = create_test_subscriptions() subscriptions = create_test_subscriptions()
connected_configured_driver.SubscribeState(subscriptions) connected_configured_collector.SubscribeState(subscriptions)
logger.info(f"Subscribed to state: {subscriptions}. waiting for 12 seconds ...") logger.info(f"Subscribed to state: {subscriptions}. waiting for 12 seconds ...")
time.sleep(12) time.sleep(12)
state_iterator = connected_configured_driver.GetState(blocking=False) state_iterator = connected_configured_collector.GetState(blocking=False)
states = list(state_iterator) states = list(state_iterator)
assert len(states) > 0 assert len(states) > 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment