Commit f25eb3dc authored by Waleed Akbar's avatar Waleed Akbar
Browse files

feat: ECOC 2026 - Pluggable Demo

- Added NETCONF Server to overcome unavaiability of Dataplane limitation
parent 40b0fc60
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
    NETCONF_PORT=830 \
    NETCONF_USERNAME=netconf \
    NETCONF_PASSWORD=netconf \
    NETCONF_DATA_FILE=/opt/netconf-mock/data/telemetry.json
    NETCONF_INITIAL_POWER=-5.0

WORKDIR /opt/netconf-mock

+97 −101
Original line number Diff line number Diff line
# NETCONF OpenConfig Optical-Channel Mock Server
# NETCONF OpenConfig Optical Mock Server

This project provides a lightweight Dockerized NETCONF-over-SSH mock server for testing a NETCONF collector against OpenConfig-style optical-channel operational telemetry.
A lightweight NETCONF-over-SSH mock for the ECOC26 pluggables tests.  It
exposes OpenConfig-style optical-channel operational state for the TFS
`DEVICEDRIVER_NETCONF_OC_PLUGGABLE` collector and the ZSM failure notification
flow.

It supports the target path:
The server generates **dynamic** telemetry — it does **not** read from a
static file.  Power values follow a biased random walk, and Pre-FEC BER is
derived from the current power using a realistic optical degradation curve.

```text
/components/component[name=<channel_name>]/optical-channel/state/<metric_leaf>/instant
```

Namespaces:

```text
ocp = http://openconfig.net/yang/platform
oct = http://openconfig.net/yang/terminal-device
```

## What this mock server is

This is a protocol-level NETCONF mock. It is useful when you want to test whether your NETCONF collector can:

- open a NETCONF-over-SSH session on port 830;
- exchange NETCONF `<hello>` messages;
- send `<get>` RPCs with subtree filters;
- parse OpenConfig-shaped XML responses under `components/component/optical-channel/state`.

## What this mock server is not
## Dynamic Telemetry Model

It is not a full YANG-validating server. It does not use Sysrepo/libyang, does not enforce OpenConfig schema constraints, and does not implement configuration datastores. For schema-accurate testing, use Sysrepo + Netopeer2 and load the relevant OpenConfig YANG modules.
Only two ECOC26 metrics are served on a single optical channel (`channel-1`):

## Standards alignment
| KPI sample type | OpenConfig leaf | ECOC26 code |
| --- | --- | --- |
| `KPISAMPLETYPE_PRE_FEC_BER_PLUGGABLE` | `pre-fec-ber` | 2301 |
| `KPISAMPLETYPE_RECEIVED_POWER_PLUGGABLE` | `input-power` | 2302 |

- NETCONF protocol operation style follows RFC 6241.
- NETCONF-over-SSH transport follows the RFC 6242 SSH subsystem model.
- This mock advertises only NETCONF `base:1.0`, therefore it uses the `]]>]]>` end-of-message delimiter instead of NETCONF 1.1 chunked framing.
- Optical telemetry is operational state, therefore clients should use `<get>`, not `<get-config>`.
### Power random walk

## Run with Docker
Power starts at `-5 dBm` and drifts between **-1 dBm** (best) and **-45 dBm**
(worst).  Each poll applies a biased step of `[-0.8, +0.3]`, so the signal
degrades over time (simulating a failing link).

```bash
./scripts/run.sh
```
### BER derivation

Equivalent manual commands:
Pre-FEC BER follows a realistic optical model:

```bash
docker build -t netconf-oc-optical-mock:latest .

docker run -d \
  --name netconf-oc-optical-mock \
  -p 830:830 \
  -e NETCONF_USERNAME=netconf \
  -e NETCONF_PASSWORD=netconf \
  -v "$(pwd)/data/telemetry.json:/opt/netconf-mock/data/telemetry.json:ro" \
  netconf-oc-optical-mock:latest
```

Credentials:

```text
username: netconf
password: netconf
port:     830
log₁₀(BER) = -12 + (|power_dbm| - 5) × 0.3
```

## Run with Docker Compose
| Power | Meaning | Pre-FEC BER |
| --- | --- | --- |
| -5 dBm | nominal | ~1×10⁻¹² |
| -15 dBm | slight loss | ~1×10⁻⁹ |
| -25 dBm | degraded | ~1×10⁻⁶ |
| -35 dBm | failing | ~1×10⁻³ |
| -45 dBm | link down | ~1 |

