Commit 66abc4a0 authored by Adrian Pino's avatar Adrian Pino Committed by GitHub
Browse files

Merge pull request #71 from SunriseOpenOperatorPlatform/feature/add-network-oai

Traffic Influence support and improvements
parents d36ee4ca f1f0b696
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -66,6 +66,7 @@ pyzmq==26.4.0
referencing==0.36.2
requests==2.32.3
rpds-py==0.24.0
shortuuid==1.0.13
six==1.17.0
soupsieve==2.6
stack-data==0.6.3
+126 −17
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
from typing import Dict
##
# Copyright (c) 2025 Netsoft Group, EURECOM.
# All rights reserved.
#
# This file is part of the Open SDK
#
# Contributors:
#   - Giulio Carota (giulio.carota@eurecom.fr)
##


from src import logger
from src.network.core.network_interface import NetworkManagementInterface
from src.network.core.schemas import (
    AsSessionWithQoSSubscription,
    CreateSession,
    CreateTrafficInfluence,
    FlowInfo,
    Snssai,
    TrafficInfluSub,
)

log = logger.get_logger(__name__)
supportedQos = ["qos-e", "qos-s", "qos-m", "qos-l"]


# Placeholder for the OAI Network Management Client
class NetworkManager(NetworkManagementInterface):
    def __init__(self, base_url: str, scs_as_id: str):
        pass
    def __init__(self, base_url: str, scs_as_id: str = None):
        """
        Initialize Network Client for OAI Core Network
        The currently supported features are:
         - QoD
         - Traffic Influence
        """
        try:
            super().__init__()
            self.base_url = base_url
            self.scs_as_id = scs_as_id
            log.info(
                f"Initialized OaiNefClient with base_url: {self.base_url} and scs_as_id: {self.scs_as_id}"
            )

    def create_qod_session(self, session_info: Dict) -> Dict:
        pass
        except Exception as e:
            log.error(f"Failed to initialize OaiNefClient: {e}")
            raise e

    def get_qod_session(self, session_id: str) -> Dict:
        pass
    def core_specific_qod_validation(self, session_info: CreateSession):
        """
        Validates core-specific parameters for the session creation.

    def delete_qod_session(self, session_id: str) -> None:
        pass
        args:
            session_info: The session information to validate.

        raises:
            ValidationError: If the session information does not meet core-specific requirements.
        """
        if session_info.qosProfile.root not in supportedQos:
            raise OaiValidationError(
                f"QoS profile {session_info.qosProfile} not supported by OAI, supported profiles are {supportedQos}"
            )

# Note:
# As this class is inheriting from NetworkManagementInterface, it is
# expected to implement all the abstract methods defined in that interface.
#
# In case this network adapter doesn't support a specific method, it should
# be marked as NotImplementedError.
        if session_info.device is None or session_info.device.ipv4Address is None:
            raise OaiValidationError("OAI requires UE IPv4 Address to activate QoS")

        if session_info.applicationServer.ipv4Address is None:
            raise OaiValidationError("OAI requires App IPv4 Address to activate QoS")
        return

    def add_core_specific_qod_parameters(
        self,
        session_info: CreateSession,
        subscription: AsSessionWithQoSSubscription,
    ) -> None:
        device_ip = _retrieve_ue_ipv4(session_info)
        server_ip = _retrieve_app_ipv4(session_info)

        # build flow descriptor in oai format using device ip and server ip
        flow_descriptor = f"permit out ip from {device_ip}/32 to {server_ip}/32"
        _add_qod_flow_descriptor(subscription, flow_descriptor)
        _add_qod_snssai(subscription, 1, "FFFFFF")
        subscription.dnn = "oai"

    def add_core_specific_ti_parameters(
        self,
        traffic_influence_info: CreateTrafficInfluence,
        subscription: TrafficInfluSub,
    ):
        # todo oai add dnn, ssnai, afServiceId
        subscription.dnn = "oai"
        subscription.add_snssai(1, "FFFFFF")
        subscription.afServiceId = self.scs_as_id

    def core_specific_traffic_influence_validation(
        self, traffic_influence_info: CreateTrafficInfluence
    ) -> None:
        """
        Validates core-specific parameters for the session creation.

        args:
            session_info: The session information to validate.

        raises:
            ValidationError: If the session information does not meet core-specific requirements.
        """
        # Placeholder for core-specific validation logic
        # This method should be overridden by subclasses if needed

        if (
            traffic_influence_info.device is None
            or traffic_influence_info.device.ipv4Address is None
        ):
            raise OaiValidationError(
                "OAI requires UE IPv4 Address to activate Traffic Influence"
            )


