Commit 20904f10 authored by Adrian Pino's avatar Adrian Pino Committed by GitHub
Browse files

Merge pull request #135 from...

Merge pull request #135 from SunriseOpenOperatorPlatform/feature/implement-camara-location-retrieval-tfs

Implement Camara Location Retrieval TF
parents 7e2d59fd 598c0abc
Loading
Loading
Loading
Loading
+31 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-

# Contributors:
#   - Ferran Cañellas (ferran.canellas@i2cat.net)
#   - Panagiotis Pavlidis (p.pavlidis@iit.demokritos.gr)
##
from pydantic import ValidationError

from sunrise6g_opensdk import logger
@@ -56,6 +61,32 @@ class NetworkManager(BaseNetworkClient):
        flow_id = flow_id_mapping[session_info.qosProfile.root]
        subscription.flowInfo = build_flows(flow_id, session_info)

    def core_specific_monitoring_event_validation(
        self, retrieve_location_request: schemas.RetrievalLocationRequest
    ) -> None:
        """Check core specific elements that required for location retrieval in NEF."""
        if retrieve_location_request.device is None:
            raise ValidationError(
                "Open5GS requires a device to be specified for location retrieval in NEF."
            )

    def add_core_specific_location_parameters(
        self, retrieve_location_request: schemas.RetrievalLocationRequest
    ) -> schemas.MonitoringEventSubscriptionRequest:
        """Add core specific location parameters to support location retrieval scenario in NEF."""
        return schemas.MonitoringEventSubscriptionRequest(
            msisdn=retrieve_location_request.device.phoneNumber.root.lstrip("+"),
            notificationDestination="http://127.0.0.1:8001",
            monitoringType=schemas.MonitoringType.LOCATION_REPORTING,
            locationType=schemas.LocationType.LAST_KNOWN,
        )
        # subscription.msisdn = retrieve_location_request.device.phoneNumber.root.lstrip('+')
        # monitoringType = schemas.MonitoringType.LOCATION_REPORTING
        # locationType = schemas.LocationType.LAST_KNOWN
        # locationType = schemas.LocationType.CURRENT_LOCATION
        # maximumNumberOfReports = 1
        # repPeriod = schemas.DurationSec(root=20)


# Note:
# As this class is inheriting from BaseNetworkClient, it is
+124 −3
Original line number Diff line number Diff line
@@ -7,12 +7,15 @@
#   - Reza Mosahebfard (reza.mosahebfard@i2cat.net)
#   - Ferran Cañellas (ferran.canellas@i2cat.net)
#   - Giulio Carota (giulio.carota@eurecom.fr)
#   - Panagiotis Pavlidis (p.pavlidis@iit.demokritos.gr)
##
import uuid
from datetime import datetime, timedelta, timezone
from itertools import product
from typing import Dict

from sunrise6g_opensdk import logger
from sunrise6g_opensdk.network.adapters.errors import NetworkPlatformError
from sunrise6g_opensdk.network.core import common, schemas

