Commit f75c82be authored by Adrian Pino's avatar Adrian Pino
Browse files

Merge branch 'main' into feature/universal-client-invocation

parents 557ec7c7 21790d5c
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -66,6 +66,7 @@ pyzmq==26.4.0
referencing==0.36.2
requests==2.32.3
rpds-py==0.24.0
shortuuid==1.0.13
six==1.17.0
soupsieve==2.6
stack-data==0.6.3
+126 −17
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
from typing import Dict
##
# Copyright (c) 2025 Netsoft Group, EURECOM.
# All rights reserved.
#
# This file is part of the Open SDK
#
# Contributors:
#   - Giulio Carota (giulio.carota@eurecom.fr)
##


from src import logger
from src.network.core.network_interface import NetworkManagementInterface
from src.network.core.schemas import (
    AsSessionWithQoSSubscription,
    CreateSession,
    CreateTrafficInfluence,
    FlowInfo,
    Snssai,
    TrafficInfluSub,
)

log = logger.get_logger(__name__)
supportedQos = ["qos-e", "qos-s", "qos-m", "qos-l"]


# Placeholder for the OAI Network Management Client
class NetworkManager(NetworkManagementInterface):
    def __init__(self, base_url: str, scs_as_id: str):
        pass
    def __init__(self, base_url: str, scs_as_id: str = None):
        """
        Initialize Network Client for OAI Core Network
        The currently supported features are:
         - QoD
         - Traffic Influence
        """
        try:
            super().__init__()
            self.base_url = base_url
            self.scs_as_id = scs_as_id
            log.info(
                f"Initialized OaiNefClient with base_url: {self.base_url} and scs_as_id: {self.scs_as_id}"
            )

    def create_qod_session(self, session_info: Dict) -> Dict:
        pass
        except Exception as e:
            log.error(f"Failed to initialize OaiNefClient: {e}")
            raise e

    def get_qod_session(self, session_id: str) -> Dict:
        pass
    def core_specific_qod_validation(self, session_info: CreateSession):
        """
        Validates core-specific parameters for the session creation.

    def delete_qod_session(self, session_id: str) -> None:
        pass
        args:
            session_info: The session information to validate.

        raises:
            ValidationError: If the session information does not meet core-specific requirements.
        """
        if session_info.qosProfile.root not in supportedQos:
            raise OaiValidationError(
                f"QoS profile {session_info.qosProfile} not supported by OAI, supported profiles are {supportedQos}"
            )

# Note:
# As this class is inheriting from NetworkManagementInterface, it is
# expected to implement all the abstract methods defined in that interface.
#
# In case this network adapter doesn't support a specific method, it should
# be marked as NotImplementedError.
        if session_info.device is None or session_info.device.ipv4Address is None:
            raise OaiValidationError("OAI requires UE IPv4 Address to activate QoS")

        if session_info.applicationServer.ipv4Address is None:
            raise OaiValidationError("OAI requires App IPv4 Address to activate QoS")
        return

    def add_core_specific_qod_parameters(
        self,
        session_info: CreateSession,
        subscription: AsSessionWithQoSSubscription,
    ) -> None:
        device_ip = _retrieve_ue_ipv4(session_info)
        server_ip = _retrieve_app_ipv4(session_info)

        # build flow descriptor in oai format using device ip and server ip
        flow_descriptor = f"permit out ip from {device_ip}/32 to {server_ip}/32"
        _add_qod_flow_descriptor(subscription, flow_descriptor)
        _add_qod_snssai(subscription, 1, "FFFFFF")
        subscription.dnn = "oai"

    def add_core_specific_ti_parameters(
        self,
        traffic_influence_info: CreateTrafficInfluence,
        subscription: TrafficInfluSub,
    ):
        # todo oai add dnn, ssnai, afServiceId
        subscription.dnn = "oai"
        subscription.add_snssai(1, "FFFFFF")
        subscription.afServiceId = self.scs_as_id

    def core_specific_traffic_influence_validation(
        self, traffic_influence_info: CreateTrafficInfluence
    ) -> None:
        """
        Validates core-specific parameters for the session creation.

        args:
            session_info: The session information to validate.

        raises:
            ValidationError: If the session information does not meet core-specific requirements.
        """
        # Placeholder for core-specific validation logic
        # This method should be overridden by subclasses if needed

        if (
            traffic_influence_info.device is None
            or traffic_influence_info.device.ipv4Address is None
        ):
            raise OaiValidationError(
                "OAI requires UE IPv4 Address to activate Traffic Influence"
            )


def _retrieve_ue_ipv4(session_info: CreateSession):
    return session_info.device.ipv4Address.root.privateAddress


def _retrieve_app_ipv4(session_info: CreateSession):
    return session_info.applicationServer.ipv4Address


def _add_qod_flow_descriptor(
    qos_sub: AsSessionWithQoSSubscription, flow_desriptor: str
):
    qos_sub.flowInfo = list()
    qos_sub.flowInfo.append(
        FlowInfo(flowId=len(qos_sub.flowInfo) + 1, flowDescriptions=[flow_desriptor])
    )


