Loading src/network/clients/oai/client.py +64 −10 Original line number Diff line number Diff line Loading @@ -14,16 +14,19 @@ import shortuuid import time from pydantic import ValidationError from src.network.core.network_interface import NetworkManagementInterface from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription,CamaraTrafficInfluence, TrafficInfluSub from src.network.clients.oai.common import ( oai_as_session_with_qos_post, oai_as_session_with_qos_get, oai_as_session_with_qos_delete, oai_traffic_influence_post, oai_traffic_influence_delete, oai_traffic_influence_put, OaiHttpError, OaiNetworkError ) from src.network.clients.oai.utils import camara_qod_to_as_session_with_qos, as_session_with_qos_to_camara_qod from src.network.clients.oai.utils import camara_qod_to_as_session_with_qos, as_session_with_qos_to_camara_qod, camara_ti_to_3gpp_ti log = logger.get_logger(__name__) Loading Loading @@ -121,19 +124,70 @@ class OaiNefClient(NetworkManagementInterface): #implementation of the NetworkManagementInterface Traffic Influence Methods def create_traffic_influence_resource(self, traffic_influence_info): try: ti_input = CamaraTrafficInfluence(**traffic_influence_info) #convert CAMARA TI to NEF TrafficInflSub model and do POST nef_req = camara_ti_to_3gpp_ti(ti_input) nef_res = oai_traffic_influence_post(self.base_url, self.scs_as_id, nef_req) #retrieve the NEF resource id if "self" in nef_res.keys(): nef_url = nef_res["self"] nef_id = nef_url.split("subscriptions/")[1] else: raise OaiNetworkError("No valid ID for the created resource was returned") #create TI session detail and return info with resource Id ti_input.trafficInfluenceID = nef_id log.info(f"Traffic Influence session activated successfully [id={nef_id}]") return ti_input except ValidationError as e: raise OaiNetworkError("Could not validate Traffic Influence data") from e except KeyError as e: raise OaiNetworkError(f"Missing field in Traffic Influence data: {e}") from e except OaiHttpError as e: raise OaiNetworkError(f"The network could not enable the Traffic Influence Session. It returned {e}") from e except OaiNetworkError as e: raise log.error(f"create_traffic_influence_resource not implemented yet") raise NotImplementedError() def delete_traffic_influence_resource(self, session_id): """ Deletes a specific Traffic Influence (TI) session. It maps CAMARA TI API DELETE /sessions/{sessionId} to OAI NEF DELETE /3gpp-traffic-influence/v1/{scs_as_id}/subscriptions/{subscriptionId} """ try: oai_traffic_influence_delete(self.base_url, self.scs_as_id, session_id=session_id) log.info(f"TI session deleted successfully [id={session_id}]") def delete_traffic_influence_resource(self, resource_id): except OaiHttpError as e: raise OaiNetworkError(f"The network could not delete the TI session. It returned {e}") from e except OaiNetworkError as e: raise log.error(f"delete_traffic_influence_resource not implemented yet") def put_traffic_influence_resource(self, resource_id, traffic_influence_info): try: qod_input = CamaraTrafficInfluence(**traffic_influence_info) raise NotImplementedError() #convert CAMARA TI to NEF TrafficInflSub model and do POST nef_req = camara_ti_to_3gpp_ti(qod_input) updated_res = oai_traffic_influence_put(self.base_url, self.scs_as_id, resource_id, nef_req) def get_traffic_influence_resource(self, resource_id): log.info(f"Traffic Influence resource updated successfully [id={resource_id}]") log.error(f"get_traffic_influence_resource not implemented yet") return qod_input raise NotImplementedError() except ValidationError as e: raise OaiNetworkError("Could not validate Traffic Influence data") from e except KeyError as e: raise OaiNetworkError(f"Missing field in Traffic Influence data: {e}") from e except OaiHttpError as e: raise OaiNetworkError(f"The network could not update the Traffic Influence Session. It returned {e}") from e except OaiNetworkError as e: raise src/network/clients/oai/common.py +25 −1 Original line number Diff line number Diff line Loading @@ -18,7 +18,7 @@ import requests def _make_request(method: str, url: str, data=None): try: headers = None if method == 'POST': if method == 'POST' or method == 'PUT': headers = { "Content-Type": "application/json", "accept": "application/json", Loading Loading @@ -60,6 +60,30 @@ def oai_as_session_with_qos_build_url(base_url: str, scs_as_id: str, session_id: else: return url ## Traffic Influence methods def oai_traffic_influence_post(base_url: str, scs_as_id: str, model_payload: BaseModel) -> dict: data = model_payload.model_dump_json(exclude_none=True) url = oai_traffic_influence_build_url(base_url, scs_as_id) return _make_request("POST", url, data=data) def oai_traffic_influence_delete(base_url: str, scs_as_id: str, session_id: str): url = oai_traffic_influence_build_url(base_url, scs_as_id, session_id) return _make_request("DELETE", url) def oai_traffic_influence_put(base_url: str, scs_as_id: str, session_id: str, model_payload: BaseModel) -> dict: data = model_payload.model_dump_json(exclude_none=True) url = oai_traffic_influence_build_url(base_url, scs_as_id, session_id) return _make_request("PUT", url, data=data) def oai_traffic_influence_build_url(base_url: str, scs_as_id: str, session_id: str = None): url = f"{base_url}/3gpp-traffic-influence/v1/{scs_as_id}/subscriptions" if session_id != None and len(session_id) > 0: return f"{url}/{session_id}" else: return url class OaiHttpError(Exception): pass Loading src/network/clients/oai/schemas.py +75 −4 Original line number Diff line number Diff line Loading @@ -16,7 +16,8 @@ class Snssai(BaseModel): sst: int = Field(default=1) sd: str = Field(default="FFFFFF") class FlowInfoItem(BaseModel): class TrafficFilter(BaseModel): flowId: int flowDescriptions: List[str] Loading @@ -27,7 +28,7 @@ class OaiAsSessionWithQosSubscription(BaseModel): supportedFeatures: str = Field(default="12") dnn: str = Field(default="oai") snssai: Snssai flowInfo: List[FlowInfoItem] flowInfo: List[TrafficFilter] ueIpv4Addr: str notificationDestination: str qosReference: str Loading @@ -36,7 +37,7 @@ class OaiAsSessionWithQosSubscription(BaseModel): def add_flow_descriptor(self, flow_desriptor: str): self.flowInfo = list() self.flowInfo.append(FlowInfoItem( self.flowInfo.append(TrafficFilter( flowId=len(self.flowInfo)+1, flowDescriptions=[flow_desriptor] )) Loading Loading @@ -118,3 +119,73 @@ class CamaraQoDSessionInfo(BaseModel): self.device = Device() if self.device.ipv4Address is None: self.device.ipv4Address = Ipv4Address(publicAddress=ipv4) ## traffic_influence schemas class SourceTrafficFilters(BaseModel): sourcePort: int class DestinationTrafficFilters(BaseModel): destinationPort: int destinationProtocol: str class TrafficRoute(BaseModel): dnai: str class NotificationSink(BaseModel): sink: Optional[str] = None sinkCredential: Optional[SinkCredential] = None class TrafficInfluSub(BaseModel): # Replace with a meaningful name afServiceId: str afAppId: str dnn: str snssai: Snssai trafficFilters: List[TrafficFilter] ipv4Addr: str notificationDestination: str trafficRoutes: List[TrafficRoute] suppFeat: str def add_flow_descriptor(self, flow_desriptor: str): self.trafficFilters = list() self.trafficFilters.append(TrafficFilter( flowId=len(self.trafficFilters)+1, flowDescriptions=[flow_desriptor] )) def add_traffic_route(self, dnai: str): self.trafficRoutes = list() self.trafficRoutes.append(TrafficRoute( dnai=dnai )) def add_snssai(self, sst: int, sd: str = None): self.snssai = Snssai(sst=sst, sd=sd) class CamaraTrafficInfluence(BaseModel): trafficInfluenceID : Optional[str] = None apiConsumerId: str appId: str appInstanceId: str edgeCloudRegion: Optional[str] = None edgeCloudZoneId: str sourceTrafficFilters: Optional[SourceTrafficFilters] = None destinationTrafficFilters: Optional[DestinationTrafficFilters] = None notificationUri: Optional[str] = None notificationAuthToken: Optional[str] = None device: Device notificationSink: Optional[NotificationSink] = None def retrieve_ue_ipv4(self): if self.device is not None and self.device.ipv4Address is not None: return self.device.ipv4Address.publicAddress else: raise KeyError("device.ipv4Address.publicAddress") def add_ue_ipv4(self, ipv4: str): if self.device is None: self.device = Device() if self.device.ipv4Address is None: self.device.ipv4Address = Ipv4Address(publicAddress=ipv4) src/network/clients/oai/utils.py +23 −1 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ ## from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription, CamaraTrafficInfluence, TrafficInfluSub from pydantic import BaseModel def camara_qod_to_as_session_with_qos(qod_input: CamaraQoDSessionInfo) -> OaiAsSessionWithQosSubscription : Loading Loading @@ -53,3 +53,25 @@ def as_session_with_qos_to_camara_qod(nef_input: OaiAsSessionWithQosSubscription return qod_info def camara_ti_to_3gpp_ti(ti_input: CamaraTrafficInfluence) -> TrafficInfluSub: device_ip = ti_input.retrieve_ue_ipv4() server_ip = ti_input.appInstanceId #assume that the instance id corresponds to its IPv4 address sink_url = ti_input.notificationSink.sink edge_zone = ti_input.edgeCloudZoneId #build flow descriptor in oai format using device ip and server ip flow_descriptor = f"permit out ip from {device_ip}/32 to {server_ip}/32" nef_traffic_influence = TrafficInfluSub.model_construct() nef_traffic_influence.afAppId = ti_input.appId nef_traffic_influence.afServiceId = "aa" nef_traffic_influence.ipv4Addr = device_ip nef_traffic_influence.notificationDestination = sink_url nef_traffic_influence.add_flow_descriptor(flow_desriptor=flow_descriptor) nef_traffic_influence.add_traffic_route(dnai=edge_zone) nef_traffic_influence.add_snssai(1, "FFFFFF") nef_traffic_influence.dnn ="oai" return nef_traffic_influence No newline at end of file src/network/core/network_interface.py +1 −1 Original line number Diff line number Diff line Loading @@ -81,7 +81,7 @@ class NetworkManagementInterface(ABC): @abstractmethod def get_traffic_influence_resource(self, resource_id: str) -> Dict: def put_traffic_influence_resource(self, resource_id: str, traffic_influence_info: Dict) -> Dict: """ Retrieves details of a specific Traffic Influence resource. Loading Loading
src/network/clients/oai/client.py +64 −10 Original line number Diff line number Diff line Loading @@ -14,16 +14,19 @@ import shortuuid import time from pydantic import ValidationError from src.network.core.network_interface import NetworkManagementInterface from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription,CamaraTrafficInfluence, TrafficInfluSub from src.network.clients.oai.common import ( oai_as_session_with_qos_post, oai_as_session_with_qos_get, oai_as_session_with_qos_delete, oai_traffic_influence_post, oai_traffic_influence_delete, oai_traffic_influence_put, OaiHttpError, OaiNetworkError ) from src.network.clients.oai.utils import camara_qod_to_as_session_with_qos, as_session_with_qos_to_camara_qod from src.network.clients.oai.utils import camara_qod_to_as_session_with_qos, as_session_with_qos_to_camara_qod, camara_ti_to_3gpp_ti log = logger.get_logger(__name__) Loading Loading @@ -121,19 +124,70 @@ class OaiNefClient(NetworkManagementInterface): #implementation of the NetworkManagementInterface Traffic Influence Methods def create_traffic_influence_resource(self, traffic_influence_info): try: ti_input = CamaraTrafficInfluence(**traffic_influence_info) #convert CAMARA TI to NEF TrafficInflSub model and do POST nef_req = camara_ti_to_3gpp_ti(ti_input) nef_res = oai_traffic_influence_post(self.base_url, self.scs_as_id, nef_req) #retrieve the NEF resource id if "self" in nef_res.keys(): nef_url = nef_res["self"] nef_id = nef_url.split("subscriptions/")[1] else: raise OaiNetworkError("No valid ID for the created resource was returned") #create TI session detail and return info with resource Id ti_input.trafficInfluenceID = nef_id log.info(f"Traffic Influence session activated successfully [id={nef_id}]") return ti_input except ValidationError as e: raise OaiNetworkError("Could not validate Traffic Influence data") from e except KeyError as e: raise OaiNetworkError(f"Missing field in Traffic Influence data: {e}") from e except OaiHttpError as e: raise OaiNetworkError(f"The network could not enable the Traffic Influence Session. It returned {e}") from e except OaiNetworkError as e: raise log.error(f"create_traffic_influence_resource not implemented yet") raise NotImplementedError() def delete_traffic_influence_resource(self, session_id): """ Deletes a specific Traffic Influence (TI) session. It maps CAMARA TI API DELETE /sessions/{sessionId} to OAI NEF DELETE /3gpp-traffic-influence/v1/{scs_as_id}/subscriptions/{subscriptionId} """ try: oai_traffic_influence_delete(self.base_url, self.scs_as_id, session_id=session_id) log.info(f"TI session deleted successfully [id={session_id}]") def delete_traffic_influence_resource(self, resource_id): except OaiHttpError as e: raise OaiNetworkError(f"The network could not delete the TI session. It returned {e}") from e except OaiNetworkError as e: raise log.error(f"delete_traffic_influence_resource not implemented yet") def put_traffic_influence_resource(self, resource_id, traffic_influence_info): try: qod_input = CamaraTrafficInfluence(**traffic_influence_info) raise NotImplementedError() #convert CAMARA TI to NEF TrafficInflSub model and do POST nef_req = camara_ti_to_3gpp_ti(qod_input) updated_res = oai_traffic_influence_put(self.base_url, self.scs_as_id, resource_id, nef_req) def get_traffic_influence_resource(self, resource_id): log.info(f"Traffic Influence resource updated successfully [id={resource_id}]") log.error(f"get_traffic_influence_resource not implemented yet") return qod_input raise NotImplementedError() except ValidationError as e: raise OaiNetworkError("Could not validate Traffic Influence data") from e except KeyError as e: raise OaiNetworkError(f"Missing field in Traffic Influence data: {e}") from e except OaiHttpError as e: raise OaiNetworkError(f"The network could not update the Traffic Influence Session. It returned {e}") from e except OaiNetworkError as e: raise
src/network/clients/oai/common.py +25 −1 Original line number Diff line number Diff line Loading @@ -18,7 +18,7 @@ import requests def _make_request(method: str, url: str, data=None): try: headers = None if method == 'POST': if method == 'POST' or method == 'PUT': headers = { "Content-Type": "application/json", "accept": "application/json", Loading Loading @@ -60,6 +60,30 @@ def oai_as_session_with_qos_build_url(base_url: str, scs_as_id: str, session_id: else: return url ## Traffic Influence methods def oai_traffic_influence_post(base_url: str, scs_as_id: str, model_payload: BaseModel) -> dict: data = model_payload.model_dump_json(exclude_none=True) url = oai_traffic_influence_build_url(base_url, scs_as_id) return _make_request("POST", url, data=data) def oai_traffic_influence_delete(base_url: str, scs_as_id: str, session_id: str): url = oai_traffic_influence_build_url(base_url, scs_as_id, session_id) return _make_request("DELETE", url) def oai_traffic_influence_put(base_url: str, scs_as_id: str, session_id: str, model_payload: BaseModel) -> dict: data = model_payload.model_dump_json(exclude_none=True) url = oai_traffic_influence_build_url(base_url, scs_as_id, session_id) return _make_request("PUT", url, data=data) def oai_traffic_influence_build_url(base_url: str, scs_as_id: str, session_id: str = None): url = f"{base_url}/3gpp-traffic-influence/v1/{scs_as_id}/subscriptions" if session_id != None and len(session_id) > 0: return f"{url}/{session_id}" else: return url class OaiHttpError(Exception): pass Loading
src/network/clients/oai/schemas.py +75 −4 Original line number Diff line number Diff line Loading @@ -16,7 +16,8 @@ class Snssai(BaseModel): sst: int = Field(default=1) sd: str = Field(default="FFFFFF") class FlowInfoItem(BaseModel): class TrafficFilter(BaseModel): flowId: int flowDescriptions: List[str] Loading @@ -27,7 +28,7 @@ class OaiAsSessionWithQosSubscription(BaseModel): supportedFeatures: str = Field(default="12") dnn: str = Field(default="oai") snssai: Snssai flowInfo: List[FlowInfoItem] flowInfo: List[TrafficFilter] ueIpv4Addr: str notificationDestination: str qosReference: str Loading @@ -36,7 +37,7 @@ class OaiAsSessionWithQosSubscription(BaseModel): def add_flow_descriptor(self, flow_desriptor: str): self.flowInfo = list() self.flowInfo.append(FlowInfoItem( self.flowInfo.append(TrafficFilter( flowId=len(self.flowInfo)+1, flowDescriptions=[flow_desriptor] )) Loading Loading @@ -118,3 +119,73 @@ class CamaraQoDSessionInfo(BaseModel): self.device = Device() if self.device.ipv4Address is None: self.device.ipv4Address = Ipv4Address(publicAddress=ipv4) ## traffic_influence schemas class SourceTrafficFilters(BaseModel): sourcePort: int class DestinationTrafficFilters(BaseModel): destinationPort: int destinationProtocol: str class TrafficRoute(BaseModel): dnai: str class NotificationSink(BaseModel): sink: Optional[str] = None sinkCredential: Optional[SinkCredential] = None class TrafficInfluSub(BaseModel): # Replace with a meaningful name afServiceId: str afAppId: str dnn: str snssai: Snssai trafficFilters: List[TrafficFilter] ipv4Addr: str notificationDestination: str trafficRoutes: List[TrafficRoute] suppFeat: str def add_flow_descriptor(self, flow_desriptor: str): self.trafficFilters = list() self.trafficFilters.append(TrafficFilter( flowId=len(self.trafficFilters)+1, flowDescriptions=[flow_desriptor] )) def add_traffic_route(self, dnai: str): self.trafficRoutes = list() self.trafficRoutes.append(TrafficRoute( dnai=dnai )) def add_snssai(self, sst: int, sd: str = None): self.snssai = Snssai(sst=sst, sd=sd) class CamaraTrafficInfluence(BaseModel): trafficInfluenceID : Optional[str] = None apiConsumerId: str appId: str appInstanceId: str edgeCloudRegion: Optional[str] = None edgeCloudZoneId: str sourceTrafficFilters: Optional[SourceTrafficFilters] = None destinationTrafficFilters: Optional[DestinationTrafficFilters] = None notificationUri: Optional[str] = None notificationAuthToken: Optional[str] = None device: Device notificationSink: Optional[NotificationSink] = None def retrieve_ue_ipv4(self): if self.device is not None and self.device.ipv4Address is not None: return self.device.ipv4Address.publicAddress else: raise KeyError("device.ipv4Address.publicAddress") def add_ue_ipv4(self, ipv4: str): if self.device is None: self.device = Device() if self.device.ipv4Address is None: self.device.ipv4Address = Ipv4Address(publicAddress=ipv4)
src/network/clients/oai/utils.py +23 −1 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ ## from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription from src.network.clients.oai.schemas import CamaraQoDSessionInfo, OaiAsSessionWithQosSubscription, CamaraTrafficInfluence, TrafficInfluSub from pydantic import BaseModel def camara_qod_to_as_session_with_qos(qod_input: CamaraQoDSessionInfo) -> OaiAsSessionWithQosSubscription : Loading Loading @@ -53,3 +53,25 @@ def as_session_with_qos_to_camara_qod(nef_input: OaiAsSessionWithQosSubscription return qod_info def camara_ti_to_3gpp_ti(ti_input: CamaraTrafficInfluence) -> TrafficInfluSub: device_ip = ti_input.retrieve_ue_ipv4() server_ip = ti_input.appInstanceId #assume that the instance id corresponds to its IPv4 address sink_url = ti_input.notificationSink.sink edge_zone = ti_input.edgeCloudZoneId #build flow descriptor in oai format using device ip and server ip flow_descriptor = f"permit out ip from {device_ip}/32 to {server_ip}/32" nef_traffic_influence = TrafficInfluSub.model_construct() nef_traffic_influence.afAppId = ti_input.appId nef_traffic_influence.afServiceId = "aa" nef_traffic_influence.ipv4Addr = device_ip nef_traffic_influence.notificationDestination = sink_url nef_traffic_influence.add_flow_descriptor(flow_desriptor=flow_descriptor) nef_traffic_influence.add_traffic_route(dnai=edge_zone) nef_traffic_influence.add_snssai(1, "FFFFFF") nef_traffic_influence.dnn ="oai" return nef_traffic_influence No newline at end of file
src/network/core/network_interface.py +1 −1 Original line number Diff line number Diff line Loading @@ -81,7 +81,7 @@ class NetworkManagementInterface(ABC): @abstractmethod def get_traffic_influence_resource(self, resource_id: str) -> Dict: def put_traffic_influence_resource(self, resource_id: str, traffic_influence_info: Dict) -> Dict: """ Retrieves details of a specific Traffic Influence resource. Loading