log = logger.get_logger(__name__)
@@ -75,7 +78,7 @@ class BaseNetworkClient:
    Class for Network Resource Management.

    This class provides shared logic and extension points for different
    Network 5G Cores (e.g., Open5GS, OAI, Open5GCopre-commit run --all-filesre) interacting with
    Network 5G Cores (e.g., Open5GS, OAI, Open5GCore) interacting with
    NEF-like platforms using CAMARA APIs.
    """

@@ -104,6 +107,15 @@ class BaseNetworkClient:
        """
        pass

    def add_core_specific_location_parameters(
        self, retrieve_location_request: schemas.RetrievalLocationRequest
    ) -> schemas.MonitoringEventSubscriptionRequest:
        """
        Placeholder for adding core-specific parameters to the location subscription.
        This method should be overridden by subclasses to implement specific logic.
        """
        pass

    def core_specific_qod_validation(self, session_info: schemas.CreateSession) -> None:
        """
        Validates core-specific parameters for the session creation.
@@ -134,6 +146,22 @@ class BaseNetworkClient:
        # This method should be overridden by subclasses if needed
        pass

    def core_specific_monitoring_event_validation(
        self, retrieve_location_request: schemas.RetrievalLocationRequest
    ) -> None:
        """
        Validates core-specific parameters for the monitoring event subscription.

        args:
            retrieve_location_request: The request information to validate.

        raises:
            ValidationError: If the request information does not meet core-specific requirements.
        """
        # Placeholder for core-specific validation logic
        # This method should be overwritten by subclasses if needed
        pass

    def _build_qod_subscription(
        self, session_info: Dict
    ) -> schemas.AsSessionWithQoSSubscription:
@@ -174,7 +202,7 @@ class BaseNetworkClient:
            ipv4Addr=str(device_ip),
            notificationDestination=sink_url,
        )
        subscription.add_flow_descriptor(flow_desriptor=flow_descriptor)
        subscription.add_flow_descriptor(flow_descriptor=flow_descriptor)
        subscription.add_traffic_route(dnai=edge_zone)

        self.add_core_specific_ti_parameters(traffic_influence_data, subscription)
@@ -201,6 +229,98 @@ class BaseNetworkClient:
        )
        return camara_ti

    def _build_monitoring_event_subscription(
        self, retrieve_location_request: schemas.RetrievalLocationRequest
    ) -> schemas.MonitoringEventSubscriptionRequest:
        self.core_specific_monitoring_event_validation(retrieve_location_request)
        subscription_3gpp = self.add_core_specific_location_parameters(
            retrieve_location_request
        )
        device = retrieve_location_request.device
        subscription_3gpp.externalId = device.networkAccessIdentifier
        subscription_3gpp.ipv4Addr = device.ipv4Address
        subscription_3gpp.ipv6Addr = device.ipv6Address
        # subscription.msisdn = device.phoneNumber.root.lstrip('+')
        # subscription.notificationDestination = "http://127.0.0.1:8001"

        return subscription_3gpp

    def _compute_camara_last_location_time(
        self, event_time: datetime, age_of_location_info_min: int = None
    ) -> datetime:
        """
        Computes the last location time based on the event time and age of location info.

        args:
            event_time: ISO 8601 datetime, e.g. "2025-06-18T12:30:00Z"
            age_of_location_info_min: unsigned int, age of location info in minutes

        returns:
            datetime object representing the last location time in UTC.
        """
        if age_of_location_info_min is not None:
            last_location_time = event_time - timedelta(
                minutes=age_of_location_info_min
            )
            return last_location_time.replace(tzinfo=timezone.utc)
        else:
            return event_time.replace(tzinfo=timezone.utc)

    def create_monitoring_event_subscription(
        self, retrieve_location_request: schemas.RetrievalLocationRequest
    ) -> schemas.Location:
        """
        Creates a Monitoring Event subscription based on CAMARA Location API input.

        args:
            retrieve_location_request: Dictionary containing location retrieval details conforming to
                                        the CAMARA Location API parameters.

        returns:
            dictionary containing the created subscription details, including its ID.
        """
        subscription = self._build_monitoring_event_subscription(
            retrieve_location_request
        )
        response = common.monitoring_event_post(
            self.base_url, self.scs_as_id, subscription
        )

        monitoring_event_report = schemas.MonitoringEventReport(**response)
        if monitoring_event_report.locationInfo is None:
            log.error(
                "Failed to retrieve location information from monitoring event report"
            )
            raise NetworkPlatformError(
                "Location information not found in monitoring event report"
            )
        geo_area = monitoring_event_report.locationInfo.geographicArea
        report_event_time = monitoring_event_report.eventTime
        age_of_location_info = None
        if monitoring_event_report.locationInfo.ageOfLocationInfo is not None:
            age_of_location_info = (
                monitoring_event_report.locationInfo.ageOfLocationInfo.duration
            )
        last_location_time = self._compute_camara_last_location_time(
            report_event_time, age_of_location_info
        )
        log.debug(f"Last Location time is {last_location_time}")
        camara_point_list: list[schemas.Point] = []
        for point in geo_area.polygon.point_list.geographical_coords:
            camara_point_list.append(
                schemas.Point(latitude=point.lat, longitude=point.lon)
            )
        camara_polygon = schemas.Polygon(
            areaType=schemas.AreaType.polygon,
            boundary=schemas.PointList(camara_point_list),
        )

        camara_location = schemas.Location(
            area=camara_polygon, lastLocationTime=last_location_time
        )

        return camara_location

    def create_qod_session(self, session_info: Dict) -> Dict:
        """
        Creates a QoS session based on CAMARA QoD API input.