def _add_qod_snssai(qos_sub: AsSessionWithQoSSubscription, sst: int, sd: str = None):
    qos_sub.snssai = Snssai(sst=sst, sd=sd)


class OaiValidationError(Exception):
    pass
+7 −5
Original line number Diff line number Diff line
@@ -42,17 +42,19 @@ class NetworkManager(NetworkManagementInterface):
            raise e

    def core_specific_validation(self, session_info: schemas.CreateSession):
        if session_info.qosProfile not in flow_id_mapping.keys():
        if session_info.qosProfile.root not in flow_id_mapping.keys():
            raise ValidationError(
                f"Open5Gs only supports these qos-profiles: {', '.join(flow_id_mapping.keys())}"
            )

    def add_core_specific_parameters(
        self, session_info: schemas.AsSessionWithQoSSubscription
        self,
        session_info: schemas.CreateSession,
        subscription: schemas.AsSessionWithQoSSubscription,
    ) -> None:
        session_info.supportedFeatures = schemas.SupportedFeatures("003C")
        flow_id = flow_id_mapping[session_info.qosProfile]
        session_info.flowInfo = build_flows(flow_id, session_info)
        subscription.supportedFeatures = schemas.SupportedFeatures("003C")
        flow_id = flow_id_mapping[session_info.qosProfile.root]
        subscription.flowInfo = build_flows(flow_id, session_info)

    # --- Implementation of NetworkManagementInterface methods ---
    def create_qod_session(self, session_info: Dict) -> Dict:
+31 −1
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
# Common utilities (errors, HTTP helpers) used by the Open5GS client implementation (client.py).
# Common utilities (errors, HTTP helpers) used by the core network interface (network_interface.py).

