diff --git a/README.md b/README.md index b0d2d318a17c9d7c2e95c36b355cbd3a94abd828..74186adec060f1151c520d3dcdf63b9fa23e71f9 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,14 @@ Open source SDK to abstract CAMARA/GSMA Transformation Functions (TFs) for Edge | i2CAT O-RAN controller | WIP | +### O-RAN Platforms + +| Platform | Status | CAMARA QoD | +|------------------|---------|------------| +| i2CAT RIC & NEF | ✅ | ✅ | +| Juniper | ❌ | ❌ | + + --- ## How to Use @@ -166,6 +174,15 @@ API ->> SDK: network_client.create_qos_session(QOS_SESSION_REQUEST) SDK ->> NEF: Equivalent endpoint NEF ->> 5GS: QoS session creation ``` +<<<<<<< HEAD +--- + +## Roadmap for Open SDK 2nd release + +- [ ] Add support to GSMA OPG.02 TFs +- [x] Include JUNIPER O-RAN adapter +======= +>>>>>>> main --- diff --git a/examples/example.py b/examples/example.py index d8ea28090625a74528a34d059f3e002ef8d15bc6..1d176c66a273f92d7707ad6d34d8b12df6007ad4 100644 --- a/examples/example.py +++ b/examples/example.py @@ -5,23 +5,21 @@ from sunrise6g_opensdk.common.sdk import Sdk as sdkclient # For developers def main(): # The module that imports the SDK package, must specify which adapters will be used: adapter_specs = { - "edgecloud": { - "client_name": "kubernetes", - "base_url": "http://IP:PORT", - }, - "network": { - "client_name": "open5gs", - "base_url": "http://IP:PORT", - "scs_as_id": "id_example", - }, + "oran": { + "client_name": "i2cat_ric", + "base_url": "http://127.0.0.1:30000", + "scs_as_id": "scs-test", + } } adapters = sdkclient.create_adapters_from(adapter_specs) - edgecloud_client = adapters.get("edgecloud") - network_client = adapters.get("network") + # edgecloud_client = adapters.get("edgecloud") + # network_client = adapters.get("network") + oran_client = adapters.get("oran") - print("EdgeCloud client ready to be used:", edgecloud_client) - print("Network client ready to be used:", network_client) + # print("EdgeCloud client ready to be used:", edgecloud_client) + # print("Network client ready to be used:", network_client) + print("Oran client ready to be used:", oran_client) # Examples: # EdgeCloud diff --git a/pyproject.toml b/pyproject.toml index 68c5415925fe5665d446e32ab11267c1cfad6df0..632dba4816cabeb3d835bd26c792cd131e232f58 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sunrise6g-opensdk" -version = "1.0.18" +version = "1.0.19" description = "Open source SDK to abstract CAMARA/GSMA Transformation Functions (TFs) for Edge Cloud platforms, 5G network cores and Open RAN solutions." keywords = [ "Federation", diff --git a/src/sunrise6g_opensdk/common/adapters_factory.py b/src/sunrise6g_opensdk/common/adapters_factory.py index f09e5e6c0466cf41723ae7112ffa9cc82d66b0b6..2f25024a0c762ee5180ff585ceab02bd1f636606 100644 --- a/src/sunrise6g_opensdk/common/adapters_factory.py +++ b/src/sunrise6g_opensdk/common/adapters_factory.py @@ -24,6 +24,9 @@ from sunrise6g_opensdk.network.adapters.open5gcore.client import ( from sunrise6g_opensdk.network.adapters.open5gs.client import ( NetworkManager as Open5GSClient, ) +from sunrise6g_opensdk.oran.adapters.i2cat_ric.client import ( + OranManager as OranManageri2CAT, +) def _edgecloud_adapters_factory(client_name: str, base_url: str, **kwargs): @@ -64,15 +67,27 @@ def _network_adapters_factory(client_name: str, base_url: str, **kwargs): ) -# def _oran_adapters_factory(client_name: str, base_url: str): -# # TODO +def _oran_adapters_factory(client_name: str, base_url: str, **kwargs): + if "scs_as_id" not in kwargs: + raise ValueError("Missing required 'scs_as_id' for network adapters.") + scs_as_id = kwargs.pop("scs_as_id") + + oran_factory = { + "i2cat_ric": lambda url, scs_id, **kw: OranManageri2CAT( + base_url=url, scs_as_id=scs_id, **kw + ), + } + try: + return oran_factory[client_name](base_url, scs_as_id, **kwargs) + except KeyError: + raise ValueError(f"Invalid Oran client '{client_name}'. Available: {list(oran_factory)}") class AdaptersFactory: _domain_factories = { "edgecloud": _edgecloud_adapters_factory, "network": _network_adapters_factory, - # "oran": _oran_adapters_factory, + "oran": _oran_adapters_factory, } @classmethod diff --git a/src/sunrise6g_opensdk/oran/clients/__init__.py b/src/sunrise6g_opensdk/oran/adapters/__init__.py similarity index 100% rename from src/sunrise6g_opensdk/oran/clients/__init__.py rename to src/sunrise6g_opensdk/oran/adapters/__init__.py diff --git a/src/sunrise6g_opensdk/oran/adapters/errors.py b/src/sunrise6g_opensdk/oran/adapters/errors.py new file mode 100644 index 0000000000000000000000000000000000000000..4f2c0f62776cd1becb7803465aeeb89f63342357 --- /dev/null +++ b/src/sunrise6g_opensdk/oran/adapters/errors.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +class OranPlatformError(Exception): + pass diff --git a/src/sunrise6g_opensdk/oran/clients/juniper_ric/__init__.py b/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/__init__.py similarity index 100% rename from src/sunrise6g_opensdk/oran/clients/juniper_ric/__init__.py rename to src/sunrise6g_opensdk/oran/adapters/i2cat_ric/__init__.py diff --git a/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/client.py b/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/client.py new file mode 100644 index 0000000000000000000000000000000000000000..813ac94fc8c3f596a37e2399c16dc9133fd103a6 --- /dev/null +++ b/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/client.py @@ -0,0 +1,438 @@ +# -*- coding: utf-8 -*- +## +# +# This file is part of the Open SDK, based on sunrise6g_opensdk.network.core.adapters.open5gs.client +# +# Contributors: +# - Miguel Catalan Cid (miguel.catalan@i2cat.net) +## +import json +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Dict + +from pydantic import ValidationError + +from sunrise6g_opensdk import logger +from sunrise6g_opensdk.oran.core.base_oran_client import BaseOranClient + +from ...core import common as oran_common +from ...core import schemas +from ...core.common import OranHttpError, requires_capability +from . import mappings as mappings_module +from .mappings import flow_id_mapping, policy_mapping, qos_prio_to_oran_prio + +log = logger.get_logger(__name__) + + +class OranManager(BaseOranClient): + """ + This client implements the BaseOranClient and translates the + CAMARA APIs into specific HTTP requests understandable by the i2CAT ORAN NEF API. + """ + + capabilities = {"oran-qod", "oran-performance"} + + def __init__(self, base_url: str, scs_as_id): + """ + Initializes the OranNEFClient Client. + """ + try: + # Set required attributes without invoking BaseOranClient.__init__ + self.base_url = base_url + self.scs_as_id = scs_as_id + # Read mappings on-demand; no background thread + log.info( + f"Initialized OranNEFClient with base_url: {self.base_url} " + f"and scs_as_id: {self.scs_as_id}" + ) + except Exception as e: + log.error(f"Failed to initialize OranNEFClient: {e}") + raise e + + def oran_specific_qod_validation(self, session_info: schemas.CreateSession): + if session_info.qosProfile.root not in qos_prio_to_oran_prio.keys(): + raise ValidationError( + f"OranNEFClient only supports these qos-profiles: {', '.join(qos_prio_to_oran_prio.keys())}" + ) + + def _load_ip_mapping_from_file(self) -> Dict[str, Dict[str, Any]]: + base_dir = Path(__file__).parent + cfg_path = base_dir / "ip_to_plmn_gnb_mapping.json" + if not cfg_path.exists(): + # No file present; nothing to load + return {} + + try: + with cfg_path.open("r", encoding="utf-8") as f: + raw = json.load(f) + except Exception as exc: + log.warning(f"Failed to parse mapping file {cfg_path.name}: {exc}") + return {} + + if not isinstance(raw, dict): + log.warning(f"Mapping file root must be an object/dict: {cfg_path.name}") + return {} + + parsed: Dict[str, Dict[str, Any]] = {} + for ip, entry in raw.items(): + if not isinstance(ip, str) or not isinstance(entry, dict): + continue + try: + mcc = entry.get("mcc") + mnc = entry.get("mnc") + gnb_length = entry.get("gnb_length") + gnb_id = entry.get("gnb_id") + ran_ue_id = entry.get("ran_ue_id") + + if ( + mcc is None + or mnc is None + or gnb_length is None + or gnb_id is None + or ran_ue_id is None + ): + raise ValueError("missing required keys") + + # Coerce types + mcc_str = str(mcc) + mnc_str = str(mnc) + gnb_length_int = int(gnb_length) + gnb_id_int = int(gnb_id) + ran_ue_id_str = str(ran_ue_id) + + parsed[ip] = { + "mcc": mcc_str, + "mnc": mnc_str, + "gnb_length": gnb_length_int, + "gnb_id": gnb_id_int, + "ran_ue_id": ran_ue_id_str, + } + except Exception: + # Skip invalid entries + continue + return parsed + + def _get_ip_mapping(self) -> Dict[str, Dict[str, Any]]: + """Return latest IP->PLMN/gNB mapping, reading file each time if present. + + Falls back to in-module defaults when the external file is missing or invalid. + """ + new_map = self._load_ip_mapping_from_file() + if new_map: + return new_map + return mappings_module.ip_to_plmn_gnb_mapping + + @requires_capability("oran-qod") + def create_qod_session(self, session_info: Dict, return_on_error: bool = False) -> Dict: + """Translate CAMARA QoD session into ORAN policy and submit it.""" + candidates = self._extract_device_ip_candidates(session_info) + scope = self._resolve_scope_from_candidates(candidates) + qos_profile, qos_prio, flow_id = self._normalize_qos_profile(session_info) + scope_with_flow = {**scope, "flow_id": flow_id} + expiry = self._parse_expiry(session_info) + notification_uri = self._extract_notification_uri(session_info) + policy = self._build_policy(scope_with_flow, qos_prio, expiry, notification_uri) + + resp = self._post_policy(policy, return_on_error) + # If return_on_error and there was an error, _post_policy returns a camara-like UNAVAILABLE + if resp.get("qosStatus") == "UNAVAILABLE" and "sessionId" not in resp: + return resp + + policy_id = ( + (resp or {}).get("policy_id") or (resp or {}).get("policyId") or (resp or {}).get("id") + ) + if not policy_id: + raise ValueError("ORAN policy creation did not return an ID") + + return self._build_camara_create_response( + session_info=session_info, + policy_id=policy_id, + qos_profile=qos_profile, + notification_uri=notification_uri, + expiry=expiry, + ) + + def _extract_device_ip_candidates(self, session_info: Dict) -> list[str]: + device = session_info.get("device") or {} + ipv4 = device.get("ipv4Address") if isinstance(device, dict) else None + candidates: list[str] = [] + if isinstance(ipv4, dict): + pub_ip = ipv4.get("publicAddress") + prv_ip = ipv4.get("privateAddress") + if isinstance(pub_ip, str) and pub_ip: + candidates.append(pub_ip) + if isinstance(prv_ip, str) and prv_ip and prv_ip not in candidates: + candidates.append(prv_ip) + elif isinstance(ipv4, str) and ipv4: + candidates.append(ipv4) + if not candidates: + raise ValueError("device.ipv4Address (public/private) must be provided") + return candidates + + def _resolve_scope_from_candidates(self, candidates: list[str]) -> Dict[str, Any]: + ip_map = self._get_ip_mapping() + for ip in candidates: + scope = ip_map.get(ip) + if scope: + return scope + raise ValueError(f"No PLMN/gNB/UE mapping found for device IPs {', '.join(candidates)}") + + def _normalize_qos_profile(self, session_info: Dict) -> tuple[str, int, int]: + qos_profile = session_info.get("qosProfile") + if isinstance(qos_profile, dict): + qos_profile = qos_profile.get("root") or qos_profile.get("value") + if qos_profile not in qos_prio_to_oran_prio: + raise ValidationError( + f"Unsupported qosProfile '{qos_profile}'. Allowed: {', '.join(qos_prio_to_oran_prio.keys())}" + ) + qos_prio = qos_prio_to_oran_prio[qos_profile] + try: + flow_id = flow_id_mapping[qos_profile] + except KeyError: + raise ValidationError(f"No flow_id mapping found for qosProfile '{qos_profile}'") + return qos_profile, qos_prio, flow_id + + def _parse_expiry(self, session_info: Dict) -> int | None: + expiry = session_info.get("duration") + try: + return int(expiry) if expiry is not None else None + except Exception: + return None + + def _extract_notification_uri(self, session_info: Dict) -> str | None: + return session_info.get("notificationDestination") or None + + def _build_policy( + self, + scope_with_flow: Dict[str, Any], + qos_prio: int, + expiry: int | None, + notification_uri: str | None, + ) -> schemas.OranPolicy: + return schemas.OranPolicy( + policyType=policy_mapping["oran-qod"], + policyScope=scope_with_flow, + policyStatement={"qos_prio": qos_prio}, + expiry=expiry, + notificationUri=notification_uri, + ) + + def _post_policy(self, policy: schemas.OranPolicy, return_on_error: bool) -> Dict[str, Any]: + try: + return oran_common.oran_policy_post(self.base_url, self.scs_as_id, policy) + except OranHttpError as e: + if return_on_error: + # Map HTTP error to CAMARA StatusInfo when returning UNAVAILABLE + status_info = None + if e.status_code is not None: + if e.status_code >= 500 or e.status_code in (408, 504): + status_info = "NETWORK_TERMINATED" + elif e.status_code == 410: + status_info = "DELETE_REQUESTED" + + # Align error payload with CAMARA ErrorInfo {status, code, message} + body = e.body if isinstance(e.body, dict) else None + if isinstance(body, dict) and { + "status", + "code", + "message", + }.issubset(body.keys()): + error_info = { + "status": body.get("status"), + "code": body.get("code"), + "message": body.get("message"), + } + else: + # Best-effort default mapping when backend doesn't provide CAMARA ErrorInfo + code_map = { + 400: "INVALID_ARGUMENT", + 401: "UNAUTHENTICATED", + 403: "PERMISSION_DENIED", + 404: "NOT_FOUND", + 409: "CONFLICT", + 410: "GONE", + 413: "REQUEST_TOO_LARGE", + 415: "UNSUPPORTED_MEDIA_TYPE", + 422: "UNPROCESSABLE_ENTITY", + 429: "TOO_MANY_REQUESTS", + } + error_info = { + "status": e.status_code, + "code": code_map.get(e.status_code or 0, "INTERNAL_ERROR"), + "message": ( + (body or {}).get("message") if isinstance(body, dict) else str(e) + ), + } + + return { + "qosStatus": "UNAVAILABLE", + "statusInfo": status_info, + "error": error_info, + } + raise + + def _build_camara_create_response( + self, + *, + session_info: Dict, + policy_id: Any, + qos_profile: str, + notification_uri: str | None, + expiry: int | None, + ) -> Dict[str, Any]: + return { + "sessionId": str(policy_id), + "qosStatus": "REQUESTED", + "duration": expiry if isinstance(expiry, int) else session_info.get("duration"), + "device": session_info.get("device"), + "applicationServer": session_info.get("applicationServer"), + "devicePorts": session_info.get("devicePorts"), + "applicationServerPorts": session_info.get("applicationServerPorts"), + "qosProfile": qos_profile, + "sink": notification_uri, + } + + @requires_capability("oran-qod") + def get_qod_session( + self, + session_id: str, + original_session: Dict | None = None, + fallback_unavailable: bool = False, + ) -> Dict: + """Return CAMARA SessionInfo-style data for a QoD session. + + Fetches the underlying ORAN policy to determine liveness and ID but + intentionally shapes the response to CAMARA fields only, not leaking + ORAN-specific attributes (policyScope, policyStatement, etc.). + """ + try: + resp: Dict[str, Any] = oran_common.oran_policy_get( + self.base_url, self.scs_as_id, session_id + ) + except OranHttpError: + if fallback_unavailable: + # Return a minimal CAMARA-like UNAVAILABLE response instead of raising + return { + "sessionId": str(session_id), + "qosStatus": "UNAVAILABLE", + } + raise + + # Determine policy/session identifier from ORAN response, fallback to provided session_id + policy_id = ( + (resp or {}).get("policy_id") + or (resp or {}).get("policyId") + or (resp or {}).get("id") + or session_id + ) + + # Build a fresh CAMARA-shaped response without ORAN internals + camara_resp: Dict[str, Any] = { + "sessionId": str(policy_id), + "qosStatus": "AVAILABLE", + } + + # Always include startedAt; include expiresAt only if a duration is known + now = datetime.now(timezone.utc) + camara_resp["startedAt"] = now.isoformat().replace("+00:00", "Z") + + expiry_val = (resp or {}).get("expiry") + duration_sec: int | None = None + if isinstance(original_session, dict) and isinstance(original_session.get("duration"), int): + duration_sec = int(original_session.get("duration")) + elif isinstance(expiry_val, int): + duration_sec = int(expiry_val) + if isinstance(duration_sec, int): + camara_resp["expiresAt"] = ( + (now + timedelta(seconds=duration_sec)).isoformat().replace("+00:00", "Z") + ) + + # sink: map ORAN notification URI to CAMARA 'sink' + notif = (resp or {}).get("notificationUri") + if isinstance(notif, str) and notif: + camara_resp["sink"] = notif + + # Enrich with original requested session fields when available (CAMARA keys only) + if isinstance(original_session, dict): + for key in ( + "device", + "applicationServer", + "devicePorts", + "applicationServerPorts", + "qosProfile", + ): + val = original_session.get(key) + if camara_resp.get(key) is None and val is not None: + camara_resp[key] = val + # Map legacy key + if camara_resp.get("sink") is None and original_session.get("notificationDestination"): + camara_resp["sink"] = original_session.get("notificationDestination") + + return camara_resp + + @requires_capability("oran-qod") + def delete_qod_session(self, session_id: str) -> None: + """Delete an ORAN policy by ID (maps to QoD session delete).""" + oran_common.oran_policy_delete(self.base_url, self.scs_as_id, session_id) + + def notification_to_camara_session( + self, notification: Dict[str, Any], original_session: Dict | None = None + ) -> Dict[str, Any]: + """Translate an ORAN notification payload into a CAMARA SessionInfo-like dict. + + Example notification payload: + { + "info_type": "policy_ue_prb_priority", + "subscription_id": "", + "data": { + "policy_status": "ENFORCED", + ... + } + } + + Returns a response shaped like our GET mapping and enriches fields + using the provided CAMARA `original_session` when available. + """ + session_id = ( + notification.get("subscription_id") + or notification.get("sessionId") + or notification.get("id") + ) + data = notification.get("data") or {} + policy_status = str(data.get("policy_status") or "").upper() + + # Map policy status to CAMARA qosStatus + qos_status = "AVAILABLE" if policy_status == "ENFORCED" else "UNAVAILABLE" + + camara_resp: Dict[str, Any] = { + "sessionId": str(session_id) if session_id is not None else None, + "qosStatus": qos_status, + } + + # Always include startedAt; add expiresAt only if original_session carries a duration + now = datetime.now(timezone.utc) + camara_resp["startedAt"] = now.isoformat().replace("+00:00", "Z") + if isinstance(original_session, dict) and isinstance(original_session.get("duration"), int): + duration_sec = int(original_session.get("duration")) + camara_resp["expiresAt"] = ( + (now + timedelta(seconds=duration_sec)).isoformat().replace("+00:00", "Z") + ) + + # Enrich with original CAMARA fields if provided + if isinstance(original_session, dict): + for key in ( + "device", + "applicationServer", + "devicePorts", + "applicationServerPorts", + "qosProfile", + ): + val = original_session.get(key) + if camara_resp.get(key) is None and val is not None: + camara_resp[key] = val + sink = original_session.get("sink") or original_session.get("notificationDestination") + if sink and camara_resp.get("sink") is None: + camara_resp["sink"] = sink + + return camara_resp diff --git a/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/ip_to_plmn_gnb_mapping.json b/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/ip_to_plmn_gnb_mapping.json new file mode 100644 index 0000000000000000000000000000000000000000..82f717b1f0c40a7a342a329b395abd23fa1cc5cb --- /dev/null +++ b/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/ip_to_plmn_gnb_mapping.json @@ -0,0 +1,4 @@ +{ +"192.168.1.10": { "mcc": "001", "mnc": "01", "gnb_length": 28, "gnb_id": 12345, "ran_ue_id":"0000000000000008"}, +"10.45.0.10": { "mcc": "214", "mnc": "07", "gnb_length": 28, "gnb_id": 67890, "ran_ue_id":"0000000000000003"} +} diff --git a/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/mappings.py b/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/mappings.py new file mode 100644 index 0000000000000000000000000000000000000000..94f073d636ba1bf25e4e865da160f14c0805e035 --- /dev/null +++ b/src/sunrise6g_opensdk/oran/adapters/i2cat_ric/mappings.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +## +# +# This file is part of the Open SDK. +# Defines some fixed mappings that in future releases could be done dynamically +# Contributors: +# - Miguel Catalan Cid (miguel.catalan@i2cat.net) +## + +# mapping from QoD to O-RAN policy +qos_prio_to_oran_prio = { + "qos-e": "qos-e", + "qos-s": "qos-s", + "qos-m": "qos-m", + "qos-l": "qos-l", +} +# mapping from QoD to 5qi +flow_id_mapping = {"qos-e": 3, "qos-s": 4, "qos-m": 5, "qos-l": 6} + + +# Maps an IP address to PLMN and gNB identifiers +# Keys: IP (str) +# Values: {"mcc": str, "mnc": str, "gnb_length": int, "gnb_id": int} +ip_to_plmn_gnb_mapping = { + # Example entries: + "192.168.1.10": { + "mcc": "001", + "mnc": "01", + "gnb_length": 28, + "gnb_id": 12345, + "ran_ue_id": "0000000000000001", + }, + "10.10.45.1": { + "mcc": "214", + "mnc": "07", + "gnb_length": 28, + "gnb_id": 67890, + "ran_ue_id": "0000000000000033", + }, +} + +# maps a CAMARA policy request to the information type in the oran NEF +policy_mapping = {"oran-qod": "qod_prb_prio"} diff --git a/src/sunrise6g_opensdk/oran/clients/juniper_ric/client.py b/src/sunrise6g_opensdk/oran/adapters/juniper_ric/__init__.py similarity index 100% rename from src/sunrise6g_opensdk/oran/clients/juniper_ric/client.py rename to src/sunrise6g_opensdk/oran/adapters/juniper_ric/__init__.py diff --git a/src/sunrise6g_opensdk/oran/adapters/juniper_ric/client.py b/src/sunrise6g_opensdk/oran/adapters/juniper_ric/client.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/sunrise6g_opensdk/oran/core/base_oran_client.py b/src/sunrise6g_opensdk/oran/core/base_oran_client.py new file mode 100644 index 0000000000000000000000000000000000000000..f7ac1163be2f5ccc3b2ffc26b0844871fa7f0f35 --- /dev/null +++ b/src/sunrise6g_opensdk/oran/core/base_oran_client.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +## +# This file is part of the Open SDK, based on sunrise6g_opensdk.network.base_network_client +# +# Contributors: +# +# - Miguel Catalán Cid (miguel.catalan@i2cat.net) +## +import threading +import uuid +from typing import Dict + +from sunrise6g_opensdk import logger + +# from sunrise6g_opensdk.oran.adapters.errors import OranPlatformError +from sunrise6g_opensdk.oran.core import common, schemas +from sunrise6g_opensdk.oran.core.common import requires_capability + +log = logger.get_logger(__name__) + + +class BaseOranClient: + """ + Class for Oran Resource Management. + + This class provides shared logic and extension points for different + ORAN frameworks (e.g., i2CAT based on OSC, Juniper RIC) interacting with + O-RAN NEF-like platforms or rapps using CAMARA APIs. + """ + + base_url: str + scs_as_id: str + _refresh_thread: threading.Thread | None = None + _refresh_stop_event: threading.Event | None = None + + @requires_capability("oran-qod") + def add_oran_specific_qod_parameters( + self, + session_info: schemas.CreateSession, + subscription: schemas.AsSessionWithQoSSubscription, + ): + """ + Placeholder for adding core-specific parameters to the subscription. + This method should be overridden by subclasses to implement specific logic. + """ + pass + + @requires_capability("oran-qod") + def core_specific_qod_validation(self, session_info: schemas.CreateSession) -> None: + """ + Validates core-specific parameters for the session creation. + + args: + session_info: The session information to validate. + + raises: + ValidationError: If the session information does not meet core-specific requirements. + """ + # Placeholder for core-specific validation logic + # This method should be overridden by subclasses if needed + pass + + # Periodic dynamic mappings refresh + def refresh_dynamic_mappings(self) -> None: + """ + Hook for subclasses to refresh dynamic mappings/config from external sources. + Default implementation is a no-op. + """ + return + + def _periodic_refresh_loop(self, interval_seconds: int, run_immediately: bool) -> None: + if not run_immediately: + # Initial delay before first refresh + if self._refresh_stop_event and self._refresh_stop_event.wait(interval_seconds): + return + while self._refresh_stop_event and not self._refresh_stop_event.is_set(): + try: + self.refresh_dynamic_mappings() + except Exception as exc: + log.warning(f"Periodic refresh failed: {exc}") + # Wait for next interval or stop event + if self._refresh_stop_event.wait(interval_seconds): + break + + def start_periodic_refresh( + self, interval_seconds: int = 60, run_immediately: bool = True + ) -> None: + """ + Starts a background thread that periodically calls `refresh_dynamic_mappings`. + If already running, it will be restarted with the new parameters. + """ + # Stop any existing loop + self.stop_periodic_refresh() + self._refresh_stop_event = threading.Event() + self._refresh_thread = threading.Thread( + target=self._periodic_refresh_loop, + args=(interval_seconds, run_immediately), + daemon=True, + ) + self._refresh_thread.start() + log.info( + f"Started periodic refresh every {interval_seconds}s (immediate={run_immediately})" + ) + + def stop_periodic_refresh(self) -> None: + """ + Stops the background periodic refresh thread if running. + """ + if self._refresh_stop_event is not None: + self._refresh_stop_event.set() + if self._refresh_thread is not None and self._refresh_thread.is_alive(): + self._refresh_thread.join(timeout=2) + self._refresh_thread = None + self._refresh_stop_event = None + + @requires_capability("qod") + def create_qod_session(self, session_info: Dict) -> Dict: + """ + Creates a QoS session based on CAMARA QoD API input. + + args: + session_info: Dictionary containing session details conforming to + the CAMARA QoD session creation parameters. + + returns: + dictionary containing the created session details, including its ID. + """ + subscription = self._build_qod_subscription(session_info) + response = common.as_session_with_qos_post(self.base_url, self.scs_as_id, subscription) + subscription_info: schemas.AsSessionWithQoSSubscription = ( + schemas.AsSessionWithQoSSubscription(**response) + ) + + session_info = schemas.SessionInfo( + sessionId=schemas.SessionId(uuid.UUID(subscription_info.subscription_id)), + qosStatus=schemas.QosStatus.REQUESTED, + **session_info, + ) + return session_info.model_dump(mode="json", by_alias=True) + + @requires_capability("qod") + def get_qod_session(self, session_id: str) -> Dict: + """ + Retrieves details of a specific Quality on Demand (QoS) session. + + args: + session_id: The unique identifier of the QoS session. + + returns: + Dictionary containing the details of the requested QoS session. + """ + response = common.as_session_with_qos_get( + self.base_url, self.scs_as_id, session_id=session_id + ) + subscription_info = schemas.AsSessionWithQoSSubscription(**response) + flowDesc = subscription_info.flowInfo[0].flowDescriptions[0] + serverIp = flowDesc.split("to ")[1].split("/")[0] + session_info = schemas.SessionInfo( + sessionId=schemas.SessionId(uuid.UUID(subscription_info.subscription_id)), + duration=subscription_info.usageThreshold.duration.root, + sink=subscription_info.notificationDestination.root, + qosProfile=subscription_info.qosReference, + device=schemas.Device( + ipv4Address=schemas.DeviceIpv4Addr1( + publicAddress=subscription_info.ueIpv4Addr, + privateAddress=subscription_info.ueIpv4Addr, + ), + ), + applicationServer=schemas.ApplicationServer( + ipv4Address=schemas.ApplicationServerIpv4Address(serverIp) + ), + qosStatus=schemas.QosStatus.AVAILABLE, + ) + return session_info.model_dump(mode="json", by_alias=True) + + @requires_capability("qod") + def delete_qod_session(self, session_id: str) -> None: + """ + Deletes a specific Quality on Demand (QoS) session. + + args: + session_id: The unique identifier of the QoS session to delete. + + returns: + None + """ + common.as_session_with_qos_delete(self.base_url, self.scs_as_id, session_id=session_id) + log.info(f"QoD session deleted successfully [id={session_id}]") + + # Placeholder for additional CAMARA APIs diff --git a/src/sunrise6g_opensdk/oran/core/common.py b/src/sunrise6g_opensdk/oran/core/common.py new file mode 100644 index 0000000000000000000000000000000000000000..aacd7ff0eec7f67c6a2d00087c90e496b05f6073 --- /dev/null +++ b/src/sunrise6g_opensdk/oran/core/common.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- + +import requests +from pydantic import BaseModel + +from sunrise6g_opensdk import logger + +log = logger.get_logger(__name__) + + +def _make_request(method: str, url: str, data=None): + try: + headers = None + if method == "POST" or method == "PUT": + headers = { + "Content-Type": "application/json", + "accept": "application/json", + } + elif method == "GET": + headers = { + "accept": "application/json", + } + response = requests.request(method, url, headers=headers, data=data) + response.raise_for_status() + if response.content: + return response.json() + except requests.exceptions.HTTPError as e: + status = None + body = None + try: + if e.response is not None: + status = e.response.status_code + try: + body = e.response.json() + except Exception: + body = e.response.text + except Exception: + pass + raise OranHttpError(str(e), status_code=status, body=body) from e + except requests.exceptions.ConnectionError as e: + raise OranHttpError("connection error", status_code=None, body=None) from e + + +class CapabilityNotSupported(Exception): + """Raised when a requested capability is not supported by the core.""" + + pass + + +def requires_capability(feature: str): + def decorator(func): + def wrapper(self, *args, **kwargs): + if feature not in self.capabilities: + # Client name is derived from the module + module_path = self.__module__.split(".") + try: + client_name = module_path[module_path.index("adapters") + 1] + except (ValueError, IndexError): + client_name = self.__class__.__name__ + + raise CapabilityNotSupported( + f"Functionality '{feature}' is nos supported by {client_name}" + ) + return func(self, *args, **kwargs) + + return wrapper + + return decorator + + +class OranHttpError(Exception): + def __init__( + self, message: str, status_code: int | None = None, body: dict | str | None = None + ): + super().__init__(message) + self.status_code = status_code + self.body = body + + +# Subscription Event Methods +def oran_subscription_post(base_url: str, scs_as_id: str, model_payload: BaseModel) -> dict: + data = model_payload.model_dump_json(exclude_none=True, by_alias=True) + url = oran_subscription_build(base_url, scs_as_id) + return _make_request("POST", url, data=data) + + +def oran_subscription_build(base_url: str, scs_as_id: str, session_id: str = None): + url = f"{base_url}/{scs_as_id}/subscriptions" + if session_id is not None and len(session_id) > 0: + return f"{url}/{session_id}" + else: + return url + + +# Policy methods +def oran_policy_post(base_url: str, scs_as_id: str, model_payload: BaseModel) -> dict: + data = model_payload.model_dump_json(exclude_none=True, by_alias=True) + url = oran_policy_build_url(base_url, scs_as_id) + return _make_request("POST", url, data=data) + + +def oran_policy_get(base_url: str, scs_as_id: str, session_id: str) -> dict: + url = oran_policy_build_url(base_url, scs_as_id, session_id) + return _make_request("GET", url) + + +def oran_policy_delete(base_url: str, scs_as_id: str, session_id: str): + url = oran_policy_build_url(base_url, scs_as_id, session_id) + return _make_request("DELETE", url) + + +def oran_policy_build_url(base_url: str, scs_as_id: str, session_id: str = None): + url = f"{base_url}/{scs_as_id}/oran-policies" + if session_id is not None and len(session_id) > 0: + return f"{url}/{session_id}" + else: + # Collection URL should end with a trailing slash to match FastAPI route + return f"{url}/" diff --git a/src/sunrise6g_opensdk/oran/core/oran_interface.py b/src/sunrise6g_opensdk/oran/core/oran_interface.py deleted file mode 100644 index 464090415c47109523e91779d4f40e19495c9cf1..0000000000000000000000000000000000000000 --- a/src/sunrise6g_opensdk/oran/core/oran_interface.py +++ /dev/null @@ -1 +0,0 @@ -# TODO diff --git a/src/sunrise6g_opensdk/oran/core/schemas.py b/src/sunrise6g_opensdk/oran/core/schemas.py new file mode 100644 index 0000000000000000000000000000000000000000..9175343350d8b73e9047c26c6f6c970a5efb779a --- /dev/null +++ b/src/sunrise6g_opensdk/oran/core/schemas.py @@ -0,0 +1,650 @@ +# -*- coding: utf-8 -*- +# This file defines the Pydantic models that represent the data structures (schemas) +# for the requests sent to and responses received from the Open5GS NEF API, +# specifically focusing on the APIs needed to support CAMARA QoD. +## +# This file is part of the Open SDK, based on sunrise6g_opensdk.network.core.schemas.py +# +# Contributors: +# +# - Miguel Catalán Cid (miguel.catalan@i2cat.net) +## + +import ipaddress +from datetime import datetime +from enum import Enum +from ipaddress import IPv4Address, IPv6Address +from typing import Annotated, Literal, Union +from uuid import UUID + +from pydantic import ( + AnyHttpUrl, + AnyUrl, + BaseModel, + ConfigDict, + Field, + NonNegativeInt, + RootModel, +) +from pydantic_extra_types.mac_address import MacAddress + +from sunrise6g_opensdk.logger import setup_logger +from sunrise6g_opensdk.oran.adapters.errors import OranPlatformError + +log = setup_logger(__name__) + + +class FlowDirection(Enum): + """ + DOWNLINK: The corresponding filter applies for traffic to the UE. + UPLINK: The corresponding filter applies for traffic from the UE. + BIDIRECTIONAL: The corresponding filter applies for traffic both to and from the UE. + UNSPECIFIED: The corresponding filter applies for traffic to the UE (downlink), but + has no specific direction declared. The service data flow detection shall apply the + filter for uplink traffic as if the filter was bidirectional. The PCF shall not use + the value UNSPECIFIED in filters created by the network in NW-initiated procedures. + The PCF shall only include the value UNSPECIFIED in filters in UE-initiated + procedures if the same value is received from the SMF. + """ + + DOWNLINK = "DOWNLINK" + UPLINK = "UPLINK" + BIDIRECTIONAL = "BIDIRECTIONAL" + UNSPECIFIED = "UNSPECIFIED" + + +class RequestedQosMonitoringParameter(Enum): + DOWNLINK = "DOWNLINK" + UPLINK = "UPLINK" + ROUND_TRIP = "ROUND_TRIP" + + +class ReportingFrequency(Enum): + EVENT_TRIGGERED = "EVENT_TRIGGERED" + PERIODIC = "PERIODIC" + SESSION_RELEASE = "SESSION_RELEASE" + + +Uinteger = Annotated[int, Field(ge=0)] + + +class DurationSec(RootModel[NonNegativeInt]): + root: NonNegativeInt = Field( + ..., + description="Unsigned integer identifying a period of time in units of \ + seconds.", + ) + + +class Volume(RootModel[NonNegativeInt]): + root: NonNegativeInt = Field( + ..., description="Unsigned integer identifying a volume in units of bytes." + ) + + +class SupportedFeatures(RootModel[str]): + root: str = Field( + ..., + pattern=r"^[A-Fa-f0-9]*$", + description="Hexadecimal string representing supported features.", + ) + + +class Link(RootModel[str]): + root: str = Field( + ..., + description="String formatted according to IETF RFC 3986 identifying a \ + referenced resource.", + ) + + +class FlowDescriptionModel(RootModel[str]): + root: str = Field(..., description="Defines a packet filter of an IP flow.") + + +class EthFlowDescription(BaseModel): + destMacAddr: MacAddress | None = None + ethType: str + fDesc: FlowDescriptionModel | None = None + fDir: FlowDirection | None = None + sourceMacAddr: MacAddress | None = None + vlanTags: list[str] | None = Field(None, max_length=2, min_length=1) + srcMacAddrEnd: MacAddress | None = None + destMacAddrEnd: MacAddress | None = None + + +class UsageThreshold(BaseModel): + duration: DurationSec | None = None + totalVolume: Volume | None = None + downlinkVolume: Volume | None = None + uplinkVolume: Volume | None = None + + +class SponsorInformation(BaseModel): + sponsorId: str = Field(..., description="It indicates Sponsor ID.") + aspId: str = Field(..., description="It indicates Application Service Provider ID.") + + +class QosMonitoringInformationModel(BaseModel): + reqQosMonParams: list[RequestedQosMonitoringParameter] | None = Field(None, min_length=1) + repFreqs: list[ReportingFrequency] | None = Field(None, min_length=1) + repThreshDl: Uinteger | None = None + repThreshUl: Uinteger | None = None + repThreshRp: Uinteger | None = None + waitTime: int | None = None + repPeriod: int | None = None + + +class FlowInfo(BaseModel): + flowId: int = Field(..., description="Indicates the IP flow.") + flowDescriptions: list[str] | None = Field( + None, + description="Indicates the packet filters of the IP flow. Refer to subclause \ + 5.3.8 of 3GPP TS 29.214 for encoding. It shall contain UL and/or DL IP \ + flow description.", + max_length=2, + min_length=1, + ) + + +class Snssai(BaseModel): + sst: int = Field(default=1) + sd: str = Field(default="FFFFFF") + + +class AsSessionWithQoSSubscription(BaseModel): + model_config = ConfigDict(serialize_by_alias=True) + self_: Link | None = Field(None, alias="self") + supportedFeatures: SupportedFeatures | None = None + notificationDestination: Link + flowInfo: list[FlowInfo] | None = Field( + None, description="Describe the data flow which requires QoS.", min_length=1 + ) + ethFlowInfo: list[EthFlowDescription] | None = Field( + None, description="Identifies Ethernet packet flows.", min_length=1 + ) + qosReference: str | None = Field(None, description="Identifies a pre-defined QoS information") + altQoSReferences: list[str] | None = Field( + None, + description="Identifies an ordered list of pre-defined QoS information. The \ + lower the index of the array for a given entry, the higher the priority.", + min_length=1, + ) + ueIpv4Addr: ipaddress.IPv4Address | None = None + ueIpv6Addr: ipaddress.IPv6Address | None = None + macAddr: MacAddress | None = None + snssai: Snssai | None = None + dnn: str | None = None + usageThreshold: UsageThreshold | None = None + sponsorInfo: SponsorInformation | None = None + qosMonInfo: QosMonitoringInformationModel | None = None + + @property + def subscription_id(self) -> str: + """ + Returns the subscription ID, which is the same as the self link. + """ + subscription_id = self.self_.root.split("/")[-1] if self.self_.root else None + if not subscription_id: + log.error("Failed to retrieve QoS session ID from response") + raise OranPlatformError("QoS session ID not found in response") + return subscription_id + + +# Monitoring Event API +class DurationMin(BaseModel): + duration: int = Field( + 0, + description="Unsigned integer identifying a period of time in units of minutes", + ge=0, + ) + + +class PlmnId(BaseModel): + mcc: str = Field( + ..., + description="String encoding a Mobile Country Code, comprising of 3 digits.", + ) + mnc: str = Field( + ..., + description="String encoding a Mobile Network Code, comprising of 2 or 3 digits.", + ) + + +# This data type represents a monitoring event type. +class MonitoringType(str, Enum): + LOCATION_REPORTING = "LOCATION_REPORTING" + + +class LocationFailureCause(str, Enum): + position_denied = "POSITIONING_DENIED" # Positioning is denied. + unsupported_by_ue = "UNSUPPORTED_BY_UE" # Positioning is not supported by UE. + not_registered_ue = "NOT_REGISTERED_UE" # UE is not registered. + unspecified = "UNSPECIFIED" # Unspecified cause. + + +class GeographicalCoordinates(BaseModel): + lon: float = Field(..., description="Longitude coordinate.") + lat: float = Field(..., description="Latitude coordinate.") + + +class PointListNef(BaseModel): + geographical_coords: list[GeographicalCoordinates] = Field( + ..., + description="List of geographical coordinates defining the points.", + min_length=3, + max_length=15, + ) + + +class NefPolygon(BaseModel): + point_list: PointListNef = Field(..., description="List of points defining the polygon.") + + +class GeographicArea(BaseModel): + polygon: NefPolygon | None = Field(None, description="Identifies a polygonal geographic area.") + + +# This data type represents the user location information which is sent from the NEF to the AF. +class LocationInfo(BaseModel): + ageOfLocationInfo: DurationMin | None = Field( + None, + description="Indicates the elapsed time since the last network contact of the UE.", + ) + cellId: str | None = Field(None, description="Cell ID where the UE is located.") + trackingAreaId: str | None = Field(None, description="TrackingArea ID where the UE is located.") + enodeBId: str | None = Field(None, description="eNodeB ID where the UE is located.") + routingAreaId: str | None = Field(None, description="Routing Area ID where the UE is located") + plmnId: PlmnId | None = Field(None, description="PLMN ID where the UE is located.") + twanId: str | None = Field(None, description="TWAN ID where the UE is located.") + geographicArea: GeographicArea | None = Field( + None, + description="Identifies a geographic area of the user where the UE is located.", + ) + + +# This data type represents a monitoring event notification which is sent from the NEF to the AF. +class MonitoringEventReport(BaseModel): + externalId: str | None = Field(None, description="Identifies a user, clause 4.6.2 TS 23.682") + msisdn: str | None = Field( + None, + description="Identifies the MS internal PSTN/ISDN number allocated for a UE.", + ) + locationInfo: LocationInfo | None = Field( + None, description="Indicates the user location related information." + ) + locFailureCause: LocationFailureCause | None = Field( + None, description="Indicates the location positioning failure cause." + ) + monitoringType: MonitoringType = Field( + ..., + description="Identifies the type of monitoring as defined in clause 5.3.2.4.3.", + ) + eventTime: datetime | None = Field( + None, + description="Identifies when the event is detected or received. Shall be included for each group of UEs.", + ) + + +# This data type represents a monitoring notification which is sent from the NEF to the AF. +class MonitoringNotification(BaseModel): + subscription: AnyHttpUrl = Field( + ..., + description="Link to the subscription resource to which this notification is related.", + ) + monitoringEventReports: list[MonitoringEventReport] | None = Field( + None, + description="Each element identifies a monitoring event report (optional).", + ) + cancelInd: bool | None = Field( + False, + description="Indicates whether to request to cancel the corresponding monitoring subscription. Set to false or omitted otherwise.", + ) + + +############################################################### +############################################################### +# CAMARA Models + + +class PhoneNumber(RootModel[str]): + root: Annotated[ + str, + Field( + description="A public identifier addressing a telephone subscription. In mobile networks it corresponds to the MSISDN (Mobile Station International Subscriber Directory Number). In order to be globally unique it has to be formatted in international format, according to E.164 standard, prefixed with '+'.", + examples=["+123456789"], + pattern="^\\+[1-9][0-9]{4,14}$", + ), + ] + + +class NetworkAccessIdentifier(RootModel[str]): + root: Annotated[ + str, + Field( + description="A public identifier addressing a subscription in a mobile network. In 3GPP terminology, it corresponds to the GPSI formatted with the External Identifier ({Local Identifier}@{Domain Identifier}). Unlike the telephone number, the network access identifier is not subjected to portability ruling in force, and is individually managed by each operator.", + examples=["123456789@domain.com"], + ), + ] + + +class SingleIpv4Addr(RootModel[IPv4Address]): + root: Annotated[ + IPv4Address, + Field( + description="A single IPv4 address with no subnet mask", + examples=["203.0.113.0"], + ), + ] + + +class Port(RootModel[int]): + root: Annotated[int, Field(description="TCP or UDP port number", ge=0, le=65535)] + + +class DeviceIpv4Addr1(BaseModel): + publicAddress: SingleIpv4Addr + privateAddress: SingleIpv4Addr + publicPort: Port | None = None + + +class DeviceIpv4Addr2(BaseModel): + publicAddress: SingleIpv4Addr + privateAddress: SingleIpv4Addr | None = None + publicPort: Port + + +class DeviceIpv4Addr(RootModel[DeviceIpv4Addr1 | DeviceIpv4Addr2]): + root: Annotated[ + DeviceIpv4Addr1 | DeviceIpv4Addr2, + Field( + description="The device should be identified by either the public (observed) IP address and port as seen by the application server, or the private (local) and any public (observed) IP addresses in use by the device (this information can be obtained by various means, for example from some DNS servers).\n\nIf the allocated and observed IP addresses are the same (i.e. NAT is not in use) then the same address should be specified for both publicAddress and privateAddress.\n\nIf NAT64 is in use, the device should be identified by its publicAddress and publicPort, or separately by its allocated IPv6 address (field ipv6Address of the Device object)\n\nIn all cases, publicAddress must be specified, along with at least one of either privateAddress or publicPort, dependent upon which is known. In general, mobile devices cannot be identified by their public IPv4 address alone.\n", + examples=[{"publicAddress": "203.0.113.0", "publicPort": 59765}], + ), + ] + + +class DeviceIpv6Address(RootModel[IPv6Address]): + root: Annotated[ + IPv6Address, + Field( + description="The device should be identified by the observed IPv6 address, or by any single IPv6 address from within the subnet allocated to the device (e.g. adding ::0 to the /64 prefix).\n\nThe session shall apply to all IP flows between the device subnet and the specified application server, unless further restricted by the optional parameters devicePorts or applicationServerPorts.\n", + examples=["2001:db8:85a3:8d3:1319:8a2e:370:7344"], + ), + ] + + +class Device(BaseModel): + phoneNumber: PhoneNumber | None = None + networkAccessIdentifier: NetworkAccessIdentifier | None = None + ipv4Address: DeviceIpv4Addr | None = None + ipv6Address: DeviceIpv6Address | None = None + + +class RetrievalLocationRequest(BaseModel): + """ + Request to retrieve the location of a device. Device is not required when using a 3-legged access token. + """ + + device: Annotated[ + Device | None, + Field(None, description="End-user device able to connect to a mobile network."), + ] + maxAge: Annotated[ + int | None, + Field( + None, + description="Maximum age of the location information which is accepted for the location retrieval (in seconds).", + ), + ] + maxSurface: Annotated[ + int | None, + Field( + None, + description="Maximum surface in square meters which is accepted by the client for the location retrieval.", + ge=1, + examples=[1000000], + ), + ] + + +class AreaType(str, Enum): + circle = "CIRCLE" # The area is defined as a circle. + polygon = "POLYGON" # The area is defined as a polygon. + + +class Point(BaseModel): + latitude: Annotated[ + float, + Field( + description="Latitude component of a location.", + examples=["50.735851"], + ge=-90, + le=90, + ), + ] + longitude: Annotated[ + float, + Field( + ..., + description="Longitude component of location.", + examples=["7.10066"], + ge=-180, + le=180, + ), + ] + + +class PointList( + RootModel[ + Annotated[ + list[Point], + Field( + min_length=3, + max_length=15, + description="List of points defining the area.", + ), + ] + ] +): + pass + + +class Circle(BaseModel): + areaType: Literal[AreaType.circle] + center: Annotated[Point, Field(description="Center point of the circle.")] + radius: Annotated[float, Field(description="Radius of the circle.", ge=1)] + + +class Polygon(BaseModel): + areaType: Literal[AreaType.polygon] + boundary: Annotated[PointList, Field(description="List of points defining the polygon.")] + + +Area = Annotated[Circle | Polygon, Field(discriminator="areaType")] + + +class LastLocationTime( + RootModel[ + Annotated[ + datetime, + Field( + description="Last date and time when the device was localized.", + examples="2023-09-07T10:40:52Z", + ), + ] + ] +): + pass + + +class Location(BaseModel): + lastLocationTime: Annotated[LastLocationTime, Field(description="Last known location time.")] + area: Annotated[Area, Field(description="Geographical area of the location.")] + + +class ApplicationServerIpv4Address(RootModel[str]): + root: Annotated[ + str, + Field( + description="IPv4 address may be specified in form
as:\n - address - an IPv4 number in dotted-quad form 1.2.3.4. Only this exact IP number will match the flow control rule.\n - address/mask - an IP number as above with a mask width of the form 1.2.3.4/24.\n In this case, all IP numbers from 1.2.3.0 to 1.2.3.255 will match. The bit width MUST be valid for the IP version.\n", + examples=["198.51.100.0/24"], + ), + ] + + +class ApplicationServerIpv6Address(RootModel[str]): + root: Annotated[ + str, + Field( + description="IPv6 address may be specified in form
as:\n - address - The /128 subnet is optional for single addresses:\n - 2001:db8:85a3:8d3:1319:8a2e:370:7344\n - 2001:db8:85a3:8d3:1319:8a2e:370:7344/128\n - address/mask - an IP v6 number with a mask:\n - 2001:db8:85a3:8d3::0/64\n - 2001:db8:85a3:8d3::/64\n", + examples=["2001:db8:85a3:8d3:1319:8a2e:370:7344"], + ), + ] + + +class ApplicationServer(BaseModel): + ipv4Address: ApplicationServerIpv4Address | None = None + ipv6Address: ApplicationServerIpv6Address | None = None + + +class Range(BaseModel): + from_: Annotated[Port, Field(alias="from")] + to: Port + + +class PortsSpec(BaseModel): + ranges: Annotated[ + list[Range] | None, Field(description="Range of TCP or UDP ports", min_length=1) + ] = None + ports: Annotated[ + list[Port] | None, Field(description="Array of TCP or UDP ports", min_length=1) + ] = None + + +class QosProfileName(RootModel[str]): + root: Annotated[ + str, + Field( + description="A unique name for identifying a specific QoS profile.\nThis may follow different formats depending on the API provider implementation.\nSome options addresses:\n - A UUID style string\n - Support for predefined profiles QOS_S, QOS_M, QOS_L, and QOS_E\n - A searchable descriptive name\nThe set of QoS Profiles that an API provider is offering may be retrieved by means of the QoS Profile API (qos-profile) or agreed on onboarding time.\n", + examples=["voice"], + max_length=256, + min_length=3, + pattern="^[a-zA-Z0-9_.-]+$", + ), + ] + + +class CredentialType(Enum): + PLAIN = "PLAIN" + ACCESSTOKEN = "ACCESSTOKEN" + REFRESHTOKEN = "REFRESHTOKEN" + + +class SinkCredential(BaseModel): + credentialType: Annotated[ + CredentialType, + Field( + description="The type of the credential.\nNote: Type of the credential - MUST be set to ACCESSTOKEN for now\n" + ), + ] + + +class NotificationSink(BaseModel): + sink: str | None + sinkCredential: SinkCredential | None + + +class BaseSessionInfo(BaseModel): + device: Device | None = None + applicationServer: ApplicationServer + devicePorts: Annotated[ + PortsSpec | None, + Field( + description="The ports used locally by the device for flows to which the requested QoS profile should apply. If omitted, then the qosProfile will apply to all flows between the device and the specified application server address and ports" + ), + ] = None + applicationServerPorts: Annotated[ + PortsSpec | None, + Field(description="A list of single ports or port ranges on the application server"), + ] = None + qosProfile: QosProfileName + sink: Annotated[ + AnyUrl | None, + Field( + description="The address to which events about all status changes of the session (e.g. session termination) shall be delivered using the selected protocol.", + examples=["https://endpoint.example.com/sink"], + ), + ] = None + sinkCredential: Annotated[ + SinkCredential | None, + Field( + description="A sink credential provides authentication or authorization information necessary to enable delivery of events to a target." + ), + ] = None + + +class CreateSession(BaseSessionInfo): + duration: Annotated[ + int, + Field( + description="Requested session duration in seconds. Value may be explicitly limited for the QoS profile, as specified in the Qos Profile (see qos-profile API). Implementations can grant the requested session duration or set a different duration, based on network policies or conditions.\n", + examples=[3600], + ge=1, + ), + ] + + +class SessionId(RootModel[UUID]): + root: Annotated[UUID, Field(description="Session ID in UUID format")] + + +class QosStatus(Enum): + REQUESTED = "REQUESTED" + AVAILABLE = "AVAILABLE" + UNAVAILABLE = "UNAVAILABLE" + + +# ORAN Policy models (client-side request) +class OranPolicy(BaseModel): + policy_type: str = Field(..., alias="policyType") + policy_scope: dict = Field(..., alias="policyScope") + policy_statement: dict = Field(..., alias="policyStatement") + expiry: Union[datetime, int, None] = None + notification_uri: AnyHttpUrl | None = Field(None, alias="notificationUri") + + model_config = ConfigDict(populate_by_name=True) + + +class StatusInfo(Enum): + DURATION_EXPIRED = "DURATION_EXPIRED" + NETWORK_TERMINATED = "NETWORK_TERMINATED" + DELETE_REQUESTED = "DELETE_REQUESTED" + + +class SessionInfo(BaseSessionInfo): + sessionId: SessionId + duration: Annotated[ + int, + Field( + description='Session duration in seconds. Implementations can grant the requested session duration or set a different duration, based on network policies or conditions.\n- When `qosStatus` is "REQUESTED", the value is the duration to be scheduled, granted by the implementation.\n- When `qosStatus` is AVAILABLE", the value is the overall duration since `startedAt. When the session is extended, the value is the new overall duration of the session.\n- When `qosStatus` is "UNAVAILABLE", the value is the overall effective duration since `startedAt` until the session was terminated.\n', + examples=[3600], + ge=1, + ), + ] + startedAt: Annotated[ + datetime | None, + Field( + description='Date and time when the QoS status became "AVAILABLE". Not to be returned when `qosStatus` is "REQUESTED". Format must follow RFC 3339 and must indicate time zone (UTC or local).', + examples=["2024-06-01T12:00:00Z"], + ), + ] = None + expiresAt: Annotated[ + datetime | None, + Field( + description='Date and time of the QoS session expiration. Format must follow RFC 3339 and must indicate time zone (UTC or local).\n- When `qosStatus` is "AVAILABLE", it is the limit time when the session is scheduled to finnish, if not terminated by other means.\n- When `qosStatus` is "UNAVAILABLE", it is the time when the session was terminated.\n- Not to be returned when `qosStatus` is "REQUESTED".\nWhen the session is extended, the value is the new expiration time of the session.\n', + examples=["2024-06-01T13:00:00Z"], + ), + ] = None + qosStatus: QosStatus + statusInfo: StatusInfo | None = None diff --git a/tests/common/test_invoke_oran_clients.py b/tests/common/test_invoke_oran_clients.py new file mode 100644 index 0000000000000000000000000000000000000000..d73f02e2fa742bd8cf57f7613dec4057d6659c1f --- /dev/null +++ b/tests/common/test_invoke_oran_clients.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +import pytest + +from sunrise6g_opensdk.common.sdk import Sdk as sdkclient + +ORAN_TEST_CASES = [ + { + "oran": { + "client_name": "i2cat_ric", + "base_url": "http://192.168.40.50:8105", + "scs_as_id": "scs-test", + } + }, +] + + +def id_func(val): + return val["oran"]["client_name"] + + +@pytest.mark.parametrize("adapter_specs", ORAN_TEST_CASES, ids=id_func) +def test_oran_platform_instantiation(adapter_specs): + """Test instantiation of ORAN platform adapters via Sdk.""" + try: + adapters = sdkclient.create_adapters_from(adapter_specs) + except ValueError: + # The factory may not yet expose the ORAN domain; accept as xfail for now + pytest.xfail("ORAN domain not wired in AdaptersFactory yet") + return + + assert "oran" in adapters + oran_client = adapters["oran"] + assert oran_client is not None + # Class name contains OranManager for i2cat_ric adapter + assert "OranManager" in str(type(oran_client)) diff --git a/tests/oran/test_cases.py b/tests/oran/test_cases.py new file mode 100644 index 0000000000000000000000000000000000000000..a86e3f498f2ac4118484208b9a4b72fe3536b4fb --- /dev/null +++ b/tests/oran/test_cases.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +test_cases = [ + { + "oran": { + "client_name": "i2cat_ric", + "base_url": "http://192.168.40.50:8105", + "scs_as_id": "scs", + } + }, +] diff --git a/tests/oran/test_qod_session.py b/tests/oran/test_qod_session.py new file mode 100644 index 0000000000000000000000000000000000000000..0c5db80b199734eb6eb69154fcad86e234c17730 --- /dev/null +++ b/tests/oran/test_qod_session.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +import time +from pprint import pformat + +import pytest + +from sunrise6g_opensdk.common.sdk import Sdk as sdkclient +from sunrise6g_opensdk.oran.core.base_oran_client import BaseOranClient +from sunrise6g_opensdk.oran.core.common import OranHttpError +from tests.oran.test_cases import test_cases + + +@pytest.fixture(scope="module", name="oran_client") +def instantiate_oran_client(request): + """Fixture to create and share an ORAN client across tests""" + adapter_specs = request.param + adapters = sdkclient.create_adapters_from(adapter_specs) + return adapters.get("oran") + + +def id_func(val): + return val["oran"]["client_name"] + + +@pytest.mark.parametrize("oran_client", test_cases, ids=id_func, indirect=True) +def test_create_wait_get_delete_then_missing(oran_client: BaseOranClient): + """ + Create a QoD policy, wait 10s, verify it exists, delete it, then + verify it is no longer retrievable. + """ + camara_session = { + "device": { + "ipv4Address": { + "publicAddress": "10.45.0.10", + "privateAddress": "10.45.0.10", + } + }, + "applicationServer": {"ipv4Address": "192.168.1.10"}, + "devicePorts": {"ranges": [{"from": 0, "to": 65535}]}, + "applicationServerPorts": {"ranges": [{"from": 0, "to": 65535}]}, + "qosProfile": "qos-s", + } + + # Create policy + print("\n===== [Test] CREATE QoD policy =====") + print("[Test] Payload:") + print(pformat(camara_session)) + response = oran_client.create_qod_session(camara_session) + print("\n----- [Test] Create response -----") + print(pformat(response)) + # Save CAMARA-style session info to enrich subsequent GET mapping + camara_session_info = dict(response) + policy_id = response.get("sessionId") or response.get("policy_id") or response.get("policyId") + assert policy_id, "Session ID not returned by create_qod_session" + + # Wait 10 seconds + print("\n===== [Test] WAIT before GET =====") + print("[Test] Sleeping 10s") + time.sleep(10) + + # Verify policy exists + try: + print("\n===== [Test] GET policy =====") + print(f"[Test] policy_id={policy_id}") + # Pass original CAMARA session info so GET can return CAMARA-compliant data + get_resp = oran_client.get_qod_session(policy_id, original_session=camara_session_info) + print("[Test] GET response:") + print(pformat(get_resp)) + except OranHttpError as e: + pytest.fail(f"Policy should exist before deletion: {e}") + + # Delete policy + try: + print("\n===== [Test] DELETE policy =====") + print(f"[Test] policy_id={policy_id}") + delete_resp = oran_client.delete_qod_session(policy_id) + # CAMARA r3.2 specifies 204 No Content for delete + assert delete_resp is None + except OranHttpError as e: + pytest.fail(f"Failed to delete oran policy: {e}") + + # Optional short wait to allow backend cleanup + print("\n===== [Test] WAIT after DELETE =====") + print("[Test] Sleeping 5s") + time.sleep(5) + + # Verify deletion (expect a failure on get) + print("\n===== [Test] GET after DELETE (expect error) =====") + print(f"[Test] policy_id={policy_id}") + with pytest.raises(OranHttpError) as excinfo: + oran_client.get_qod_session(policy_id) + print(f"[Test] Received expected error: {excinfo.value}") diff --git a/tests/oran/test_qod_session_notification.py b/tests/oran/test_qod_session_notification.py new file mode 100644 index 0000000000000000000000000000000000000000..d4bfe4d3976e0ed9767efc619b171b77f5635834 --- /dev/null +++ b/tests/oran/test_qod_session_notification.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +import json +import threading +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +from pprint import pformat +from socketserver import ThreadingMixIn +from typing import List + +import pytest + +from sunrise6g_opensdk.common.sdk import Sdk as sdkclient +from sunrise6g_opensdk.oran.core.base_oran_client import BaseOranClient +from sunrise6g_opensdk.oran.core.common import OranHttpError +from tests.oran.test_cases import test_cases + + +class _ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + daemon_threads = True + + +def _make_handler(storage: List[dict]): + class _Handler(BaseHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(length) if length > 0 else b"" + try: + payload = json.loads(body.decode("utf-8") or "{}") + except Exception: + payload = {"raw": body.decode("utf-8", errors="ignore")} + record = { + "path": self.path, + "headers": dict(self.headers), + "payload": payload, + "ts": time.time(), + } + storage.append(record) + # Verbose output of received callback + try: + print("\n----- [Notify] Incoming Callback -----") + print("[Notify] Received POST", record["path"]) # status set below + print("[Notify] Headers:", pformat(record["headers"])) + print("[Notify] Payload:", json.dumps(record["payload"], ensure_ascii=False)) + except Exception: + print("[Notify] Received callback at", record["path"]) # best-effort + self.send_response(204) + self.end_headers() + + def log_message(self, fmt, *args): + # Silence server logs during tests + return + + return _Handler + + +@pytest.fixture(scope="module") +def notification_server(): + """Spin up a tiny HTTP server to capture ORAN NEF callbacks. + + Binds to the host used in test_cases; callback URL is printed for visibility. + """ + host, port = "192.168.40.50", 40000 + received: List[dict] = [] + handler = _make_handler(received) + httpd = _ThreadingHTTPServer((host, port), handler) + thread = threading.Thread(target=httpd.serve_forever, daemon=True) + thread.start() + + server = { + "url": f"http://{host}:{port}/callback", + "received": received, + } + print("\n===== [Notify] SERVER START =====") + print(f"[Notify] Listening at {server['url']}") + try: + yield server + finally: + httpd.shutdown() + httpd.server_close() + thread.join(timeout=2) + + +@pytest.fixture(scope="module", name="oran_client") +def instantiate_oran_client(request): + """Fixture to create and share an ORAN client across tests""" + adapter_specs = request.param + adapters = sdkclient.create_adapters_from(adapter_specs) + return adapters.get("oran") + + +def id_func(val): + return val["oran"]["client_name"] + + +def _wait_for_callbacks(store: List[dict], min_new: int, timeout: float, start_len: int = 0): + deadline = time.time() + timeout + while time.time() < deadline: + if len(store) - start_len >= min_new: + print(f"[Notify] Callback(s) arrived: new={len(store) - start_len}, total={len(store)}") + return True + time.sleep(0.2) + return False + + +@pytest.mark.parametrize("oran_client", test_cases, ids=id_func, indirect=True) +def test_qos_session_notification(oran_client: BaseOranClient, notification_server): + """Create policy, wait, verify exists, delete, then expect deletion callback and missing on get.""" + camara_session = { + "device": { + "ipv4Address": { + "publicAddress": "10.45.0.10", + "privateAddress": "10.45.0.10", + } + }, + "applicationServer": {"ipv4Address": "192.168.1.10"}, + "devicePorts": {"ranges": [{"from": 0, "to": 65535}]}, + "applicationServerPorts": {"ranges": [{"from": 0, "to": 65535}]}, + "qosProfile": "qos-e", + "notificationDestination": notification_server["url"], + } + + print("\n===== [Test] CREATE QoD policy (with notifications) =====") + print("[Test] Payload:") + print(pformat(camara_session)) + start_len = len(notification_server["received"]) + response = oran_client.create_qod_session(camara_session) + print("\n----- [Test] Create response -----") + print(pformat(response)) + camara_session_info2 = dict(response) + session_id2 = response.get("sessionId") + assert session_id2, "Session ID not returned by create_qod_session" + + # Expect at least one callback shortly after creation + print("\n===== [Test] WAIT for creation callback =====") + assert _wait_for_callbacks( + notification_server["received"], 1, timeout=10, start_len=start_len + ), "Did not receive any callback after creation" + if len(notification_server["received"]) > start_len: + print("\n----- [Test] New callback(s) after create -----") + for rec in notification_server["received"][start_len:]: + print(pformat(rec)) + try: + transformed = oran_client.notification_to_camara_session( + rec["payload"], original_session=camara_session_info2 + ) + print("[Notify] Transformed to CAMARA:") + print(pformat(transformed)) + except Exception as exc: + print(f"[Notify] Transform error: {exc}") + + # Wait 10 seconds and verify exists + print("\n===== [Test] WAIT before GET =====") + print("[Test] Sleeping 10s") + time.sleep(10) + print("\n===== [Test] GET policy =====") + print(f"[Test] session_id={session_id2}") + try: + get_resp = oran_client.get_qod_session(session_id2, original_session=camara_session_info2) + print("\n----- [Test] GET response -----") + print(pformat(get_resp)) + except OranHttpError as e: + pytest.fail(f"Policy should exist before deletion: {e}") + + # Delete policy + print("\n===== [Test] DELETE policy =====") + print(f"[Test] session_id={session_id2}") + delete_resp = oran_client.delete_qod_session(session_id2) + assert delete_resp is None + + # Give time for deletion callback to arrive + print("\n===== [Test] WAIT for deletion callback =====") + print("[Test] Sleeping 10s") + time.sleep(10) + + # Expect an additional callback (e.g., deletion) + + assert _wait_for_callbacks( + notification_server["received"], 2, timeout=10, start_len=start_len + ), "Did not receive post-deletion callback" + if len(notification_server["received"]) > start_len: + print("\n----- [Test] Callbacks collected -----") + for rec in notification_server["received"][start_len:]: + print(pformat(rec)) + try: + transformed = oran_client.notification_to_camara_session( + rec["payload"], original_session=camara_session_info2 + ) + print("[Notify] Transformed to CAMARA:") + print(pformat(transformed)) + except Exception as exc: + print(f"[Notify] Transform error: {exc}") + + # Verify policy is gone + print("\n===== [Test] GET after DELETE (expect error) =====") + print(f"[Test] session_id={session_id2}") + with pytest.raises(OranHttpError): + oran_client.get_qod_session(session_id2) diff --git a/tests/oran/test_qod_session_with_duration.py b/tests/oran/test_qod_session_with_duration.py new file mode 100644 index 0000000000000000000000000000000000000000..5d6bd86ce75c5554557e1b50f7e8aebb22575f82 --- /dev/null +++ b/tests/oran/test_qod_session_with_duration.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +import time +from pprint import pformat + +import pytest + +from sunrise6g_opensdk.common.sdk import Sdk as sdkclient +from sunrise6g_opensdk.oran.core.base_oran_client import BaseOranClient +from sunrise6g_opensdk.oran.core.common import OranHttpError +from tests.oran.test_cases import test_cases + + +@pytest.fixture(scope="module", name="oran_client") +def instantiate_oran_client(request): + """Fixture to create and share an ORAN client across tests""" + adapter_specs = request.param + adapters = sdkclient.create_adapters_from(adapter_specs) + return adapters.get("oran") + + +def id_func(val): + return val["oran"]["client_name"] + + +@pytest.mark.parametrize("oran_client", test_cases, ids=id_func, indirect=True) +def test_qod_policy_lifecycle_with_expiry(oran_client: BaseOranClient): + """ + Create a policy with a short duration, confirm it exists, then + wait for expiry and confirm it no longer exists. + """ + duration_seconds = 15 + camara_session = { + "duration": duration_seconds, + "device": { + "ipv4Address": { + "publicAddress": "10.45.0.10", + "privateAddress": "10.45.0.10", + } + }, + "applicationServer": {"ipv4Address": "192.168.1.10"}, + "devicePorts": {"ranges": [{"from": 0, "to": 65535}]}, + "applicationServerPorts": {"ranges": [{"from": 0, "to": 65535}]}, + "qosProfile": "qos-e", + } + + # Create policy + print("\n===== [Test] CREATE QoD policy =====") + print("[Test] Payload:") + print(pformat(camara_session)) + response = oran_client.create_qod_session(camara_session) + print("\n----- [Test] Create response -----") + print(pformat(response)) + session_id = response.get("sessionId") + assert session_id, "Session ID not returned by create_qod_session" + + # Immediately check it exists + try: + print("\n===== [Test] GET after create =====") + # Provide original CAMARA response to enrich GET mapping + get_resp = oran_client.get_qod_session(session_id, original_session=dict(response)) + print(f"[Test] policy_id={session_id}") + print(pformat(get_resp)) + except OranHttpError as e: + pytest.fail(f"Policy should exist right after creation: {e}") + + # Wait slightly longer than the duration to ensure expiry + buffer_seconds = 5 + wait_secs = duration_seconds + buffer_seconds + print("\n===== [Test] WAIT for expiry =====") + print(f"[Test] Sleeping {wait_secs}s") + time.sleep(wait_secs) + + # After expiry, the policy should not exist anymore (expect OranHttpError/404) + print("\n===== [Test] GET after expiry (expect error) =====") + print(f"[Test] session_id={session_id}") + with pytest.raises(OranHttpError) as excinfo: + oran_client.get_qod_session(session_id) + print(f"[Test] Received expected error: {excinfo.value}") diff --git a/tests/oran/test_qod_session_with_duration_notification.py b/tests/oran/test_qod_session_with_duration_notification.py new file mode 100644 index 0000000000000000000000000000000000000000..ecd2df56183b56076aa8a200d1b96df86040fa75 --- /dev/null +++ b/tests/oran/test_qod_session_with_duration_notification.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- +import json +import threading +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +from pprint import pformat +from socketserver import ThreadingMixIn +from typing import List + +import pytest + +from sunrise6g_opensdk.common.sdk import Sdk as sdkclient +from sunrise6g_opensdk.oran.core.base_oran_client import BaseOranClient +from sunrise6g_opensdk.oran.core.common import OranHttpError +from tests.oran.test_cases import test_cases + + +class _ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + daemon_threads = True + + +def _make_handler(storage: List[dict]): + class _Handler(BaseHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(length) if length > 0 else b"" + try: + payload = json.loads(body.decode("utf-8") or "{}") + except Exception: + payload = {"raw": body.decode("utf-8", errors="ignore")} + record = { + "path": self.path, + "headers": dict(self.headers), + "payload": payload, + "ts": time.time(), + } + storage.append(record) + # Verbose output of received callback + try: + print("\n----- [Notify] Incoming Callback -----") + print("[Notify] Received POST", record["path"]) # status set below + print("[Notify] Headers:", pformat(record["headers"])) + print("[Notify] Payload:", json.dumps(record["payload"], ensure_ascii=False)) + except Exception: + print("[Notify] Received callback at", record["path"]) # best-effort + self.send_response(204) + self.end_headers() + + def log_message(self, fmt, *args): + # Silence server logs during tests + return + + return _Handler + + +@pytest.fixture(scope="module") +def notification_server(): + """Spin up a tiny HTTP server to capture ORAN NEF callbacks. + + Binds to the host used in test_cases; callback URL is printed for visibility. + """ + host, port = "192.168.40.50", 40000 + received: List[dict] = [] + handler = _make_handler(received) + httpd = _ThreadingHTTPServer((host, port), handler) + thread = threading.Thread(target=httpd.serve_forever, daemon=True) + thread.start() + + server = { + "url": f"http://{host}:{port}/callback", + "received": received, + } + print("\n===== [Notify] SERVER START =====") + print(f"[Notify] Listening at {server['url']}") + try: + yield server + finally: + httpd.shutdown() + httpd.server_close() + thread.join(timeout=2) + + +@pytest.fixture(scope="module", name="oran_client") +def instantiate_oran_client(request): + """Fixture to create and share an ORAN client across tests""" + adapter_specs = request.param + adapters = sdkclient.create_adapters_from(adapter_specs) + return adapters.get("oran") + + +def id_func(val): + return val["oran"]["client_name"] + + +def _wait_for_callbacks(store: List[dict], min_new: int, timeout: float, start_len: int = 0): + deadline = time.time() + timeout + while time.time() < deadline: + if len(store) - start_len >= min_new: + print(f"[Notify] Callback(s) arrived: new={len(store) - start_len}, total={len(store)}") + return True + time.sleep(0.2) + return False + + +@pytest.mark.parametrize("oran_client", test_cases, ids=id_func, indirect=True) +def test_qod_session_with_duration_notification(oran_client: BaseOranClient, notification_server): + """Create a short-lived policy and expect at least one callback, then expiry notification.""" + duration_seconds = 15 + camara_session = { + "duration": duration_seconds, + "device": { + "ipv4Address": { + "publicAddress": "10.45.0.10", + "privateAddress": "10.45.0.10", + } + }, + "applicationServer": {"ipv4Address": "192.168.1.10"}, + "devicePorts": {"ranges": [{"from": 0, "to": 65535}]}, + "applicationServerPorts": {"ranges": [{"from": 0, "to": 65535}]}, + "qosProfile": "qos-e", + "notificationDestination": notification_server["url"], + } + + print("\n===== [Test] CREATE QoD policy (with notifications) =====") + print("[Test] Payload:") + print(pformat(camara_session)) + start_len = len(notification_server["received"]) + response = oran_client.create_qod_session(camara_session) + print("\n----- [Test] Create response -----") + print(pformat(response)) + camara_session_info = dict(response) + session_id = response.get("sessionId") + assert session_id, "Session ID not returned by create_qod_session" + + # Expect at least one callback shortly after creation + print("\n===== [Test] WAIT for creation callback =====") + assert _wait_for_callbacks( + notification_server["received"], 1, timeout=10, start_len=start_len + ), "Did not receive any callback after creation" + if len(notification_server["received"]) > start_len: + print("\n----- [Test] New callback(s) after create -----") + for rec in notification_server["received"][start_len:]: + print(pformat(rec)) + try: + transformed = oran_client.notification_to_camara_session( + rec["payload"], original_session=camara_session_info + ) + print("[Notify] Transformed to CAMARA:") + print(pformat(transformed)) + except Exception as exc: + print(f"[Notify] Transform error: {exc}") + + # Wait until after expiry + wait_secs = duration_seconds + 5 + print("\n===== [Test] WAIT for expiry =====") + print(f"[Test] Sleeping {wait_secs}s") + time.sleep(wait_secs) + + # After expiry, policy should be gone + print("\n===== [Test] GET after expiry (expect error) =====") + print(f"[Test] session_id={session_id}") + with pytest.raises(OranHttpError): + oran_client.get_qod_session(session_id) + + # Expect at least one additional callback (e.g., expiry) + print("\n===== [Test] WAIT for expiry callback =====") + assert _wait_for_callbacks( + notification_server["received"], 2, timeout=10, start_len=start_len + ), "Did not receive post-expiry callback" + if len(notification_server["received"]) > start_len: + print("\n----- [Test] Callbacks collected -----") + for rec in notification_server["received"][start_len:]: + print(pformat(rec)) + try: + transformed = oran_client.notification_to_camara_session( + rec["payload"], original_session=camara_session_info + ) + print("[Notify] Transformed to CAMARA:") + print(pformat(transformed)) + except Exception as exc: + print(f"[Notify] Transform error: {exc}")