@@ -345,4 +465,5 @@ class BaseNetworkClient:
        r = common.traffic_influence_get(self.base_url, self.scs_as_id)
        return [self._build_camara_ti(item) for item in r]

    # Placeholder for other CAMARA APIs (e.g: Location-retrieval, etc.)

# Placeholder for other CAMARA APIs
+17 −0
Original line number Diff line number Diff line
@@ -30,6 +30,23 @@ def _make_request(method: str, url: str, data=None):
        raise CoreHttpError("connection error") from e


# Monitoring Event Methods
def monitoring_event_post(
    base_url: str, scs_as_id: str, model_payload: BaseModel
) -> dict:
    data = model_payload.model_dump_json(exclude_none=True, by_alias=True)
    url = monitoring_event_build_url(base_url, scs_as_id)
    return _make_request("POST", url, data=data)


def monitoring_event_build_url(base_url: str, scs_as_id: str, session_id: str = None):
    url = f"{base_url}/3gpp-monitoring-event/v1/{scs_as_id}/subscriptions"
    if session_id is not None and len(session_id) > 0:
        return f"{url}/{session_id}"
    else:
        return url


# QoD methods
def as_session_with_qos_post(
    base_url: str, scs_as_id: str, model_payload: BaseModel
+309 −4
Original line number Diff line number Diff line
@@ -7,10 +7,18 @@ import ipaddress
from datetime import datetime
from enum import Enum
from ipaddress import IPv4Address, IPv6Address
from typing import Annotated
from typing import Annotated, Literal
from uuid import UUID

from pydantic import AnyUrl, BaseModel, ConfigDict, Field, NonNegativeInt, RootModel
from pydantic import (
    AnyHttpUrl,
    AnyUrl,
    BaseModel,
    ConfigDict,
    Field,
    NonNegativeInt,
    RootModel,
)
from pydantic_extra_types.mac_address import MacAddress

from sunrise6g_opensdk.logger import setup_logger
@@ -213,11 +221,11 @@ class TrafficInfluSub(BaseModel): # Replace with a meaningful name
    )
    suppFeat: str | None = None

    def add_flow_descriptor(self, flow_desriptor: str):
    def add_flow_descriptor(self, flow_descriptor: str):
        self.trafficFilters = list()
        self.trafficFilters.append(
            FlowInfo(
                flowId=len(self.trafficFilters) + 1, flowDescriptions=[flow_desriptor]
                flowId=len(self.trafficFilters) + 1, flowDescriptions=[flow_descriptor]
            )
        )

@@ -229,6 +237,197 @@ class TrafficInfluSub(BaseModel): # Replace with a meaningful name
        self.snssai = Snssai(sst=sst, sd=sd)


# Monitoring Event API


class DurationMin(BaseModel):
    duration: int = Field(
        0,
        description="Unsigned integer identifying a period of time in units of minutes",
        ge=0,
    )


class PlmnId(BaseModel):
    mcc: str = Field(
        ...,
        description="String encoding a Mobile Country Code, comprising of 3 digits.",
    )
    mnc: str = Field(
        ...,
        description="String encoding a Mobile Network Code, comprising of 2 or 3 digits.",
    )


# The enumeration Accuracy represents a desired granularity of accuracy of the requested location information.
class Accuracy(str, Enum):
    cgi_ecgi = (
        "CGI_ECGI"  # The AF requests to be notified using cell level location accuracy.
    )
    ta_ra = (
        "TA_RA"  # The AF requests to be notified using TA/RA level location accuracy.
    )
    geo_area = "GEO_AREA"  # The AF requests to be notified using the geographical area accuracy.
    civic_addr = "CIVIC_ADDR"  # The AF requests to be notified using the civic address accuracy. #EDGEAPP


