diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index 0a87e33faab47088ee7116bfe2e0d3094950b7ed..745c83dd26fbae39e27900b141f5ad787f71e885 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -61,7 +61,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): t0 = time.time() device_uuid = request.device_id.device_uuid.uuid - + connection_config_rules = check_connect_rules(request.device_config) check_no_endpoints(request.device_endpoints) @@ -94,7 +94,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): update_sap_id(device, self.sap_id) self.sap_id += 1 - + t2 = time.time() self.mutex_queues.add_alias(device_uuid, device_name) @@ -102,7 +102,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): t3 = time.time() try: driver : _Driver = get_driver(self.driver_instance_cache, device) - + t4 = time.time() errors = [] diff --git a/src/device/service/Tools.py b/src/device/service/Tools.py index 191dffca682aee75e85d32b10743bcc3ff560698..eace134eafce5b0b188062349076512b7069e71c 100644 --- a/src/device/service/Tools.py +++ b/src/device/service/Tools.py @@ -555,7 +555,6 @@ def extract_resources(config : dict, device : Device) -> list[list[dict], dict]: return [resources, conditions] - def update_sap_id(device: Device, sap_id: int) -> None: found = False @@ -574,4 +573,4 @@ def update_sap_id(device: Device, sap_id: int) -> None: new_rule.action = ConfigActionEnum.CONFIGACTION_SET new_rule.custom.resource_key = '_connect/settings' settings = {'sap_id': str(sap_id)} - new_rule.custom.resource_value = json.dumps(settings) \ No newline at end of file + new_rule.custom.resource_value = json.dumps(settings) diff --git a/src/device/service/driver_api/DriverInstanceCache.py b/src/device/service/driver_api/DriverInstanceCache.py index 4b6881a9fca0dcd02d393440899d9ce54522ad55..29c5a5fb97cd939874fe62e9be60f20d045702e0 100644 --- a/src/device/service/driver_api/DriverInstanceCache.py +++ b/src/device/service/driver_api/DriverInstanceCache.py @@ -23,7 +23,6 @@ from .DriverFactory import DriverFactory from .Exceptions import DriverInstanceCacheTerminatedException from .FilterFields import FilterFieldEnum, get_device_driver_filter_fields - LOGGER = logging.getLogger(__name__) class DriverInstanceCache: @@ -95,7 +94,6 @@ def get_driver(driver_instance_cache : DriverInstanceCache, device : Device) -> port = connect_rules.get('port', '0') settings = connect_rules.get('settings', '{}') - try: settings = json.loads(settings) except ValueError as e: @@ -104,9 +102,6 @@ def get_driver(driver_instance_cache : DriverInstanceCache, device : Device) -> extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.' ) from e - - - driver : _Driver = driver_instance_cache.get( device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings) driver.Connect() @@ -117,4 +112,4 @@ def preload_drivers(driver_instance_cache : DriverInstanceCache) -> None: context_client = ContextClient() devices = context_client.ListDevices(Empty()) for device in devices.devices: get_driver(driver_instance_cache, device) - + diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index adb05f904fc28afaf5a4447caef47db4aae3ff6f..e3102cdf523a4e0b551873bb8f0c423db00aebf0 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -217,14 +217,3 @@ if LOAD_ALL_DEVICE_DRIVERS: FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_QKD, } ])) - -if LOAD_ALL_DEVICE_DRIVERS: - from .pon_driver.PON_Driver import PON_Driver # pylint: disable=wrong-import-position - DRIVERS.append( - (PON_Driver, [ - { - # Close enough, it does optical switching - FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PON_CONTROLLER, - FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_PON, - } - ])) \ No newline at end of file diff --git a/src/device/service/drivers/openconfig/OpenConfigDriver.py b/src/device/service/drivers/openconfig/OpenConfigDriver.py index df3ad9ec237109a26c9df174086e2e97854bb36a..594cc51f4330c44f2be8eebebade9ff1af158d93 100644 --- a/src/device/service/drivers/openconfig/OpenConfigDriver.py +++ b/src/device/service/drivers/openconfig/OpenConfigDriver.py @@ -33,8 +33,6 @@ from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_su from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse, cli_compose_config from .RetryDecorator import retry - - DEBUG_MODE = False logging.getLogger('ncclient.manager').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING) logging.getLogger('ncclient.transport.ssh').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING) @@ -42,8 +40,6 @@ logging.getLogger('apscheduler.executors.default').setLevel(logging.INFO if DEBU logging.getLogger('apscheduler.scheduler').setLevel(logging.INFO if DEBUG_MODE else logging.ERROR) logging.getLogger('monitoring-client').setLevel(logging.INFO if DEBUG_MODE else logging.ERROR) -LOGGER = logging.getLogger(__name__) - RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*') RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*") @@ -78,7 +74,6 @@ class NetconfSessionHandler: self.__manager_params = settings.get('manager_params', {}) self.__nc_params = settings.get('nc_params', {}) self.__message_renderer = settings.get('message_renderer','jinja') - LOGGER.info(f"[OpenConfigDriver] Settings recibidos: {settings}") self.__manager : Manager = None self.__candidate_supported = False @@ -287,7 +282,7 @@ class OpenConfigDriver(_Driver): self.__out_samples = queue.Queue() self.__netconf_handler = NetconfSessionHandler(self.address, self.port, **(self.settings)) self.__samples_cache = SamplesCache(self.__netconf_handler, self.__logger) - self.sap_id = 0 # Inicializar sap_id a cero + self.sap_id = 0 # Initialize sap_id to zero def Connect(self) -> bool: with self.__lock: @@ -313,7 +308,7 @@ class OpenConfigDriver(_Driver): def GetInitialConfig(self) -> List[Tuple[str, Any]]: with self.__lock: return [] - + @metered_subclass_method(METRICS_POOL) def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: chk_type('resources', resource_keys, list) @@ -335,13 +330,6 @@ class OpenConfigDriver(_Driver): self.__logger.exception(MSG.format(str_resource_name, str(resource_key))) results.append((resource_key, e)) # if validation fails, store the exception self.sap_id += 1 - #LOGGER.info("RESULTS---------------") - #LOGGER.info(results) - #results.append(('/device/sap_id', {'sap_id': f'SAP_{self.sap_id}'})) - - #LOGGER.info("RESULTS---------------") - #LOGGER.info(results) - #OpenConfigDriver.sap_id += 1 return results @metered_subclass_method(METRICS_POOL) diff --git a/src/device/service/drivers/openconfig/templates/EndPoints.py b/src/device/service/drivers/openconfig/templates/EndPoints.py index 92f0b44079d8bda682ee6180a095a8eb14e11e75..26be2e9cad78b33a55271575427ccf30a1488261 100644 --- a/src/device/service/drivers/openconfig/templates/EndPoints.py +++ b/src/device/service/drivers/openconfig/templates/EndPoints.py @@ -16,7 +16,7 @@ import logging, lxml.etree as ET from typing import Any, Dict, List, Tuple from common.proto.kpi_sample_types_pb2 import KpiSampleType from .Namespace import NAMESPACES -from .Tools import add_value_from_collection, add_value_from_tag#, get_sap_id +from .Tools import add_value_from_collection, add_value_from_tag LOGGER = logging.getLogger(__name__) diff --git a/src/device/service/drivers/openconfig/templates/Interfaces.py b/src/device/service/drivers/openconfig/templates/Interfaces.py index 7b972d3f770a5b37fdcccb05c412e779b557ba60..db91e2bb0bd4ac2b2721df3ca2e6dc902f51479e 100644 --- a/src/device/service/drivers/openconfig/templates/Interfaces.py +++ b/src/device/service/drivers/openconfig/templates/Interfaces.py @@ -27,8 +27,6 @@ XPATH_IPV6ADDRESSES = ".//ociip:ipv6/ociip:addresses/ociip:address" def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]: response = [] sap_interface_counter = 0 - - for xml_interface in xml_data.xpath(XPATH_INTERFACES, namespaces=NAMESPACES): #LOGGER.info('xml_interface = {:s}'.format(str(ET.tostring(xml_interface)))) @@ -46,7 +44,6 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]: interface_name = xml_interface.find('oci:name', namespaces=NAMESPACES) if interface_name is None or interface_name.text is None: continue add_value_from_tag(interface, 'name', interface_name) - sap_interface = ET.Element('dummy') #SAP-ID sap_interface.text =str(sap_interface_counter) @@ -54,7 +51,6 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]: add_value_from_tag(interface, 'sap_id', sap_interface) sap_interface_counter += 1 - # Get the type of interface according to the vendor's type if 'ianaift:' in interface_type.text: interface_type.text = interface_type.text.replace('ianaift:', '') #ADVA @@ -68,9 +64,8 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]: interface_description = xml_interface.find('oci:config/oci:description', namespaces=NAMESPACES) add_value_from_tag(interface, 'description', interface_description) - - sap_subinterface_counter = 65 # 65 = A in ASCII - + sap_subinterface_counter = 65 + for xml_subinterface in xml_interface.xpath(XPATH_SUBINTERFACES, namespaces=NAMESPACES): #LOGGER.info('xml_subinterface = {:s}'.format(str(ET.tostring(xml_subinterface)))) @@ -88,7 +83,6 @@ def parse(xml_data : ET.Element) -> List[Tuple[str, Dict[str, Any]]]: vlan_id = xml_subinterface.find('ocv:vlan/ocv:match/ocv:single-tagged/ocv:config/ocv:vlan-id', namespaces=NAMESPACES) add_value_from_tag(subinterface, 'vlan_id', vlan_id, cast=int) - sap_subinterface = ET.Element('dummy') #SAP-ID value = (aux + chr(sap_subinterface_counter)) sap_subinterface.text = value diff --git a/src/device/service/drivers/pon_driver/Constants.py b/src/device/service/drivers/pon_driver/Constants.py deleted file mode 100644 index 3d349152eb03c04222ae376a3aed21605bcc7708..0000000000000000000000000000000000000000 --- a/src/device/service/drivers/pon_driver/Constants.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES - -SPECIAL_RESOURCE_MAPPINGS = { - RESOURCE_ENDPOINTS : '/endpoints', - RESOURCE_INTERFACES : '/interfaces', - RESOURCE_NETWORK_INSTANCES: '/net-instances', -} diff --git a/src/device/service/drivers/pon_driver/PON_Driver.py b/src/device/service/drivers/pon_driver/PON_Driver.py deleted file mode 100644 index cf2c4ac0d9c4f1ddafb6dc4e22d11e60ca2e10c4..0000000000000000000000000000000000000000 --- a/src/device/service/drivers/pon_driver/PON_Driver.py +++ /dev/null @@ -1,317 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import anytree, json, logging, pytz, queue, re, threading -from datetime import datetime, timedelta -from typing import Any, Iterator, List, Optional, Tuple, Union -from apscheduler.executors.pool import ThreadPoolExecutor -from apscheduler.job import Job -from apscheduler.jobstores.memory import MemoryJobStore -from apscheduler.schedulers.background import BackgroundScheduler -from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method -from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type -from device.service.driver_api._Driver import _Driver -from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value -from .Constants import SPECIAL_RESOURCE_MAPPINGS -from .SyntheticSamplingParameters import SyntheticSamplingParameters, do_sampling -from .Tools import compose_resource_endpoint -import requests - -LOGGER = logging.getLogger(__name__) - -RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r'^\/interface\[([^\]]+)\].*') - -DRIVER_NAME = 'PON_DRIVER' -METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) - - -CONTROLLER_IP = "CONTROLLER-IP-ADDRESS" # REPLACE -CONTROLLER_PORT = 3333 -API_ENDPOINT = f"http://{CONTROLLER_IP}:{CONTROLLER_PORT}/api/service/" - - -class PON_Driver(_Driver): - def __init__(self, address : str, port : int, **settings) -> None: - super().__init__(DRIVER_NAME, address, port, **settings) - self.__lock = threading.Lock() - self.__initial = TreeNode('.') - self.__running = TreeNode('.') - self.__subscriptions = TreeNode('.') - - endpoints = self.settings.get('endpoints', []) - endpoint_resources = [] - for endpoint in endpoints: - endpoint_resource = compose_resource_endpoint(endpoint) - if endpoint_resource is None: continue - endpoint_resources.append(endpoint_resource) - self.SetConfig(endpoint_resources) - - self.__started = threading.Event() - self.__terminate = threading.Event() - self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events - self.__scheduler.configure( - jobstores = {'default': MemoryJobStore()}, - executors = {'default': ThreadPoolExecutor(max_workers=1)}, - job_defaults = {'coalesce': False, 'max_instances': 3}, - timezone=pytz.utc) - self.__out_samples = queue.Queue() - self.__synthetic_sampling_parameters = SyntheticSamplingParameters() - - def Connect(self) -> bool: - # If started, assume it is already connected - if self.__started.is_set(): return True - - # Connect triggers activation of sampling events that will be scheduled based on subscriptions - self.__scheduler.start() - - # Indicate the driver is now connected to the device - self.__started.set() - return True - - def Disconnect(self) -> bool: - # Trigger termination of loops and processes - self.__terminate.set() - - # If not started, assume it is already disconnected - if not self.__started.is_set(): return True - - # Disconnect triggers deactivation of sampling events - self.__scheduler.shutdown() - return True - - @metered_subclass_method(METRICS_POOL) - def GetInitialConfig(self) -> List[Tuple[str, Any]]: - with self.__lock: - return dump_subtree(self.__initial) - - @metered_subclass_method(METRICS_POOL) - def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: - chk_type('resources', resource_keys, list) - with self.__lock: - if len(resource_keys) == 0: return dump_subtree(self.__running) - results = [] - resolver = anytree.Resolver(pathattr='name') - for i,resource_key in enumerate(resource_keys): - str_resource_name = 'resource_key[#{:d}]'.format(i) - try: - chk_string(str_resource_name, resource_key, allow_empty=False) - resource_key = SPECIAL_RESOURCE_MAPPINGS.get(resource_key, resource_key) - resource_path = resource_key.split('/') - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) - results.append((resource_key, e)) # if validation fails, store the exception - continue - - resource_node = get_subnode(resolver, self.__running, resource_path, default=None) - # if not found, resource_node is None - if resource_node is None: continue - results.extend(dump_subtree(resource_node)) - return results - - def send_pon_connection_request(self, ont_id: str, cvlan: int, ethernet_port: str, svlan: int, profile: str, bw: int) -> bool: - """ - Envía una petición POST para establecer la conexión entre la red de acceso PON y la OLS. - """ - payload = { - "ont_id": ont_id, - "cvlan": cvlan, - "ethernet_port": ethernet_port, - "svlan": svlan, - "profile": profile, - "bw": bw - } - headers = { - "accept": "application/json", - "Content-Type": "application/json" - } - - try: - response = requests.post(API_ENDPOINT, headers=headers, json=payload) - if response.status_code == 200: - LOGGER.info(f"Connection successfully established: {payload}") - return True - else: - LOGGER.error(f"Connection error: {response.status_code} - {response.text}") - return False - except requests.RequestException as e: - LOGGER.error(f"Error when sending the request: {str(e)}") - return False - - @metered_subclass_method(METRICS_POOL) - def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: - chk_type('resources', resources, list) - if len(resources) == 0: return [] - results = [] - resolver = anytree.Resolver(pathattr='name') - with self.__lock: - for i,resource in enumerate(resources): - str_resource_name = 'resources[#{:d}]'.format(i) - try: - chk_type(str_resource_name, resource, (list, tuple)) - chk_length(str_resource_name, resource, min_length=2, max_length=2) - resource_key,resource_value = resource - chk_string(str_resource_name, resource_key, allow_empty=False) - resource_path = resource_key.split('/') - - - result = self.send_pon_connection_request(2, 22, 2, 333, "ef", 2500000) - results.append(result) - - except Exception as e: - LOGGER.exception(f"Error procesando la configuración {resource}: {str(e)}") - results.append(e) - - - - #match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key) - #if match is not None: - # endpoint_uuid = match.group(1) - # if '.' in endpoint_uuid: endpoint_uuid = endpoint_uuid.split('.')[0] - # self.__synthetic_sampling_parameters.set_endpoint_configured(endpoint_uuid) - - #results.append(True) - return results - - - @metered_subclass_method(METRICS_POOL) - def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: - chk_type('resources', resources, list) - if len(resources) == 0: return [] - results = [] - resolver = anytree.Resolver(pathattr='name') - with self.__lock: - for i,resource in enumerate(resources): - str_resource_name = 'resources[#{:d}]'.format(i) - try: - chk_type(str_resource_name, resource, (list, tuple)) - chk_length(str_resource_name, resource, min_length=2, max_length=2) - resource_key,_ = resource - chk_string(str_resource_name, resource_key, allow_empty=False) - resource_path = resource_key.split('/') - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) - results.append(e) # if validation fails, store the exception - continue - - resource_node = get_subnode(resolver, self.__running, resource_path, default=None) - # if not found, resource_node is None - if resource_node is None: - results.append(False) - continue - - match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key) - if match is not None: - endpoint_uuid = match.group(1) - if '.' in endpoint_uuid: endpoint_uuid = endpoint_uuid.split('.')[0] - self.__synthetic_sampling_parameters.unset_endpoint_configured(endpoint_uuid) - - parent = resource_node.parent - children = list(parent.children) - children.remove(resource_node) - parent.children = tuple(children) - results.append(True) - return results - - @metered_subclass_method(METRICS_POOL) - def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: - chk_type('subscriptions', subscriptions, list) - if len(subscriptions) == 0: return [] - results = [] - resolver = anytree.Resolver(pathattr='name') - with self.__lock: - for i,subscription in enumerate(subscriptions): - str_subscription_name = 'subscriptions[#{:d}]'.format(i) - try: - chk_type(str_subscription_name, subscription, (list, tuple)) - chk_length(str_subscription_name, subscription, min_length=3, max_length=3) - resource_key,sampling_duration,sampling_interval = subscription - chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) - resource_path = resource_key.split('/') - chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) - chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) - results.append(e) # if validation fails, store the exception - continue - - start_date,end_date = None,None - if sampling_duration <= 1.e-12: - start_date = datetime.utcnow() - end_date = start_date + timedelta(seconds=sampling_duration) - - job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval) - job = self.__scheduler.add_job( - do_sampling, args=(self.__synthetic_sampling_parameters, resource_key, self.__out_samples), - kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date, - end_date=end_date, timezone=pytz.utc) - - subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] - set_subnode_value(resolver, self.__subscriptions, subscription_path, job) - results.append(True) - return results - - @metered_subclass_method(METRICS_POOL) - def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: - chk_type('subscriptions', subscriptions, list) - if len(subscriptions) == 0: return [] - results = [] - resolver = anytree.Resolver(pathattr='name') - with self.__lock: - for i,resource in enumerate(subscriptions): - str_subscription_name = 'resources[#{:d}]'.format(i) - try: - chk_type(str_subscription_name, resource, (list, tuple)) - chk_length(str_subscription_name, resource, min_length=3, max_length=3) - resource_key,sampling_duration,sampling_interval = resource - chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) - resource_path = resource_key.split('/') - chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) - chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) - results.append(e) # if validation fails, store the exception - continue - - subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] - subscription_node = get_subnode(resolver, self.__subscriptions, subscription_path) - - # if not found, resource_node is None - if subscription_node is None: - results.append(False) - continue - - job : Job = getattr(subscription_node, 'value', None) - if job is None or not isinstance(job, Job): - raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource))) - job.remove() - - parent = subscription_node.parent - children = list(parent.children) - children.remove(subscription_node) - parent.children = tuple(children) - - results.append(True) - return results - - def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]: - while True: - if self.__terminate.is_set(): break - if terminate is not None and terminate.is_set(): break - try: - sample = self.__out_samples.get(block=blocking, timeout=0.1) - except queue.Empty: - if blocking: continue - return - if sample is None: continue - yield sample diff --git a/src/device/service/drivers/pon_driver/SyntheticSamplingParameters.py b/src/device/service/drivers/pon_driver/SyntheticSamplingParameters.py deleted file mode 100644 index e25e207e87256472a6bebf2da5601d409f189b1f..0000000000000000000000000000000000000000 --- a/src/device/service/drivers/pon_driver/SyntheticSamplingParameters.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, math, queue, random, re, threading -from datetime import datetime -from typing import Optional, Tuple - -LOGGER = logging.getLogger(__name__) - -RE_GET_ENDPOINT_METRIC = re.compile(r'.*\/endpoint\[([^\]]+)\]\/state\/(.*)') - -MSG_ERROR_PARSE = '[get] unable to extract endpoint-metric from monitoring_resource_key "{:s}"' -MSG_INFO = '[get] monitoring_resource_key={:s}, endpoint_uuid={:s}, metric={:s}, metric_sense={:s}' - -class SyntheticSamplingParameters: - def __init__(self) -> None: - self.__lock = threading.Lock() - self.__data = {} - self.__configured_endpoints = set() - - def set_endpoint_configured(self, endpoint_uuid : str): - with self.__lock: - self.__configured_endpoints.add(endpoint_uuid) - - def unset_endpoint_configured(self, endpoint_uuid : str): - with self.__lock: - self.__configured_endpoints.discard(endpoint_uuid) - - def get(self, monitoring_resource_key : str) -> Optional[Tuple[float, float, float, float, float]]: - with self.__lock: - match = RE_GET_ENDPOINT_METRIC.match(monitoring_resource_key) - if match is None: - LOGGER.error(MSG_ERROR_PARSE.format(monitoring_resource_key)) - return None - endpoint_uuid = match.group(1) - - # If endpoint is not configured, generate a flat synthetic traffic aligned at 0 - if endpoint_uuid not in self.__configured_endpoints: return (0, 0, 1, 0, 0) - - metric = match.group(2) - metric_sense = metric.lower().replace('packets_', '').replace('bytes_', '') - - LOGGER.debug(MSG_INFO.format(monitoring_resource_key, endpoint_uuid, metric, metric_sense)) - - parameters_key = '{:s}-{:s}'.format(endpoint_uuid, metric_sense) - parameters = self.__data.get(parameters_key) - if parameters is not None: return parameters - - # assume packets - amplitude = 1.e7 * random.random() - phase = 60 * random.random() - period = 3600 * random.random() - offset = 1.e8 * random.random() + amplitude - avg_bytes_per_packet = random.randint(500, 1500) - parameters = (amplitude, phase, period, offset, avg_bytes_per_packet) - return self.__data.setdefault(parameters_key, parameters) - -def do_sampling( - synthetic_sampling_parameters : SyntheticSamplingParameters, monitoring_resource_key : str, - out_samples : queue.Queue -) -> None: - parameters = synthetic_sampling_parameters.get(monitoring_resource_key) - if parameters is None: return - amplitude, phase, period, offset, avg_bytes_per_packet = parameters - - if 'bytes' in monitoring_resource_key.lower(): - # convert to bytes - amplitude = avg_bytes_per_packet * amplitude - offset = avg_bytes_per_packet * offset - - timestamp = datetime.timestamp(datetime.utcnow()) - waveform = amplitude * math.sin(2 * math.pi * timestamp / period + phase) + offset - noise = amplitude * random.random() - value = abs(0.95 * waveform + 0.05 * noise) - out_samples.put_nowait((timestamp, monitoring_resource_key, value)) diff --git a/src/device/service/drivers/pon_driver/Tools.py b/src/device/service/drivers/pon_driver/Tools.py deleted file mode 100644 index 9f2a105c0d9735f486f41fab5bc3069ec9327f65..0000000000000000000000000000000000000000 --- a/src/device/service/drivers/pon_driver/Tools.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import Any, Dict, Optional, Tuple -from common.proto.kpi_sample_types_pb2 import KpiSampleType -from common.type_checkers.Checkers import chk_attribute, chk_string, chk_type -from device.service.driver_api._Driver import RESOURCE_ENDPOINTS -from .Constants import SPECIAL_RESOURCE_MAPPINGS - -LOGGER = logging.getLogger(__name__) - -def process_optional_string_field( - endpoint_data : Dict[str, Any], field_name : str, endpoint_resource_value : Dict[str, Any] -) -> None: - field_value = chk_attribute(field_name, endpoint_data, 'endpoint_data', default=None) - if field_value is None: return - chk_string('endpoint_data.{:s}'.format(field_name), field_value) - if len(field_value) > 0: endpoint_resource_value[field_name] = field_value - -def compose_resource_endpoint(endpoint_data : Dict[str, Any]) -> Optional[Tuple[str, Dict]]: - try: - # Check type of endpoint_data - chk_type('endpoint_data', endpoint_data, dict) - - # Check endpoint UUID (mandatory) - endpoint_uuid = chk_attribute('uuid', endpoint_data, 'endpoint_data') - chk_string('endpoint_data.uuid', endpoint_uuid, min_length=1) - endpoint_resource_path = SPECIAL_RESOURCE_MAPPINGS.get(RESOURCE_ENDPOINTS) - endpoint_resource_key = '{:s}/endpoint[{:s}]'.format(endpoint_resource_path, endpoint_uuid) - endpoint_resource_value = {'uuid': endpoint_uuid} - - # Check endpoint optional string fields - process_optional_string_field(endpoint_data, 'name', endpoint_resource_value) - process_optional_string_field(endpoint_data, 'type', endpoint_resource_value) - process_optional_string_field(endpoint_data, 'context_uuid', endpoint_resource_value) - process_optional_string_field(endpoint_data, 'topology_uuid', endpoint_resource_value) - - # Check endpoint sample types (optional) - endpoint_sample_types = chk_attribute('sample_types', endpoint_data, 'endpoint_data', default=[]) - chk_type('endpoint_data.sample_types', endpoint_sample_types, list) - sample_types = {} - sample_type_errors = [] - for i,endpoint_sample_type in enumerate(endpoint_sample_types): - field_name = 'endpoint_data.sample_types[{:d}]'.format(i) - try: - chk_type(field_name, endpoint_sample_type, (int, str)) - if isinstance(endpoint_sample_type, int): - metric_name = KpiSampleType.Name(endpoint_sample_type) - metric_id = endpoint_sample_type - elif isinstance(endpoint_sample_type, str): - metric_id = KpiSampleType.Value(endpoint_sample_type) - metric_name = endpoint_sample_type - else: - str_type = str(type(endpoint_sample_type)) - raise Exception('Bad format: {:s}'.format(str_type)) # pylint: disable=broad-exception-raised - except Exception as e: # pylint: disable=broad-exception-caught - MSG = 'Unsupported {:s}({:s}) : {:s}' - sample_type_errors.append(MSG.format(field_name, str(endpoint_sample_type), str(e))) - - metric_name = metric_name.lower().replace('kpisampletype_', '') - monitoring_resource_key = '{:s}/state/{:s}'.format(endpoint_resource_key, metric_name) - sample_types[metric_id] = monitoring_resource_key - - if len(sample_type_errors) > 0: - # pylint: disable=broad-exception-raised - raise Exception('Malformed Sample Types:\n{:s}'.format('\n'.join(sample_type_errors))) - - if len(sample_types) > 0: - endpoint_resource_value['sample_types'] = sample_types - - if 'location' in endpoint_data: - endpoint_resource_value['location'] = endpoint_data['location'] - - return endpoint_resource_key, endpoint_resource_value - except: # pylint: disable=bare-except - LOGGER.exception('Problem composing endpoint({:s})'.format(str(endpoint_data))) - return None diff --git a/src/device/service/drivers/pon_driver/__init__.py b/src/device/service/drivers/pon_driver/__init__.py deleted file mode 100644 index 53d5157f750bfb085125cbd33faff1cec5924e14..0000000000000000000000000000000000000000 --- a/src/device/service/drivers/pon_driver/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -