Commit c3efbd9c authored by Ferran Cañellas's avatar Ferran Cañellas
Browse files

Refactor to put the logic in the core

parent 36b98484
Loading
Loading
Loading
Loading
+22 −43
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
from itertools import product
from typing import Dict

from pydantic import ValidationError
from src import logger
from src.network.core.network_interface import NetworkManagementInterface
from . import common
from . import schemas
from src.network.core.network_interface import NetworkManagementInterface, build_flows
from ...core import common
from ...core import schemas

log = logger.get_logger(__name__)

flow_id_mapping = {
    "qos-e": 3,
    "qos-s": 4,
    "qos-m": 5,
    "qos-l": 6
}
flow_id_mapping = {"qos-e": 3, "qos-s": 4, "qos-m": 5, "qos-l": 6}

def flatten_port_spec(ports_spec: schemas.PortsSpec | None)-> list[str]:
    has_ports = False
    has_ranges = False
    flat_ports = []
    if ports_spec and ports_spec.ports:
        has_ports = True
        flat_ports.extend([str(port) for port in ports_spec.ports])
    if ports_spec and ports_spec.ranges:
        has_ranges = True
        flat_ports.extend([f"{range.from_}-{range.to}" for range in ports_spec.ranges])
    if not has_ports and not has_ranges:
        flat_ports.append("0-65535")
    return flat_ports

class NetworkManager(NetworkManagementInterface):
    """
@@ -58,6 +39,19 @@ class NetworkManager(NetworkManagementInterface):
            log.error(f"Failed to initialize Open5GSClient: {e}")
            raise e

    def core_specific_validation(self, session_info: schemas.CreateSession):
        if session_info.qosProfile not in flow_id_mapping.keys():
            raise ValidationError(
                f"Open5Gs only supports these qos-profiles: {', '.join(flow_id_mapping.keys())}"
            )

    def add_core_specific_parameters(
        self, session_info: schemas.AsSessionWithQoSSubscription
    ) -> None:
        session_info.supportedFeatures = schemas.SupportedFeatures("003C")
        flow_id = flow_id_mapping[session_info.qosProfile]
        session_info.flowInfo = build_flows(flow_id, session_info)

    # --- Implementation of NetworkManagementInterface methods ---
    def create_qod_session(self, session_info: Dict) -> Dict:
        """
@@ -67,31 +61,15 @@ class NetworkManager(NetworkManagementInterface):
        url = f"{self.base_url}/{self.scs_as_id}/subscriptions"
        # Raises ValidationError if the object is not valid.
        valid_session_info = schemas.CreateSession.model_validate(session_info)
        if valid_session_info.qosProfile not in flow_id_mapping.keys():
            raise ValidationError(f"Open5Gs only supports these qos-profiles: {", ".join(flow_id_mapping.keys())}")

        flow_id = flow_id_mapping[valid_session_info.qosProfile]
        device_ip = valid_session_info.device.ipv4Address or session_info.device.ipv4Address
        server_ip = valid_session_info.applicationServer.ipv4Address or valid_session_info.applicationServer.ipv6Address
        device_ports = flatten_port_spec(valid_session_info.devicePorts)
        server_ports = flatten_port_spec(valid_session_info.applicationServerPorts)
        ports_combis = list(product(device_ports, server_ports))

        flow_descrs = []
        for device_port, server_port in ports_combis:
            flow_descrs.append(f"permit in ip from {device_ip} {device_port} to {server_ip} {server_port}")
            flow_descrs.append(f"permit out ip from {device_ip} {device_port} to {server_ip} {server_port}")
        flows = [schemas.FlowInfo(
            flowId=flow_id,
            flowDescriptions=[", ".join(flow_descrs)]
        )]
        subscription = schemas.AsSessionWithQoSSubscription(
            supportedFeatures=schemas.SupportedFeatures("003C"),
            flowInfo=flows,
            notificationDestination=valid_session_info.sink,
            qosReference=valid_session_info.qosProfile,
            ueIpv4Addr=valid_session_info.device.ipv4Address,
            ueIpv6Addr=valid_session_info.device.ipv6Address,
            usageThreshold=schemas.UsageThreshold(duration=valid_session_info.duration),
        )
        self.add_core_specific_parameters(subscription)
        common.open5gs_post(url, subscription)

    def get_qod_session(self, session_id: str) -> Dict:
@@ -112,6 +90,7 @@ class NetworkManager(NetworkManagementInterface):
        url = f"{self.base_url}/{self.scs_as_id}/subscriptions/{session_id}"
        common.open5gs_delete(url)


# Note:
# As this class is inheriting from NetworkManagementInterface, it is
# expected to implement all the abstract methods defined in that interface.
+0 −43
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
# Common utilities (errors, HTTP helpers) used by the Open5GS client implementation (client.py).
from typing import Optional

from pydantic import BaseModel

from src import logger
from src.network.clients.errors import NetworkPlatformError

log = logger.get_logger(__name__)


class Open5GSError(NetworkPlatformError):
    pass


class Open5GSErrorResponse(BaseModel):
    message: str
    detail: dict


# --- HTTP Request Helper Functions ---
def open5gs_post(url: str, model_payload: BaseModel) -> dict:
    """
    Placeholder for the POST request function."""
    response = requests.post(url, model_payload)
    return response.json()


def open5gs_get(url: str, params: Optional[dict] = None) -> dict:
    """
    Placeholder for the GET request function.
    """
    response = requests.get(url, params=params)
    return response.json()


def open5gs_delete(url: str) -> None:
    """
    Placeholder for the DELETE request function.
    """
    requests.delete(url)
+64 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-
# Common utilities (errors, HTTP helpers) used by the Open5GS client implementation (client.py).

from pydantic import BaseModel
import requests

from src import logger

log = logger.get_logger(__name__)


def _make_request(method: str, url: str, data=None):
    try:
        headers = None
        if method == "POST" or method == "PUT":
            headers = {
                "Content-Type": "application/json",
                "accept": "application/json",
            }
        elif method == "GET":
            headers = {
                "accept": "application/json",
            }
        response = requests.request(method, url, headers=headers, data=data)
        response.raise_for_status()
        if response.content:
            return response.json()
    except requests.exceptions.HTTPError as e:
        raise CoreHttpError(e) from e
    except requests.exceptions.ConnectionError as e:
        raise CoreHttpError("connection error") from e


# QoD methods
def as_session_with_qos_post(
    base_url: str, scs_as_id: str, model_payload: BaseModel
) -> dict:
    data = model_payload.model_dump_json(exclude_none=True)
    url = as_session_with_qos_build_url(base_url, scs_as_id)
    return _make_request("POST", url, data=data)


def as_session_with_qos_get(base_url: str, scs_as_id: str, session_id: str) -> dict:
    url = as_session_with_qos_build_url(base_url, scs_as_id, session_id)
    return _make_request("GET", url)


def as_session_with_qos_delete(base_url: str, scs_as_id: str, session_id: str):
    url = as_session_with_qos_build_url(base_url, scs_as_id, session_id)
    return _make_request("DELETE", url)


def as_session_with_qos_build_url(
    base_url: str, scs_as_id: str, session_id: str = None
):
    url = f"{base_url}/3gpp-as-session-with-qos/v1/{scs_as_id}/subscriptions"
    if session_id is not None and len(session_id) > 0:
        return f"{url}/{session_id}"
    else:
        return url


class CoreHttpError(Exception):
    pass
+94 −5
Original line number Diff line number Diff line
@@ -10,8 +10,56 @@
#   - Reza Mosahebfard (reza.mosahebfard@i2cat.net)
##
from abc import ABC, abstractmethod
from itertools import product
from typing import Dict

from src import logger
from src.network.core import common, schemas

log = logger.get_logger(__name__)