## Supported NETCONF Behavior

- SSH password authentication with the `netconf` subsystem.
- NETCONF `base:1.0` framing with the `]]>]]>` end-of-message delimiter.
- Server `<hello>` with these capabilities:
  - `urn:ietf:params:netconf:base:1.0`
  - `http://openconfig.net/yang/platform?module=openconfig-platform`
  - `http://openconfig.net/yang/terminal-device?module=openconfig-terminal-device`
- `<get>` RPCs — returns current `channel-1` values (advances simulation).
- `<close-session>` RPCs.
- `<get-config>` returns `operation-not-supported`.

## Runtime Configuration

| Variable | Default | Purpose |
| --- | --- | --- |
| `NETCONF_HOST` | `0.0.0.0` | Address to bind |
| `NETCONF_PORT` | `830` | NETCONF SSH port |
| `NETCONF_USERNAME` | `netconf` | Login username |
| `NETCONF_PASSWORD` | `netconf` | Login password |
| `NETCONF_INITIAL_POWER` | `-5.0` | Starting power in dBm |
| `NETCONF_SESSION_ID` | `100` | Session ID in hello |
| `LOG_LEVEL` | `INFO` | Python logging level |

## Run Locally with Docker

```bash
docker compose up -d --build
./scripts/run.sh
```

## Test with ncclient
Credentials: `localhost:830`, username `netconf`, password `netconf`.

After the container is running:
## Deploy to Kubernetes

```bash
./scripts/test_with_ncclient.sh
./scripts/deploy_k8s.sh
```

Expected response includes:
Service endpoint: `netconf-oc-optical-mock.netconf-mock.svc.cluster.local:830`

```xml
<components xmlns="http://openconfig.net/yang/platform">
  <component>
    <name>och-1</name>
    <optical-channel xmlns="http://openconfig.net/yang/terminal-device">
      <state>
        <name>och-1</name>
        <pre-fec-ber>
          <instant>1.2e-6</instant>
        </pre-fec-ber>
      </state>
    </optical-channel>
  </component>
</components>
```
The ConfigMap mount is no longer needed — the server generates values internally.

## Example NETCONF `<get>` RPC
## Example NETCONF Get

```xml
<rpc message-id="101" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
@@ -103,12 +90,11 @@ Expected response includes:
    <filter type="subtree">
      <components xmlns="http://openconfig.net/yang/platform">
        <component>
          <name>och-1</name>
          <name>channel-1</name>
          <optical-channel xmlns="http://openconfig.net/yang/terminal-device">
            <state>
              <pre-fec-ber>
                <instant/>
              </pre-fec-ber>
              <input-power><instant/></input-power>
              <pre-fec-ber><instant/></pre-fec-ber>
            </state>
          </optical-channel>
        </component>
@@ -118,31 +104,41 @@ Expected response includes:
</rpc>
```

## Add or change channels and metrics

Edit `data/telemetry.json`.

Example metric format:

```json
{
  "pre-fec-ber": {
    "instant": "1.2e-6"
  }
}
```

The server will return any metric leaf/container found under:
Example response (values vary — generated dynamically):

```text
components.<component-name>.optical-channel.state.<metric-name>.instant
```xml
<data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
  <ocp:components xmlns:ocp="http://openconfig.net/yang/platform">
    <ocp:component>
      <ocp:name>channel-1</ocp:name>
      <ocp:state>
        <ocp:name>channel-1</ocp:name>
        <ocp:type>OPTICAL_CHANNEL</ocp:type>
      </ocp:state>
      <oct:optical-channel xmlns:oct="http://openconfig.net/yang/terminal-device">
        <oct:state>
          <oct:input-power>
            <oct:instant>-5.34</oct:instant>
          </oct:input-power>
          <oct:pre-fec-ber>
            <oct:instant>1.27e-12</oct:instant>
          </oct:pre-fec-ber>
        </oct:state>
      </oct:optical-channel>
    </ocp:component>
  </ocp:components>
