Commit 02ac01d7 authored by Adrian Pino's avatar Adrian Pino
Browse files

Add missing location-retrieval related functions

parent deea5fb8
Loading
Loading
Loading
Loading
+94 −0
Original line number Diff line number Diff line
@@ -10,10 +10,12 @@
#   - Panagiotis Pavlidis (p.pavlidis@iit.demokritos.gr)
##
import uuid
from datetime import datetime, timedelta, timezone
from itertools import product
from typing import Dict

from sunrise6g_opensdk import logger
from sunrise6g_opensdk.network.adapters.errors import NetworkPlatformError
from sunrise6g_opensdk.network.core import common, schemas

log = logger.get_logger(__name__)
@@ -227,6 +229,98 @@ class BaseNetworkClient:
        )
        return camara_ti

    def _build_monitoring_event_subscription(
        self, retrieve_location_request: schemas.RetrievalLocationRequest
    ) -> schemas.MonitoringEventSubscriptionRequest:
        self.core_specific_monitoring_event_validation(retrieve_location_request)
        subscription_3gpp = self.add_core_specific_location_parameters(
            retrieve_location_request
        )
        device = retrieve_location_request.device
        subscription_3gpp.externalId = device.networkAccessIdentifier
        subscription_3gpp.ipv4Addr = device.ipv4Address
        subscription_3gpp.ipv6Addr = device.ipv6Address
        # subscription.msisdn = device.phoneNumber.root.lstrip('+')
        # subscription.notificationDestination = "http://127.0.0.1:8001"

        return subscription_3gpp

    def _compute_camara_last_location_time(
        self, event_time: datetime, age_of_location_info_min: int = None
    ) -> datetime:
        """
        Computes the last location time based on the event time and age of location info.

        args:
            event_time: ISO 8601 datetime, e.g. "2025-06-18T12:30:00Z"
            age_of_location_info_min: unsigned int, age of location info in minutes

        returns:
            datetime object representing the last location time in UTC.
        """
        if age_of_location_info_min is not None:
            last_location_time = event_time - timedelta(
                minutes=age_of_location_info_min
            )
            return last_location_time.replace(tzinfo=timezone.utc)
        else:
            return event_time.replace(tzinfo=timezone.utc)

    def create_monitoring_event_subscription(
        self, retrieve_location_request: schemas.RetrievalLocationRequest
    ) -> schemas.Location:
        """
        Creates a Monitoring Event subscription based on CAMARA Location API input.

        args:
            retrieve_location_request: Dictionary containing location retrieval details conforming to
                                        the CAMARA Location API parameters.

        returns:
            dictionary containing the created subscription details, including its ID.
        """
        subscription = self._build_monitoring_event_subscription(
            retrieve_location_request
        )
        response = common.monitoring_event_post(
            self.base_url, self.scs_as_id, subscription
        )

        monitoring_event_report = schemas.MonitoringEventReport(**response)
        if monitoring_event_report.locationInfo is None:
            log.error(
                "Failed to retrieve location information from monitoring event report"
            )
            raise NetworkPlatformError(
                "Location information not found in monitoring event report"
            )
        geo_area = monitoring_event_report.locationInfo.geographicArea
        report_event_time = monitoring_event_report.eventTime
        age_of_location_info = None
        if monitoring_event_report.locationInfo.ageOfLocationInfo is not None:
            age_of_location_info = (
                monitoring_event_report.locationInfo.ageOfLocationInfo.duration
            )
        last_location_time = self._compute_camara_last_location_time(
            report_event_time, age_of_location_info
        )
        log.debug(f"Last Location time is {last_location_time}")
        camara_point_list: list[schemas.Point] = []
        for point in geo_area.polygon.point_list.geographical_coords:
            camara_point_list.append(
                schemas.Point(latitude=point.lat, longitude=point.lon)
            )
        camara_polygon = schemas.Polygon(
            areaType=schemas.AreaType.polygon,
            boundary=schemas.PointList(camara_point_list),
        )

        camara_location = schemas.Location(
            area=camara_polygon, lastLocationTime=last_location_time
        )

        return camara_location

    def create_qod_session(self, session_info: Dict) -> Dict:
        """
        Creates a QoS session based on CAMARA QoD API input.