Loading src/nbi/service/rest_server/nbi_plugins/camara_qod/Resources.py +9 −0 Original line number Diff line number Diff line Loading @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json import profile from flask.json import jsonify from flask_restful import Resource, request from enum import Enum Loading Loading @@ -115,6 +116,14 @@ class AllProfiles(Resource): def get(self): return jsonify(profiles) class Delete_all_profile(Resource): def delete(self): global profiles profile_count = len(profiles) profiles = [] return {"message": f"Deleted {profile_count} sessions."}, 200 class PortRange: def __init__(self, from_port, to_port): self.from_port = from_port Loading src/nbi/service/rest_server/nbi_plugins/camara_qod/__init__.py +3 −2 Original line number Diff line number Diff line Loading @@ -14,7 +14,8 @@ from nbi.service.rest_server.RestServer import RestServer from .Resources import ( Profile_delete_by_name, SessionList,SessionDetail,AllSessions,DeleteAllSessions,ProfileList,ProfileDetail,AllProfiles,Profile_delete_by_name Profile_delete_by_name, SessionList,SessionDetail,AllSessions,DeleteAllSessions,ProfileList,ProfileDetail,AllProfiles,Profile_delete_by_name, Delete_all_profile ) URL_PREFIX = '/camara/qod/v0' Loading @@ -31,7 +32,7 @@ RESOURCES = [ ('camara.qod.profile_detail',ProfileDetail,'/profiles/<string:profile_id>'), ('camara.qod.profile_all',AllProfiles,'/profiles/all'), ('camara.qod.profile_delete_by_name',Profile_delete_by_name,'/profiles/delete_by_name/<string:name>'), ('camara.qod.profile_delete_all',Delete_all_profile,'/profiles/delete_all/'), ] Loading src/nbi/tests/test_camara_qod.py +70 −71 Original line number Diff line number Diff line import pytest import requests BASE_URL = 'http://10.1.7.197/camara/qod/v0' # #@pytest.fixture #def test_create_profile(): # response = requests.post(f'{BASE_URL}/profiles', json={ # "name": "Test Profile", # "description": "This is a test profile", # "status": "ACTIVE", # "targetMinUpstreamRate": {"value": 1000, "unit": "Kbps"}, # "maxUpstreamRate": {"value": 5000, "unit": "Kbps"}, # "maxUpstreamBurstRate": {"value": 10000, "unit": "Kbps"}, # "targetMinDownstreamRate": {"value": 2000, "unit": "Kbps"}, # "maxDownstreamRate": {"value": 10000, "unit": "Kbps"}, # "maxDownstreamBurstRate": {"value": 20000, "unit": "Kbps"}, # "minDuration": {"value": 1, "unit": "hours"}, # "maxDuration": {"value": 24, "unit": "hours"}, # "priority": 1, # "packetDelayBudget": {"value": 100, "unit": "ms"}, # "jitter": {"value": 10, "unit": "ms"}, # "packetErrorLossRate": 0.01 # }) # global profile_id # profile_id = response.json()['profile_id'] # # # #def test_update_profile(): # response = requests.put(f'{BASE_URL}/profiles/b6c6a54e-069f-4b1c-95f8-03acc5c8970c', json={ # "description": "AM UPDATING THIS " # }) # # def test_create_profile(): response = requests.post(f'{BASE_URL}/profiles', json={ "name": "Test Profile", "description": "This is a test profile", "status": "ACTIVE", "targetMinUpstreamRate": {"value": 1000, "unit": "Kbps"}, "maxUpstreamRate": {"value": 5000, "unit": "Kbps"}, "maxUpstreamBurstRate": {"value": 10000, "unit": "Kbps"}, "targetMinDownstreamRate": {"value": 2000, "unit": "Kbps"}, "maxDownstreamRate": {"value": 10000, "unit": "Kbps"}, "maxDownstreamBurstRate": {"value": 20000, "unit": "Kbps"}, "minDuration": {"value": 1, "unit": "hours"}, "maxDuration": {"value": 24, "unit": "hours"}, "priority": 1, "packetDelayBudget": {"value": 100, "unit": "ms"}, "jitter": {"value": 10, "unit": "ms"}, "packetErrorLossRate": 0.01 }) def test_update_profile(): response = requests.put(f'{BASE_URL}/profiles/270a9c75-6a4e-452e-9dc5-804fc0204dcb', json={ "description": "AM UPDATING THIS ", "name":"test2" }) def test_delete_profile_by_name(): response = requests.delete(f'{BASE_URL}/profiles/delete_by_name/Test Profile') # #def test_create_session(): # response = requests.post(f'{BASE_URL}/sessions', json={ # "device": { # "phoneNumber": "1234567890", # "networkAccessIdentifier": "nai", # "ipv4Address": {"publicAddress": "84.125.93.10", "publicPort": 59765}, # "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" # }, # "applicationServer": { # "ipv4Address": "192.168.0.1/24", # "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" # }, # "devicePorts": { # "ranges": [{"from": 5010, "to": 5020}], # "ports": [5060, 5070] # }, # "applicationServerPorts": { # "ranges": [{"from": 6010, "to": 6020}], # "ports": [6060, 6070] # }, # "qosProfile": "Test Profile", # "webhook": { # "notificationUrl": "https://application-server.com", # "notificationAuthToken": "c8974e592c2fa383d4a3960714" # }, # "duration": {"value": 1, "unit": "hours"} # }) def test_create_session(): response = requests.post(f'{BASE_URL}/sessions', json={ "device": { "phoneNumber": "1234567890", "networkAccessIdentifier": "nai", "ipv4Address": {"publicAddress": "84.125.93.10", "publicPort": 59765}, "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" }, "applicationServer": { "ipv4Address": "192.168.0.1/24", "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" }, "devicePorts": { "ranges": [{"from": 5010, "to": 5020}], "ports": [5060, 5070] }, "applicationServerPorts": { "ranges": [{"from": 6010, "to": 6020}], "ports": [6060, 6070] }, "qosProfile": "Test Profile", "webhook": { "notificationUrl": "https://application-server.com", "notificationAuthToken": "c8974e592c2fa383d4a3960714" }, "duration": {"value": 1, "unit": "hours"} }) def test_update_session(): response = requests.put(f'{BASE_URL}/sessions/88c51c03-13bf-4542-b3f2-43a9edccfc95', json={ "duration": {"value": 32, "unit": "hours"} }) #def test_update_session(): # response = requests.put(f'{BASE_URL}/sessions/88c51c03-13bf-4542-b3f2-43a9edccfc95', json={ # "duration": {"value": 32, "unit": "hours"} # }) #def test_delete_session(): # response = requests.delete(f'{BASE_URL}/sessions/e1b53005-ffba-4e66-b7fe-0c5e022d8d7d') def test_delete_session(): response = requests.delete(f'{BASE_URL}/sessions/e1b53005-ffba-4e66-b7fe-0c5e022d8d7d') #def test_delete_all_sessions(): #BASE_URL = 'http://10.1.7.197/camara/qod/v0' #DELETE_ALL_URL = f"{BASE_URL}/sessions/delete_all" #response = requests.delete(DELETE_ALL_URL) #def test_delete_all_profiles(): #BASE_URL = 'http://10.1.7.197/camara/qod/v0' #DELETE_ALL_URL_Profile = f"{BASE_URL}/profiles/delete_all" #response = requests.delete(DELETE_ALL_URL_Profile) No newline at end of file Loading
src/nbi/service/rest_server/nbi_plugins/camara_qod/Resources.py +9 −0 Original line number Diff line number Diff line Loading @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json import profile from flask.json import jsonify from flask_restful import Resource, request from enum import Enum Loading Loading @@ -115,6 +116,14 @@ class AllProfiles(Resource): def get(self): return jsonify(profiles) class Delete_all_profile(Resource): def delete(self): global profiles profile_count = len(profiles) profiles = [] return {"message": f"Deleted {profile_count} sessions."}, 200 class PortRange: def __init__(self, from_port, to_port): self.from_port = from_port Loading
src/nbi/service/rest_server/nbi_plugins/camara_qod/__init__.py +3 −2 Original line number Diff line number Diff line Loading @@ -14,7 +14,8 @@ from nbi.service.rest_server.RestServer import RestServer from .Resources import ( Profile_delete_by_name, SessionList,SessionDetail,AllSessions,DeleteAllSessions,ProfileList,ProfileDetail,AllProfiles,Profile_delete_by_name Profile_delete_by_name, SessionList,SessionDetail,AllSessions,DeleteAllSessions,ProfileList,ProfileDetail,AllProfiles,Profile_delete_by_name, Delete_all_profile ) URL_PREFIX = '/camara/qod/v0' Loading @@ -31,7 +32,7 @@ RESOURCES = [ ('camara.qod.profile_detail',ProfileDetail,'/profiles/<string:profile_id>'), ('camara.qod.profile_all',AllProfiles,'/profiles/all'), ('camara.qod.profile_delete_by_name',Profile_delete_by_name,'/profiles/delete_by_name/<string:name>'), ('camara.qod.profile_delete_all',Delete_all_profile,'/profiles/delete_all/'), ] Loading
src/nbi/tests/test_camara_qod.py +70 −71 Original line number Diff line number Diff line import pytest import requests BASE_URL = 'http://10.1.7.197/camara/qod/v0' # #@pytest.fixture #def test_create_profile(): # response = requests.post(f'{BASE_URL}/profiles', json={ # "name": "Test Profile", # "description": "This is a test profile", # "status": "ACTIVE", # "targetMinUpstreamRate": {"value": 1000, "unit": "Kbps"}, # "maxUpstreamRate": {"value": 5000, "unit": "Kbps"}, # "maxUpstreamBurstRate": {"value": 10000, "unit": "Kbps"}, # "targetMinDownstreamRate": {"value": 2000, "unit": "Kbps"}, # "maxDownstreamRate": {"value": 10000, "unit": "Kbps"}, # "maxDownstreamBurstRate": {"value": 20000, "unit": "Kbps"}, # "minDuration": {"value": 1, "unit": "hours"}, # "maxDuration": {"value": 24, "unit": "hours"}, # "priority": 1, # "packetDelayBudget": {"value": 100, "unit": "ms"}, # "jitter": {"value": 10, "unit": "ms"}, # "packetErrorLossRate": 0.01 # }) # global profile_id # profile_id = response.json()['profile_id'] # # # #def test_update_profile(): # response = requests.put(f'{BASE_URL}/profiles/b6c6a54e-069f-4b1c-95f8-03acc5c8970c', json={ # "description": "AM UPDATING THIS " # }) # # def test_create_profile(): response = requests.post(f'{BASE_URL}/profiles', json={ "name": "Test Profile", "description": "This is a test profile", "status": "ACTIVE", "targetMinUpstreamRate": {"value": 1000, "unit": "Kbps"}, "maxUpstreamRate": {"value": 5000, "unit": "Kbps"}, "maxUpstreamBurstRate": {"value": 10000, "unit": "Kbps"}, "targetMinDownstreamRate": {"value": 2000, "unit": "Kbps"}, "maxDownstreamRate": {"value": 10000, "unit": "Kbps"}, "maxDownstreamBurstRate": {"value": 20000, "unit": "Kbps"}, "minDuration": {"value": 1, "unit": "hours"}, "maxDuration": {"value": 24, "unit": "hours"}, "priority": 1, "packetDelayBudget": {"value": 100, "unit": "ms"}, "jitter": {"value": 10, "unit": "ms"}, "packetErrorLossRate": 0.01 }) def test_update_profile(): response = requests.put(f'{BASE_URL}/profiles/270a9c75-6a4e-452e-9dc5-804fc0204dcb', json={ "description": "AM UPDATING THIS ", "name":"test2" }) def test_delete_profile_by_name(): response = requests.delete(f'{BASE_URL}/profiles/delete_by_name/Test Profile') # #def test_create_session(): # response = requests.post(f'{BASE_URL}/sessions', json={ # "device": { # "phoneNumber": "1234567890", # "networkAccessIdentifier": "nai", # "ipv4Address": {"publicAddress": "84.125.93.10", "publicPort": 59765}, # "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" # }, # "applicationServer": { # "ipv4Address": "192.168.0.1/24", # "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" # }, # "devicePorts": { # "ranges": [{"from": 5010, "to": 5020}], # "ports": [5060, 5070] # }, # "applicationServerPorts": { # "ranges": [{"from": 6010, "to": 6020}], # "ports": [6060, 6070] # }, # "qosProfile": "Test Profile", # "webhook": { # "notificationUrl": "https://application-server.com", # "notificationAuthToken": "c8974e592c2fa383d4a3960714" # }, # "duration": {"value": 1, "unit": "hours"} # }) def test_create_session(): response = requests.post(f'{BASE_URL}/sessions', json={ "device": { "phoneNumber": "1234567890", "networkAccessIdentifier": "nai", "ipv4Address": {"publicAddress": "84.125.93.10", "publicPort": 59765}, "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" }, "applicationServer": { "ipv4Address": "192.168.0.1/24", "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" }, "devicePorts": { "ranges": [{"from": 5010, "to": 5020}], "ports": [5060, 5070] }, "applicationServerPorts": { "ranges": [{"from": 6010, "to": 6020}], "ports": [6060, 6070] }, "qosProfile": "Test Profile", "webhook": { "notificationUrl": "https://application-server.com", "notificationAuthToken": "c8974e592c2fa383d4a3960714" }, "duration": {"value": 1, "unit": "hours"} }) def test_update_session(): response = requests.put(f'{BASE_URL}/sessions/88c51c03-13bf-4542-b3f2-43a9edccfc95', json={ "duration": {"value": 32, "unit": "hours"} }) #def test_update_session(): # response = requests.put(f'{BASE_URL}/sessions/88c51c03-13bf-4542-b3f2-43a9edccfc95', json={ # "duration": {"value": 32, "unit": "hours"} # }) #def test_delete_session(): # response = requests.delete(f'{BASE_URL}/sessions/e1b53005-ffba-4e66-b7fe-0c5e022d8d7d') def test_delete_session(): response = requests.delete(f'{BASE_URL}/sessions/e1b53005-ffba-4e66-b7fe-0c5e022d8d7d') #def test_delete_all_sessions(): #BASE_URL = 'http://10.1.7.197/camara/qod/v0' #DELETE_ALL_URL = f"{BASE_URL}/sessions/delete_all" #response = requests.delete(DELETE_ALL_URL) #def test_delete_all_profiles(): #BASE_URL = 'http://10.1.7.197/camara/qod/v0' #DELETE_ALL_URL_Profile = f"{BASE_URL}/profiles/delete_all" #response = requests.delete(DELETE_ALL_URL_Profile) No newline at end of file