# If locationType set to "LAST_KNOWN_LOCATION", the monitoring event request from AF shall be only for one-time monitoring request
class LocationType(str, Enum):
    CURRENT_LOCATION = (
        "CURRENT_LOCATION"  # The AF requests to be notified for current location.
    )
    LAST_KNOWN = (
        "LAST_KNOWN_LOCATION"  # The AF requests to be notified for last known location.
    )


# This data type represents a monitoring event type.
class MonitoringType(str, Enum):
    LOCATION_REPORTING = "LOCATION_REPORTING"


class LocationFailureCause(str, Enum):
    position_denied = "POSITIONING_DENIED"  # Positioning is denied.
    unsupported_by_ue = "UNSUPPORTED_BY_UE"  # Positioning is not supported by UE.
    not_registered_ue = "NOT_REGISTERED_UE"  # UE is not registered.
    unspecified = "UNSPECIFIED"  # Unspecified cause.


class GeographicalCoordinates(BaseModel):
    lon: float = Field(..., description="Longitude coordinate.")
    lat: float = Field(..., description="Latitude coordinate.")


class PointListNef(BaseModel):
    geographical_coords: list[GeographicalCoordinates] = Field(
        ...,
        description="List of geographical coordinates defining the points.",
        min_length=3,
        max_length=15,
    )


class NefPolygon(BaseModel):
    point_list: PointListNef = Field(
        ..., description="List of points defining the polygon."
    )


class GeographicArea(BaseModel):
    polygon: NefPolygon | None = Field(
        None, description="Identifies a polygonal geographic area."
    )


# This data type represents the user location information which is sent from the NEF to the AF.
class LocationInfo(BaseModel):
    ageOfLocationInfo: DurationMin | None = Field(
        None,
        description="Indicates the elapsed time since the last network contact of the UE.",
    )
    cellId: str | None = Field(None, description="Cell ID where the UE is located.")
    trackingAreaId: str | None = Field(
        None, description="TrackingArea ID where the UE is located."
    )
    enodeBId: str | None = Field(None, description="eNodeB ID where the UE is located.")
    routingAreaId: str | None = Field(
        None, description="Routing Area ID where the UE is located"
    )
    plmnId: PlmnId | None = Field(None, description="PLMN ID where the UE is located.")
    twanId: str | None = Field(None, description="TWAN ID where the UE is located.")
    geographicArea: GeographicArea | None = Field(
        None,
        description="Identifies a geographic area of the user where the UE is located.",
    )


class MonitoringEventSubscriptionRequest(BaseModel):
    accuracy: Accuracy | None = Field(
        None,
        description="Accuracy represents a desired granularity of accuracy of the requested location information.",
    )
    externalId: str | None = Field(
        None, description="Identifies a user clause 4.6.2 TS 23.682 (optional)"
    )
    msisdn: str | None = Field(
        None,
        description="Identifies the MS internal PSTN/ISDN number allocated for a UE.",
    )
    ipv4Addr: IPv4Address | None = Field(
        None, description="Identifies the Ipv4 address."
    )
    ipv6Addr: IPv6Address | None = Field(
        None, description="Identifies the Ipv6 address."
    )
    notificationDestination: AnyHttpUrl = Field(
        ...,
        description="URI of a notification destination that the T8 message shall be delivered to.",
    )
    monitoringType: MonitoringType = Field(
        ..., description="Enumeration of monitoring type. Refer to clause 5.3.2.4.3."
    )
    maximumNumberOfReports: int | None = Field(
        None,
        description="Identifies the maximum number of event reports to be generated by the AMF to the NEF and then the AF.",
    )
    monitorExpireTime: datetime | None = Field(
        None,
        description="Identifies the absolute time at which the related monitoring event request is considered to expire.",
    )
    locationType: LocationType | None = Field(
        None,
        description="Indicates whether the request is for Current Location, Initial Location, or Last Known Location.",
    )
    repPeriod: DurationSec | None = Field(
        None, description="Identifies the periodic time for the event reports."
    )
    minimumReportInterval: DurationSec | None = Field(
        None,
        description="identifies a minimum time interval between Location Reporting notifications",
    )