def flatten_port_spec(ports_spec: schemas.PortsSpec | None) -> list[str]:
    has_ports = False
    has_ranges = False
    flat_ports = []
    if ports_spec and ports_spec.ports:
        has_ports = True
        flat_ports.extend([str(port) for port in ports_spec.ports])
    if ports_spec and ports_spec.ranges:
        has_ranges = True
        flat_ports.extend([f"{range.from_}-{range.to}" for range in ports_spec.ranges])
    if not has_ports and not has_ranges:
        flat_ports.append("0-65535")
    return flat_ports


def build_flows(
    flow_id: int, session_info: schemas.CreateSession
) -> list[schemas.FlowInfo]:
    device_ports = flatten_port_spec(session_info.devicePorts)
    server_ports = flatten_port_spec(session_info.applicationServerPorts)
    ports_combis = list(product(device_ports, server_ports))

    device_ip = session_info.device.ipv4Address or session_info.device.ipv4Address
    server_ip = (
        session_info.applicationServer.ipv4Address
        or session_info.applicationServer.ipv6Address
    )

    flow_descrs = []
    for device_port, server_port in ports_combis:
        flow_descrs.append(
            f"permit in ip from {device_ip} {device_port} to {server_ip} {server_port}"
        )
        flow_descrs.append(
            f"permit out ip from {device_ip} {device_port} to {server_ip} {server_port}"
        )
    flows = [
        schemas.FlowInfo(flowId=flow_id, flowDescriptions=[", ".join(flow_descrs)])
    ]
    return flows


class NetworkManagementInterface(ABC):
    """
@@ -25,7 +73,32 @@ class NetworkManagementInterface(ABC):
    to their specific NEF capabilities.
    """

    base_url: str
    scs_as_id: str

    @abstractmethod
    def add_core_specific_parameters(x):
        """
        Placeholder for adding core-specific parameters to the subscription.
        This method should be overridden by subclasses to implement specific logic.
        """
        pass

    @abstractmethod
    def core_specific_validation(self, session_info: schemas.CreateSession) -> None:
        """
        Validates core-specific parameters for the session creation.

        args:
            session_info: The session information to validate.

        raises:
            ValidationError: If the session information does not meet core-specific requirements.
        """
        # Placeholder for core-specific validation logic
        # This method should be overridden by subclasses if needed
        pass

    def create_qod_session(self, session_info: Dict) -> Dict:
        """
        Creates a QoS session based on CAMARA QoD API input.
@@ -37,9 +110,19 @@ class NetworkManagementInterface(ABC):
        returns:
            dictionary containing the created session details, including its ID.
        """
        pass
        valid_session_info = schemas.CreateSession.model_validate(session_info)
        self.core_specific_validation(valid_session_info)
        subscription = schemas.AsSessionWithQoSSubscription(
            notificationDestination=valid_session_info.sink,
            qosReference=valid_session_info.qosProfile,
            ueIpv4Addr=valid_session_info.device.ipv4Address,
            ueIpv6Addr=valid_session_info.device.ipv6Address,
            usageThreshold=schemas.UsageThreshold(duration=valid_session_info.duration),
        )
        self.add_core_specific_parameters(subscription)
        url = f"{self.base_url}/{self.scs_as_id}/subscriptions"
        common.as_session_with_qos_post(self.base_url, self.scs_as_id, subscription)

    @abstractmethod
    def get_qod_session(self, session_id: str) -> Dict:
        """
        Retrieves details of a specific Quality on Demand (QoS) session.
@@ -50,9 +133,12 @@ class NetworkManagementInterface(ABC):
        returns:
            Dictionary containing the details of the requested QoS session.
        """
        pass
        session = common.as_session_with_qos_get(
            self.base_url, self.scs_as_id, session_id=session_id
        )
        log.info(f"QoD session retrived successfully [id={session_id}]")
        return session

    @abstractmethod
    def delete_qod_session(self, session_id: str) -> None:
        """
        Deletes a specific Quality on Demand (QoS) session.
@@ -63,7 +149,10 @@ class NetworkManagementInterface(ABC):
        returns:
            None
        """
        pass
        common.as_session_with_qos_delete(
            self.base_url, self.scs_as_id, session_id=session_id
        )
        log.info(f"QoD session deleted successfully [id={session_id}]")

    # Placeholder for other CAMARA APIs (e.g., Traffic Influence,
    # Location-retrieval, etc.)
