Commit 2c8c18ef authored by Pablo Armingol's avatar Pablo Armingol
Browse files

feat: add custom IPoWDM device driver with synthetic sampling capabilities

parent 1c9a3553
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -255,6 +255,7 @@ enum DeviceDriverEnum {
  DEVICEDRIVER_GNMI_NOKIA_SRLINUX = 19;
  DEVICEDRIVER_OPENROADM = 20;
  DEVICEDRIVER_RESTCONF_OPENCONFIG = 21;
  DEVICEDRIVER_CUSTOM_IPOWDM = 22;
}

enum DeviceOperationalStatusEnum {
+21 −0
Original line number Diff line number Diff line
# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES

SPECIAL_RESOURCE_MAPPINGS = {
    RESOURCE_ENDPOINTS        : '/endpoints',
    RESOURCE_INTERFACES       : '/interfaces',
    RESOURCE_NETWORK_INSTANCES: '/net-instances',
}
+334 −0
Original line number Diff line number Diff line
# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import anytree, json, logging, pytz, queue, re, threading
from datetime import datetime, timedelta
from typing import Any, Iterator, List, Optional, Tuple, Union
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.job import Job
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from device.service.driver_api.AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value
from .Constants import SPECIAL_RESOURCE_MAPPINGS
from .SyntheticSamplingParameters import SyntheticSamplingParameters, do_sampling
from .Tools import compose_resource_endpoint, connect_to_xr_agent
import requests


LOGGER = logging.getLogger(__name__)

RE_GET_ENDPOINT_FROM_INTERFACE = re.compile(r'^\/interface\[([^\]]+)\].*')

DRIVER_NAME = 'custom_ipowdm'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})

