From c6b134a641b9138e284bdfaccdc1c957901355b8 Mon Sep 17 00:00:00 2001 From: rahhal <mrahhal@cttc.es> Date: Fri, 15 Nov 2024 14:15:39 +0000 Subject: [PATCH] Updated version of CAMARA NBI --- scripts/run_tests_locally-nbi-camara-qod.sh | 30 +++++ .../nbi_plugins/camara_qod/Resources.py | 82 ++++------- .../nbi_plugins/camara_qod/Tools.py | 127 ++++++++++++------ .../nbi_plugins/camara_qod/__init__.py | 6 +- src/nbi/tests/test_camara_qod_profile.py | 93 ++----------- src/nbi/tests/test_camara_qos_service.py | 49 +++---- 6 files changed, 178 insertions(+), 209 deletions(-) create mode 100755 scripts/run_tests_locally-nbi-camara-qod.sh diff --git a/scripts/run_tests_locally-nbi-camara-qod.sh b/scripts/run_tests_locally-nbi-camara-qod.sh new file mode 100755 index 000000000..c37a95984 --- /dev/null +++ b/scripts/run_tests_locally-nbi-camara-qod.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc +COVERAGEFILE=$PROJECTDIR/coverage/.coverage + +# Destroy old coverage file and configure the correct folder on the .coveragerc file +rm -f $COVERAGEFILE +cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/tfs-ctrl+$PROJECTDIR+g > $RCFILE + + +# Run unitary tests and analyze coverage of code at same time +# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0 +coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ + nbi/tests/test_camara_qod_profile.py nbi/tests/test_camara_qos_service.py diff --git a/src/nbi/service/rest_server/nbi_plugins/camara_qod/Resources.py b/src/nbi/service/rest_server/nbi_plugins/camara_qod/Resources.py index a72de0323..b208b561f 100644 --- a/src/nbi/service/rest_server/nbi_plugins/camara_qod/Resources.py +++ b/src/nbi/service/rest_server/nbi_plugins/camara_qod/Resources.py @@ -11,27 +11,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from venv import logger from flask.json import jsonify from flask_restful import Resource, request from enum import Enum import grpc._channel -from qos_profile.service.database.QoSProfile import grpc_message_to_qos_table_data -from qos_profile.tests.test_crud import create_qos_profile_from_json,test_update_qos_profile from qos_profile.client.QoSProfileClient import QoSProfileClient from werkzeug.exceptions import UnsupportedMediaType -from common.proto.context_pb2 import QoSProfile, QoSProfileId, Uuid, QoSProfileValueUnitPair,Empty -from common.proto.qos_profile_pb2 import QoSProfile, QoDConstraintsRequest -from context.client.ContextClient import ContextClient -from service.client.ServiceClient import ServiceClient +from common.proto.context_pb2 import QoSProfileId, Uuid,Empty +from common.proto.qos_profile_pb2 import QoDConstraintsRequest from typing import Dict from uuid import uuid4 import grpc, logging +import copy, deepmerge, json, logging +from typing import Dict +from flask_restful import Resource, request +from werkzeug.exceptions import UnsupportedMediaType +from common.Constants import DEFAULT_CONTEXT_NAME +from context.client.ContextClient import ContextClient +from service.client.ServiceClient import ServiceClient +from .Tools import ( + format_grpc_to_json, grpc_context_id, grpc_service_id, QOD_2_service, service_2_qod,grpc_message_to_qos_table_data,create_qos_profile_from_json +) -LOGGER = logging.getLogger(__name__) - - +LOGGER = logging.getLogger(__name__) #Initiate the QoSProfileClient class _Resource(Resource): def __init__(self) -> None: @@ -74,30 +77,6 @@ class ProfileList(_Resource): return jsonify(qos_profile_list) -class ProfileListCons(_Resource): - def get(self): - qos_profile_id = request.args.get("qos_profile_id") - start_timestamp = request.args.get("start_timestamp", type=float) - duration = request.args.get("duration", type=int) - - qos_constraints_request = QoDConstraintsRequest( - qos_profile_id=qos_profile_id, - start_timestamp=start_timestamp, - duration=duration - ) - try: - qos_profiles = self.qos_profile_client.GetConstraintListFromQoSProfile(qos_constraints_request) - qos_profile_list = [ - grpc_message_to_qos_table_data(profile) for profile in qos_profiles - ] - return jsonify(qos_profile_list) - except grpc._channel._InactiveRpcError as exc: - LOGGER.error(f"gRPC error while fetching constraints: {exc}") - return {"error": "Internal Server Error"}, 500 - except Exception as e: - LOGGER.error(f"Error while fetching constraints: {e}") - return {"error": "Internal Server Error"}, 500 - #getting,updating,deleting using the qos profile id class ProfileDetail(_Resource): def get(self, qos_profile_id): @@ -160,25 +139,8 @@ class ProfileDetail(_Resource): LOGGER.error(f"Error in DELETE /profiles/{qos_profile_id}: {e}") return {"error": "Internal Server Error"}, 500 - -import copy, deepmerge, json, logging -from typing import Dict -from flask_restful import Resource, request -from werkzeug.exceptions import UnsupportedMediaType -from common.Constants import DEFAULT_CONTEXT_NAME -from context.client.ContextClient import ContextClient -from service.client.ServiceClient import ServiceClient -from .Tools import ( - format_grpc_to_json, grpc_context_id, grpc_service_id, QOD_2_service, service_2_qod -) - +###SESSION########################################################## LOGGER = logging.getLogger(__name__) -class _Resource(Resource): - def __init__(self) -> None: - super().__init__() - self.client = ContextClient() - self.service_client = ServiceClient() - class qodinfo(_Resource): def post(self): if not request.is_json: @@ -186,12 +148,13 @@ class qodinfo(_Resource): request_data: Dict = request.get_json() qos_profile_id = request_data.get('qos_profile_id') qos_session_id = request_data.get('qos_session_id') + duration = request_data.get('duration') LOGGER.info(f'qos_profile_id:{qos_profile_id}') if not qos_profile_id: return jsonify({'error': 'qos_profile_id is required'}), 400 if qos_session_id: return jsonify({'error': 'qos_session_id is not allowed in creation'}), 400 - service = QOD_2_service(self.client, request_data,qos_profile_id) + service = QOD_2_service(self.client, request_data,qos_profile_id,duration) stripped_service = copy.deepcopy(service) stripped_service.ClearField('service_endpoint_ids') stripped_service.ClearField('service_constraints') @@ -206,8 +169,8 @@ class qodinfo(_Resource): return response def get(self): - service_list = self.client.ListServices(grpc_context_id(DEFAULT_CONTEXT_NAME)) - qod_info = [service_2_qod(service) for service in service_list.services] + service_list = self.client.ListServices(grpc_context_id(DEFAULT_CONTEXT_NAME)) #return context id as json + qod_info = [service_2_qod(service) for service in service_list.services] #iterating over service list LOGGER.info(f"error related to qod_info: {qod_info}") return qod_info @@ -231,7 +194,14 @@ class qodinfoId(_Resource): qos_profile_id = request_data.get('qos_profile_id') if not qos_profile_id: return jsonify({'error': 'qos_profile_id is required'}), 400 - service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, sessionId)) + duration = request_data.get('duration') + service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, sessionId)) #to get service we should have the context and the session id + if qos_profile_id: + service.name = qos_profile_id # if we provide a new qos profile , update the service name with new qos_profile_id + if duration: + for constraint in service.service_constraints: + if constraint.WhichOneof('constraint') == 'schedule': + constraint.schedule.duration_days = duration updated_service = self.service_client.UpdateService(service) qod_response = service_2_qod(updated_service) return qod_response, 200 diff --git a/src/nbi/service/rest_server/nbi_plugins/camara_qod/Tools.py b/src/nbi/service/rest_server/nbi_plugins/camara_qod/Tools.py index 043e5d92a..1b246a79d 100644 --- a/src/nbi/service/rest_server/nbi_plugins/camara_qod/Tools.py +++ b/src/nbi/service/rest_server/nbi_plugins/camara_qod/Tools.py @@ -13,13 +13,11 @@ # limitations under the License. import json, logging, re, time -from decimal import ROUND_HALF_EVEN, Decimal from flask.json import jsonify from common.proto.context_pb2 import ( ContextId, Empty, EndPointId, ServiceId, ServiceStatusEnum, ServiceTypeEnum, - Service, Constraint, Constraint_SLA_Capacity, ConfigRule, ConfigRule_Custom, - ConfigActionEnum,QoSProfile -) + Service,ConfigRule, ConfigRule_Custom, + ConfigActionEnum) from common.tools.grpc.ConfigRules import update_config_rule_custom from common.tools.grpc.Tools import grpc_message_to_json from common.tools.object_factory.Context import json_context_id @@ -27,10 +25,11 @@ from common.tools.object_factory.Service import json_service_id from uuid import uuid4 from nbi.service.rest_server.nbi_plugins.ietf_network.bindings.networks import network from qos_profile.client.QoSProfileClient import QoSProfileClient -#from context.service.database.QoSProfile import grpc_message_to_qos_table_data -from common.proto.context_pb2 import QoSProfile, QoSProfileId, Uuid, QoSProfileValueUnitPair,Empty,ServiceId +from context.client.ContextClient import ContextClient +from common.proto.context_pb2 import QoSProfileId, Uuid,Empty,ServiceId +from common.proto.context_pb2 import Uuid, QoSProfileId +from common.proto.qos_profile_pb2 import QoSProfileValueUnitPair, QoSProfile,QoDConstraintsRequest import logging -import grpc from netaddr import IPAddress, IPNetwork LOGGER = logging.getLogger(__name__) @@ -40,17 +39,60 @@ DEVICE_SETTINGS_KEY = '/device[{:s}]/settings' RE_CONFIG_RULE_IF_SUBIF = re.compile(r'^\/interface\[([^\]]+)\]\/subinterface\[([^\]]+)\]$') MEC_CONSIDERED_FIELDS = ['device', 'applicationServer', 'qosProfile', 'sessionId', 'duration', 'startedAt', 'expiresAt', 'qosStatus'] - def __init__(self) -> None: super().__init__() self.qos_profile_client = QoSProfileClient() + self.client = ContextClient() + +def grpc_message_to_qos_table_data(message: QoSProfile) -> dict: + return { + 'qos_profile_id' : message.qos_profile_id.qos_profile_id.uuid, + 'name' : message.name, + 'description' : message.description, + 'status' : message.status, + 'targetMinUpstreamRate' : grpc_message_to_json(message.targetMinUpstreamRate), + 'maxUpstreamRate' : grpc_message_to_json(message.maxUpstreamRate), + 'maxUpstreamBurstRate' : grpc_message_to_json(message.maxUpstreamBurstRate), + 'targetMinDownstreamRate' : grpc_message_to_json(message.targetMinDownstreamRate), + 'maxDownstreamRate' : grpc_message_to_json(message.maxDownstreamRate), + 'maxDownstreamBurstRate' : grpc_message_to_json(message.maxDownstreamBurstRate), + 'minDuration' : grpc_message_to_json(message.minDuration), + 'maxDuration' : grpc_message_to_json(message.maxDuration), + 'priority' : message.priority, + 'packetDelayBudget' : grpc_message_to_json(message.packetDelayBudget), + 'jitter' : grpc_message_to_json(message.jitter), + 'packetErrorLossRate' : message.packetErrorLossRate, + } + +def create_qos_profile_from_json(qos_profile_data: dict) -> QoSProfile: + def create_QoSProfileValueUnitPair(data) -> QoSProfileValueUnitPair: + return QoSProfileValueUnitPair(value=data['value'], unit=data['unit']) + qos_profile = QoSProfile() + qos_profile.qos_profile_id.CopyFrom(QoSProfileId(qos_profile_id=Uuid(uuid=qos_profile_data['qos_profile_id']))) + qos_profile.name = qos_profile_data['name'] + qos_profile.description = qos_profile_data['description'] + qos_profile.status = qos_profile_data['status'] + qos_profile.targetMinUpstreamRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['targetMinUpstreamRate'])) + qos_profile.maxUpstreamRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxUpstreamRate'])) + qos_profile.maxUpstreamBurstRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxUpstreamBurstRate'])) + qos_profile.targetMinDownstreamRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['targetMinDownstreamRate'])) + qos_profile.maxDownstreamRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxDownstreamRate'])) + qos_profile.maxDownstreamBurstRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxDownstreamBurstRate'])) + qos_profile.minDuration.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['minDuration'])) + qos_profile.maxDuration.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxDuration'])) + qos_profile.priority = qos_profile_data['priority'] + qos_profile.packetDelayBudget.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['packetDelayBudget'])) + qos_profile.jitter.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['jitter'])) + qos_profile.packetErrorLossRate = qos_profile_data['packetErrorLossRate'] + return qos_profile def ip_withoutsubnet(ip_withsubnet,neededip): network=IPNetwork(ip_withsubnet) return IPAddress(neededip) in network -def QOD_2_service(client,qod_info: dict,qos_profile_id) -> Service: +def QOD_2_service(client,qod_info: dict,qos_profile_id,duration) -> Service: + qos_profile_client = QoSProfileClient() service = Service() service_config_rules = service.service_config.config_rules @@ -112,34 +154,43 @@ def QOD_2_service(client,qod_info: dict,qos_profile_id) -> Service: update_config_rule_custom(service_config_rules, settings_cr_key, settings_cr_value) service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED service.service_type = ServiceTypeEnum.SERVICETYPE_L3NM - qod_info["sessionID"]=str(uuid4()) qod_info["context"]='admin' service.service_id.service_uuid.uuid = qod_info['sessionID'] service.service_id.context_id.context_uuid.uuid = qod_info["context"] - #service.service_constraints.CopyFrom() = qod_info['contraints'] - #LOGGER.info(f'this is the error: {qod_info["context"]}') - - try: - id = QoSProfileId(qos_profile_id=Uuid(uuid=qos_profile_id)) - qos_profile = client.GetQoSProfile(id) - except grpc._channel._InactiveRpcError as exc: - if exc.code() == grpc.StatusCode.NOT_FOUND: - return {"error": f"QoSProfile {qos_profile_id} not found"}, 404 - - if qos_profile.qos_profile_id: - qos_profile_id = qod_info.get('qos_profile_id') - service.name = qod_info.get('QosProfileId', qos_profile_id) + #try: + # id = QoSProfileId(qos_profile_id=Uuid(uuid=qos_profile_id)) + # id= QoSProfileId() + # id.qos_profile_id.uuid="qos_profile_id" + # qos_profile = qos_profile_client.GetQoSProfile(id) + #except grpc._channel._InactiveRpcError as exc: + # if exc.code() == grpc.StatusCode.NOT_FOUND: + # return {"error": f"QoSProfile {qos_profile_id} not found"}, 404 + #if qos_profile.qos_profile_id: + # qos_profile_id = qod_info.get('qos_profile_id') + service.name = qod_info.get('qos_profile_id', qos_profile_id) + current_time = time.time() + duration_days = duration # days as i saw it in the proto files + id = QoSProfileId(qos_profile_id=Uuid(uuid=qos_profile_id)) + request = QoDConstraintsRequest(qos_profile_id=id,start_timestamp=current_time,duration=duration_days) #defined attributes in proto file for the QoDconstraint rquest message + qos_profiles_constraint = qos_profile_client.GetConstraintListFromQoSProfile(request) + LOGGER.info(f'current time contains{current_time}') + LOGGER.info(f'current duration time contains{duration_days}') + LOGGER.info(f'id : {id}') + for cs in qos_profiles_constraint: + if cs.WhichOneof('constraint') == 'schedule' and cs.WhichOneof('constraint') == 'qos_profile': #the method of which one of + cs.schedule.start_timestamp = current_time + cs.schedule.duration_days = duration_days + cs.qos_profile.qos_profile_id=id + cs.qos_profile.qos_profile_name.CopyFrom(qos_profile_client.name) #i copied this from the qosprofile + LOGGER.info(f'the cs : {cs}') + service.service_constraints.append(cs) #qos_profile = QoSProfile() #qos_profile.qos_profile_id.qos_profile_id.uuid = qod_info['qosProfile'] - - #if 'qosProfile' in qos_profile_list: # qos_profile = QoSProfile() # qos_profile.qos_profile_id.qos_profile_id.uuid = qod_info['qosProfile'] - - return service @@ -167,20 +218,14 @@ def service_2_qod(service: Service) -> dict: if service.service_id: response['sessionId'] = service.service_id.service_uuid.uuid - - if service.timestamp: - response['duration'] = service.timestamp.timestamp - LOGGER.info(f"time stamp contains{response['duration']}") - - current_time = time.time() - response['startedAt'] = int(current_time) - response['expiresAt'] = int(current_time + response['duration']) - -# unixtime = time.time() -# response['timeStamp'] = { -# "seconds": int(unixtime), -# "nanoseconds": int(unixtime % 1 * 1e9) -# } + if service.service_constraints: + for constraint in service.service_constraints: + if constraint.WhichOneof('constraint') == 'schedule': + response['duration'] = float(constraint.schedule.duration_days* (86400)) + LOGGER.info(f'the duration in seconds: {response["duration"]}') + response['startedAt'] = int(constraint.schedule.start_timestamp) + response['expiresAt'] = response['startedAt'] + response['duration'] + return response diff --git a/src/nbi/service/rest_server/nbi_plugins/camara_qod/__init__.py b/src/nbi/service/rest_server/nbi_plugins/camara_qod/__init__.py index 9b19a1f9e..ab4fe2bbd 100644 --- a/src/nbi/service/rest_server/nbi_plugins/camara_qod/__init__.py +++ b/src/nbi/service/rest_server/nbi_plugins/camara_qod/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. from nbi.service.rest_server.RestServer import RestServer -from .Resources import ProfileList, ProfileDetail, qodinfo, qodinfoId, ProfileListCons +from .Resources import ProfileList, ProfileDetail, qodinfo, qodinfoId URL_PREFIX = '/camara/qod/v0' @@ -25,10 +25,6 @@ RESOURCES = [ ('camara.qod_info_session_id', qodinfoId, '/sessions/<sessionId>'), ('camara.qod.profile_list',ProfileList,'/profiles'), ('camara.qod.profile_detail',ProfileDetail,'/profiles/<string:qos_profile_id>'), - ('camara.qod.profile_all',ProfileListCons,'/profiles/constraints'), - #('camara.qod.profile_delete_by_name',Profile_delete_by_name,'/profiles/delete_by_name/<string:name>'), - #('camara.qod.profile_delete_all',Delete_all_profile,'/profiles/delete_all/'), - ] def register_camara_qod(rest_server : RestServer): diff --git a/src/nbi/tests/test_camara_qod_profile.py b/src/nbi/tests/test_camara_qod_profile.py index 7e2eebe79..4408305a9 100644 --- a/src/nbi/tests/test_camara_qod_profile.py +++ b/src/nbi/tests/test_camara_qod_profile.py @@ -88,14 +88,17 @@ def test_create_profile(): assert post_response['status'] == get_response['status'] assert post_response['targetMinDownstreamRate'] == get_response['targetMinDownstreamRate'] assert post_response['targetMinUpstreamRate'] == get_response['targetMinUpstreamRate'] - #assert response.status_code == 200, f"Failed to retrieve profile with status code {response.status_code}" +#assert response.status_code == 200, f"Failed to retrieve profile with status code {response.status_code}" + + + #def test_update_profile(): -# qos_profile_id = '1b4689d8-02a4-4a6c-bd0a-18ffecc1a336' +# qos_profile_id = '0898e7e8-ef15-4522-8e93-623e31c92efa' # qos_profile_data = { -# "qos_profile_id": "1b4689d8-02a4-4a6c-bd0a-18ffecc1a336", +# "qos_profile_id": "0898e7e8-ef15-4522-8e93-623e31c92efa", # "name": "Updated Name", -# "description": "Updated Description", +# "description": "NEW GAMING PROFILE", # "status": "ACTIVE", # "targetMinUpstreamRate": { # "value": 20, @@ -140,87 +143,11 @@ def test_create_profile(): # }, # "packetErrorLossRate": 1 #} -# # response = requests.put(f'{BASE_URL}/profiles/{qos_profile_id}', json=qos_profile_data) -# -#def test_delete_profile_by_id(): -# qos_profile_id = 'adcbc52e-85e1-42e2-aa33-b6d022798fb3' -# response = requests.delete(f'{BASE_URL}/profiles/{qos_profile_id}') - -import logging -from flask import jsonify -import requests - -logging.basicConfig(level=logging.DEBUG) -LOGGER = logging.getLogger() - -# Define the base URL for the API -BASE_URL = 'http://10.1.7.197/camara/qod/v0' - -def test_create_profile(): - # Define the QoS profile data - qos_profile_data = { - "name": "QCI_2_voice", - "description": "QoS profile for video streaming", - "status": "ACTIVE", - "targetMinUpstreamRate": {"value": 10, "unit": "bps"}, - "maxUpstreamRate": {"value": 10, "unit": "bps"}, - "maxUpstreamBurstRate": {"value": 10, "unit": "bps"}, - "targetMinDownstreamRate": {"value": 10, "unit": "bps"}, - "maxDownstreamRate": {"value": 10, "unit": "bps"}, - "maxDownstreamBurstRate": {"value": 10, "unit": "bps"}, - "minDuration": {"value": 12, "unit": "Minutes"}, - "maxDuration": {"value": 12, "unit": "Minutes"}, - "priority": 20, - "packetDelayBudget": {"value": 12, "unit": "Minutes"}, - "jitter": {"value": 12, "unit": "Minutes"}, - "packetErrorLossRate": 3 - } - # Test profile creation - post_response = requests.post(f'{BASE_URL}/profiles', json=qos_profile_data).json() - id = post_response['qos_profile_id'] - # Test profile retrieval - get_response = requests.get(f'{BASE_URL}/profiles/{id}').json() - - # Assertions to check if post and get responses match - assert post_response['qos_profile_id'] == get_response['qos_profile_id'] - assert post_response['jitter'] == get_response['jitter'] - assert post_response['maxDownstreamBurstRate'] == get_response['maxDownstreamBurstRate'] - assert post_response['maxDownstreamRate'] == get_response['maxDownstreamRate'] - assert post_response['maxUpstreamBurstRate'] == get_response['maxUpstreamBurstRate'] - assert post_response['maxUpstreamRate'] == get_response['maxUpstreamRate'] - assert post_response['minDuration'] == get_response['minDuration'] - assert post_response['name'] == get_response['name'] - assert post_response['packetDelayBudget'] == get_response['packetDelayBudget'] - assert post_response['packetErrorLossRate'] == get_response['packetErrorLossRate'] - assert post_response['priority'] == get_response['priority'] - assert post_response['status'] == get_response['status'] - assert post_response['targetMinDownstreamRate'] == get_response['targetMinDownstreamRate'] - assert post_response['targetMinUpstreamRate'] == get_response['targetMinUpstreamRate'] -def test_get_constraints(): - # Replace with actual qos_profile_id you want to test with - qos_profile_id = "contraints" - start_timestamp = 1726063284.25332 - duration = 86400 +# def test_delete_profile_by_id(): +# qos_profile_id = '0898e7e8-ef15-4522-8e93-623e31c92efa' +# response = requests.delete(f'{BASE_URL}/profiles/{qos_profile_id}') - # Send GET request to fetch constraints - response = requests.get(f'{BASE_URL}/profiles/constraints', params={ - "qos_profile_id": qos_profile_id, - "start_timestamp": start_timestamp, - "duration": duration - }) - - # Convert response to JSON and add error checking - if response.status_code == 200: - constraints = response.json() - LOGGER.debug(f"Constraints retrieved: {constraints}") - - # Additional assertions for constraints - assert len(constraints) > 0, "Expected at least one constraint" - first_constraint = constraints[0] - assert "constraint_type" in first_constraint, "Constraint type missing in response" - else: - LOGGER.error(f"Failed to fetch constraints: Status code {response.status_code}") diff --git a/src/nbi/tests/test_camara_qos_service.py b/src/nbi/tests/test_camara_qos_service.py index 247268512..fe4ee3f2e 100644 --- a/src/nbi/tests/test_camara_qos_service.py +++ b/src/nbi/tests/test_camara_qos_service.py @@ -19,34 +19,35 @@ import requests logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger() BASE_URL = 'http://10.1.7.197/camara/qod/v0' +def test_create_SESSION(): + BASE_URL = 'http://10.1.7.197/camara/qod/v0' + service_data={ + "device": + {"ipv4Address":"84.75.11.12/25" }, + "applicationServer": { + "ipv4Address": "192.168.0.1/26", + }, + "duration":10000000.00, + "qos_profile_id": "367d3e3f-96be-4391-af82-e21c042dd9bd", + } + post_response = requests.post(f'{BASE_URL}/sessions', json=service_data).json() + #id=post_response['sessionID'] + #get_response = requests.get(f'{BASE_URL}/sessions/{id}').json() + get_response = requests.get(f'{BASE_URL}/sessions').json() + -#def test_create_SESSION(): -# BASE_URL = 'http://10.1.7.197/camara/qod/v0' -# service_data={ -# "device": -# {"ipv4Address":"84.75.11.12/25" }, -# "applicationServer": { -# "ipv4Address": "192.168.0.1/26", -# }, -# "duration":100.00, -# "qos_profile_id": "6f39d0ae-f1a4-4e05-ad3c-bc77bdbb7fd0", -# } -# post_response = requests.post(f'{BASE_URL}/sessions', json=service_data).json() -# #id=post_response['sessionID'] -# #get_response = requests.get(f'{BASE_URL}/sessions/{id}').json() -# get_response = requests.get(f'{BASE_URL}/sessions').json() #def test_delete_session_by_id(): # session_id = '' # response = requests.delete(f'{BASE_URL}/sessions/{session_id}') #def test_update_session_by_id(): -# session_id='f192a586-4869-4d28-8793-01478fa149041' -# session_data={"session_id":'f192a586-4869-4d28-8793-01478fa14904', -# "device": -# {"ipv4Address":"84.75.11.12/25" }, -# "applicationServer": { -# "ipv4Address": "192.168.0.1/26", -# }, -# "duration":200.00, -# "qos_profile_id": "6f39d0ae-f1a4-4e05-ad3c-bc77bdbb7fd0"} +# session_id='3ac2ff12-d763-4ded-9e13-7e82709b16cd' +# session_data={"session_id":'3ac2ff12-d763-4ded-9e13-7e82709b16cd', +# "device": +# {"ipv4Address":"84.75.11.12/25" }, +# "applicationServer": { +# "ipv4Address": "192.168.0.1/26", +# }, +# "duration":2000000000.00, +# "qos_profile_id": "28499a15-c1d9-428c-9732-82859a727235"} # put_response=requests.put(f'{BASE_URL}/sessions/{session_id}',json=session_data).json() \ No newline at end of file -- GitLab