From 3de33cb0713ed47d3fae5e08054e3be4c116489c Mon Sep 17 00:00:00 2001
From: armingol <pablo.armingolrobles@telefonica.com>
Date: Fri, 14 Feb 2025 12:36:40 +0100
Subject: [PATCH] code clean up
---
.../service/DeviceServiceServicerImpl.py | 6 +-
src/device/service/Tools.py | 3 +-
.../service/driver_api/DriverInstanceCache.py | 7 +-
src/device/service/drivers/__init__.py | 11 -
.../drivers/openconfig/OpenConfigDriver.py | 16 +-
.../drivers/openconfig/templates/EndPoints.py | 2 +-
.../openconfig/templates/Interfaces.py | 10 +-
.../service/drivers/pon_driver/Constants.py | 21 --
.../service/drivers/pon_driver/PON_Driver.py | 317 ------------------
.../pon_driver/SyntheticSamplingParameters.py | 86 -----
.../service/drivers/pon_driver/Tools.py | 89 -----
.../service/drivers/pon_driver/__init__.py | 14 -
12 files changed, 10 insertions(+), 572 deletions(-)
delete mode 100644 src/device/service/drivers/pon_driver/Constants.py
delete mode 100644 src/device/service/drivers/pon_driver/PON_Driver.py
delete mode 100644 src/device/service/drivers/pon_driver/SyntheticSamplingParameters.py
delete mode 100644 src/device/service/drivers/pon_driver/Tools.py
delete mode 100644 src/device/service/drivers/pon_driver/__init__.py
diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py
index 0a87e33fa..745c83dd2 100644
--- a/src/device/service/DeviceServiceServicerImpl.py
+++ b/src/device/service/DeviceServiceServicerImpl.py
@@ -61,7 +61,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
t0 = time.time()
device_uuid = request.device_id.device_uuid.uuid
-
+
connection_config_rules = check_connect_rules(request.device_config)
check_no_endpoints(request.device_endpoints)
@@ -94,7 +94,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
update_sap_id(device, self.sap_id)
self.sap_id += 1
-
+
t2 = time.time()
self.mutex_queues.add_alias(device_uuid, device_name)
@@ -102,7 +102,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
t3 = time.time()
try:
driver : _Driver = get_driver(self.driver_instance_cache, device)
-
+
t4 = time.time()
errors = []
diff --git a/src/device/service/Tools.py b/src/device/service/Tools.py
index 191dffca6..eace134ea 100644
--- a/src/device/service/Tools.py
+++ b/src/device/service/Tools.py
@@ -555,7 +555,6 @@ def extract_resources(config : dict, device : Device) -> list[list[dict], dict]:
return [resources, conditions]
-
def update_sap_id(device: Device, sap_id: int) -> None:
found = False
@@ -574,4 +573,4 @@ def update_sap_id(device: Device, sap_id: int) -> None:
new_rule.action = ConfigActionEnum.CONFIGACTION_SET
new_rule.custom.resource_key = '_connect/settings'
settings = {'sap_id': str(sap_id)}
- new_rule.custom.resource_value = json.dumps(settings)
\ No newline at end of file
+ new_rule.custom.resource_value = json.dumps(settings)
diff --git a/src/device/service/driver_api/DriverInstanceCache.py b/src/device/service/driver_api/DriverInstanceCache.py
index 4b6881a9f..29c5a5fb9 100644
--- a/src/device/service/driver_api/DriverInstanceCache.py
+++ b/src/device/service/driver_api/DriverInstanceCache.py
@@ -23,7 +23,6 @@ from .DriverFactory import DriverFactory
from .Exceptions import DriverInstanceCacheTerminatedException
from .FilterFields import FilterFieldEnum, get_device_driver_filter_fields
-
LOGGER = logging.getLogger(__name__)
class DriverInstanceCache:
@@ -95,7 +94,6 @@ def get_driver(driver_instance_cache : DriverInstanceCache, device : Device) ->
port = connect_rules.get('port', '0')
settings = connect_rules.get('settings', '{}')
-
try:
settings = json.loads(settings)
except ValueError as e:
@@ -104,9 +102,6 @@ def get_driver(driver_instance_cache : DriverInstanceCache, device : Device) ->
extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.'
) from e
-
-
-
driver : _Driver = driver_instance_cache.get(
device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
driver.Connect()
@@ -117,4 +112,4 @@ def preload_drivers(driver_instance_cache : DriverInstanceCache) -> None:
context_client = ContextClient()
devices = context_client.ListDevices(Empty())
for device in devices.devices: get_driver(driver_instance_cache, device)
-
+
diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py
index adb05f904..e3102cdf5 100644
--- a/src/device/service/drivers/__init__.py
+++ b/src/device/service/drivers/__init__.py
@@ -217,14 +217,3 @@ if LOAD_ALL_DEVICE_DRIVERS:
FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_QKD,
}
]))
-
-if LOAD_ALL_DEVICE_DRIVERS:
- from .pon_driver.PON_Driver import PON_Driver # pylint: disable=wrong-import-position
- DRIVERS.append(
- (PON_Driver, [
- {
- # Close enough, it does optical switching
- FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PON_CONTROLLER,
- FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_PON,
- }
- ]))
\ No newline at end of file
diff --git a/src/device/service/drivers/openconfig/OpenConfigDriver.py b/src/device/service/drivers/openconfig/OpenConfigDriver.py
index df3ad9ec2..594cc51f4 100644
--- a/src/device/service/drivers/openconfig/OpenConfigDriver.py
+++ b/src/device/service/drivers/openconfig/OpenConfigDriver.py
@@ -33,8 +33,6 @@ from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_su
from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse, cli_compose_config
from .RetryDecorator import retry
-
-
DEBUG_MODE = False
logging.getLogger('ncclient.manager').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING)
logging.getLogger('ncclient.transport.ssh').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING)
@@ -42,8 +40,6 @@ logging.getLogger('apscheduler.executors.default').setLevel(logging.INFO if DEBU
logging.getLogger('apscheduler.scheduler').setLevel(logging.INFO if DEBUG_MODE else logging.ERROR)
logging.getLogger('monitoring-client').setLevel(logging.INFO if DEBUG_MODE else logging.ERROR)
-LOGGER = logging.getLogger(__name__)
-
RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*')
RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*")
@@ -78,7 +74,6 @@ class NetconfSessionHandler:
self.__manager_params = settings.get('manager_params', {})
self.__nc_params = settings.get('nc_params', {})
self.__message_renderer = settings.get('message_renderer','jinja')
- LOGGER.info(f"[OpenConfigDriver] Settings recibidos: {settings}")
self.__manager : Manager = None
self.__candidate_supported = False
@@ -287,7 +282,7 @@ class OpenConfigDriver(_Driver):
self.__out_samples = queue.Queue()
self.__netconf_handler = NetconfSessionHandler(self.address, self.port, **(self.settings))
self.__samples_cache = SamplesCache(self.__netconf_handler, self.__logger)
- self.sap_id = 0 # Inicializar sap_id a cero
+ self.sap_id = 0 # Initialize sap_id to zero
def Connect(self) -> bool:
with self.__lock:
@@ -313,7 +308,7 @@ class OpenConfigDriver(_Driver):
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
with self.__lock:
return []
-
+
@metered_subclass_method(METRICS_POOL)
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type('resources', resource_keys, list)
@@ -335,13 +330,6 @@ class OpenConfigDriver(_Driver):
self.__logger.exception(MSG.format(str_resource_name, str(resource_key)))
results.append((resource_key, e)) # if validation fails, store the exception
self.sap_id += 1
- #LOGGER.info("RESULTS---------------")
- #LOGGER.info(results)
- #results.append(('/device/sap_id', {'sap_id': f'SAP_{self.sap_id}'}))
-
- #LOGGER.info("RESULTS---------------")
- #LOGGER.info(results)
- #OpenConfigDriver.sap_id += 1
return results
@metered_subclass_method(METRICS_POOL)
diff --git a/src/device/service/drivers/openconfig/templates/EndPoints.py b/src/device/service/drivers/openconfig/templates/EndPoints.py
index 92f0b4407..26be2e9ca 100644
--- a/src/device/service/drivers/openconfig/templates/EndPoints.py
+++ b/src/device/service/drivers/openconfig/templates/EndPoints.py
@@ -16,7 +16,7 @@ import logging, lxml.etree as ET
from typing import Any, Dict, List, Tuple
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from .Namespace import NAMESPACES
-from .Tools import add_value_from_collection, add_value_from_tag#, get_sap_id
+from .Tools import add_value_from_collection, add_value_from_tag
LOGGER = logging.getLogger(__name__)
diff --git a/src/device/service/drivers/openconfig/templates/Interfaces.py b/src/device/service/drivers/openconfig/templates/Interfaces.py
index 7b972d3f7..db91e2bb0 100644
--- a/src/device/service/drivers/openconfig/templates/Interfaces.py
+++ b/src/device/service/drivers/openconfig/templates/Interfaces.py
@@ -27,8 +27,6 @@ XPATH_IPV6ADDRESSES = ".//ociip:ipv6/ociip:addresses/ociip:address"
def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]:
response = []
sap_interface_counter = 0
-
-
for xml_interface in xml_data.xpath(XPATH_INTERFACES, namespaces=NAMESPACES):
#LOGGER.info('xml_interface = {:s}'.format(str(ET.tostring(xml_interface))))
@@ -46,7 +44,6 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]:
interface_name = xml_interface.find('oci:name', namespaces=NAMESPACES)
if interface_name is None or interface_name.text is None: continue
add_value_from_tag(interface, 'name', interface_name)
-
sap_interface = ET.Element('dummy') #SAP-ID
sap_interface.text =str(sap_interface_counter)
@@ -54,7 +51,6 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]:
add_value_from_tag(interface, 'sap_id', sap_interface)
sap_interface_counter += 1
-
# Get the type of interface according to the vendor's type
if 'ianaift:' in interface_type.text:
interface_type.text = interface_type.text.replace('ianaift:', '') #ADVA
@@ -68,9 +64,8 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]:
interface_description = xml_interface.find('oci:config/oci:description', namespaces=NAMESPACES)
add_value_from_tag(interface, 'description', interface_description)
-
- sap_subinterface_counter = 65 # 65 = A in ASCII
-
+ sap_subinterface_counter = 65
+
for xml_subinterface in xml_interface.xpath(XPATH_SUBINTERFACES, namespaces=NAMESPACES):
#LOGGER.info('xml_subinterface = {:s}'.format(str(ET.tostring(xml_subinterface))))
@@ -88,7 +83,6 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]:
vlan_id = xml_subinterface.find('ocv:vlan/ocv:match/ocv:single-tagged/ocv:config/ocv:vlan-id', namespaces=NAMESPACES)
add_value_from_tag(subinterface, 'vlan_id', vlan_id, cast=int)
-
sap_subinterface = ET.Element('dummy') #SAP-ID
value = (aux + chr(sap_subinterface_counter))
sap_subinterface.text = value
diff --git a/src/device/service/drivers/pon_driver/Constants.py b/src/device/service/drivers/pon_driver/Constants.py
deleted file mode 100644
index 3d349152e..000000000
--- a/src/device/service/drivers/pon_driver/Constants.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
-
-SPECIAL_RESOURCE_MAPPINGS = {
- RESOURCE_ENDPOINTS : '/endpoints',
- RESOURCE_INTERFACES : '/interfaces',
- RESOURCE_NETWORK_INSTANCES: '/net-instances',
-}
diff --git a/src/device/service/drivers/pon_driver/PON_Driver.py b/src/device/service/drivers/pon_driver/PON_Driver.py
deleted file mode 100644
index cf2c4ac0d..000000000
--- a/src/device/service/drivers/pon_driver/PON_Driver.py
+++ /dev/null
@@ -1,317 +0,0 @@
-# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import anytree, json, logging, pytz, queue, re, threading
-from datetime import datetime, timedelta
-from typing import Any, Iterator, List, Optional, Tuple, Union
-from apscheduler.executors.pool import ThreadPoolExecutor
-from apscheduler.job import Job
-from apscheduler.jobstores.memory import MemoryJobStore
-from apscheduler.schedulers.background import BackgroundScheduler
-from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
-from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
-from device.service.driver_api._Driver import _Driver
-from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
-from .Constants import SPECIAL_RESOURCE_MAPPINGS
-from .SyntheticSamplingParameters import SyntheticSamplingParameters, do_sampling
-from .Tools import compose_resource_endpoint
-import requests
-
-LOGGER = logging.getLogger(__name__)
-
-RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r'^\/interface\[([^\]]+)\].*')
-
-DRIVER_NAME = 'PON_DRIVER'
-METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
-
-
-CONTROLLER_IP = "CONTROLLER-IP-ADDRESS" # REPLACE
-CONTROLLER_PORT = 3333
-API_ENDPOINT = f"http://{CONTROLLER_IP}:{CONTROLLER_PORT}/api/service/"
-
-
-class PON_Driver(_Driver):
- def __init__(self, address : str, port : int, **settings) -> None:
- super().__init__(DRIVER_NAME, address, port, **settings)
- self.__lock = threading.Lock()
- self.__initial = TreeNode('.')
- self.__running = TreeNode('.')
- self.__subscriptions = TreeNode('.')
-
- endpoints = self.settings.get('endpoints', [])
- endpoint_resources = []
- for endpoint in endpoints:
- endpoint_resource = compose_resource_endpoint(endpoint)
- if endpoint_resource is None: continue
- endpoint_resources.append(endpoint_resource)
- self.SetConfig(endpoint_resources)
-
- self.__started = threading.Event()
- self.__terminate = threading.Event()
- self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
- self.__scheduler.configure(
- jobstores = {'default': MemoryJobStore()},
- executors = {'default': ThreadPoolExecutor(max_workers=1)},
- job_defaults = {'coalesce': False, 'max_instances': 3},
- timezone=pytz.utc)
- self.__out_samples = queue.Queue()
- self.__synthetic_sampling_parameters = SyntheticSamplingParameters()
-
- def Connect(self) -> bool:
- # If started, assume it is already connected
- if self.__started.is_set(): return True
-
- # Connect triggers activation of sampling events that will be scheduled based on subscriptions
- self.__scheduler.start()
-
- # Indicate the driver is now connected to the device
- self.__started.set()
- return True
-
- def Disconnect(self) -> bool:
- # Trigger termination of loops and processes
- self.__terminate.set()
-
- # If not started, assume it is already disconnected
- if not self.__started.is_set(): return True
-
- # Disconnect triggers deactivation of sampling events
- self.__scheduler.shutdown()
- return True
-
- @metered_subclass_method(METRICS_POOL)
- def GetInitialConfig(self) -> List[Tuple[str, Any]]:
- with self.__lock:
- return dump_subtree(self.__initial)
-
- @metered_subclass_method(METRICS_POOL)
- def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
- chk_type('resources', resource_keys, list)
- with self.__lock:
- if len(resource_keys) == 0: return dump_subtree(self.__running)
- results = []
- resolver = anytree.Resolver(pathattr='name')
- for i,resource_key in enumerate(resource_keys):
- str_resource_name = 'resource_key[#{:d}]'.format(i)
- try:
- chk_string(str_resource_name, resource_key, allow_empty=False)
- resource_key = SPECIAL_RESOURCE_MAPPINGS.get(resource_key, resource_key)
- resource_path = resource_key.split('/')
- except Exception as e: # pylint: disable=broad-except
- LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
- results.append((resource_key, e)) # if validation fails, store the exception
- continue
-
- resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
- # if not found, resource_node is None
- if resource_node is None: continue
- results.extend(dump_subtree(resource_node))
- return results
-
- def send_pon_connection_request(self, ont_id: str, cvlan: int, ethernet_port: str, svlan: int, profile: str, bw: int) -> bool:
- """
- Envía una petición POST para establecer la conexión entre la red de acceso PON y la OLS.
- """
- payload = {
- "ont_id": ont_id,
- "cvlan": cvlan,
- "ethernet_port": ethernet_port,
- "svlan": svlan,
- "profile": profile,
- "bw": bw
- }
- headers = {
- "accept": "application/json",
- "Content-Type": "application/json"
- }
-
- try:
- response = requests.post(API_ENDPOINT, headers=headers, json=payload)
- if response.status_code == 200:
- LOGGER.info(f"Connection successfully established: {payload}")
- return True
- else:
- LOGGER.error(f"Connection error: {response.status_code} - {response.text}")
- return False
- except requests.RequestException as e:
- LOGGER.error(f"Error when sending the request: {str(e)}")
- return False
-
- @metered_subclass_method(METRICS_POOL)
- def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
- chk_type('resources', resources, list)
- if len(resources) == 0: return []
- results = []
- resolver = anytree.Resolver(pathattr='name')
- with self.__lock:
- for i,resource in enumerate(resources):
- str_resource_name = 'resources[#{:d}]'.format(i)
- try:
- chk_type(str_resource_name, resource, (list, tuple))
- chk_length(str_resource_name, resource, min_length=2, max_length=2)
- resource_key,resource_value = resource
- chk_string(str_resource_name, resource_key, allow_empty=False)
- resource_path = resource_key.split('/')
-
-
- result = self.send_pon_connection_request(2, 22, 2, 333, "ef", 2500000)
- results.append(result)
-
- except Exception as e:
- LOGGER.exception(f"Error procesando la configuración {resource}: {str(e)}")
- results.append(e)
-
-
-
- #match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key)
- #if match is not None:
- # endpoint_uuid = match.group(1)
- # if '.' in endpoint_uuid: endpoint_uuid = endpoint_uuid.split('.')[0]
- # self.__synthetic_sampling_parameters.set_endpoint_configured(endpoint_uuid)
-
- #results.append(True)
- return results
-
-
- @metered_subclass_method(METRICS_POOL)
- def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
- chk_type('resources', resources, list)
- if len(resources) == 0: return []
- results = []
- resolver = anytree.Resolver(pathattr='name')
- with self.__lock:
- for i,resource in enumerate(resources):
- str_resource_name = 'resources[#{:d}]'.format(i)
- try:
- chk_type(str_resource_name, resource, (list, tuple))
- chk_length(str_resource_name, resource, min_length=2, max_length=2)
- resource_key,_ = resource
- chk_string(str_resource_name, resource_key, allow_empty=False)
- resource_path = resource_key.split('/')
- except Exception as e: # pylint: disable=broad-except
- LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
- results.append(e) # if validation fails, store the exception
- continue
-
- resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
- # if not found, resource_node is None
- if resource_node is None:
- results.append(False)
- continue
-
- match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key)
- if match is not None:
- endpoint_uuid = match.group(1)
- if '.' in endpoint_uuid: endpoint_uuid = endpoint_uuid.split('.')[0]
- self.__synthetic_sampling_parameters.unset_endpoint_configured(endpoint_uuid)
-
- parent = resource_node.parent
- children = list(parent.children)
- children.remove(resource_node)
- parent.children = tuple(children)
- results.append(True)
- return results
-
- @metered_subclass_method(METRICS_POOL)
- def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
- chk_type('subscriptions', subscriptions, list)
- if len(subscriptions) == 0: return []
- results = []
- resolver = anytree.Resolver(pathattr='name')
- with self.__lock:
- for i,subscription in enumerate(subscriptions):
- str_subscription_name = 'subscriptions[#{:d}]'.format(i)
- try:
- chk_type(str_subscription_name, subscription, (list, tuple))
- chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
- resource_key,sampling_duration,sampling_interval = subscription
- chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
- resource_path = resource_key.split('/')
- chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
- chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
- except Exception as e: # pylint: disable=broad-except
- LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
- results.append(e) # if validation fails, store the exception
- continue
-
- start_date,end_date = None,None
- if sampling_duration <= 1.e-12:
- start_date = datetime.utcnow()
- end_date = start_date + timedelta(seconds=sampling_duration)
-
- job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
- job = self.__scheduler.add_job(
- do_sampling, args=(self.__synthetic_sampling_parameters, resource_key, self.__out_samples),
- kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date,
- end_date=end_date, timezone=pytz.utc)
-
- subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
- set_subnode_value(resolver, self.__subscriptions, subscription_path, job)
- results.append(True)
- return results
-
- @metered_subclass_method(METRICS_POOL)
- def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
- chk_type('subscriptions', subscriptions, list)
- if len(subscriptions) == 0: return []
- results = []
- resolver = anytree.Resolver(pathattr='name')
- with self.__lock:
- for i,resource in enumerate(subscriptions):
- str_subscription_name = 'resources[#{:d}]'.format(i)
- try:
- chk_type(str_subscription_name, resource, (list, tuple))
- chk_length(str_subscription_name, resource, min_length=3, max_length=3)
- resource_key,sampling_duration,sampling_interval = resource
- chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
- resource_path = resource_key.split('/')
- chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
- chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
- except Exception as e: # pylint: disable=broad-except
- LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
- results.append(e) # if validation fails, store the exception
- continue
-
- subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
- subscription_node = get_subnode(resolver, self.__subscriptions, subscription_path)
-
- # if not found, resource_node is None
- if subscription_node is None:
- results.append(False)
- continue
-
- job : Job = getattr(subscription_node, 'value', None)
- if job is None or not isinstance(job, Job):
- raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource)))
- job.remove()
-
- parent = subscription_node.parent
- children = list(parent.children)
- children.remove(subscription_node)
- parent.children = tuple(children)
-
- results.append(True)
- return results
-
- def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]:
- while True:
- if self.__terminate.is_set(): break
- if terminate is not None and terminate.is_set(): break
- try:
- sample = self.__out_samples.get(block=blocking, timeout=0.1)
- except queue.Empty:
- if blocking: continue
- return
- if sample is None: continue
- yield sample
diff --git a/src/device/service/drivers/pon_driver/SyntheticSamplingParameters.py b/src/device/service/drivers/pon_driver/SyntheticSamplingParameters.py
deleted file mode 100644
index e25e207e8..000000000
--- a/src/device/service/drivers/pon_driver/SyntheticSamplingParameters.py
+++ /dev/null
@@ -1,86 +0,0 @@
-# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging, math, queue, random, re, threading
-from datetime import datetime
-from typing import Optional, Tuple
-
-LOGGER = logging.getLogger(__name__)
-
-RE_GET_ENDPOINT_METRIC = re.compile(r'.*\/endpoint\[([^\]]+)\]\/state\/(.*)')
-
-MSG_ERROR_PARSE = '[get] unable to extract endpoint-metric from monitoring_resource_key "{:s}"'
-MSG_INFO = '[get] monitoring_resource_key={:s}, endpoint_uuid={:s}, metric={:s}, metric_sense={:s}'
-
-class SyntheticSamplingParameters:
- def __init__(self) -> None:
- self.__lock = threading.Lock()
- self.__data = {}
- self.__configured_endpoints = set()
-
- def set_endpoint_configured(self, endpoint_uuid : str):
- with self.__lock:
- self.__configured_endpoints.add(endpoint_uuid)
-
- def unset_endpoint_configured(self, endpoint_uuid : str):
- with self.__lock:
- self.__configured_endpoints.discard(endpoint_uuid)
-
- def get(self, monitoring_resource_key : str) -> Optional[Tuple[float, float, float, float, float]]:
- with self.__lock:
- match = RE_GET_ENDPOINT_METRIC.match(monitoring_resource_key)
- if match is None:
- LOGGER.error(MSG_ERROR_PARSE.format(monitoring_resource_key))
- return None
- endpoint_uuid = match.group(1)
-
- # If endpoint is not configured, generate a flat synthetic traffic aligned at 0
- if endpoint_uuid not in self.__configured_endpoints: return (0, 0, 1, 0, 0)
-
- metric = match.group(2)
- metric_sense = metric.lower().replace('packets_', '').replace('bytes_', '')
-
- LOGGER.debug(MSG_INFO.format(monitoring_resource_key, endpoint_uuid, metric, metric_sense))
-
- parameters_key = '{:s}-{:s}'.format(endpoint_uuid, metric_sense)
- parameters = self.__data.get(parameters_key)
- if parameters is not None: return parameters
-
- # assume packets
- amplitude = 1.e7 * random.random()
- phase = 60 * random.random()
- period = 3600 * random.random()
- offset = 1.e8 * random.random() + amplitude
- avg_bytes_per_packet = random.randint(500, 1500)
- parameters = (amplitude, phase, period, offset, avg_bytes_per_packet)
- return self.__data.setdefault(parameters_key, parameters)
-
-def do_sampling(
- synthetic_sampling_parameters : SyntheticSamplingParameters, monitoring_resource_key : str,
- out_samples : queue.Queue
-) -> None:
- parameters = synthetic_sampling_parameters.get(monitoring_resource_key)
- if parameters is None: return
- amplitude, phase, period, offset, avg_bytes_per_packet = parameters
-
- if 'bytes' in monitoring_resource_key.lower():
- # convert to bytes
- amplitude = avg_bytes_per_packet * amplitude
- offset = avg_bytes_per_packet * offset
-
- timestamp = datetime.timestamp(datetime.utcnow())
- waveform = amplitude * math.sin(2 * math.pi * timestamp / period + phase) + offset
- noise = amplitude * random.random()
- value = abs(0.95 * waveform + 0.05 * noise)
- out_samples.put_nowait((timestamp, monitoring_resource_key, value))
diff --git a/src/device/service/drivers/pon_driver/Tools.py b/src/device/service/drivers/pon_driver/Tools.py
deleted file mode 100644
index 9f2a105c0..000000000
--- a/src/device/service/drivers/pon_driver/Tools.py
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from typing import Any, Dict, Optional, Tuple
-from common.proto.kpi_sample_types_pb2 import KpiSampleType
-from common.type_checkers.Checkers import chk_attribute, chk_string, chk_type
-from device.service.driver_api._Driver import RESOURCE_ENDPOINTS
-from .Constants import SPECIAL_RESOURCE_MAPPINGS
-
-LOGGER = logging.getLogger(__name__)
-
-def process_optional_string_field(
- endpoint_data : Dict[str, Any], field_name : str, endpoint_resource_value : Dict[str, Any]
-) -> None:
- field_value = chk_attribute(field_name, endpoint_data, 'endpoint_data', default=None)
- if field_value is None: return
- chk_string('endpoint_data.{:s}'.format(field_name), field_value)
- if len(field_value) > 0: endpoint_resource_value[field_name] = field_value
-
-def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Optional[Tuple[str, Dict]]:
- try:
- # Check type of endpoint_data
- chk_type('endpoint_data', endpoint_data, dict)
-
- # Check endpoint UUID (mandatory)
- endpoint_uuid = chk_attribute('uuid', endpoint_data, 'endpoint_data')
- chk_string('endpoint_data.uuid', endpoint_uuid, min_length=1)
- endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS)
- endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid)
- endpoint_resource_value = {'uuid': endpoint_uuid}
-
- # Check endpoint optional string fields
- process_optional_string_field(endpoint_data, 'name', endpoint_resource_value)
- process_optional_string_field(endpoint_data, 'type', endpoint_resource_value)
- process_optional_string_field(endpoint_data, 'context_uuid', endpoint_resource_value)
- process_optional_string_field(endpoint_data, 'topology_uuid', endpoint_resource_value)
-
- # Check endpoint sample types (optional)
- endpoint_sample_types = chk_attribute('sample_types', endpoint_data, 'endpoint_data', default=[])
- chk_type('endpoint_data.sample_types', endpoint_sample_types, list)
- sample_types = {}
- sample_type_errors = []
- for i,endpoint_sample_type in enumerate(endpoint_sample_types):
- field_name = 'endpoint_data.sample_types[{:d}]'.format(i)
- try:
- chk_type(field_name, endpoint_sample_type, (int, str))
- if isinstance(endpoint_sample_type, int):
- metric_name = KpiSampleType.Name(endpoint_sample_type)
- metric_id = endpoint_sample_type
- elif isinstance(endpoint_sample_type, str):
- metric_id = KpiSampleType.Value(endpoint_sample_type)
- metric_name = endpoint_sample_type
- else:
- str_type = str(type(endpoint_sample_type))
- raise Exception('Bad format: {:s}'.format(str_type)) # pylint: disable=broad-exception-raised
- except Exception as e: # pylint: disable=broad-exception-caught
- MSG = 'Unsupported {:s}({:s}) : {:s}'
- sample_type_errors.append(MSG.format(field_name, str(endpoint_sample_type), str(e)))
-
- metric_name = metric_name.lower().replace('kpisampletype_', '')
- monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name)
- sample_types[metric_id] = monitoring_resource_key
-
- if len(sample_type_errors) > 0:
- # pylint: disable=broad-exception-raised
- raise Exception('Malformed Sample Types:\n{:s}'.format('\n'.join(sample_type_errors)))
-
- if len(sample_types) > 0:
- endpoint_resource_value['sample_types'] = sample_types
-
- if 'location' in endpoint_data:
- endpoint_resource_value['location'] = endpoint_data['location']
-
- return endpoint_resource_key, endpoint_resource_value
- except: # pylint: disable=bare-except
- LOGGER.exception('Problem composing endpoint({:s})'.format(str(endpoint_data)))
- return None
diff --git a/src/device/service/drivers/pon_driver/__init__.py b/src/device/service/drivers/pon_driver/__init__.py
deleted file mode 100644
index 53d5157f7..000000000
--- a/src/device/service/drivers/pon_driver/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
--
GitLab