Commit bf27b2c1 authored by Adrian Pino's avatar Adrian Pino Committed by GitHub
Browse files

Merge pull request #66 from SunriseOpenOperatorPlatform/refactor/network-logic-from-clients-to-core

Refactor/network logic from clients to core
parents 939c7c21 c3efbd9c
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ prompt_toolkit==3.0.50
ptyprocess==0.7.0
pure_eval==0.2.3
pydantic==2.10.6
pydantic-extra-types==2.10.3
pydantic_core==2.27.2
Pygments==2.19.1
pyproject_hooks==1.2.0
+36 −4
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
from typing import Dict

from pydantic import ValidationError
from src import logger
from src.network.core.network_interface import NetworkManagementInterface
from src.network.core.network_interface import NetworkManagementInterface, build_flows
from ...core import common
from ...core import schemas

log = logger.get_logger(__name__)

flow_id_mapping = {"qos-e": 3, "qos-s": 4, "qos-m": 5, "qos-l": 6}


class NetworkManager(NetworkManagementInterface):
    """
@@ -34,13 +39,38 @@ class NetworkManager(NetworkManagementInterface):
            log.error(f"Failed to initialize Open5GSClient: {e}")
            raise e

    def core_specific_validation(self, session_info: schemas.CreateSession):
        if session_info.qosProfile not in flow_id_mapping.keys():
            raise ValidationError(
                f"Open5Gs only supports these qos-profiles: {', '.join(flow_id_mapping.keys())}"
            )

    def add_core_specific_parameters(
        self, session_info: schemas.AsSessionWithQoSSubscription
    ) -> None:
        session_info.supportedFeatures = schemas.SupportedFeatures("003C")
        flow_id = flow_id_mapping[session_info.qosProfile]
        session_info.flowInfo = build_flows(flow_id, session_info)

    # --- Implementation of NetworkManagementInterface methods ---
    def create_qod_session(self, session_info: Dict) -> Dict:
        """
        Creates a QoD session based on the CAMARA QoD API input.
        Maps the CAMARA QoD POST /sessions to Open5GS NEF POST /{scsAsId}/subscriptions.
        """
        pass
        url = f"{self.base_url}/{self.scs_as_id}/subscriptions"
        # Raises ValidationError if the object is not valid.
        valid_session_info = schemas.CreateSession.model_validate(session_info)

        subscription = schemas.AsSessionWithQoSSubscription(
            notificationDestination=valid_session_info.sink,
            qosReference=valid_session_info.qosProfile,
            ueIpv4Addr=valid_session_info.device.ipv4Address,
            ueIpv6Addr=valid_session_info.device.ipv6Address,
            usageThreshold=schemas.UsageThreshold(duration=valid_session_info.duration),
        )
        self.add_core_specific_parameters(subscription)
        common.open5gs_post(url, subscription)

    def get_qod_session(self, session_id: str) -> Dict:
        """
@@ -48,7 +78,8 @@ class NetworkManager(NetworkManagementInterface):
        Maps CAMARA QoD GET /sessions/{sessionId} to Open5GS NEF GET /
        {scsAsId}/subscriptions/{subscriptionId}.
        """
        pass
        url = f"{self.base_url}/{self.scs_as_id}/subscriptions/{session_id}"
        common.open5gs_get(url)

    def delete_qod_session(self, session_id: str) -> None:
        """
@@ -56,7 +87,8 @@ class NetworkManager(NetworkManagementInterface):
        Maps CAMARA QoD DELETE /sessions/{sessionId} to Open5GS NEF DELETE /
        {scsAsId}/subscriptions/{subscriptionId}.
        """
        pass
        url = f"{self.base_url}/{self.scs_as_id}/subscriptions/{session_id}"
        common.open5gs_delete(url)


# Note:
+0 −40
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
# Common utilities (errors, HTTP helpers) used by the Open5GS client implementation (client.py).
from typing import Optional

from pydantic import BaseModel

from src import logger
from src.network.clients.errors import NetworkPlatformError

log = logger.get_logger(__name__)


class Open5GSError(NetworkPlatformError):
    pass


class Open5GSErrorResponse(BaseModel):
    message: str
    detail: dict


# --- HTTP Request Helper Functions ---
def open5gs_post(url: str, model_payload: BaseModel) -> dict:
    """
    Placeholder for the POST request function."""
    pass


def open5gs_get(url: str, params: Optional[dict] = None) -> dict:
    """
    Placeholder for the GET request function.
    """
    pass


def open5gs_delete(url: str) -> None:
    """
    Placeholder for the DELETE request function.
    """
    pass
+0 −23
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
# This file defines the Pydantic models that represent the data structures (schemas)
# for the requests sent to and responses received from the Open5GS NEF API,
# specifically focusing on the APIs needed to support CAMARA QoD.

from pydantic import BaseModel


# Dummy examples of Pydantic models for the Open5GS NEF API.
class Open5GSQoSSubscription(BaseModel):
    """
    Represents the payload for creating a QoS subscription in Open5GS.
    """

    pass


class CamaraQoDSessionInfo(BaseModel):
    """
    Represents the input data for creating a QoD session.
    """

    pass
+64 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
# Common utilities (errors, HTTP helpers) used by the Open5GS client implementation (client.py).

from pydantic import BaseModel
import requests

from src import logger

log = logger.get_logger(__name__)


def _make_request(method: str, url: str, data=None):
    try:
        headers = None
        if method == "POST" or method == "PUT":
            headers = {
                "Content-Type": "application/json",
                "accept": "application/json",
            }
        elif method == "GET":
            headers = {
                "accept": "application/json",
            }
        response = requests.request(method, url, headers=headers, data=data)
        response.raise_for_status()
        if response.content:
            return response.json()
    except requests.exceptions.HTTPError as e:
        raise CoreHttpError(e) from e
    except requests.exceptions.ConnectionError as e:
        raise CoreHttpError("connection error") from e


# QoD methods
def as_session_with_qos_post(
    base_url: str, scs_as_id: str, model_payload: BaseModel
) -> dict:
    data = model_payload.model_dump_json(exclude_none=True)
    url = as_session_with_qos_build_url(base_url, scs_as_id)
    return _make_request("POST", url, data=data)


def as_session_with_qos_get(base_url: str, scs_as_id: str, session_id: str) -> dict:
    url = as_session_with_qos_build_url(base_url, scs_as_id, session_id)
    return _make_request("GET", url)


def as_session_with_qos_delete(base_url: str, scs_as_id: str, session_id: str):
    url = as_session_with_qos_build_url(base_url, scs_as_id, session_id)
    return _make_request("DELETE", url)


def as_session_with_qos_build_url(
    base_url: str, scs_as_id: str, session_id: str = None
):
    url = f"{base_url}/3gpp-as-session-with-qos/v1/{scs_as_id}/subscriptions"
    if session_id is not None and len(session_id) > 0:
        return f"{url}/{session_id}"
    else:
        return url


class CoreHttpError(Exception):
    pass
Loading