Loading src/nbi/service/rest_server/nbi_plugins/camara_qod/Resources.py +223 −104 Original line number Diff line number Diff line Loading @@ -11,37 +11,35 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json from flask.json import jsonify from flask_restful import Resource, request, Api from flask_restful import Resource, request from enum import Enum import uuid # Enum for session status class SessionStatus(Enum): ACTIVE = 'ACTIVE' INACTIVE = 'INACTIVE' class Rate(): class Rate: def __init__(self, value, unit): self.value = value self.unit = unit class Duration(): class Duration: def __init__(self, value, unit): self.value = value self.unit = unit class SessionCreate(): def __init__(self,source_ipv4_address,destination_ipv4_address,name,session_profile,targetMinUpstreamRate=None,maxUpstreamRate=None, maxUpstreamBurstRate=None,targetMinDownstreamRate=None,maxDownstreamRate=None,maxDownstreamBurstRate=None,minDuration=None, maxDuration=None,priority=None,packetDelayBudget=None,jitter=None,packetErrorLossRate=None,description=None): self.source_ipv4_address = source_ipv4_address self.destination_ipv4_address = destination_ipv4_address class ProfileCreate: def __init__(self, name, description, status, targetMinUpstreamRate=None, maxUpstreamRate=None, maxUpstreamBurstRate=None, targetMinDownstreamRate=None, maxDownstreamRate=None, maxDownstreamBurstRate=None, minDuration=None, maxDuration=None, priority=None, packetDelayBudget=None, jitter=None, packetErrorLossRate=None): self.name = name self.session_profile = session_profile self.description = description self.status = status self.targetMinUpstreamRate = targetMinUpstreamRate self.maxUpstreamRate = maxUpstreamRate self.maxUpstreamBurstRate = maxUpstreamBurstRate Loading @@ -54,13 +52,106 @@ class SessionCreate(): self.packetDelayBudget = packetDelayBudget self.jitter = jitter self.packetErrorLossRate = packetErrorLossRate self.description = description class Session(SessionCreate): def __init__(self,session_id,status,**kwargs): class Profile(ProfileCreate): def __init__(self, profile_id, **kwargs): super().__init__(**kwargs) self.session_id=session_id self.status=status self.profile_id = profile_id profiles = [] class ProfileList(Resource): def post(self): data = request.get_json() profile_id = str(uuid.uuid4()) new_profile = Profile( profile_id=profile_id, name=data.get('name'), description=data.get('description'), status=data.get('status', SessionStatus.ACTIVE.value), targetMinUpstreamRate=data.get('targetMinUpstreamRate'), maxUpstreamRate=data.get('maxUpstreamRate'), maxUpstreamBurstRate=data.get('maxUpstreamBurstRate'), targetMinDownstreamRate=data.get('targetMinDownstreamRate'), maxDownstreamRate=data.get('maxDownstreamRate'), maxDownstreamBurstRate=data.get('maxDownstreamBurstRate'), minDuration=data.get('minDuration'), maxDuration=data.get('maxDuration'), priority=data.get('priority'), packetDelayBudget=data.get('packetDelayBudget'), jitter=data.get('jitter'), packetErrorLossRate=data.get('packetErrorLossRate') ) profiles.append(new_profile.__dict__) return jsonify(new_profile.__dict__), 201 class ProfileDetail(Resource): def get(self, profile_id): profile = next((p for p in profiles if p["profile_id"] == profile_id), None) if profile: return jsonify(profile) return {"message": "Profile not found"}, 404 def put(self, profile_id): data = request.get_json() profile = next((p for p in profiles if p["profile_id"] == profile_id), None) if profile: for key, value in data.items(): if key in profile: profile[key] = value return jsonify(profile) return {"message": "Profile not found"}, 404 class Profile_delete_by_name(Resource): def delete(self, name): global profiles profile = next((p for p in profiles if p["name"] == name), None) if profile: profiles = [p for p in profiles if p["name"] != name] return {"message": "Profile deleted successfully"}, 200 return {"message": "Profile not found"}, 404 class AllProfiles(Resource): def get(self): return jsonify(profiles) class PortRange: def __init__(self, from_port, to_port): self.from_port = from_port self.to_port = to_port class PortDetails: def __init__(self, ranges=None, ports=None): self.ranges = ranges or [] self.ports = ports or [] class Device: def __init__(self, phoneNumber, networkAccessIdentifier, publicAddress, publicPort, ipv6Address): self.phoneNumber = phoneNumber self.networkAccessIdentifier = networkAccessIdentifier self.publicAddress = publicAddress self.publicPort = publicPort self.ipv6Address = ipv6Address class ApplicationServer: def __init__(self, ipv4Address, ipv6Address): self.ipv4Address = ipv4Address self.ipv6Address = ipv6Address class Webhook: def __init__(self, notificationUrl, notificationAuthToken): self.notificationUrl = notificationUrl self.notificationAuthToken = notificationAuthToken class SessionCreate: def __init__(self, device, applicationServer, devicePorts, applicationServerPorts, qosProfile, webhook, duration): self.device = device self.applicationServer = applicationServer self.devicePorts = devicePorts self.applicationServerPorts = applicationServerPorts self.qosProfile = qosProfile self.webhook = webhook self.duration = duration sessions = [] Loading @@ -68,82 +159,92 @@ class SessionList(Resource): def post(self): data = request.get_json() session_id = str(uuid.uuid4()) new_session={ profile_name = data.get('qosProfile') profile = next((p for p in profiles if p["name"] == profile_name), None) if not profile: return jsonify({"error": "QoS profile not found"}), 404 device = Device( phoneNumber=data['device']['phoneNumber'], networkAccessIdentifier=data['device']['networkAccessIdentifier'], publicAddress=data['device']['ipv4Address']['publicAddress'], publicPort=data['device']['ipv4Address']['publicPort'], ipv6Address=data['device']['ipv6Address'] ) applicationServer = ApplicationServer( ipv4Address=data['applicationServer']['ipv4Address'], ipv6Address=data['applicationServer']['ipv6Address'] ) devicePorts = PortDetails( ranges=[PortRange(r['from'], r['to']) for r in data['devicePorts']['ranges']], ports=data['devicePorts']['ports'] ) applicationServerPorts = PortDetails( ranges=[PortRange(r['from'], r['to']) for r in data['applicationServerPorts']['ranges']], ports=data['applicationServerPorts']['ports'] ) webhook = Webhook( notificationUrl=data['webhook']['notificationUrl'], notificationAuthToken=data['webhook']['notificationAuthToken'] ) new_session = SessionCreate( device=device, applicationServer=applicationServer, devicePorts=devicePorts, applicationServerPorts=applicationServerPorts, qosProfile=profile_name, webhook=webhook, duration=data['duration'] ) session_data = { "session_id": session_id, "name": data.get('name'), "source_ipv4_address": data.get('source_ipv4_address'), "destination_ipv4_address": data.get('destination_ipv4_address'), "session_profile": data.get('session_profile'), "targetMinUpstreamRate": data.get('targetMinUpstreamRate'), "maxUpstreamRate": data.get('maxUpstreamRate'), "maxUpstreamBurstRate": data.get('maxUpstreamBurstRate'), "targetMinDownstreamRate": data.get('targetMinDownstreamRate'), "maxDownstreamRate": data.get('maxDownstreamRate'), "maxDownstreamBurstRate": data.get('maxDownstreamBurstRate'), "minDuration": data.get('minDuration'), "maxDuration": data.get('maxDuration'), "priority": data.get('priority'), "packetDelayBudget": data.get('packetDelayBudget'), "jitter": data.get('jitter'), "packetErrorLossRate": data.get('packetErrorLossRate'), "description": data.get('description'), "device": { "phoneNumber": device.phoneNumber, "networkAccessIdentifier": device.networkAccessIdentifier, "ipv4Address": { "publicAddress": device.publicAddress, "publicPort": device.publicPort }, "ipv6Address": device.ipv6Address }, "applicationServer": { "ipv4Address": applicationServer.ipv4Address, "ipv6Address": applicationServer.ipv6Address }, "devicePorts": { "ranges": [{"from": r.from_port, "to": r.to_port} for r in devicePorts.ranges], "ports": devicePorts.ports }, "applicationServerPorts": { "ranges": [{"from": r.from_port, "to": r.to_port} for r in applicationServerPorts.ranges], "ports": applicationServerPorts.ports }, "qosProfile": new_session.qosProfile, "webhook": { "notificationUrl": webhook.notificationUrl, "notificationAuthToken": webhook.notificationAuthToken }, "duration": new_session.duration, "status": SessionStatus.ACTIVE.value } sessions.append(new_session) return jsonify(new_session) class AllSessions(Resource): def get(self): all_sessions = [] for session in sessions: session_details = { "session_id": session["session_id"], "name": session["name"], "source_ipv4_address": session["source_ipv4_address"], "destination_ipv4_address": session["destination_ipv4_address"], "session_profile": session["session_profile"], "targetMinUpstreamRate": session["targetMinUpstreamRate"], "maxUpstreamRate": session["maxUpstreamRate"], "maxUpstreamBurstRate": session["maxUpstreamBurstRate"], "targetMinDownstreamRate": session["targetMinDownstreamRate"], "maxDownstreamRate": session["maxDownstreamRate"], "maxDownstreamBurstRate": session["maxDownstreamBurstRate"], "minDuration": session["minDuration"], "maxDuration": session["maxDuration"], "priority": session["priority"], "packetDelayBudget": session["packetDelayBudget"], "jitter": session["jitter"], "packetErrorLossRate": session["packetErrorLossRate"], "description": session["description"], "status": session["status"] } all_sessions.append(session_details) return jsonify(all_sessions) sessions.append(session_data) return jsonify(session_data), 201 class DeleteAllSessions(Resource): def delete(self): global sessions session_count = len(sessions) if session_count > 0: sessions.clear() return {"message": f"All {session_count} sessions deleted successfully"} else: return {"message": "No sessions to delete"}, 204 class Sessionlist2(Resource): class SessionDetail(Resource): def get(self, session_id): for session in sessions: if session["session_id"] == session_id: session = next((s for s in sessions if s["session_id"] == session_id), None) if session: return jsonify(session) return {"message": "Session not found"}, 404 abort(404, description="Session not found") def delete(self, session_id): global sessions for session in sessions[:]: if session["session_id"] == session_id: sessions.remove(session) return {"message": "Session deleted successfully"} abort(404, description="Session not found") def put(self, session_id): data = request.get_json() session = next((s for s in sessions if s["session_id"] == session_id), None) Loading @@ -152,5 +253,23 @@ class Sessionlist2(Resource): if key in session: session[key] = value return jsonify(session) abort(404, description="Session not found") return {"message": "Session not found"}, 404 def delete(self, session_id): global sessions session = next((s for s in sessions if s["session_id"] == session_id), None) if session: sessions = [s for s in sessions if s["session_id"] != session_id] return {"message": "Session deleted successfully"}, 200 return {"message": "Session not found"}, 404 class AllSessions(Resource): def get(self): return jsonify(sessions) class DeleteAllSessions(Resource): def delete(self): global sessions session_count = len(sessions) sessions = [] return {"message": f"Deleted {session_count} sessions."}, 200 src/nbi/service/rest_server/nbi_plugins/camara_qod/__init__.py +8 −3 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ from nbi.service.rest_server.RestServer import RestServer from .Resources import ( SessionList,Sessionlist2,AllSessions,DeleteAllSessions Profile_delete_by_name, SessionList,SessionDetail,AllSessions,DeleteAllSessions,ProfileList,ProfileDetail,AllProfiles,Profile_delete_by_name ) URL_PREFIX = '/camara/qod/v0' Loading @@ -24,9 +24,14 @@ RESOURCES = [ # (endpoint_name, resource_class, resource_url) # TODO: Add appropriate endpoints ('camara.qod.session_list', SessionList, '/sessions'), ('camara.qod.session', Sessionlist2, '/sessions/<string:session_id>'), ('camara.qod.session', SessionDetail, '/sessions/<string:session_id>'), ('camara.qod.session_all', AllSessions, '/sessions/all'), ('camara.qod.delete_all_sessions',DeleteAllSessions,'/sessions/delete_all') ('camara.qod.delete_all_sessions',DeleteAllSessions,'/sessions/delete_all'), ('camara.qod.profile_list',ProfileList,'/profiles'), ('camara.qod.profile_detail',ProfileDetail,'/profiles/<string:profile_id>'), ('camara.qod.profile_all',AllProfiles,'/profiles/all'), ('camara.qod.profile_delete_by_name',Profile_delete_by_name,'/profiles/delete_by_name/<string:name>'), ] Loading src/nbi/tests/test_camara_qod.py +64 −140 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import requests,logging import pytest import subprocess import time import logging import requests logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger(__name__) @pytest.fixture(scope="session", autouse=True) def start_server(): server = subprocess.Popen(['python', '-m', '__main__']) time.sleep(5) yield server.terminate() server.wait() @pytest.fixture def create_session(): BASE_URL = 'http://10.1.7.197/camara/qod/v0' URL = f"{BASE_URL}/sessions" data = { "source_ipv4_address": "192.168.1.1", "destination_ipv4_address": "192.168.1.2", "name": "Test Session", "session_profile": "Profile1", "targetMinUpstreamRate": {"value": 100, "unit": "kbps"}, "maxUpstreamRate": {"value": 1000, "unit": "kbps"}, "maxUpstreamBurstRate": {"value": 1500, "unit": "kbps"}, "targetMinDownstreamRate": {"value": 100, "unit": "kbps"}, "maxDownstreamRate": {"value": 1000, "unit": "kbps"}, "maxDownstreamBurstRate": {"value": 1500, "unit": "kbps"}, "minDuration": {"value": 10, "unit": "s"}, "maxDuration": {"value": 60, "unit": "s"}, "priority": 1, "packetDelayBudget": {"value": 100, "unit": "ms"}, "jitter": {"value": 20, "unit": "ms"}, "packetErrorLossRate": 1, "description": "Test session description" } response = requests.post(URL, json=data) print(f"Response Status Code: {response.status_code}") print(f"Response Content: {response.text}") response.raise_for_status() json_response = response.json() return json_response #def get_session(base_url, session_id): # url = f"{base_url}/sessions/{session_id}" # try: # response = requests.get(url) # response.raise_for_status() # return response.json() # except requests.exceptions.RequestException as e: # LOGGER.error(f"Request failed: {e}") # raise # # #def test_create_and_get_session(create_session): # session_data = create_session # print(f"Session Data: {session_data}") # session_id = session_data.get('session_id') # print(f"Session ID: {session_id}") # LOGGER.debug(f"Session ID: {session_id}") # BASE_URL = 'http://10.1.7.197/camara/qod/v0/' # try: # session_details = get_session(BASE_URL, session_id) # LOGGER.debug(f"Session Details: {session_details}") # LOGGER.debug('Session Details={:s}'.format(json.dumps(session_details, sort_keys=True, indent=4))) # print(f"Session Details: {session_details}") # except Exception as e: # LOGGER.error(f"Failed to retrieve session details: {e}") # raise #@pytest.fixture #def test_create_profile(): # response = requests.post(f'{BASE_URL}/profiles', json={ # "name": "Test Profile", # "description": "This is a test profile", # "status": "ACTIVE", # "targetMinUpstreamRate": {"value": 1000, "unit": "Kbps"}, # "maxUpstreamRate": {"value": 5000, "unit": "Kbps"}, # "maxUpstreamBurstRate": {"value": 10000, "unit": "Kbps"}, # "targetMinDownstreamRate": {"value": 2000, "unit": "Kbps"}, # "maxDownstreamRate": {"value": 10000, "unit": "Kbps"}, # "maxDownstreamBurstRate": {"value": 20000, "unit": "Kbps"}, # "minDuration": {"value": 1, "unit": "hours"}, # "maxDuration": {"value": 24, "unit": "hours"}, # "priority": 1, # "packetDelayBudget": {"value": 100, "unit": "ms"}, # "jitter": {"value": 10, "unit": "ms"}, # "packetErrorLossRate": 0.01 # }) # global profile_id # profile_id = response.json()['profile_id'] # # # #def test_get_session(create_session): # session_id = create_session.get('session_id') # print(f"session_id':{session_id}") # BASE_URL = 'http://10.1.7.197/camara/qod/v0' # URL = f"{BASE_URL}/sessions/{session_id}" #def test_update_profile(): # response = requests.put(f'{BASE_URL}/profiles/b6c6a54e-069f-4b1c-95f8-03acc5c8970c', json={ # "description": "AM UPDATING THIS " # }) # # response = requests.get(URL) # assert response.status_code == 200, f"Expected 200, got {response.status_code}" # # response_data = response.json() # assert response_data['session_id'] == session_id # assert response_data['name'] == "Test Session" # #def test_get_all_sessions(create_session): # # BASE_URL = 'http://10.1.7.197/camara/qod/v0' # URL = f"{BASE_URL}/sessions/all" # LOGGER.debug(f'Sending GET request for all sessions...') # response = requests.get(URL, timeout=5) # LOGGER.debug('Received response status code: %s', response.status_code) # LOGGER.debug('Received response content: %s', response.text) def test_delete_session(create_session): session_id = create_session.get('session_id') BASE_URL = 'http://10.1.7.197/camara/qod/v0' URL = f"{BASE_URL}/sessions/ea3852d4-6cdc-4dc3-a8bf-4c16e19836da" def test_delete_profile_by_name(): response = requests.delete(f'{BASE_URL}/profiles/delete_by_name/Test Profile') LOGGER.debug(f'Sending DELETE request for session ID {session_id}...') response = requests.delete(URL, timeout=5) LOGGER.debug('Received response: %s', response.text) LOGGER.debug('Response Headers: %s', response.headers) def test_update_session(create_session): session_data = create_session session_id = session_data.get('session_id') BASE_URL = 'http://10.1.7.197/camara/qod/v0' UPDATE_URL = f"{BASE_URL}/sessions/06222e89-8610-4672-8534-184aa6760c04" updated_data = { "name": "Updated Session Name", "maxUpstreamRate": {"value": 2000, "unit": "kbps"}, "description": "Updated description for the test session" } response = requests.put(UPDATE_URL, json=updated_data) # #def test_create_session(): # response = requests.post(f'{BASE_URL}/sessions', json={ # "device": { # "phoneNumber": "1234567890", # "networkAccessIdentifier": "nai", # "ipv4Address": {"publicAddress": "84.125.93.10", "publicPort": 59765}, # "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" # }, # "applicationServer": { # "ipv4Address": "192.168.0.1/24", # "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" # }, # "devicePorts": { # "ranges": [{"from": 5010, "to": 5020}], # "ports": [5060, 5070] # }, # "applicationServerPorts": { # "ranges": [{"from": 6010, "to": 6020}], # "ports": [6060, 6070] # }, # "qosProfile": "Test Profile", # "webhook": { # "notificationUrl": "https://application-server.com", # "notificationAuthToken": "c8974e592c2fa383d4a3960714" # }, # "duration": {"value": 1, "unit": "hours"} # }) #def test_update_session(): # response = requests.put(f'{BASE_URL}/sessions/88c51c03-13bf-4542-b3f2-43a9edccfc95', json={ # "duration": {"value": 32, "unit": "hours"} # }) #def test_delete_session(): # response = requests.delete(f'{BASE_URL}/sessions/e1b53005-ffba-4e66-b7fe-0c5e022d8d7d') #def test_delete_all_sessions(): # BASE_URL = 'http://10.1.7.197/camara/qod/v0' # DELETE_ALL_URL = f"{BASE_URL}/sessions/delete_all" # response = requests.delete(DELETE_ALL_URL) # print(f"Status Code: {response.status_code}") # print(f"Response: {response.json()}") # assert response.status_code in [200, 204], f"Unexpected status code: {response.status_code}" # if response.status_code == 200: # assert response.json().get("message", "").startswith("All"), "Unexpected message content" # elif response.status_code == 204: # assert response.json() == {}, "Expected empty response for 204 No Content" # print("Test for delete all sessions passed.") No newline at end of file Loading
src/nbi/service/rest_server/nbi_plugins/camara_qod/Resources.py +223 −104 Original line number Diff line number Diff line Loading @@ -11,37 +11,35 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json from flask.json import jsonify from flask_restful import Resource, request, Api from flask_restful import Resource, request from enum import Enum import uuid # Enum for session status class SessionStatus(Enum): ACTIVE = 'ACTIVE' INACTIVE = 'INACTIVE' class Rate(): class Rate: def __init__(self, value, unit): self.value = value self.unit = unit class Duration(): class Duration: def __init__(self, value, unit): self.value = value self.unit = unit class SessionCreate(): def __init__(self,source_ipv4_address,destination_ipv4_address,name,session_profile,targetMinUpstreamRate=None,maxUpstreamRate=None, maxUpstreamBurstRate=None,targetMinDownstreamRate=None,maxDownstreamRate=None,maxDownstreamBurstRate=None,minDuration=None, maxDuration=None,priority=None,packetDelayBudget=None,jitter=None,packetErrorLossRate=None,description=None): self.source_ipv4_address = source_ipv4_address self.destination_ipv4_address = destination_ipv4_address class ProfileCreate: def __init__(self, name, description, status, targetMinUpstreamRate=None, maxUpstreamRate=None, maxUpstreamBurstRate=None, targetMinDownstreamRate=None, maxDownstreamRate=None, maxDownstreamBurstRate=None, minDuration=None, maxDuration=None, priority=None, packetDelayBudget=None, jitter=None, packetErrorLossRate=None): self.name = name self.session_profile = session_profile self.description = description self.status = status self.targetMinUpstreamRate = targetMinUpstreamRate self.maxUpstreamRate = maxUpstreamRate self.maxUpstreamBurstRate = maxUpstreamBurstRate Loading @@ -54,13 +52,106 @@ class SessionCreate(): self.packetDelayBudget = packetDelayBudget self.jitter = jitter self.packetErrorLossRate = packetErrorLossRate self.description = description class Session(SessionCreate): def __init__(self,session_id,status,**kwargs): class Profile(ProfileCreate): def __init__(self, profile_id, **kwargs): super().__init__(**kwargs) self.session_id=session_id self.status=status self.profile_id = profile_id profiles = [] class ProfileList(Resource): def post(self): data = request.get_json() profile_id = str(uuid.uuid4()) new_profile = Profile( profile_id=profile_id, name=data.get('name'), description=data.get('description'), status=data.get('status', SessionStatus.ACTIVE.value), targetMinUpstreamRate=data.get('targetMinUpstreamRate'), maxUpstreamRate=data.get('maxUpstreamRate'), maxUpstreamBurstRate=data.get('maxUpstreamBurstRate'), targetMinDownstreamRate=data.get('targetMinDownstreamRate'), maxDownstreamRate=data.get('maxDownstreamRate'), maxDownstreamBurstRate=data.get('maxDownstreamBurstRate'), minDuration=data.get('minDuration'), maxDuration=data.get('maxDuration'), priority=data.get('priority'), packetDelayBudget=data.get('packetDelayBudget'), jitter=data.get('jitter'), packetErrorLossRate=data.get('packetErrorLossRate') ) profiles.append(new_profile.__dict__) return jsonify(new_profile.__dict__), 201 class ProfileDetail(Resource): def get(self, profile_id): profile = next((p for p in profiles if p["profile_id"] == profile_id), None) if profile: return jsonify(profile) return {"message": "Profile not found"}, 404 def put(self, profile_id): data = request.get_json() profile = next((p for p in profiles if p["profile_id"] == profile_id), None) if profile: for key, value in data.items(): if key in profile: profile[key] = value return jsonify(profile) return {"message": "Profile not found"}, 404 class Profile_delete_by_name(Resource): def delete(self, name): global profiles profile = next((p for p in profiles if p["name"] == name), None) if profile: profiles = [p for p in profiles if p["name"] != name] return {"message": "Profile deleted successfully"}, 200 return {"message": "Profile not found"}, 404 class AllProfiles(Resource): def get(self): return jsonify(profiles) class PortRange: def __init__(self, from_port, to_port): self.from_port = from_port self.to_port = to_port class PortDetails: def __init__(self, ranges=None, ports=None): self.ranges = ranges or [] self.ports = ports or [] class Device: def __init__(self, phoneNumber, networkAccessIdentifier, publicAddress, publicPort, ipv6Address): self.phoneNumber = phoneNumber self.networkAccessIdentifier = networkAccessIdentifier self.publicAddress = publicAddress self.publicPort = publicPort self.ipv6Address = ipv6Address class ApplicationServer: def __init__(self, ipv4Address, ipv6Address): self.ipv4Address = ipv4Address self.ipv6Address = ipv6Address class Webhook: def __init__(self, notificationUrl, notificationAuthToken): self.notificationUrl = notificationUrl self.notificationAuthToken = notificationAuthToken class SessionCreate: def __init__(self, device, applicationServer, devicePorts, applicationServerPorts, qosProfile, webhook, duration): self.device = device self.applicationServer = applicationServer self.devicePorts = devicePorts self.applicationServerPorts = applicationServerPorts self.qosProfile = qosProfile self.webhook = webhook self.duration = duration sessions = [] Loading @@ -68,82 +159,92 @@ class SessionList(Resource): def post(self): data = request.get_json() session_id = str(uuid.uuid4()) new_session={ profile_name = data.get('qosProfile') profile = next((p for p in profiles if p["name"] == profile_name), None) if not profile: return jsonify({"error": "QoS profile not found"}), 404 device = Device( phoneNumber=data['device']['phoneNumber'], networkAccessIdentifier=data['device']['networkAccessIdentifier'], publicAddress=data['device']['ipv4Address']['publicAddress'], publicPort=data['device']['ipv4Address']['publicPort'], ipv6Address=data['device']['ipv6Address'] ) applicationServer = ApplicationServer( ipv4Address=data['applicationServer']['ipv4Address'], ipv6Address=data['applicationServer']['ipv6Address'] ) devicePorts = PortDetails( ranges=[PortRange(r['from'], r['to']) for r in data['devicePorts']['ranges']], ports=data['devicePorts']['ports'] ) applicationServerPorts = PortDetails( ranges=[PortRange(r['from'], r['to']) for r in data['applicationServerPorts']['ranges']], ports=data['applicationServerPorts']['ports'] ) webhook = Webhook( notificationUrl=data['webhook']['notificationUrl'], notificationAuthToken=data['webhook']['notificationAuthToken'] ) new_session = SessionCreate( device=device, applicationServer=applicationServer, devicePorts=devicePorts, applicationServerPorts=applicationServerPorts, qosProfile=profile_name, webhook=webhook, duration=data['duration'] ) session_data = { "session_id": session_id, "name": data.get('name'), "source_ipv4_address": data.get('source_ipv4_address'), "destination_ipv4_address": data.get('destination_ipv4_address'), "session_profile": data.get('session_profile'), "targetMinUpstreamRate": data.get('targetMinUpstreamRate'), "maxUpstreamRate": data.get('maxUpstreamRate'), "maxUpstreamBurstRate": data.get('maxUpstreamBurstRate'), "targetMinDownstreamRate": data.get('targetMinDownstreamRate'), "maxDownstreamRate": data.get('maxDownstreamRate'), "maxDownstreamBurstRate": data.get('maxDownstreamBurstRate'), "minDuration": data.get('minDuration'), "maxDuration": data.get('maxDuration'), "priority": data.get('priority'), "packetDelayBudget": data.get('packetDelayBudget'), "jitter": data.get('jitter'), "packetErrorLossRate": data.get('packetErrorLossRate'), "description": data.get('description'), "device": { "phoneNumber": device.phoneNumber, "networkAccessIdentifier": device.networkAccessIdentifier, "ipv4Address": { "publicAddress": device.publicAddress, "publicPort": device.publicPort }, "ipv6Address": device.ipv6Address }, "applicationServer": { "ipv4Address": applicationServer.ipv4Address, "ipv6Address": applicationServer.ipv6Address }, "devicePorts": { "ranges": [{"from": r.from_port, "to": r.to_port} for r in devicePorts.ranges], "ports": devicePorts.ports }, "applicationServerPorts": { "ranges": [{"from": r.from_port, "to": r.to_port} for r in applicationServerPorts.ranges], "ports": applicationServerPorts.ports }, "qosProfile": new_session.qosProfile, "webhook": { "notificationUrl": webhook.notificationUrl, "notificationAuthToken": webhook.notificationAuthToken }, "duration": new_session.duration, "status": SessionStatus.ACTIVE.value } sessions.append(new_session) return jsonify(new_session) class AllSessions(Resource): def get(self): all_sessions = [] for session in sessions: session_details = { "session_id": session["session_id"], "name": session["name"], "source_ipv4_address": session["source_ipv4_address"], "destination_ipv4_address": session["destination_ipv4_address"], "session_profile": session["session_profile"], "targetMinUpstreamRate": session["targetMinUpstreamRate"], "maxUpstreamRate": session["maxUpstreamRate"], "maxUpstreamBurstRate": session["maxUpstreamBurstRate"], "targetMinDownstreamRate": session["targetMinDownstreamRate"], "maxDownstreamRate": session["maxDownstreamRate"], "maxDownstreamBurstRate": session["maxDownstreamBurstRate"], "minDuration": session["minDuration"], "maxDuration": session["maxDuration"], "priority": session["priority"], "packetDelayBudget": session["packetDelayBudget"], "jitter": session["jitter"], "packetErrorLossRate": session["packetErrorLossRate"], "description": session["description"], "status": session["status"] } all_sessions.append(session_details) return jsonify(all_sessions) sessions.append(session_data) return jsonify(session_data), 201 class DeleteAllSessions(Resource): def delete(self): global sessions session_count = len(sessions) if session_count > 0: sessions.clear() return {"message": f"All {session_count} sessions deleted successfully"} else: return {"message": "No sessions to delete"}, 204 class Sessionlist2(Resource): class SessionDetail(Resource): def get(self, session_id): for session in sessions: if session["session_id"] == session_id: session = next((s for s in sessions if s["session_id"] == session_id), None) if session: return jsonify(session) return {"message": "Session not found"}, 404 abort(404, description="Session not found") def delete(self, session_id): global sessions for session in sessions[:]: if session["session_id"] == session_id: sessions.remove(session) return {"message": "Session deleted successfully"} abort(404, description="Session not found") def put(self, session_id): data = request.get_json() session = next((s for s in sessions if s["session_id"] == session_id), None) Loading @@ -152,5 +253,23 @@ class Sessionlist2(Resource): if key in session: session[key] = value return jsonify(session) abort(404, description="Session not found") return {"message": "Session not found"}, 404 def delete(self, session_id): global sessions session = next((s for s in sessions if s["session_id"] == session_id), None) if session: sessions = [s for s in sessions if s["session_id"] != session_id] return {"message": "Session deleted successfully"}, 200 return {"message": "Session not found"}, 404 class AllSessions(Resource): def get(self): return jsonify(sessions) class DeleteAllSessions(Resource): def delete(self): global sessions session_count = len(sessions) sessions = [] return {"message": f"Deleted {session_count} sessions."}, 200
src/nbi/service/rest_server/nbi_plugins/camara_qod/__init__.py +8 −3 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ from nbi.service.rest_server.RestServer import RestServer from .Resources import ( SessionList,Sessionlist2,AllSessions,DeleteAllSessions Profile_delete_by_name, SessionList,SessionDetail,AllSessions,DeleteAllSessions,ProfileList,ProfileDetail,AllProfiles,Profile_delete_by_name ) URL_PREFIX = '/camara/qod/v0' Loading @@ -24,9 +24,14 @@ RESOURCES = [ # (endpoint_name, resource_class, resource_url) # TODO: Add appropriate endpoints ('camara.qod.session_list', SessionList, '/sessions'), ('camara.qod.session', Sessionlist2, '/sessions/<string:session_id>'), ('camara.qod.session', SessionDetail, '/sessions/<string:session_id>'), ('camara.qod.session_all', AllSessions, '/sessions/all'), ('camara.qod.delete_all_sessions',DeleteAllSessions,'/sessions/delete_all') ('camara.qod.delete_all_sessions',DeleteAllSessions,'/sessions/delete_all'), ('camara.qod.profile_list',ProfileList,'/profiles'), ('camara.qod.profile_detail',ProfileDetail,'/profiles/<string:profile_id>'), ('camara.qod.profile_all',AllProfiles,'/profiles/all'), ('camara.qod.profile_delete_by_name',Profile_delete_by_name,'/profiles/delete_by_name/<string:name>'), ] Loading
src/nbi/tests/test_camara_qod.py +64 −140 Original line number Diff line number Diff line # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import requests,logging import pytest import subprocess import time import logging import requests logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger(__name__) @pytest.fixture(scope="session", autouse=True) def start_server(): server = subprocess.Popen(['python', '-m', '__main__']) time.sleep(5) yield server.terminate() server.wait() @pytest.fixture def create_session(): BASE_URL = 'http://10.1.7.197/camara/qod/v0' URL = f"{BASE_URL}/sessions" data = { "source_ipv4_address": "192.168.1.1", "destination_ipv4_address": "192.168.1.2", "name": "Test Session", "session_profile": "Profile1", "targetMinUpstreamRate": {"value": 100, "unit": "kbps"}, "maxUpstreamRate": {"value": 1000, "unit": "kbps"}, "maxUpstreamBurstRate": {"value": 1500, "unit": "kbps"}, "targetMinDownstreamRate": {"value": 100, "unit": "kbps"}, "maxDownstreamRate": {"value": 1000, "unit": "kbps"}, "maxDownstreamBurstRate": {"value": 1500, "unit": "kbps"}, "minDuration": {"value": 10, "unit": "s"}, "maxDuration": {"value": 60, "unit": "s"}, "priority": 1, "packetDelayBudget": {"value": 100, "unit": "ms"}, "jitter": {"value": 20, "unit": "ms"}, "packetErrorLossRate": 1, "description": "Test session description" } response = requests.post(URL, json=data) print(f"Response Status Code: {response.status_code}") print(f"Response Content: {response.text}") response.raise_for_status() json_response = response.json() return json_response #def get_session(base_url, session_id): # url = f"{base_url}/sessions/{session_id}" # try: # response = requests.get(url) # response.raise_for_status() # return response.json() # except requests.exceptions.RequestException as e: # LOGGER.error(f"Request failed: {e}") # raise # # #def test_create_and_get_session(create_session): # session_data = create_session # print(f"Session Data: {session_data}") # session_id = session_data.get('session_id') # print(f"Session ID: {session_id}") # LOGGER.debug(f"Session ID: {session_id}") # BASE_URL = 'http://10.1.7.197/camara/qod/v0/' # try: # session_details = get_session(BASE_URL, session_id) # LOGGER.debug(f"Session Details: {session_details}") # LOGGER.debug('Session Details={:s}'.format(json.dumps(session_details, sort_keys=True, indent=4))) # print(f"Session Details: {session_details}") # except Exception as e: # LOGGER.error(f"Failed to retrieve session details: {e}") # raise #@pytest.fixture #def test_create_profile(): # response = requests.post(f'{BASE_URL}/profiles', json={ # "name": "Test Profile", # "description": "This is a test profile", # "status": "ACTIVE", # "targetMinUpstreamRate": {"value": 1000, "unit": "Kbps"}, # "maxUpstreamRate": {"value": 5000, "unit": "Kbps"}, # "maxUpstreamBurstRate": {"value": 10000, "unit": "Kbps"}, # "targetMinDownstreamRate": {"value": 2000, "unit": "Kbps"}, # "maxDownstreamRate": {"value": 10000, "unit": "Kbps"}, # "maxDownstreamBurstRate": {"value": 20000, "unit": "Kbps"}, # "minDuration": {"value": 1, "unit": "hours"}, # "maxDuration": {"value": 24, "unit": "hours"}, # "priority": 1, # "packetDelayBudget": {"value": 100, "unit": "ms"}, # "jitter": {"value": 10, "unit": "ms"}, # "packetErrorLossRate": 0.01 # }) # global profile_id # profile_id = response.json()['profile_id'] # # # #def test_get_session(create_session): # session_id = create_session.get('session_id') # print(f"session_id':{session_id}") # BASE_URL = 'http://10.1.7.197/camara/qod/v0' # URL = f"{BASE_URL}/sessions/{session_id}" #def test_update_profile(): # response = requests.put(f'{BASE_URL}/profiles/b6c6a54e-069f-4b1c-95f8-03acc5c8970c', json={ # "description": "AM UPDATING THIS " # }) # # response = requests.get(URL) # assert response.status_code == 200, f"Expected 200, got {response.status_code}" # # response_data = response.json() # assert response_data['session_id'] == session_id # assert response_data['name'] == "Test Session" # #def test_get_all_sessions(create_session): # # BASE_URL = 'http://10.1.7.197/camara/qod/v0' # URL = f"{BASE_URL}/sessions/all" # LOGGER.debug(f'Sending GET request for all sessions...') # response = requests.get(URL, timeout=5) # LOGGER.debug('Received response status code: %s', response.status_code) # LOGGER.debug('Received response content: %s', response.text) def test_delete_session(create_session): session_id = create_session.get('session_id') BASE_URL = 'http://10.1.7.197/camara/qod/v0' URL = f"{BASE_URL}/sessions/ea3852d4-6cdc-4dc3-a8bf-4c16e19836da" def test_delete_profile_by_name(): response = requests.delete(f'{BASE_URL}/profiles/delete_by_name/Test Profile') LOGGER.debug(f'Sending DELETE request for session ID {session_id}...') response = requests.delete(URL, timeout=5) LOGGER.debug('Received response: %s', response.text) LOGGER.debug('Response Headers: %s', response.headers) def test_update_session(create_session): session_data = create_session session_id = session_data.get('session_id') BASE_URL = 'http://10.1.7.197/camara/qod/v0' UPDATE_URL = f"{BASE_URL}/sessions/06222e89-8610-4672-8534-184aa6760c04" updated_data = { "name": "Updated Session Name", "maxUpstreamRate": {"value": 2000, "unit": "kbps"}, "description": "Updated description for the test session" } response = requests.put(UPDATE_URL, json=updated_data) # #def test_create_session(): # response = requests.post(f'{BASE_URL}/sessions', json={ # "device": { # "phoneNumber": "1234567890", # "networkAccessIdentifier": "nai", # "ipv4Address": {"publicAddress": "84.125.93.10", "publicPort": 59765}, # "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" # }, # "applicationServer": { # "ipv4Address": "192.168.0.1/24", # "ipv6Address": "2001:db8:85a3:8d3:1319:8a2e:370:7344" # }, # "devicePorts": { # "ranges": [{"from": 5010, "to": 5020}], # "ports": [5060, 5070] # }, # "applicationServerPorts": { # "ranges": [{"from": 6010, "to": 6020}], # "ports": [6060, 6070] # }, # "qosProfile": "Test Profile", # "webhook": { # "notificationUrl": "https://application-server.com", # "notificationAuthToken": "c8974e592c2fa383d4a3960714" # }, # "duration": {"value": 1, "unit": "hours"} # }) #def test_update_session(): # response = requests.put(f'{BASE_URL}/sessions/88c51c03-13bf-4542-b3f2-43a9edccfc95', json={ # "duration": {"value": 32, "unit": "hours"} # }) #def test_delete_session(): # response = requests.delete(f'{BASE_URL}/sessions/e1b53005-ffba-4e66-b7fe-0c5e022d8d7d') #def test_delete_all_sessions(): # BASE_URL = 'http://10.1.7.197/camara/qod/v0' # DELETE_ALL_URL = f"{BASE_URL}/sessions/delete_all" # response = requests.delete(DELETE_ALL_URL) # print(f"Status Code: {response.status_code}") # print(f"Response: {response.json()}") # assert response.status_code in [200, 204], f"Unexpected status code: {response.status_code}" # if response.status_code == 200: # assert response.json().get("message", "").startswith("All"), "Unexpected message content" # elif response.status_code == 204: # assert response.json() == {}, "Expected empty response for 204 No Content" # print("Test for delete all sessions passed.") No newline at end of file