def _retrieve_ue_ipv4(session_info: CreateSession):
    return session_info.device.ipv4Address.root.privateAddress


def _retrieve_app_ipv4(session_info: CreateSession):
    return session_info.applicationServer.ipv4Address


def _add_qod_flow_descriptor(
    qos_sub: AsSessionWithQoSSubscription, flow_desriptor: str
):
    qos_sub.flowInfo = list()
    qos_sub.flowInfo.append(
        FlowInfo(flowId=len(qos_sub.flowInfo) + 1, flowDescriptions=[flow_desriptor])
    )


def _add_qod_snssai(qos_sub: AsSessionWithQoSSubscription, sst: int, sd: str = None):
    qos_sub.snssai = Snssai(sst=sst, sd=sd)


class OaiValidationError(Exception):
    pass
+31 −1
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
# Common utilities (errors, HTTP helpers) used by the Open5GS client implementation (client.py).
# Common utilities (errors, HTTP helpers) used by the core network interface (network_interface.py).

import requests
from pydantic import BaseModel
@@ -60,5 +60,35 @@ def as_session_with_qos_build_url(
        return url


# Traffic Influence Methods
def traffic_influence_post(
    base_url: str, scs_as_id: str, model_payload: BaseModel
) -> dict:
    data = model_payload.model_dump_json(exclude_none=True)
    url = traffic_influence_build_url(base_url, scs_as_id)
    return _make_request("POST", url, data=data)


def traffic_influence_delete(base_url: str, scs_as_id: str, session_id: str):
    url = traffic_influence_build_url(base_url, scs_as_id, session_id)
    return _make_request("DELETE", url)


def traffic_influence_put(
    base_url: str, scs_as_id: str, session_id: str, model_payload: BaseModel
) -> dict:
    data = model_payload.model_dump_json(exclude_none=True)
    url = traffic_influence_build_url(base_url, scs_as_id, session_id)
    return _make_request("PUT", url, data=data)


def traffic_influence_build_url(base_url: str, scs_as_id: str, session_id: str = None):
    url = f"{base_url}/3gpp-traffic-influence/v1/{scs_as_id}/subscriptions"
    if session_id is not None and len(session_id) > 0:
        return f"{url}/{session_id}"
    else:
        return url


class CoreHttpError(Exception):
    pass
+122 −6
Original line number Diff line number Diff line
@@ -78,7 +78,7 @@ class NetworkManagementInterface(ABC):
    scs_as_id: str

    @abstractmethod
    def add_core_specific_parameters(
    def add_core_specific_qod_parameters(
        self,
        session_info: schemas.CreateSession,
        subscription: schemas.AsSessionWithQoSSubscription,
@@ -90,7 +90,19 @@ class NetworkManagementInterface(ABC):
        pass

    @abstractmethod
    def core_specific_validation(self, session_info: schemas.CreateSession) -> None:
    def add_core_specific_ti_parameters(
        self,
        traffic_influence_info: schemas.CreateTrafficInfluence,
        subscription: schemas.TrafficInfluSub,
    ):
        """
        Placeholder for adding core-specific parameters to the subscription.
        This method should be overridden by subclasses to implement specific logic.
        """
        pass

    @abstractmethod
    def core_specific_qod_validation(self, session_info: schemas.CreateSession) -> None:
        """
        Validates core-specific parameters for the session creation.

@@ -104,13 +116,30 @@ class NetworkManagementInterface(ABC):
        # This method should be overridden by subclasses if needed
        pass

    def _build_subscription(self, session_info: Dict) -> None:
    @abstractmethod
    def core_specific_traffic_influence_validation(
        self, traffic_influence_info: schemas.CreateTrafficInfluence
    ) -> None:
        """
        Validates core-specific parameters for the session creation.

        args:
            session_info: The session information to validate.

        raises:
            ValidationError: If the session information does not meet core-specific requirements.
        """
        # Placeholder for core-specific validation logic
        # This method should be overridden by subclasses if needed
        pass

    def _build_qod_subscription(self, session_info: Dict) -> None:
        valid_session_info = schemas.CreateSession.model_validate(session_info)
        device_ipv4 = None
        if valid_session_info.device.ipv4Address:
            device_ipv4 = valid_session_info.device.ipv4Address.root.publicAddress.root

        self.core_specific_validation(valid_session_info)
        self.core_specific_qod_validation(valid_session_info)
        subscription = schemas.AsSessionWithQoSSubscription(
            notificationDestination=str(valid_session_info.sink),
            qosReference=valid_session_info.qosProfile.root,
@@ -118,7 +147,35 @@ class NetworkManagementInterface(ABC):
            ueIpv6Addr=valid_session_info.device.ipv6Address,
            usageThreshold=schemas.UsageThreshold(duration=valid_session_info.duration),
        )
        self.add_core_specific_parameters(valid_session_info, subscription)
        self.add_core_specific_qod_parameters(valid_session_info, subscription)
        return subscription

    def _build_ti_subscription(self, traffic_influence_info: Dict):

        traffic_influence_data = schemas.CreateTrafficInfluence.model_validate(
            traffic_influence_info
        )
        self.core_specific_traffic_influence_validation(traffic_influence_data)

        device_ip = traffic_influence_data.retrieve_ue_ipv4()
        server_ip = (
            traffic_influence_data.appInstanceId
        )  # assume that the instance id corresponds to its IPv4 address
        sink_url = traffic_influence_data.notificationUri
        edge_zone = traffic_influence_data.edgeCloudZoneId

        # build flow descriptor in oai format using device ip and server ip
        flow_descriptor = f"permit out ip from {device_ip}/32 to {server_ip}/32"

        subscription = schemas.TrafficInfluSub(
            afAppId=traffic_influence_data.appId,
            ipv4Addr=str(device_ip),
            notificationDestination=sink_url,
        )
        subscription.add_flow_descriptor(flow_desriptor=flow_descriptor)
        subscription.add_traffic_route(dnai=edge_zone)

        self.add_core_specific_ti_parameters(traffic_influence_data, subscription)
        return subscription

    def create_qod_session(self, session_info: Dict) -> Dict:
@@ -132,7 +189,7 @@ class NetworkManagementInterface(ABC):
        returns:
            dictionary containing the created session details, including its ID.
        """
        subscription = self._build_subscription(session_info)
        subscription = self._build_qod_subscription(session_info)
        return common.as_session_with_qos_post(
            self.base_url, self.scs_as_id, subscription
        )
@@ -168,5 +225,64 @@ class NetworkManagementInterface(ABC):
        )
        log.info(f"QoD session deleted successfully [id={session_id}]")

    def create_traffic_influence_resource(self, traffic_influence_info: Dict) -> Dict:
        """
        Creates a Traffic Influence resource based on CAMARA TI API input.

        args:
            traffic_influence_info: Dictionary containing traffic influence details conforming to
                                    the CAMARA TI resource creation parameters.

        returns:
            dictionary containing the created traffic influence resource details, including its ID.
        """

        subscription = self._build_ti_subscription(traffic_influence_info)
        response = common.traffic_influence_post(
            self.base_url, self.scs_as_id, subscription
        )

        # retrieve the NEF resource id
        if "self" in response.keys():
            subscription_id = response["self"]
        else:
            subscription_id = None

        traffic_influence_info["trafficInfluenceID"] = subscription_id
        return traffic_influence_info

    def put_traffic_influence_resource(
        self, resource_id: str, traffic_influence_info: Dict
    ) -> Dict:
        """
        Retrieves details of a specific Traffic Influence resource.

        args:
            resource_id: The unique identifier of the Traffic Influence resource.

        returns:
            Dictionary containing the details of the requested Traffic Influence resource.
        """
        subscription = self._build_ti_subscription(traffic_influence_info)
        common.traffic_influence_put(
            self.base_url, self.scs_as_id, resource_id, subscription
        )

        traffic_influence_info.trafficInfluenceID = resource_id
        return traffic_influence_info

    def delete_traffic_influence_resource(self, resource_id: str) -> None:
        """
        Deletes a specific Traffic Influence resource.

        args:
            resource_id: The unique identifier of the Traffic Influence resource to delete.

        returns:
            None
        """
        common.traffic_influence_delete(self.base_url, self.scs_as_id, resource_id)
        return

    # Placeholder for other CAMARA APIs (e.g., Traffic Influence,
    # Location-retrieval, etc.)
+89 −0
Original line number Diff line number Diff line
@@ -127,6 +127,11 @@ class FlowInfo(BaseModel):
    )


class Snssai(BaseModel):
    sst: int = Field(default=1)
    sd: str = Field(default="FFFFFF")


class AsSessionWithQoSSubscription(BaseModel):
    model_config = ConfigDict(serialize_by_alias=True)
    self_: Link | None = Field(None, alias="self")
@@ -147,6 +152,8 @@ class AsSessionWithQoSSubscription(BaseModel):
            lower the index of the array for a given entry, the higher the priority.",
        min_length=1,
    )
    snssai: Snssai | None = None
    dnn: str | None = None
    ueIpv4Addr: ipaddress.IPv4Address | None = None
    ueIpv6Addr: ipaddress.IPv6Address | None = None
    macAddr: MacAddress | None = None
