Commit 626122a8 authored by Giulio Carota's avatar Giulio Carota
Browse files

feat: QoD support in oai client

parent acb19b76
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@ pydantic==2.10.6
pydantic_core==2.27.2
pytest==8.3.2
requests==2.32.3
shortuuid==1.0.13
tomli==2.2.1
typing_extensions==4.12.2
urllib3==2.3.0
+129 −0
Original line number Diff line number Diff line
from typing import Dict
from src import logger
import shortuuid
import time
from pydantic import ValidationError
from src.network.core.network_interface import NetworkManagementInterface
from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription
from src.network.clients.oai.common import (
    oai_as_session_with_qos_post,
    oai_as_session_with_qos_get,
    oai_as_session_with_qos_delete,
    OaiHttpError,
    OaiNetworkError
)

from src.network.clients.oai.utils import camara_qod_to_as_session_with_qos, as_session_with_qos_to_camara_qod

log = logger.get_logger(__name__)

class OaiNefClient(NetworkManagementInterface):
    def __init__(self, base_url: str, scs_as_id: str = None):
        """
        Initialize Network Client for OAI Core Network
        The currently supported features are:
         - QoD
         - Traffic Influence
        """
        try:
            super().__init__()
            self.base_url = base_url
            self.scs_as_id = shortuuid.uuid()
            log.info(f"Initialized OaiNefClient with base_url: {self.base_url} and scs_as_id: {self.scs_as_id}")
        except Exception as e:
            log.error(f"Failed to initialize OaiNefClient: {e}")
            raise e

    #implementation of the NetworkManagementInterface QoD Methods
    def create_qod_session(self, session_info: Dict) -> Dict:
        """
        Creates a QoS session based on CAMARA QoD API input.
        It maps CAMARA QoD API POST /sessions to
        OAI NEF POST /3gpp-as-session-with-qos/v1/{scs_as_id}/subscriptions
        """
        try:
            qod_input = CamaraQoDSessionInfo(**session_info)

            #convert CAMARA QoD to NEF AsSessionWithQos model and do POST
            nef_req = camara_qod_to_as_session_with_qos(qod_input)
            nef_res = oai_as_session_with_qos_post(self.base_url, self.scs_as_id, nef_req)

            #retrieve the NEF resource id
            if "self" in nef_res.keys():
                nef_url = nef_res["self"]
                nef_id = nef_url.split("subscriptions/")[1]
            else:
                raise OaiNetworkError("No valid ID for the created resource was returned")

            #create QoD session detail and return info with resource Id
            qod_input.sessionId = nef_id

            log.info(f"QoD session activated successfully [id={nef_id}]")

            return qod_input

        except ValidationError as e:
            raise OaiNetworkError("Could not validate QoD Session Info data") from e
        except KeyError as e:
            raise OaiNetworkError(f"Missing field in QoD Session Info data: {e}") from e
        except OaiHttpError as e:
            raise OaiNetworkError(f"The network could not enable the QoD Session. It returned {e}") from e
        except OaiNetworkError as e:
            raise


    def get_qod_session(self, session_id: str) -> Dict:
        """
        Retrieves details of a specific Quality on Demand (QoS) session.
        It maps CAMARA QoD API GET /sessions/{sessionId} to
        OAI NEF GET /3gpp-as-session-with-qos/v1/{scs_as_id}/subscriptions/{subscriptionId}
        """
        try:
            res = oai_as_session_with_qos_get(self.base_url, self.scs_as_id, session_id=session_id)
            nef_res = OaiAsSessionWithQosSubscription(**res)
            qod_info = as_session_with_qos_to_camara_qod(nef_res)

            log.info(f"QoD session retrived successfully [id={session_id}]")

            return qod_info
        except ValidationError as e:
            raise OaiNetworkError("Could not validate network response data") from e
        except OaiHttpError as e:
            raise OaiNetworkError(f"The network could not enable the QoD Session. It returned {e}") from e
        except OaiNetworkError as e:
            raise

    def delete_qod_session(self, session_id: str) -> None:
        """
        Deletes a specific Quality on Demand (QoS) session.
        It maps CAMARA QoD API DELETE /sessions/{sessionId} to
        OAI NEF DELETE /3gpp-as-session-with-qos/v1/{scs_as_id}/subscriptions/{subscriptionId}
        """
        try:
            oai_as_session_with_qos_delete(self.base_url, self.scs_as_id, session_id=session_id)

            log.info(f"QoD session deleted successfully [id={session_id}]")

        except OaiHttpError as e:
            raise OaiNetworkError(f"The network could not enable the QoD Session. It returned {e}") from e
        except OaiNetworkError as e:
            raise

    #implementation of the NetworkManagementInterface Traffic Influence Methods
    def create_traffic_influence_resource(self, traffic_influence_info):

        log.error(f"create_traffic_influence_resource not implemented yet")

        raise NotImplementedError()

    def delete_traffic_influence_resource(self, resource_id):

        log.error(f"delete_traffic_influence_resource not implemented yet")

        raise NotImplementedError()

    def get_traffic_influence_resource(self, resource_id):

        log.error(f"get_traffic_influence_resource not implemented yet")

        raise NotImplementedError()
 No newline at end of file
