Loading src/network/clients/open5gs/client.py +0 −38 Original line number Diff line number Diff line Loading @@ -52,44 +52,6 @@ class NetworkManager(NetworkManagementInterface): flow_id = flow_id_mapping[session_info.qosProfile] session_info.flowInfo = build_flows(flow_id, session_info) # --- Implementation of NetworkManagementInterface methods --- def create_qod_session(self, session_info: Dict) -> Dict: """ Creates a QoD session based on the CAMARA QoD API input. Maps the CAMARA QoD POST /sessions to Open5GS NEF POST /{scsAsId}/subscriptions. """ url = f"{self.base_url}/{self.scs_as_id}/subscriptions" # Raises ValidationError if the object is not valid. valid_session_info = schemas.CreateSession.model_validate(session_info) subscription = schemas.AsSessionWithQoSSubscription( notificationDestination=valid_session_info.sink, qosReference=valid_session_info.qosProfile, ueIpv4Addr=valid_session_info.device.ipv4Address, ueIpv6Addr=valid_session_info.device.ipv6Address, usageThreshold=schemas.UsageThreshold(duration=valid_session_info.duration), ) self.add_core_specific_parameters(subscription) common.open5gs_post(url, subscription) def get_qod_session(self, session_id: str) -> Dict: """ Retrieves a specific Open5GS QoS Subscription details. Maps CAMARA QoD GET /sessions/{sessionId} to Open5GS NEF GET / {scsAsId}/subscriptions/{subscriptionId}. """ url = f"{self.base_url}/{self.scs_as_id}/subscriptions/{session_id}" common.open5gs_get(url) def delete_qod_session(self, session_id: str) -> None: """ Deletes a specific Open5GS QoS Subscription. Maps CAMARA QoD DELETE /sessions/{sessionId} to Open5GS NEF DELETE / {scsAsId}/subscriptions/{subscriptionId}. """ url = f"{self.base_url}/{self.scs_as_id}/subscriptions/{session_id}" common.open5gs_delete(url) # Note: # As this class is inheriting from NetworkManagementInterface, it is Loading Loading
src/network/clients/open5gs/client.py +0 −38 Original line number Diff line number Diff line Loading @@ -52,44 +52,6 @@ class NetworkManager(NetworkManagementInterface): flow_id = flow_id_mapping[session_info.qosProfile] session_info.flowInfo = build_flows(flow_id, session_info) # --- Implementation of NetworkManagementInterface methods --- def create_qod_session(self, session_info: Dict) -> Dict: """ Creates a QoD session based on the CAMARA QoD API input. Maps the CAMARA QoD POST /sessions to Open5GS NEF POST /{scsAsId}/subscriptions. """ url = f"{self.base_url}/{self.scs_as_id}/subscriptions" # Raises ValidationError if the object is not valid. valid_session_info = schemas.CreateSession.model_validate(session_info) subscription = schemas.AsSessionWithQoSSubscription( notificationDestination=valid_session_info.sink, qosReference=valid_session_info.qosProfile, ueIpv4Addr=valid_session_info.device.ipv4Address, ueIpv6Addr=valid_session_info.device.ipv6Address, usageThreshold=schemas.UsageThreshold(duration=valid_session_info.duration), ) self.add_core_specific_parameters(subscription) common.open5gs_post(url, subscription) def get_qod_session(self, session_id: str) -> Dict: """ Retrieves a specific Open5GS QoS Subscription details. Maps CAMARA QoD GET /sessions/{sessionId} to Open5GS NEF GET / {scsAsId}/subscriptions/{subscriptionId}. """ url = f"{self.base_url}/{self.scs_as_id}/subscriptions/{session_id}" common.open5gs_get(url) def delete_qod_session(self, session_id: str) -> None: """ Deletes a specific Open5GS QoS Subscription. Maps CAMARA QoD DELETE /sessions/{sessionId} to Open5GS NEF DELETE / {scsAsId}/subscriptions/{subscriptionId}. """ url = f"{self.base_url}/{self.scs_as_id}/subscriptions/{session_id}" common.open5gs_delete(url) # Note: # As this class is inheriting from NetworkManagementInterface, it is Loading