@@ -155,6 +162,56 @@ class AsSessionWithQoSSubscription(BaseModel):
    qosMonInfo: QosMonitoringInformationModel | None = None


class SourceTrafficFilters(BaseModel):
    sourcePort: int


class DestinationTrafficFilters(BaseModel):
    destinationPort: int
    destinationProtocol: str


class TrafficRoute(BaseModel):
    dnai: str


class TrafficInfluSub(BaseModel):  # Replace with a meaningful name
    afServiceId: str | None = None
    afAppId: str
    dnn: str | None = None
    snssai: Snssai | None = None
    trafficFilters: list[FlowInfo] | None = Field(
        None,
        description="Describe the data flow which requires Traffic Influence.",
        min_length=1,
    )
    ipv4Addr: str | None = None
    ipv6Addr: str | None = None

    notificationDestination: str
    trafficRoutes: list[TrafficRoute] | None = Field(
        None,
        description="Describe the list of DNAIs to reach the destination",
        min_length=1,
    )
    suppFeat: str | None = None

    def add_flow_descriptor(self, flow_desriptor: str):
        self.trafficFilters = list()
        self.trafficFilters.append(
            FlowInfo(
                flowId=len(self.trafficFilters) + 1, flowDescriptions=[flow_desriptor]
            )
        )

    def add_traffic_route(self, dnai: str):
        self.trafficRoutes = list()
        self.trafficRoutes.append(TrafficRoute(dnai=dnai))

    def add_snssai(self, sst: int, sd: str = None):
        self.snssai = Snssai(sst=sst, sd=sd)