class CustomIpowdmDriver(_Driver):
    def __init__(self, address : str, port : int, **settings) -> None:
        super().__init__(DRIVER_NAME, address, port, **settings)
        self.__lock = threading.Lock()
        self.__address = address
        self.__initial = TreeNode('.')
        self.__running = TreeNode('.')
        self.__subscriptions = TreeNode('.')

        endpoints = self.settings.get('endpoints', [])
        endpoint_resources = []
        for endpoint in endpoints:
            endpoint_resource = compose_resource_endpoint(endpoint)
            if endpoint_resource is None: continue
            endpoint_resources.append(endpoint_resource)
        self.SetConfig(endpoint_resources)

        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
        self.__scheduler.configure(
            jobstores = {'default': MemoryJobStore()},
            executors = {'default': ThreadPoolExecutor(max_workers=1)},
            job_defaults = {'coalesce': False, 'max_instances': 3},
            timezone=pytz.utc)
        self.__out_samples = queue.Queue()
        self.__synthetic_sampling_parameters = SyntheticSamplingParameters()

    def Connect(self) -> bool:
        # If started, assume it is already connected
        if self.__started.is_set(): return True

        # Connect triggers activation of sampling events that will be scheduled based on subscriptions
        self.__scheduler.start()

        # Indicate the driver is now connected to the device
        self.__started.set()
        return True

    def Disconnect(self) -> bool:
        # Trigger termination of loops and processes
        self.__terminate.set()

        # If not started, assume it is already disconnected
        if not self.__started.is_set(): return True

        # Disconnect triggers deactivation of sampling events
        self.__scheduler.shutdown()
        return True

    @metered_subclass_method(METRICS_POOL)
    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        with self.__lock:
            return dump_subtree(self.__initial)

    @metered_subclass_method(METRICS_POOL)
    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type('resources', resource_keys, list)
        with self.__lock:
            if len(resource_keys) == 0: return dump_subtree(self.__running)
            results = []
            resolver = anytree.Resolver(pathattr='name')
            for i,resource_key in enumerate(resource_keys):
                str_resource_name = 'resource_key[#{:d}]'.format(i)
                try:
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    resource_key = SPECIAL_RESOURCE_MAPPINGS.get(resource_key, resource_key)
                    resource_path = resource_key.split('/')
                except Exception as e: # pylint: disable=broad-except
                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
                    results.append((resource_key, e)) # if validation fails, store the exception
                    continue

                resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
                # if not found, resource_node is None
                if resource_node is None: continue
                results.extend(dump_subtree(resource_node))
            return results

    @metered_subclass_method(METRICS_POOL)
    def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []
        results = []
        if 'ipowdm_ruleset' in str(resources):
            connect_to_xr_agent(resources)
            results.append(True)
        else:
            resolver = anytree.Resolver(pathattr='name')
            with self.__lock:
                for i,resource in enumerate(resources):
                    str_resource_name = 'resources[#{:d}]'.format(i)
                    try:
                        chk_type(str_resource_name, resource, (list, tuple))
                        chk_length(str_resource_name, resource, min_length=2, max_length=2)
                        resource_key,resource_value = resource
                        chk_string(str_resource_name, resource_key, allow_empty=False)
                        resource_path = resource_key.split('/')
                    except Exception as e: # pylint: disable=broad-except
                        LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
                        results.append(e) # if validation fails, store the exception
                        continue
                    try:
                        resource_value = json.loads(resource_value)
                    except: # pylint: disable=bare-except
                        pass

                    if resource_key.startswith('/ipowdm/service/'):
                        LOGGER.info('[%s] Legacy IPoWDM Service Provisioning: Payload=%s', self.__address, str(resource_value))
                    elif resource_key.startswith('/ipowdm/l3nm/'):
                        LOGGER.info('[%s] L3NM Service Provisioning: Payload=%s', self.__address, str(resource_value))
                    elif resource_key.startswith('/ipowdm/pluggables/'):
                        LOGGER.info('[%s] Pluggables Service Provisioning: Payload=%s', self.__address, str(resource_value))
                        try:
                            key_parts = resource_key.split('/')
                            if len(key_parts) >= 5:
                                service_uuid_raw = key_parts[3]
                                # Normalize UUID: if serviceId contains '-pluggable-', take the part before it
                                if "-pluggable-" in service_uuid_raw:
                                     service_uuid = service_uuid_raw.split("-pluggable-")[0]
                                else:
                                     service_uuid = service_uuid_raw

                                if not hasattr(EmulatedDriver, 'pluggables_pending'):
                                    EmulatedDriver.pluggables_pending = {}

                                if service_uuid in EmulatedDriver.pluggables_pending:
                                    stored_payload = EmulatedDriver.pluggables_pending.pop(service_uuid)
                                    current_payload = resource_value

                                    def format_entry(payload):
                                        config = payload.get('config', {})
                                        return {
                                            'uuid': payload.get('device'),
                                            'power': config.get('target-output-power', 0.0),
                                            'frequency': config.get('frequency', 0.0)
                                        }
                                    combined_data = [
                                        {'src': format_entry(stored_payload)},
                                        {'dst': format_entry(current_payload)}
                                    ]
                                    LOGGER.info('[%s] Pluggables Service Provisioning Aggregated: %s', self.__address, json.dumps(combined_data, indent=2))
                                    # TODO Dynamic IP
                                    url = "http://192.168.88.17:9849/api-v0/transponders"
                                    headers = {'Content-Type': 'application/json'}
                                    response = requests.post(url, json=combined_data, headers=headers)
                                    LOGGER.info('[%s] Pluggables Service Provisioning Response: %s', self.__address, str(response.text))
                                else:
                                    EmulatedDriver.pluggables_pending[service_uuid] = resource_value
                                    LOGGER.debug('[%s] Pluggables Service Partial Provisioning stored for %s', self.__address, service_uuid)
                        except Exception as e:
                            LOGGER.warning("Error processing Pluggables aggregation: %s", str(e))
                            LOGGER.info('[%s] Pluggables Service Provisioning: Payload=%s', self.__address, str(resource_value))

                    set_subnode_value(resolver, self.__running, resource_path, resource_value)

                    match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key)
                    if match is not None:
                        endpoint_uuid = match.group(1)
                        if '.' in endpoint_uuid: endpoint_uuid = endpoint_uuid.split('.')[0]
                        self.__synthetic_sampling_parameters.set_endpoint_configured(endpoint_uuid)

                    results.append(True)
        return results

    @metered_subclass_method(METRICS_POOL)
    def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
        chk_type('resources', resources, list)
        if len(resources) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource in enumerate(resources):
                str_resource_name = 'resources[#{:d}]'.format(i)
                try:
                    chk_type(str_resource_name, resource, (list, tuple))
                    chk_length(str_resource_name, resource, min_length=2, max_length=2)
                    resource_key,_ = resource
                    chk_string(str_resource_name, resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                except Exception as e: # pylint: disable=broad-except
                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue

                resource_node = get_subnode(resolver, self.__running, resource_path, default=None)
                # if not found, resource_node is None
                if resource_node is None:
                    results.append(False)
                    continue

                match = RE_GET_ENDPOINT_FROM_INTERFACE.match(resource_key)
                if match is not None:
                    endpoint_uuid = match.group(1)
                    if '.' in endpoint_uuid: endpoint_uuid = endpoint_uuid.split('.')[0]
                    self.__synthetic_sampling_parameters.unset_endpoint_configured(endpoint_uuid)

                parent = resource_node.parent
                children = list(parent.children)
                children.remove(resource_node)
                parent.children = tuple(children)
                results.append(True)
        return results

    @metered_subclass_method(METRICS_POOL)
    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        chk_type('subscriptions', subscriptions, list)
        if len(subscriptions) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,subscription in enumerate(subscriptions):
                str_subscription_name = 'subscriptions[#{:d}]'.format(i)
                try:
                    chk_type(str_subscription_name, subscription, (list, tuple))
                    chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
                    resource_key,sampling_duration,sampling_interval = subscription
                    chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                    chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
                    chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
                except Exception as e: # pylint: disable=broad-except
                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue

                start_date,end_date = None,None
                if sampling_duration <= 1.e-12:
                    start_date = datetime.utcnow()
                    end_date = start_date + timedelta(seconds=sampling_duration)

                job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
                job = self.__scheduler.add_job(
                    do_sampling, args=(self.__synthetic_sampling_parameters, resource_key, self.__out_samples),
                    kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date,
                    end_date=end_date, timezone=pytz.utc)

                subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
                set_subnode_value(resolver, self.__subscriptions, subscription_path, job)
                results.append(True)
        return results

    @metered_subclass_method(METRICS_POOL)
    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        chk_type('subscriptions', subscriptions, list)
        if len(subscriptions) == 0: return []
        results = []
        resolver = anytree.Resolver(pathattr='name')
        with self.__lock:
            for i,resource in enumerate(subscriptions):
                str_subscription_name = 'resources[#{:d}]'.format(i)
                try:
                    chk_type(str_subscription_name, resource, (list, tuple))
                    chk_length(str_subscription_name, resource, min_length=3, max_length=3)
                    resource_key,sampling_duration,sampling_interval = resource
                    chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
                    resource_path = resource_key.split('/')
                    chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
                    chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
                except Exception as e: # pylint: disable=broad-except
                    LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key)))
                    results.append(e) # if validation fails, store the exception
                    continue

                subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)]
                subscription_node = get_subnode(resolver, self.__subscriptions, subscription_path)

                # if not found, resource_node is None
                if subscription_node is None:
                    results.append(False)
                    continue

                job : Job = getattr(subscription_node, 'value', None)
                if job is None or not isinstance(job, Job):
                    raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource)))
                job.remove()

                parent = subscription_node.parent
                children = list(parent.children)
                children.remove(subscription_node)
                parent.children = tuple(children)

                results.append(True)
        return results

    def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]:
        while True:
            if self.__terminate.is_set(): break
            if terminate is not None and terminate.is_set(): break
            try:
                sample = self.__out_samples.get(block=blocking, timeout=0.1)
            except queue.Empty:
                if blocking: continue
                return
            if sample is None: continue
            yield sample