import requests
from pydantic import BaseModel
@@ -60,5 +60,35 @@ def as_session_with_qos_build_url(
        return url


# Traffic Influence Methods
def traffic_influence_post(
    base_url: str, scs_as_id: str, model_payload: BaseModel
) -> dict:
    data = model_payload.model_dump_json(exclude_none=True)
    url = traffic_influence_build_url(base_url, scs_as_id)
    return _make_request("POST", url, data=data)


def traffic_influence_delete(base_url: str, scs_as_id: str, session_id: str):
    url = traffic_influence_build_url(base_url, scs_as_id, session_id)
    return _make_request("DELETE", url)


def traffic_influence_put(
    base_url: str, scs_as_id: str, session_id: str, model_payload: BaseModel
) -> dict:
    data = model_payload.model_dump_json(exclude_none=True)
    url = traffic_influence_build_url(base_url, scs_as_id, session_id)
    return _make_request("PUT", url, data=data)


def traffic_influence_build_url(base_url: str, scs_as_id: str, session_id: str = None):
    url = f"{base_url}/3gpp-traffic-influence/v1/{scs_as_id}/subscriptions"
    if session_id is not None and len(session_id) > 0:
        return f"{url}/{session_id}"
    else:
        return url


class CoreHttpError(Exception):
    pass
+145 −3
Original line number Diff line number Diff line
@@ -36,7 +36,8 @@ def flatten_port_spec(ports_spec: schemas.PortsSpec | None) -> list[str]:


def build_flows(
    flow_id: int, session_info: schemas.CreateSession
    flow_id: int,
    session_info: schemas.CreateSession,
) -> list[schemas.FlowInfo]:
    device_ports = flatten_port_spec(session_info.devicePorts)
    server_ports = flatten_port_spec(session_info.applicationServerPorts)
@@ -78,7 +79,11 @@ class NetworkManagementInterface(ABC):
    scs_as_id: str

    @abstractmethod
    def add_core_specific_parameters(x):
    def add_core_specific_qod_parameters(
        self,
        session_info: schemas.CreateSession,
        subscription: schemas.AsSessionWithQoSSubscription,
    ):
        """
        Placeholder for adding core-specific parameters to the subscription.
        This method should be overridden by subclasses to implement specific logic.
@@ -86,7 +91,19 @@ class NetworkManagementInterface(ABC):
        pass

    @abstractmethod
    def core_specific_validation(self, session_info: schemas.CreateSession) -> None:
    def add_core_specific_ti_parameters(
        self,
        traffic_influence_info: schemas.CreateTrafficInfluence,
        subscription: schemas.TrafficInfluSub,
    ):
        """
        Placeholder for adding core-specific parameters to the subscription.
        This method should be overridden by subclasses to implement specific logic.
        """
        pass

    @abstractmethod
    def core_specific_qod_validation(self, session_info: schemas.CreateSession) -> None:
        """
        Validates core-specific parameters for the session creation.

@@ -100,6 +117,68 @@ class NetworkManagementInterface(ABC):
        # This method should be overridden by subclasses if needed
        pass

    @abstractmethod
    def core_specific_traffic_influence_validation(
        self, traffic_influence_info: schemas.CreateTrafficInfluence
    ) -> None:
        """
        Validates core-specific parameters for the session creation.

        args:
            session_info: The session information to validate.

        raises:
            ValidationError: If the session information does not meet core-specific requirements.
        """
        # Placeholder for core-specific validation logic
        # This method should be overridden by subclasses if needed
        pass

    def _build_qod_subscription(self, session_info: Dict) -> None:
        valid_session_info = schemas.CreateSession.model_validate(session_info)
        device_ipv4 = None
        if valid_session_info.device.ipv4Address:
            device_ipv4 = valid_session_info.device.ipv4Address.root.publicAddress.root

        self.core_specific_qod_validation(valid_session_info)
        subscription = schemas.AsSessionWithQoSSubscription(
            notificationDestination=str(valid_session_info.sink),
            qosReference=valid_session_info.qosProfile.root,
            ueIpv4Addr=device_ipv4,
            ueIpv6Addr=valid_session_info.device.ipv6Address,
            usageThreshold=schemas.UsageThreshold(duration=valid_session_info.duration),
        )
        self.add_core_specific_qod_parameters(valid_session_info, subscription)
        return subscription

    def _build_ti_subscription(self, traffic_influence_info: Dict):

        traffic_influence_data = schemas.CreateTrafficInfluence.model_validate(
            traffic_influence_info
        )
        self.core_specific_traffic_influence_validation(traffic_influence_data)

        device_ip = traffic_influence_data.retrieve_ue_ipv4()
        server_ip = (
            traffic_influence_data.appInstanceId
        )  # assume that the instance id corresponds to its IPv4 address
        sink_url = traffic_influence_data.notificationUri
        edge_zone = traffic_influence_data.edgeCloudZoneId

        # build flow descriptor in oai format using device ip and server ip
        flow_descriptor = f"permit out ip from {device_ip}/32 to {server_ip}/32"

        subscription = schemas.TrafficInfluSub(
            afAppId=traffic_influence_data.appId,
            ipv4Addr=str(device_ip),
            notificationDestination=sink_url,
        )
        subscription.add_flow_descriptor(flow_desriptor=flow_descriptor)
        subscription.add_traffic_route(dnai=edge_zone)

        self.add_core_specific_ti_parameters(traffic_influence_data, subscription)
        return subscription

    def create_qod_session(self, session_info: Dict) -> Dict:
        """
        Creates a QoS session based on CAMARA QoD API input.
@@ -111,6 +190,10 @@ class NetworkManagementInterface(ABC):
        returns:
            dictionary containing the created session details, including its ID.
        """
        subscription = self._build_qod_subscription(session_info)
        return common.as_session_with_qos_post(
            self.base_url, self.scs_as_id, subscription
        )
        valid_session_info = schemas.CreateSession.model_validate(session_info)
        self.core_specific_validation(valid_session_info)
        subscription = schemas.AsSessionWithQoSSubscription(
@@ -155,5 +238,64 @@ class NetworkManagementInterface(ABC):
        )
        log.info(f"QoD session deleted successfully [id={session_id}]")

    def create_traffic_influence_resource(self, traffic_influence_info: Dict) -> Dict:
        """
        Creates a Traffic Influence resource based on CAMARA TI API input.

        args:
            traffic_influence_info: Dictionary containing traffic influence details conforming to
                                    the CAMARA TI resource creation parameters.

        returns:
            dictionary containing the created traffic influence resource details, including its ID.
        """

        subscription = self._build_ti_subscription(traffic_influence_info)
        response = common.traffic_influence_post(
            self.base_url, self.scs_as_id, subscription
        )

        # retrieve the NEF resource id
        if "self" in response.keys():
            subscription_id = response["self"]
        else:
            subscription_id = None

        traffic_influence_info["trafficInfluenceID"] = subscription_id
        return traffic_influence_info

    def put_traffic_influence_resource(
        self, resource_id: str, traffic_influence_info: Dict
    ) -> Dict:
        """
        Retrieves details of a specific Traffic Influence resource.

        args:
            resource_id: The unique identifier of the Traffic Influence resource.

        returns:
            Dictionary containing the details of the requested Traffic Influence resource.
        """
        subscription = self._build_ti_subscription(traffic_influence_info)
        common.traffic_influence_put(
            self.base_url, self.scs_as_id, resource_id, subscription
        )

        traffic_influence_info.trafficInfluenceID = resource_id
        return traffic_influence_info

    def delete_traffic_influence_resource(self, resource_id: str) -> None:
        """
        Deletes a specific Traffic Influence resource.

        args:
            resource_id: The unique identifier of the Traffic Influence resource to delete.

        returns:
            None
        """
        common.traffic_influence_delete(self.base_url, self.scs_as_id, resource_id)
        return

    # Placeholder for other CAMARA APIs (e.g., Traffic Influence,
    # Location-retrieval, etc.)
Loading