###############################################################
###############################################################
# CAMARA Models
@@ -301,6 +358,11 @@ class SinkCredential(BaseModel):
    ]


class NotificationSink(BaseModel):
    sink: str | None
    sinkCredential: SinkCredential | None


class BaseSessionInfo(BaseModel):
    device: Device | None = None
    applicationServer: ApplicationServer
@@ -341,3 +403,30 @@ class CreateSession(BaseSessionInfo):
            ge=1,
        ),
    ]


class CreateTrafficInfluence(BaseModel):
    trafficInfluenceID: str | None = None
    apiConsumerId: str | None = None
    appId: str
    appInstanceId: str
    edgeCloudRegion: str | None = None
    edgeCloudZoneId: str | None = None
    sourceTrafficFilters: SourceTrafficFilters | None = None
    destinationTrafficFilters: DestinationTrafficFilters | None = None
    notificationUri: str | None = None
    notificationAuthToken: str | None = None
    device: Device
    notificationSink: NotificationSink | None = None

    def retrieve_ue_ipv4(self):
        if self.device is not None and self.device.ipv4Address is not None:
            return self.device.ipv4Address.root.privateAddress.root
        else:
            raise KeyError("device.ipv4Address.publicAddress")

    def add_ue_ipv4(self, ipv4: str):
        if self.device is None:
            self.device = Device()
        if self.device.ipv4Address is None:
            self.device.ipv4Address = DeviceIpv4Addr(publicAddress=ipv4)
Loading