# This data type represents a monitoring event notification which is sent from the NEF to the AF.
class MonitoringEventReport(BaseModel):
    externalId: str | None = Field(
        None, description="Identifies a user, clause 4.6.2 TS 23.682"
    )
    msisdn: str | None = Field(
        None,
        description="Identifies the MS internal PSTN/ISDN number allocated for a UE.",
    )
    locationInfo: LocationInfo | None = Field(
        None, description="Indicates the user location related information."
    )
    locFailureCause: LocationFailureCause | None = Field(
        None, description="Indicates the location positioning failure cause."
    )
    monitoringType: MonitoringType = Field(
        ...,
        description="Identifies the type of monitoring as defined in clause 5.3.2.4.3.",
    )
    eventTime: datetime | None = Field(
        None,
        description="Identifies when the event is detected or received. Shall be included for each group of UEs.",
    )


# This data type represents a monitoring notification which is sent from the NEF to the AF.
class MonitoringNotification(BaseModel):
    subscription: AnyHttpUrl = Field(
        ...,
        description="Link to the subscription resource to which this notification is related.",
    )
    monitoringEventReports: list[MonitoringEventReport] | None = Field(
        None,
        description="Each element identifies a monitoring event report (optional).",
    )
    cancelInd: bool | None = Field(
        False,
        description="Indicates whether to request to cancel the corresponding monitoring subscription. Set to false or omitted otherwise.",
    )


###############################################################
###############################################################
# CAMARA Models
@@ -308,6 +507,112 @@ class Device(BaseModel):
    ipv6Address: DeviceIpv6Address | None = None


class RetrievalLocationRequest(BaseModel):
    """
    Request to retrieve the location of a device. Device is not required when using a 3-legged access token.
    """

    device: Annotated[
        Device | None,
        Field(None, description="End-user device able to connect to a mobile network."),
    ]
    maxAge: Annotated[
        int | None,
        Field(
            None,
            description="Maximum age of the location information which is accepted for the location retrieval (in seconds).",
        ),
    ]
    maxSurface: Annotated[
        int | None,
        Field(
            None,
            description="Maximum surface in square meters which is accepted by the client for the location retrieval.",
            ge=1,
            examples=[1000000],
        ),
    ]


class AreaType(str, Enum):
    circle = "CIRCLE"  # The area is defined as a circle.
    polygon = "POLYGON"  # The area is defined as a polygon.


class Point(BaseModel):
    latitude: Annotated[
        float,
        Field(
            description="Latitude component of a location.",
            examples=["50.735851"],
            ge=-90,
            le=90,
        ),
    ]
    longitude: Annotated[
        float,
        Field(
            ...,
            description="Longitude component of location.",
            examples=["7.10066"],
            ge=-180,
            le=180,
        ),
    ]


class PointList(
    RootModel[
        Annotated[
            list[Point],
            Field(
                min_length=3,
                max_length=15,
                description="List of points defining the area.",
            ),
        ]
    ]
):
    pass


class Circle(BaseModel):
    areaType: Literal[AreaType.circle]
    center: Annotated[Point, Field(description="Center point of the circle.")]
    radius: Annotated[float, Field(description="Radius of the circle.", ge=1)]


class Polygon(BaseModel):
    areaType: Literal[AreaType.polygon]
    boundary: Annotated[
        PointList, Field(description="List of points defining the polygon.")
    ]


Area = Annotated[Circle | Polygon, Field(discriminator="areaType")]


class LastLocationTime(
    RootModel[
        Annotated[
            datetime,
            Field(
                description="Last date and time when the device was localized.",
                examples="2023-09-07T10:40:52Z",
            ),
        ]
    ]
):
    pass


class Location(BaseModel):
    lastLocationTime: Annotated[
        LastLocationTime, Field(description="Last known location time.")
    ]
    area: Annotated[Area, Field(description="Geographical area of the location.")]