+56 −0
Original line number Diff line number Diff line
from pydantic import BaseModel
from src.network.clients.errors import NetworkPlatformError

import json
import requests

def _make_request(method: str, url: str, data=None):
    try:
        headers = None
        if method == 'POST':
            headers = {
                "Content-Type": "application/json",
                "accept": "application/json",
            }
        elif method == 'GET':
            headers = {
                "accept": "application/json",
            }
        response = requests.request(method, url, headers=headers, data=data)
        response.raise_for_status()
        if response.content:
            return response.json()
    except requests.exceptions.HTTPError as e:
        raise OaiHttpError(e) from e
    except requests.exceptions.ConnectionError as e:
        raise OaiHttpError("connection error") from e


#QoD methods
def oai_as_session_with_qos_post(base_url: str, scs_as_id: str, model_payload: BaseModel) -> dict:
    data = model_payload.model_dump_json(exclude_none=True)
    url = oai_as_session_with_qos_build_url(base_url, scs_as_id)
    return _make_request("POST", url, data=data)


def oai_as_session_with_qos_get(base_url: str, scs_as_id: str, session_id: str) -> dict:
    url = oai_as_session_with_qos_build_url(base_url, scs_as_id, session_id)
    return _make_request("GET", url)


def oai_as_session_with_qos_delete(base_url: str, scs_as_id: str, session_id: str):
    url = oai_as_session_with_qos_build_url(base_url, scs_as_id, session_id)
    return _make_request("DELETE", url)

def oai_as_session_with_qos_build_url(base_url: str, scs_as_id: str, session_id: str = None):
    url = f"{base_url}/3gpp-as-session-with-qos/v1/{scs_as_id}/subscriptions"
    if session_id != None and len(session_id) > 0:
        return f"{url}/{session_id}"
    else:
        return url

class OaiHttpError(Exception):
    pass

class OaiNetworkError(NetworkPlatformError):
    pass
 No newline at end of file
+110 −0
Original line number Diff line number Diff line
from pydantic import BaseModel, Field, AnyHttpUrl
from typing import List, Optional


class Snssai(BaseModel):
    sst: int = Field(default=1)
    sd: str = Field(default="FFFFFF")

class FlowInfoItem(BaseModel):
    flowId: int
    flowDescriptions: List[str]

class OaiAsSessionWithQosSubscription(BaseModel):
    """
    Represents the model to create an AsSessionWithQoS resource inside the OAI NEF.
    """
    supportedFeatures: str = Field(default="12")
    dnn: str = Field(default="oai")
    snssai: Snssai
    flowInfo: List[FlowInfoItem]
    ueIpv4Addr: str
    notificationDestination: str
    qosReference: str
    self: Optional[str] = None
    qosDuration: Optional[int] = None

    def add_flow_descriptor(self, flow_desriptor: str):
        self.flowInfo = list()
        self.flowInfo.append(FlowInfoItem(
            flowId=len(self.flowInfo)+1,
            flowDescriptions=[flow_desriptor]
        ))

    def add_snssai(self, sst: int, sd: str = None):
        self.snssai = Snssai(sst=sst, sd=sd)