</data>
```

## Notes for integration with your collector

Use `<get>` because the target path is operational state. If your collector currently sends `<get-config>`, this mock intentionally returns an `operation-not-supported` RPC error to expose that mistake early.
## ECOC26 ZSM Flow

This server returns full or filtered data according to the subtree filter. Filtering is intentionally simple and supports:
1. Deploy this mock to Kubernetes with `./scripts/deploy_k8s.sh`.
2. Load the ECOC26 automation topology (device IP2 → mock service).
3. The collector polls `channel-1` every few seconds, reading `input-power` and
   `pre-fec-ber`.
4. Power degrades over time (biased random walk).  When it drops into the
   analyzer threshold range (e.g. `[-30, -5]` dBm), the analyzer triggers.
5. The policy action `GENERATE_FAILURE_NOTIFICATION` fires.

- `component/name` selection;
- direct metric selection under `optical-channel/state`.
To force a quick trigger, set `NETCONF_INITIAL_POWER` to a value inside the
threshold, e.g. `-15.0`.
+91 −106
Original line number Diff line number Diff line
@@ -11,22 +11,28 @@ Scope:
    ocp = http://openconfig.net/yang/platform
    oct = http://openconfig.net/yang/terminal-device

Dynamic telemetry model
-----------------------
Only two ECOC26 metrics are served — input-power and pre-fec-ber — for a
single optical channel (channel-1).

Power starts at an average value and adds/subtracts random drift each poll,
staying within [-1, -45] dBm. BER is derived from power: the weaker the
signal (more negative), the worse the BER.

This mock is intended for collector/SBI integration tests. It is not a full
YANG-validating NETCONF server and does not implement sysrepo/libyang semantics.
YANG-validating NETCONF server.
"""

from __future__ import annotations

import asyncio
import json
import logging
import math
import os
import pathlib
import random
import signal
import sys
import tempfile
from dataclasses import dataclass
from typing import Any, Iterable
from xml.etree import ElementTree as ET

import asyncssh
@@ -53,100 +59,74 @@ def local_name(tag: str) -> str:
    return tag


def first_child_by_local_name(element: ET.Element, wanted: str) -> ET.Element | None:
    for child in list(element):
        if local_name(child.tag) == wanted:
            return child
    return None


def children_by_local_name(element: ET.Element, wanted: str) -> list[ET.Element]:
    return [child for child in list(element) if local_name(child.tag) == wanted]


def iter_descendants_by_local_name(element: ET.Element, wanted: str) -> Iterable[ET.Element]:
    for child in element.iter():
        if child is not element and local_name(child.tag) == wanted:
            yield child


def xml_to_string(element: ET.Element) -> str:
    return ET.tostring(element, encoding="unicode", short_empty_elements=True)


@dataclass(frozen=True)
class RequestSelection:
    component_names: set[str] | None
    metric_names: set[str] | None
# ── ECOC26 supported metrics ──────────────────────────────────────────────────
# KPISAMPLETYPE_RECEIVED_POWER_PLUGGABLE (2302) → input-power
# KPISAMPLETYPE_PRE_FEC_BER_PLUGGABLE    (2301) → pre-fec-ber
SUPPORTED_CHANNELS = {"channel-1"}
SUPPORTED_METRICS  = {"input-power", "pre-fec-ber"}


class OpenConfigOpticalDataStore:
    def __init__(self, data_file: str) -> None:
        self.data_file = pathlib.Path(data_file)
        self._data: dict[str, Any] = {}
        self.reload()
# ═══════════════════════════════════════════════════════════════════════════════
# Dynamic optical metric generator
# ═══════════════════════════════════════════════════════════════════════════════

    def reload(self) -> None:
        with self.data_file.open("r", encoding="utf-8") as fh:
            self._data = json.load(fh)
        if "components" not in self._data or not isinstance(self._data["components"], dict):
            raise ValueError("telemetry JSON must contain a top-level 'components' object")
        LOG.info("Loaded mock telemetry from %s", self.data_file)
class OpticalSignalSimulator:
    """Optical signal with random power drift and derived BER.

    def build_data_element(self, selection: RequestSelection) -> ET.Element:
        data = ET.Element(qname(NETCONF_NS, "data"))
        components = ET.SubElement(data, qname(OC_PLATFORM_NS, "components"))
    Power wanders randomly within [POWER_MIN, POWER_MAX] on each step.
    Pre-FEC BER is derived from the current power level.
    """

        all_components: dict[str, Any] = self._data.get("components", {})
        requested_names = selection.component_names or set(all_components.keys())
    POWER_MIN  = -45.0   # dBm — worst (link failure)
    POWER_MAX  = -1.0    # dBm — best  (no attenuation)

        for component_name in sorted(requested_names):
            component_data = all_components.get(component_name)
            if component_data is None:
                continue
            self._append_component(components, component_name, component_data, selection.metric_names)
    # Random-walk step range (symmetric)
    STEP_MIN   = -0.5
    STEP_MAX   = +0.5

        return data
    def __init__(self, initial_power: float = -5.0) -> None:
        self._power = max(self.POWER_MIN, min(self.POWER_MAX, initial_power))

    def _append_component(
        self,
        components: ET.Element,
        component_name: str,
        component_data: dict[str, Any],
        selected_metrics: set[str] | None,
    ) -> None:
        component = ET.SubElement(components, qname(OC_PLATFORM_NS, "component"))
        ET.SubElement(component, qname(OC_PLATFORM_NS, "name")).text = component_name
    # ── public API ────────────────────────────────────────────────────────────

        component_state = ET.SubElement(component, qname(OC_PLATFORM_NS, "state"))
        ET.SubElement(component_state, qname(OC_PLATFORM_NS, "name")).text = component_name
        component_type = component_data.get("type")
        if component_type:
            ET.SubElement(component_state, qname(OC_PLATFORM_NS, "type")).text = str(component_type)

        optical_state_data = (
            component_data.get("optical-channel", {})
            .get("state", {})
        )

        optical_channel = ET.SubElement(component, qname(OC_TERMINAL_DEVICE_NS, "optical-channel"))
        state = ET.SubElement(optical_channel, qname(OC_TERMINAL_DEVICE_NS, "state"))
    @property
    def power(self) -> float:
        """Current optical input power in dBm."""
        return round(self._power, 2)

        for key, value in optical_state_data.items():
            if selected_metrics is not None and key not in selected_metrics and key not in {"name"}:
                continue
    @property
    def pre_fec_ber(self) -> float:
        """Pre-FEC BER derived from current power.

            metric_elem = ET.SubElement(state, qname(OC_TERMINAL_DEVICE_NS, key))
            if isinstance(value, dict):
                for child_key, child_value in value.items():
                    ET.SubElement(metric_elem, qname(OC_TERMINAL_DEVICE_NS, child_key)).text = str(child_value)
            else:
                metric_elem.text = str(value)
        Weaker signal (more negative dBm) → worse BER.
        Uses a simple exponential model bounded to [0, 1].
        """
        exponent = -12.0 + (abs(self._power) - 5.0) * 0.3
        exponent = min(exponent, 0.0)               # BER ≤ 1
        return 10.0 ** exponent

    def step(self) -> None:
        """Advance the simulation one polling interval."""
        delta = random.uniform(self.STEP_MIN, self.STEP_MAX)
        self._power = max(self.POWER_MIN, min(self.POWER_MAX, self._power + delta))

    # ── value formatting ──────────────────────────────────────────────────────

    def get_value(self, metric: str) -> str | None:
        if metric == "input-power":
            return str(self.power)
        if metric == "pre-fec-ber":
            return f"{self.pre_fec_ber:.2e}"
        return None


class NetconfProtocol:
    def __init__(self, datastore: OpenConfigOpticalDataStore) -> None:
        self.datastore = datastore
    def __init__(self, simulator: OpticalSignalSimulator) -> None:
        self.simulator = simulator
        self.session_id = os.getenv("NETCONF_SESSION_ID", "100")

    def server_hello(self) -> str:
@@ -187,8 +167,9 @@ class NetconfProtocol:
        LOG.info("Received NETCONF operation <%s> message-id=%s", op_name, message_id)

        if op_name == "get":
            selection = self._extract_selection(operation)
            data = self.datastore.build_data_element(selection)
            # Advance the simulation on every poll for realistic time-varying data
            self.simulator.step()
            data = self._build_data_element()
            reply = self._rpc_reply(message_id)
            reply.append(data)
            return xml_to_string(reply) + EOM, False
@@ -211,31 +192,35 @@ class NetconfProtocol:

        return self._rpc_error(message_id, "operation-not-supported", f"Unsupported operation: {op_name}") + EOM, False

    def _extract_selection(self, get_operation: ET.Element) -> RequestSelection:
        filter_elem = first_child_by_local_name(get_operation, "filter")
        if filter_elem is None:
            return RequestSelection(component_names=None, metric_names=None)

        component_names: set[str] = set()
        metric_names: set[str] = set()

        for component in iter_descendants_by_local_name(filter_elem, "component"):
            name_elem = first_child_by_local_name(component, "name")
            if name_elem is not None and name_elem.text:
                component_names.add(name_elem.text.strip())

        for state in iter_descendants_by_local_name(filter_elem, "state"):
            parent_candidates = list(state)
            for child in parent_candidates:
                metric = local_name(child.tag)
                # Ignore generic state/name selectors and include actual requested metric leaves/containers.
                if metric not in {"name", "type"}:
                    metric_names.add(metric)

        return RequestSelection(
            component_names=component_names or None,
            metric_names=metric_names or None,
        )
    # ── dynamic data builder ──────────────────────────────────────────────────

    def _build_data_element(self) -> ET.Element:
        """Build a <data> payload with current simulator values for channel-1."""
        data = ET.Element(qname(NETCONF_NS, "data"))
        components = ET.SubElement(data, qname(OC_PLATFORM_NS, "components"))
        self._append_component(components, "channel-1")
        return data

    def _append_component(self, components: ET.Element, channel_name: str) -> None:
        component = ET.SubElement(components, qname(OC_PLATFORM_NS, "component"))
        ET.SubElement(component, qname(OC_PLATFORM_NS, "name")).text = channel_name

        component_state = ET.SubElement(component, qname(OC_PLATFORM_NS, "state"))
        ET.SubElement(component_state, qname(OC_PLATFORM_NS, "name")).text = channel_name
        ET.SubElement(component_state, qname(OC_PLATFORM_NS, "type")).text = "OPTICAL_CHANNEL"

        optical_channel = ET.SubElement(component, qname(OC_TERMINAL_DEVICE_NS, "optical-channel"))
        state = ET.SubElement(optical_channel, qname(OC_TERMINAL_DEVICE_NS, "state"))

        # input-power
        power_elem = ET.SubElement(state, qname(OC_TERMINAL_DEVICE_NS, "input-power"))
        ET.SubElement(power_elem, qname(OC_TERMINAL_DEVICE_NS, "instant")).text = self.simulator.get_value("input-power")

        # pre-fec-ber
        ber_elem = ET.SubElement(state, qname(OC_TERMINAL_DEVICE_NS, "pre-fec-ber"))
        ET.SubElement(ber_elem, qname(OC_TERMINAL_DEVICE_NS, "instant")).text = self.simulator.get_value("pre-fec-ber")

    # ── RPC helpers ───────────────────────────────────────────────────────────

    def _rpc_reply(self, message_id: str) -> ET.Element:
        return ET.Element(qname(NETCONF_NS, "rpc-reply"), {"message-id": message_id})
@@ -326,10 +311,10 @@ async def run_server() -> None:
    port = int(os.getenv("NETCONF_PORT", "830"))
    username = os.getenv("NETCONF_USERNAME", "netconf")
    password = os.getenv("NETCONF_PASSWORD", "netconf")
    data_file = os.getenv("NETCONF_DATA_FILE", "/opt/netconf-mock/data/telemetry.json")
    initial_power = float(os.getenv("NETCONF_INITIAL_POWER", "-5.0"))

    datastore = OpenConfigOpticalDataStore(data_file)
    protocol = NetconfProtocol(datastore)
    simulator = OpticalSignalSimulator(initial_power=initial_power)
    protocol = NetconfProtocol(simulator)
    host_key = await create_host_key()

    server = await asyncssh.create_server(
+1 −3
Original line number Diff line number Diff line
@@ -9,7 +9,5 @@ services:
      NETCONF_PORT: "830"
      NETCONF_USERNAME: "netconf"
      NETCONF_PASSWORD: "netconf"
      NETCONF_DATA_FILE: "/opt/netconf-mock/data/telemetry.json"
    volumes:
      - ./data/telemetry.json:/opt/netconf-mock/data/telemetry.json:ro
      NETCONF_INITIAL_POWER: "-5.0"
    restart: unless-stopped
+1 −8
Original line number Diff line number Diff line
@@ -23,14 +23,7 @@ docker push "${IMAGE_REGISTRY}"
echo "Applying K8s manifests..."
kubectl apply -f "${SCRIPT_DIR}/k8s-manifest.yaml"

# Apply ConfigMap from telemetry.json (--dry-run + apply = idempotent upsert)
echo "Applying ConfigMap from telemetry.json..."
kubectl create configmap netconf-mock-data \
  --from-file=telemetry.json="${MOCK_DIR}/data/telemetry.json" \
  -n "${NAMESPACE}" \
  --dry-run=client -o yaml | kubectl apply -f -

# Restart deployment to pick up any ConfigMap changes
# Restart deployment to pick up any image changes
kubectl rollout restart deployment/netconf-oc-optical-mock -n "${NAMESPACE}"

echo "Waiting for deployment to be ready..."
Loading