+69 −27
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ import ipaddress
from enum import Enum
from typing import Annotated

from pydantic import BaseModel, ConfigDict, Field, NonNegativeInt, RootModel
from pydantic import AnyUrl, BaseModel, ConfigDict, Field, NonNegativeInt, RootModel
from pydantic_extra_types.mac_address import MacAddress
from ipaddress import IPv4Address, IPv6Address

@@ -159,36 +159,41 @@ class AsSessionWithQoSSubscription(BaseModel):
###############################################################
# CAMARA Models


class PhoneNumber(RootModel[str]):
    root: Annotated[
        str,
        Field(
            description="A public identifier addressing a telephone subscription. In mobile networks it corresponds to the MSISDN (Mobile Station International Subscriber Directory Number). In order to be globally unique it has to be formatted in international format, according to E.164 standard, prefixed with '+'.",
            examples=['+123456789'],
            pattern='^\\+[1-9][0-9]{4,14}$',
            examples=["+123456789"],
            pattern="^\\+[1-9][0-9]{4,14}$",
        ),
    ]


class NetworkAccessIdentifier(RootModel[str]):
    root: Annotated[
        str,
        Field(
            description='A public identifier addressing a subscription in a mobile network. In 3GPP terminology, it corresponds to the GPSI formatted with the External Identifier ({Local Identifier}@{Domain Identifier}). Unlike the telephone number, the network access identifier is not subjected to portability ruling in force, and is individually managed by each operator.',
            examples=['123456789@domain.com'],
            description="A public identifier addressing a subscription in a mobile network. In 3GPP terminology, it corresponds to the GPSI formatted with the External Identifier ({Local Identifier}@{Domain Identifier}). Unlike the telephone number, the network access identifier is not subjected to portability ruling in force, and is individually managed by each operator.",
            examples=["123456789@domain.com"],
        ),
    ]


class SingleIpv4Addr(RootModel[IPv4Address]):
    root: Annotated[
        IPv4Address,
        Field(
            description='A single IPv4 address with no subnet mask',
            examples=['203.0.113.0'],
            description="A single IPv4 address with no subnet mask",
            examples=["203.0.113.0"],
        ),
    ]


class Port(RootModel[int]):
    root: Annotated[int, Field(description='TCP or UDP port number', ge=0, le=65535)]
    root: Annotated[int, Field(description="TCP or UDP port number", ge=0, le=65535)]


class DeviceIpv4Addr1(BaseModel):
    publicAddress: SingleIpv4Addr
@@ -206,95 +211,132 @@ class DeviceIpv4Addr(RootModel[DeviceIpv4Addr1 | DeviceIpv4Addr2]):
    root: Annotated[
        DeviceIpv4Addr1 | DeviceIpv4Addr2,
        Field(
            description='The device should be identified by either the public (observed) IP address and port as seen by the application server, or the private (local) and any public (observed) IP addresses in use by the device (this information can be obtained by various means, for example from some DNS servers).\n\nIf the allocated and observed IP addresses are the same (i.e. NAT is not in use) then  the same address should be specified for both publicAddress and privateAddress.\n\nIf NAT64 is in use, the device should be identified by its publicAddress and publicPort, or separately by its allocated IPv6 address (field ipv6Address of the Device object)\n\nIn all cases, publicAddress must be specified, along with at least one of either privateAddress or publicPort, dependent upon which is known. In general, mobile devices cannot be identified by their public IPv4 address alone.\n',
            examples=[{'publicAddress': '203.0.113.0', 'publicPort': 59765}],
            description="The device should be identified by either the public (observed) IP address and port as seen by the application server, or the private (local) and any public (observed) IP addresses in use by the device (this information can be obtained by various means, for example from some DNS servers).\n\nIf the allocated and observed IP addresses are the same (i.e. NAT is not in use) then  the same address should be specified for both publicAddress and privateAddress.\n\nIf NAT64 is in use, the device should be identified by its publicAddress and publicPort, or separately by its allocated IPv6 address (field ipv6Address of the Device object)\n\nIn all cases, publicAddress must be specified, along with at least one of either privateAddress or publicPort, dependent upon which is known. In general, mobile devices cannot be identified by their public IPv4 address alone.\n",
            examples=[{"publicAddress": "203.0.113.0", "publicPort": 59765}],
        ),
    ]


class DeviceIpv6Address(RootModel[IPv6Address]):
    root: Annotated[
        IPv6Address,
        Field(
            description='The device should be identified by the observed IPv6 address, or by any single IPv6 address from within the subnet allocated to the device (e.g. adding ::0 to the /64 prefix).\n\nThe session shall apply to all IP flows between the device subnet and the specified application server, unless further restricted by the optional parameters devicePorts or applicationServerPorts.\n',
            examples=['2001:db8:85a3:8d3:1319:8a2e:370:7344'],
            description="The device should be identified by the observed IPv6 address, or by any single IPv6 address from within the subnet allocated to the device (e.g. adding ::0 to the /64 prefix).\n\nThe session shall apply to all IP flows between the device subnet and the specified application server, unless further restricted by the optional parameters devicePorts or applicationServerPorts.\n",
            examples=["2001:db8:85a3:8d3:1319:8a2e:370:7344"],
        ),
    ]


class Device(BaseModel):
    phoneNumber: PhoneNumber | None = None
    networkAccessIdentifier: NetworkAccessIdentifier | None = None
    ipv4Address: DeviceIpv4Addr | None = None
    ipv6Address: DeviceIpv6Address | None = None


class ApplicationServerIpv4Address(RootModel[str]):
    root: Annotated[
        str,
        Field(
            description='IPv4 address may be specified in form <address/mask> as:\n  - address - an IPv4 number in dotted-quad form 1.2.3.4. Only this exact IP number will match the flow control rule.\n  - address/mask - an IP number as above with a mask width of the form 1.2.3.4/24.\n    In this case, all IP numbers from 1.2.3.0 to 1.2.3.255 will match. The bit width MUST be valid for the IP version.\n',
            examples=['198.51.100.0/24'],
            description="IPv4 address may be specified in form <address/mask> as:\n  - address - an IPv4 number in dotted-quad form 1.2.3.4. Only this exact IP number will match the flow control rule.\n  - address/mask - an IP number as above with a mask width of the form 1.2.3.4/24.\n    In this case, all IP numbers from 1.2.3.0 to 1.2.3.255 will match. The bit width MUST be valid for the IP version.\n",
            examples=["198.51.100.0/24"],
        ),
    ]


class ApplicationServerIpv6Address(RootModel[str]):
    root: Annotated[
        str,
        Field(
            description='IPv6 address may be specified in form <address/mask> as:\n  - address - The /128 subnet is optional for single addresses:\n    - 2001:db8:85a3:8d3:1319:8a2e:370:7344\n    - 2001:db8:85a3:8d3:1319:8a2e:370:7344/128\n  - address/mask - an IP v6 number with a mask:\n    - 2001:db8:85a3:8d3::0/64\n    - 2001:db8:85a3:8d3::/64\n',
            examples=['2001:db8:85a3:8d3:1319:8a2e:370:7344'],
            description="IPv6 address may be specified in form <address/mask> as:\n  - address - The /128 subnet is optional for single addresses:\n    - 2001:db8:85a3:8d3:1319:8a2e:370:7344\n    - 2001:db8:85a3:8d3:1319:8a2e:370:7344/128\n  - address/mask - an IP v6 number with a mask:\n    - 2001:db8:85a3:8d3::0/64\n    - 2001:db8:85a3:8d3::/64\n",
            examples=["2001:db8:85a3:8d3:1319:8a2e:370:7344"],
        ),
    ]


class ApplicationServer(BaseModel):
    ipv4Address: ApplicationServerIpv4Address | None = None
    ipv6Address: ApplicationServerIpv6Address | None = None


