Loading examples/example.py +1 −1 Original line number Diff line number Diff line Loading @@ -30,7 +30,7 @@ def main(): # print(zones) # Network # print("Testing network client function: EXAMPLE_FUNCTION:") # print("Testing network client function: 'get_qod_session'") # network_client.get_qod_session(session_id="example_session_id") Loading src/sunrise6g_opensdk/logger.py +0 −3 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- ## # Copyright 2025-present by Software Networks Area, i2CAT. # All rights reserved. # # This file is part of the Open SDK # # Contributors: Loading src/sunrise6g_opensdk/network/adapters/oai/client.py +7 −23 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- ## # Loading @@ -14,8 +13,6 @@ from sunrise6g_opensdk.network.core.schemas import ( CreateSession, CreateTrafficInfluence, FlowInfo, MonitoringEventSubscriptionRequest, RetrievalLocationRequest, Snssai, TrafficInfluSub, ) Loading @@ -25,13 +22,14 @@ supportedQos = ["qos-e", "qos-s", "qos-m", "qos-l"] class NetworkManager(BaseNetworkClient): def __init__(self, base_url: str, scs_as_id: str = None): """ Initialize Network Client for OAI Core Network The currently supported features are: - QoD - Traffic Influence This client implements the BaseNetworkClient and translates the CAMARA APIs into specific HTTP requests understandable by the OAI NEF API. """ capabilities = {"qod", "traffic_influence"} def __init__(self, base_url: str, scs_as_id: str = None): try: super().__init__() self.base_url = base_url Loading Loading @@ -113,20 +111,6 @@ class NetworkManager(BaseNetworkClient): "OAI requires UE IPv4 Address to activate Traffic Influence" ) def core_specific_monitoring_event_validation( self, retrieve_location_request: RetrievalLocationRequest ) -> None: raise NotImplementedError( "core_specific_monitoring_event_validation not implemented for OAI" ) def add_core_specific_location_parameters( self, retrieve_location_request: RetrievalLocationRequest ) -> MonitoringEventSubscriptionRequest: raise NotImplementedError( "add_core_specific_location_parameters not implemented for OAI" ) def _retrieve_ue_ipv4(session_info: CreateSession): return session_info.device.ipv4Address.root.privateAddress Loading src/sunrise6g_opensdk/network/adapters/open5gcore/client.py +14 −30 Original line number Diff line number Diff line # -*- coding: utf-8 -*- ## # # This file is part of the Open SDK # # Contributors: # - Manar Zaboub (manar.zaboub@fokus.fraunhofer.de) ## from pydantic import ValidationError from sunrise6g_opensdk import logger Loading @@ -20,6 +27,13 @@ qos_support_map = { class NetworkManager(BaseNetworkClient): """ This client implements the BaseNetworkClient and translates the CAMARA APIs into specific HTTP requests understandable by the Open5GCore NEF API. """ capabilities = {"qod"} def __init__(self, base_url: str, scs_as_id: str): if not base_url: raise ValueError("base_url is required and cannot be empty.") Loading Loading @@ -47,33 +61,3 @@ class NetworkManager(BaseNetworkClient): flow_id = qos_support_map[session_info.qosProfile.root] subscription.flowInfo = build_flows(flow_id, session_info) subscription.ueIpv4Addr = "192.168.6.1" # ToDo def add_core_specific_ti_parameters( self, traffic_influence_info: schemas.CreateTrafficInfluence, subscription: schemas.TrafficInfluSub, ): raise NotImplementedError( "add_core_specific_ti_parameters not implemented for Open5GCore" ) def core_specific_traffic_influence_validation( self, traffic_influence_info: schemas.CreateTrafficInfluence ) -> None: raise NotImplementedError( "core_specific_traffic_influence_validation not implemented for Open5GCore" ) def core_specific_monitoring_event_validation( self, retrieve_location_request: schemas.RetrievalLocationRequest ) -> None: raise NotImplementedError( "core_specific_monitoring_event_validation not implemented for Open5GCore" ) def add_core_specific_location_parameters( self, retrieve_location_request: schemas.RetrievalLocationRequest ) -> schemas.MonitoringEventSubscriptionRequest: raise NotImplementedError( "add_core_specific_location_parameters not implemented for Open5GCore" ) src/sunrise6g_opensdk/network/adapters/open5gs/client.py +6 −15 Original line number Diff line number Diff line # -*- coding: utf-8 -*- ## # # This file is part of the Open SDK # # Contributors: # - Ferran Cañellas (ferran.canellas@i2cat.net) # - Panagiotis Pavlidis (p.pavlidis@iit.demokritos.gr) Loading @@ -23,14 +26,10 @@ class NetworkManager(BaseNetworkClient): """ This client implements the BaseNetworkClient and translates the CAMARA APIs into specific HTTP requests understandable by the Open5GS NEF API. Invloved partners and their roles in this implementation: - I2CAT: Responsible for the CAMARA QoD API and its mapping to the 3GPP AsSessionWithQoS API exposed by Open5GS NEF. - NCSRD: Responsible for the CAMARA Location API and its mapping to the 3GPP Monitoring Event API exposed Open5GS NEF. """ capabilities = {"qod", "location_retrieval"} def __init__(self, base_url: str, scs_as_id): """ Initializes the Open5GS Client. Loading Loading @@ -86,11 +85,3 @@ class NetworkManager(BaseNetworkClient): # locationType = schemas.LocationType.CURRENT_LOCATION # maximumNumberOfReports = 1 # repPeriod = schemas.DurationSec(root=20) # Note: # As this class is inheriting from BaseNetworkClient, it is # expected to implement all the abstract methods defined in that interface. # # In case this network adapter doesn't support a specific method, it should # be marked as NotImplementedError. Loading
examples/example.py +1 −1 Original line number Diff line number Diff line Loading @@ -30,7 +30,7 @@ def main(): # print(zones) # Network # print("Testing network client function: EXAMPLE_FUNCTION:") # print("Testing network client function: 'get_qod_session'") # network_client.get_qod_session(session_id="example_session_id") Loading
src/sunrise6g_opensdk/logger.py +0 −3 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- ## # Copyright 2025-present by Software Networks Area, i2CAT. # All rights reserved. # # This file is part of the Open SDK # # Contributors: Loading
src/sunrise6g_opensdk/network/adapters/oai/client.py +7 −23 Original line number Diff line number Diff line #!/usr/bin/env python3 # -*- coding: utf-8 -*- ## # Loading @@ -14,8 +13,6 @@ from sunrise6g_opensdk.network.core.schemas import ( CreateSession, CreateTrafficInfluence, FlowInfo, MonitoringEventSubscriptionRequest, RetrievalLocationRequest, Snssai, TrafficInfluSub, ) Loading @@ -25,13 +22,14 @@ supportedQos = ["qos-e", "qos-s", "qos-m", "qos-l"] class NetworkManager(BaseNetworkClient): def __init__(self, base_url: str, scs_as_id: str = None): """ Initialize Network Client for OAI Core Network The currently supported features are: - QoD - Traffic Influence This client implements the BaseNetworkClient and translates the CAMARA APIs into specific HTTP requests understandable by the OAI NEF API. """ capabilities = {"qod", "traffic_influence"} def __init__(self, base_url: str, scs_as_id: str = None): try: super().__init__() self.base_url = base_url Loading Loading @@ -113,20 +111,6 @@ class NetworkManager(BaseNetworkClient): "OAI requires UE IPv4 Address to activate Traffic Influence" ) def core_specific_monitoring_event_validation( self, retrieve_location_request: RetrievalLocationRequest ) -> None: raise NotImplementedError( "core_specific_monitoring_event_validation not implemented for OAI" ) def add_core_specific_location_parameters( self, retrieve_location_request: RetrievalLocationRequest ) -> MonitoringEventSubscriptionRequest: raise NotImplementedError( "add_core_specific_location_parameters not implemented for OAI" ) def _retrieve_ue_ipv4(session_info: CreateSession): return session_info.device.ipv4Address.root.privateAddress Loading
src/sunrise6g_opensdk/network/adapters/open5gcore/client.py +14 −30 Original line number Diff line number Diff line # -*- coding: utf-8 -*- ## # # This file is part of the Open SDK # # Contributors: # - Manar Zaboub (manar.zaboub@fokus.fraunhofer.de) ## from pydantic import ValidationError from sunrise6g_opensdk import logger Loading @@ -20,6 +27,13 @@ qos_support_map = { class NetworkManager(BaseNetworkClient): """ This client implements the BaseNetworkClient and translates the CAMARA APIs into specific HTTP requests understandable by the Open5GCore NEF API. """ capabilities = {"qod"} def __init__(self, base_url: str, scs_as_id: str): if not base_url: raise ValueError("base_url is required and cannot be empty.") Loading Loading @@ -47,33 +61,3 @@ class NetworkManager(BaseNetworkClient): flow_id = qos_support_map[session_info.qosProfile.root] subscription.flowInfo = build_flows(flow_id, session_info) subscription.ueIpv4Addr = "192.168.6.1" # ToDo def add_core_specific_ti_parameters( self, traffic_influence_info: schemas.CreateTrafficInfluence, subscription: schemas.TrafficInfluSub, ): raise NotImplementedError( "add_core_specific_ti_parameters not implemented for Open5GCore" ) def core_specific_traffic_influence_validation( self, traffic_influence_info: schemas.CreateTrafficInfluence ) -> None: raise NotImplementedError( "core_specific_traffic_influence_validation not implemented for Open5GCore" ) def core_specific_monitoring_event_validation( self, retrieve_location_request: schemas.RetrievalLocationRequest ) -> None: raise NotImplementedError( "core_specific_monitoring_event_validation not implemented for Open5GCore" ) def add_core_specific_location_parameters( self, retrieve_location_request: schemas.RetrievalLocationRequest ) -> schemas.MonitoringEventSubscriptionRequest: raise NotImplementedError( "add_core_specific_location_parameters not implemented for Open5GCore" )
src/sunrise6g_opensdk/network/adapters/open5gs/client.py +6 −15 Original line number Diff line number Diff line # -*- coding: utf-8 -*- ## # # This file is part of the Open SDK # # Contributors: # - Ferran Cañellas (ferran.canellas@i2cat.net) # - Panagiotis Pavlidis (p.pavlidis@iit.demokritos.gr) Loading @@ -23,14 +26,10 @@ class NetworkManager(BaseNetworkClient): """ This client implements the BaseNetworkClient and translates the CAMARA APIs into specific HTTP requests understandable by the Open5GS NEF API. Invloved partners and their roles in this implementation: - I2CAT: Responsible for the CAMARA QoD API and its mapping to the 3GPP AsSessionWithQoS API exposed by Open5GS NEF. - NCSRD: Responsible for the CAMARA Location API and its mapping to the 3GPP Monitoring Event API exposed Open5GS NEF. """ capabilities = {"qod", "location_retrieval"} def __init__(self, base_url: str, scs_as_id): """ Initializes the Open5GS Client. Loading Loading @@ -86,11 +85,3 @@ class NetworkManager(BaseNetworkClient): # locationType = schemas.LocationType.CURRENT_LOCATION # maximumNumberOfReports = 1 # repPeriod = schemas.DurationSec(root=20) # Note: # As this class is inheriting from BaseNetworkClient, it is # expected to implement all the abstract methods defined in that interface. # # In case this network adapter doesn't support a specific method, it should # be marked as NotImplementedError.