class ApplicationServerIpv4Address(RootModel[str]):
    root: Annotated[
        str,
+128 −0
Original line number Diff line number Diff line
# Contributors:
#   - Panagiotis Pavlidis (p.pavlidis@iit.demokritos.gr)
##

import pytest

from sunrise6g_opensdk.common.sdk import Sdk as sdkclient
from sunrise6g_opensdk.network.core.base_network_client import BaseNetworkClient
from sunrise6g_opensdk.network.core.common import CoreHttpError
from sunrise6g_opensdk.network.core.schemas import (
    AreaType,
    Device,
    Location,
    MonitoringEventSubscriptionRequest,
    Point,
    PointList,
    Polygon,
    RetrievalLocationRequest,
)

# --- Test config ---
client_specs = {
    "network": {
        "client_name": "open5gs",
        "base_url": "http://127.0.0.1:8000/",
        "scs_as_id": "af_1",
    }
}
clients = sdkclient.create_adapters_from(client_specs)
network_client: BaseNetworkClient = clients.get("network")


# Test full input data from Camara Payload
# {
#   "phoneNumber": "+1234567890",
#   "networkAccessIdentifier": "user123@example.com",
#   "ipv4Address": {
#     "publicAddress": "198.51.100.10",
#     "privateAddress": "10.0.0.1",
#     "publicPort": 12345
#   },
#   "ipv6Address": "2001:0db8:85a3:0000:0000:8a2e:0370:7334"
# }

camara_payload_input_data = RetrievalLocationRequest(
    device=Device(phoneNumber="+306912345678")
)


# Sample output test data 3GPP MonitoringEventSubscription Request Payload
# {
#   "msisdn": "+306912345678",
#   "notificationDestination": "https://af.example.com/location_notifications",
#   "monitoringType": "LOCATION_REPORTING",
#   "locationType": "CURRENT_LOCATION"
# }

output_msisdn = camara_payload_input_data.device.phoneNumber.root.lstrip("+")
expected_3gpp_output_data = MonitoringEventSubscriptionRequest(
    msisdn=output_msisdn,
    notificationDestination="http://127.0.0.1:8001",
    monitoringType="LOCATION_REPORTING",
    locationType="LAST_KNOWN_LOCATION",
)


# Example:
#
# {
#     "lastLocationTime": "2025-06-23T20:47:22Z",
#     "area": {
#         "areaType": "POLYGON",
#         "boundary": [
#         {
#             "latitude": 37.9838,
#             "longitude": 23.7275
#         },
#         {
#             "latitude": 37.98,
#             "longitude": 23.75
#         },
#         {
#             "latitude": 37.97,
#             "longitude": 23.73
#         },
#         {   "latitude": 37.975,
#             "longtitude": 23.71
#         }
#         ]
#     }
# }

point1 = Point(latitude=37.9838, longitude=23.7275)
point2 = Point(latitude=37.98, longitude=23.75)
point3 = Point(latitude=37.97, longitude=23.73)
point4 = Point(latitude=37.975, longitude=23.71)
point_list = PointList(root=[point1, point2, point3, point4])

polygon_area = Polygon(areaType=AreaType.polygon, boundary=point_list)

expected_camara_output_data = Location(
    lastLocationTime="2025-06-23T20:47:22Z",
    area=polygon_area,
)


# --- TEST CASES ---


def test_camara_tf_3gpp_event() -> None:
    actual_result = network_client._build_monitoring_event_subscription(
        retrieve_location_request=camara_payload_input_data
    )
    assert (
        actual_result == expected_3gpp_output_data
    ), f"Expected actual_result ({actual_result}) \n to be equal to expected_result ({expected_3gpp_output_data}), but they were not."


def test_create_monitoring_event():
    try:
        actual_result = network_client.create_monitoring_event_subscription(
            retrieve_location_request=camara_payload_input_data
        )
        assert (
            actual_result == expected_camara_output_data
        ), f"Expected actual_result ({actual_result}) \n to be equal to expected_result ({expected_camara_output_data}), but they were not."
    except CoreHttpError as e:
        pytest.fail(f"Failed to retrieve event report: {e}")