class PortRange(BaseModel):
    from_: int = Field(alias="from")
    to: int

    class Config:
        populate_by_name = True

class Ports(BaseModel):
    ranges: Optional[List[PortRange]] = None
    ports: Optional[List[int]] = None

class Ipv4Address(BaseModel):
    publicAddress: str
    publicPort: Optional[int] = None

class Device(BaseModel):
    phoneNumber: Optional[str] = None
    networkAccessIdentifier: Optional[str] = None
    ipv4Address: Optional[Ipv4Address] = None
    ipv6Address: Optional[str] = None

class ApplicationServer(BaseModel):
    ipv4Address: Optional[str] = None
    ipv6Address: Optional[str] = None

class SinkCredential(BaseModel):
    credentialType: Optional[str] = None

class CamaraQoDSessionInfo(BaseModel):
    """
    Represents the input data for creating a QoD session.
    """
    duration: int
    qosProfile: str
    applicationServer: ApplicationServer

    device: Optional[Device] = None
    devicePorts: Optional[Ports] = None
    applicationServerPorts: Optional[Ports] = None
    sink: Optional[str] = None
    sinkCredential: Optional[SinkCredential] = None

    #fields only applicable to sessionInfo in responses:
    sessionId: Optional[str] = None
    startedAt: Optional[int] = None
    expiresAt: Optional[int] = None
    qosStatus: Optional[str] = None
    statusInfo: Optional[str] = None


    class Config:
        populate_by_name = True

    def retrieve_ue_ipv4(self):
        if self.device is not None and self.device.ipv4Address is not None:
            return self.device.ipv4Address.publicAddress
        else:
            raise KeyError("device.ipv4Address.publicAddress")

    def retrieve_app_ipv4(self):
        if self.applicationServer.ipv4Address is not None:
            return self.applicationServer.ipv4Address
        else:
            raise KeyError("applicationServer.ipv4Address")

    def add_server_ipv4(self, ipv4: str):
        self.applicationServer = ApplicationServer(ipv4Address = ipv4)


    def add_ue_ipv4(self, ipv4: str):
        if self.device is None:
            self.device = Device()
        if self.device.ipv4Address is None:
            self.device.ipv4Address = Ipv4Address(publicAddress=ipv4)
 No newline at end of file
+44 −0
Original line number Diff line number Diff line
from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription
from pydantic import BaseModel

def camara_qod_to_as_session_with_qos(qod_input: CamaraQoDSessionInfo) -> OaiAsSessionWithQosSubscription :
    device_ip = qod_input.retrieve_ue_ipv4()
    server_ip = qod_input.retrieve_app_ipv4()

    # Extract callback sink and QoS profile
    sink_url = qod_input.sink
    qos_profile = qod_input.qosProfile

    #build flow descriptor in oai format using device ip and server ip
    flow_descriptor = f"permit out ip from {device_ip}/32 to {server_ip}/32"

    #create the nef request model
    nef_req = OaiAsSessionWithQosSubscription.construct()
    nef_req.ueIpv4Addr = device_ip
    nef_req.notificationDestination = sink_url
    nef_req.add_flow_descriptor(flow_desriptor=flow_descriptor)
    nef_req.qosReference = qos_profile
    nef_req.add_snssai(1, "FFFFFF")

    #the qos duration feature is not available yet in oai
    #nef_req.qosDuration = qod_input.duration

    return nef_req


def as_session_with_qos_to_camara_qod(nef_input: OaiAsSessionWithQosSubscription) -> CamaraQoDSessionInfo :
    #create the camara qod model

    qod_info = CamaraQoDSessionInfo.construct()

    flowDesc = nef_input.flowInfo[0].flowDescriptions[0]
    serverIp = flowDesc.split("to ")[1].split("/32")[0]

    qod_info.add_server_ipv4(serverIp)
    qod_info.qosProfile = nef_input.qosReference
    qod_info.add_ue_ipv4(nef_input.ueIpv4Addr)
    qod_info.sink = nef_input.notificationDestination
    qod_info.duration = nef_input.qosDuration

    return qod_info