class Range(BaseModel):
    from_: Annotated[Port, Field(alias='from')]
    from_: Annotated[Port, Field(alias="from")]
    to: Port


class PortsSpec(BaseModel):
    ranges: Annotated[
        list[Range] | None, Field(description='Range of TCP or UDP ports', min_length=1)
        list[Range] | None, Field(description="Range of TCP or UDP ports", min_length=1)
    ] = None
    ports: Annotated[
        list[Port] | None, Field(description='Array of TCP or UDP ports', min_length=1)
        list[Port] | None, Field(description="Array of TCP or UDP ports", min_length=1)
    ] = None


class QosProfileName(RootModel[str]):
    root: Annotated[
        str,
        Field(
            description='A unique name for identifying a specific QoS profile.\nThis may follow different formats depending on the API provider implementation.\nSome options addresses:\n  - A UUID style string\n  - Support for predefined profiles QOS_S, QOS_M, QOS_L, and QOS_E\n  - A searchable descriptive name\nThe set of QoS Profiles that an API provider is offering may be retrieved by means of the QoS Profile API (qos-profile) or agreed on onboarding time.\n',
            examples=['voice'],
            description="A unique name for identifying a specific QoS profile.\nThis may follow different formats depending on the API provider implementation.\nSome options addresses:\n  - A UUID style string\n  - Support for predefined profiles QOS_S, QOS_M, QOS_L, and QOS_E\n  - A searchable descriptive name\nThe set of QoS Profiles that an API provider is offering may be retrieved by means of the QoS Profile API (qos-profile) or agreed on onboarding time.\n",
            examples=["voice"],
            max_length=256,
            min_length=3,
            pattern='^[a-zA-Z0-9_.-]+$',
            pattern="^[a-zA-Z0-9_.-]+$",
        ),
    ]


class CredentialType(Enum):
    PLAIN = "PLAIN"
    ACCESSTOKEN = "ACCESSTOKEN"
    REFRESHTOKEN = "REFRESHTOKEN"


class SinkCredential(BaseModel):
    credentialType: Annotated[
        CredentialType,
        Field(
            description="The type of the credential.\nNote: Type of the credential - MUST be set to ACCESSTOKEN for now\n"
        ),
    ]


class BaseSessionInfo(BaseModel):
    device: Device | None = None
    applicationServer: ApplicationServer
    devicePorts: Annotated[
        PortsSpec | None,
        Field(
            description='The ports used locally by the device for flows to which the requested QoS profile should apply. If omitted, then the qosProfile will apply to all flows between the device and the specified application server address and ports'
            description="The ports used locally by the device for flows to which the requested QoS profile should apply. If omitted, then the qosProfile will apply to all flows between the device and the specified application server address and ports"
        ),
    ] = None
    applicationServerPorts: Annotated[
        PortsSpec | None,
        Field(
            description='A list of single ports or port ranges on the application server'
            description="A list of single ports or port ranges on the application server"
        ),
    ] = None
    qosProfile: QosProfileName
    sink: Annotated[
        AnyUrl | None,
        Field(
            description="The address to which events about all status changes of the session (e.g. session termination) shall be delivered using the selected protocol.",
            examples=["https://endpoint.example.com/sink"],
        ),
    ] = None
    sinkCredential: Annotated[
        SinkCredential | None,
        Field(
            description="A sink credential provides authentication or authorization information necessary to enable delivery of events to a target."
        ),
    ] = None


class CreateSession(BaseSessionInfo):
    duration: Annotated[
        int,
        Field(
            description='Requested session duration in seconds. Value may be explicitly limited for the QoS profile, as specified in the Qos Profile (see qos-profile API). Implementations can grant the requested session duration or set a different duration, based on network policies or conditions.\n',
            description="Requested session duration in seconds. Value may be explicitly limited for the QoS profile, as specified in the Qos Profile (see qos-profile API). Implementations can grant the requested session duration or set a different duration, based on network policies or conditions.\n",
            examples=[3600],
            ge=1,
        ),