+86 −0
Original line number Diff line number Diff line
# Copyright 2022-2026 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, math, queue, random, re, threading
from datetime import datetime
from typing import Optional, Tuple

LOGGER = logging.getLogger(__name__)

RE_GET_ENDPOINT_METRIC = re.compile(r'.*\/endpoint\[([^\]]+)\]\/state\/(.*)')

MSG_ERROR_PARSE = '[get] unable to extract endpoint-metric from monitoring_resource_key "{:s}"'
MSG_INFO = '[get] monitoring_resource_key={:s}, endpoint_uuid={:s}, metric={:s}, metric_sense={:s}'

class SyntheticSamplingParameters:
    def __init__(self) -> None:
        self.__lock = threading.Lock()
        self.__data = {}
        self.__configured_endpoints = set()
 
    def set_endpoint_configured(self, endpoint_uuid : str):
        with self.__lock:
            self.__configured_endpoints.add(endpoint_uuid)

    def unset_endpoint_configured(self, endpoint_uuid : str):
        with self.__lock:
            self.__configured_endpoints.discard(endpoint_uuid)

    def get(self, monitoring_resource_key : str) -> Optional[Tuple[float, float, float, float, float]]:
        with self.__lock:
            match = RE_GET_ENDPOINT_METRIC.match(monitoring_resource_key)
            if match is None:
                LOGGER.error(MSG_ERROR_PARSE.format(monitoring_resource_key))
                return None
            endpoint_uuid = match.group(1)

            # If endpoint is not configured, generate a flat synthetic traffic aligned at 0
            if endpoint_uuid not in self.__configured_endpoints: return (0, 0, 1, 0, 0)

            metric = match.group(2)
            metric_sense = metric.lower().replace('packets_', '').replace('bytes_', '')

            LOGGER.debug(MSG_INFO.format(monitoring_resource_key, endpoint_uuid, metric, metric_sense))

            parameters_key = '{:s}-{:s}'.format(endpoint_uuid, metric_sense)
            parameters = self.__data.get(parameters_key)
            if parameters is not None: return parameters

            # assume packets
            amplitude  = 1.e7 * random.random()
            phase      = 60 * random.random()
            period     = 3600 * random.random()
            offset     = 1.e8 * random.random() + amplitude
            avg_bytes_per_packet = random.randint(500, 1500)
            parameters = (amplitude, phase, period, offset, avg_bytes_per_packet)
            return self.__data.setdefault(parameters_key, parameters)

def do_sampling(
    synthetic_sampling_parameters : SyntheticSamplingParameters, monitoring_resource_key : str,
    out_samples : queue.Queue
) -> None:
    parameters = synthetic_sampling_parameters.get(monitoring_resource_key)
    if parameters is None: return
    amplitude, phase, period, offset, avg_bytes_per_packet = parameters

    if 'bytes' in monitoring_resource_key.lower():
        # convert to bytes
        amplitude = avg_bytes_per_packet * amplitude
        offset = avg_bytes_per_packet * offset

    timestamp = datetime.timestamp(datetime.utcnow())
    waveform  = amplitude * math.sin(2 * math.pi * timestamp / period + phase) + offset
    noise     = amplitude * random.random()
    value     = abs(0.95 * waveform + 0.05 * noise)
    out_samples.put_nowait((timestamp, monitoring_resource_key, value))
+134 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading