Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (15)
......@@ -69,3 +69,10 @@ spec:
name: nbiservice
port:
number: 8080
- path: /()(camara/.*)
pathType: Prefix
backend:
service:
name: nbiservice
port:
number: 8080
File mode changed from 100755 to 100644
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
COVERAGEFILE=$PROJECTDIR/coverage/.coverage
# Destroy old coverage file and configure the correct folder on the .coveragerc file
rm -f $COVERAGEFILE
cat $PROJECTDIR/coverage/.coveragerc.template | sed s+~/tfs-ctrl+$PROJECTDIR+g > $RCFILE
# Run unitary tests and analyze coverage of code at same time
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_camara_qod_profile.py nbi/tests/test_camara_qos_service.py
......@@ -88,6 +88,8 @@ COPY src/slice/__init__.py slice/__init__.py
COPY src/slice/client/. slice/client/
COPY src/qkd_app/__init__.py qkd_app/__init__.py
COPY src/qkd_app/client/. qkd_app/client/
COPY src/qos_profile/__init__.py qos_profile/__init__.py
COPY src/qos_profile/client/. qos_profile/client/
COPY src/vnt_manager/__init__.py vnt_manager/__init__.py
COPY src/vnt_manager/client/. vnt_manager/client/
RUN mkdir -p /var/teraflow/tests/tools
......
......@@ -22,6 +22,7 @@ from common.Settings import (
)
from .NbiService import NbiService
from .rest_server.RestServer import RestServer
from .rest_server.nbi_plugins.camara_qod import register_camara_qod
from .rest_server.nbi_plugins.etsi_bwm import register_etsi_bwm_api
from .rest_server.nbi_plugins.ietf_hardware import register_ietf_hardware
from .rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn
......@@ -70,6 +71,7 @@ def main():
grpc_service.start()
rest_server = RestServer()
register_camara_qod(rest_server)
register_etsi_bwm_api(rest_server)
register_ietf_hardware(rest_server)
register_ietf_l2vpn(rest_server) # Registering L2VPN entrypoint
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, grpc, grpc._channel, logging
from typing import Dict
from uuid import uuid4
from flask.json import jsonify
from flask_restful import Resource, request
from common.proto.context_pb2 import QoSProfileId, Uuid,Empty
from common.Constants import DEFAULT_CONTEXT_NAME
from context.client.ContextClient import ContextClient
from qos_profile.client.QoSProfileClient import QoSProfileClient
from service.client.ServiceClient import ServiceClient
from .Tools import (
format_grpc_to_json, grpc_context_id, grpc_service_id,
QOD_2_service, service_2_qod, grpc_message_to_qos_table_data,
create_qos_profile_from_json
)
LOGGER = logging.getLogger(__name__)
class _Resource(Resource):
def __init__(self) -> None:
super().__init__()
self.qos_profile_client = QoSProfileClient()
self.client = ContextClient()
self.service_client = ServiceClient()
#ProfileList Endpoint for posting
class ProfileList(_Resource):
def post(self):
if not request.is_json:
return {"message": "JSON payload is required to proceed"}, 415
request_data: Dict = request.get_json() #get the json from the test function
request_data['qos_profile_id']=str(uuid4()) # Create qos ID randomly using uuid4
# JSON TO GRPC to store the data in the grpc server
try:
qos_profile = create_qos_profile_from_json(request_data)
except:
LOGGER.exception("Error happened while creating QoS profile from json")
return {"message": "Failed to create QoS profile"}, 500
# Send to gRPC server using CreateQosProfile
try:
qos_profile_created = self.qos_profile_client.CreateQoSProfile(qos_profile)
except:
LOGGER.exception("error happened while creating QoS profile")
#gRPC message back to JSON using the helper function
qos_profile_data = grpc_message_to_qos_table_data(qos_profile_created)
LOGGER.info(f'qos_profile_data{qos_profile_data}')
return jsonify(qos_profile_data)
def get(self):
qos_profiles = self.qos_profile_client.GetQoSProfiles(Empty()) #get all of type empty and defined in .proto file ()
qos_profile_list = [] #since it iterates over QoSProfile , create a list to store all QOS profiles
for qos_profile in qos_profiles:
LOGGER.info('qos_profiles = {:s}'.format(str(qos_profiles)))
qos_profile_data = grpc_message_to_qos_table_data(qos_profile) #transform to json
qos_profile_list.append(qos_profile_data) # append to the list
return jsonify(qos_profile_list)
#getting,updating,deleting using the qos profile id
class ProfileDetail(_Resource):
def get(self, qos_profile_id):
id = QoSProfileId(qos_profile_id=Uuid(uuid=qos_profile_id)) #this is because we want to GetQOSProfile which takes QOSProfileID
#or
#id=QoSProfileId()
#id.qos_profile_id.uuid=qos_profile_id
#The QoSProfileID is message qod_profile_id of type Uuid
# Uuid is a message uuid of type qos_profile_id which is a string
try:
qos_profile = self.qos_profile_client.GetQoSProfile(id) #get the qosprofile from grpc server according to ID
qos_profile_data = grpc_message_to_qos_table_data(qos_profile) # grpc to json agian to view it on http
return jsonify(qos_profile_data)
except grpc._channel._InactiveRpcError as exc:
if exc.code() == grpc.StatusCode.NOT_FOUND:
LOGGER.warning(f"QoSProfile not found: {qos_profile_id}")
return {"error": f"QoSProfile {qos_profile_id} not found"}, 404
LOGGER.error(f"gRPC error while fetching QoSProfile: {exc}")
return {"error": "Internal Server Error"}, 500
except:
LOGGER.exception(f"Error while fetching QoSProfile")
return {"error": "Internal Server Error"}, 500
def put(self, qos_profile_id):
try:
request_data = request.get_json() # get the json to do the update
if 'qos_profile_id' not in request_data: #ensuring the update on qos profile id
request_data['qos_profile_id'] = qos_profile_id # ID to be updated
qos_profile = create_qos_profile_from_json(request_data) # Transform it again to grpc
qos_profile_updated = self.qos_profile_client.UpdateQoSProfile(qos_profile) # update the profile in the grpc server
return grpc_message_to_qos_table_data(qos_profile_updated), 200
except KeyError as e:
LOGGER.error(f"Missing required key: {e}")
return {"error": f"Missing required key: {str(e)}"}, 400
except grpc._channel._InactiveRpcError as exc:
if exc.code() == grpc.StatusCode.NOT_FOUND:
LOGGER.warning(f"QoSProfile not found for update: {qos_profile_id}")
return {"error": f"QoSProfile {qos_profile_id} not found"}, 404
LOGGER.error(f"gRPC error while updating QoSProfile: {exc}")
return {"error": "Internal Server Error"}, 500
except:
LOGGER.exception(f"Error in PUT /profiles/{qos_profile_id}")
return {"error": "Internal Server Error"}, 500
def delete(self, qos_profile_id):
id = QoSProfileId(qos_profile_id=Uuid(uuid=qos_profile_id)) # get id to delete accordingly
try:
qos_profile = self.qos_profile_client.DeleteQoSProfile(id)
qos_profile_data = grpc_message_to_qos_table_data(qos_profile)
return jsonify(qos_profile_data)
except grpc._channel._InactiveRpcError as exc:
if exc.code() == grpc.StatusCode.NOT_FOUND:
LOGGER.warning(f"QoSProfile not found for deletion: {qos_profile_id}")
return {"error": f"QoSProfile {qos_profile_id} not found"}, 404
LOGGER.error(f"gRPC error while deleting QoSProfile: {exc}")
return {"error": "Internal Server Error"}, 500
except:
LOGGER.exception(f"Error in DELETE /profiles/{qos_profile_id}")
return {"error": "Internal Server Error"}, 500
###SESSION##########################################################
class QodInfo(_Resource):
def post(self):
if not request.is_json:
return jsonify({'error': 'Unsupported Media Type', 'message': 'JSON payload is required'}), 415
request_data: Dict = request.get_json()
qos_profile_id = request_data.get('qos_profile_id')
qos_session_id = request_data.get('qos_session_id')
duration = request_data.get('duration')
LOGGER.info(f'qos_profile_id:{qos_profile_id}')
if not qos_profile_id:
return jsonify({'error': 'qos_profile_id is required'}), 400
if qos_session_id:
return jsonify({'error': 'qos_session_id is not allowed in creation'}), 400
service = QOD_2_service(self.client, request_data, qos_profile_id, duration)
stripped_service = copy.deepcopy(service)
stripped_service.ClearField('service_endpoint_ids')
stripped_service.ClearField('service_constraints')
stripped_service.ClearField('service_config')
try:
create_response = format_grpc_to_json(self.service_client.CreateService(stripped_service))
update_response = format_grpc_to_json(self.service_client.UpdateService(service))
response = {
"create_response": create_response,
"update_response": update_response
}
except Exception as e:
LOGGER.error(f"Unexpected error: {str(e)}", exc_info=True)
return jsonify({'error': 'Internal Server Error', 'message': 'An unexpected error occurred'}), 500
return response
#didnt work return jsonify(response)
def get(self):
service_list = self.client.ListServices(grpc_context_id(DEFAULT_CONTEXT_NAME)) #return context id as json
qod_info = [service_2_qod(service) for service in service_list.services] #iterating over service list
LOGGER.info(f"error related to qod_info: {qod_info}")
return qod_info
#didnt work return jsonify(qod_info)
class QodInfoID(_Resource):
def get(self, sessionId: str):
try:
service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, sessionId))
return service_2_qod(service)
except grpc._channel._InactiveRpcError as exc:
if exc.code()==grpc.StatusCode.NOT_FOUND:
LOGGER.warning(f"Qod Session not found: {sessionId}")
return {"error": f"Qod Session {sessionId} not found"}, 404
def put(self, sessionId: str):
try:
request_data: Dict = request.get_json()
session_id = request_data.get('session_id')
if not session_id:
return jsonify({'error': 'sessionId is required'}), 400
qos_profile_id = request_data.get('qos_profile_id')
if not qos_profile_id:
return jsonify({'error': 'qos_profile_id is required'}), 400
duration = request_data.get('duration')
service = self.client.GetService(grpc_service_id(DEFAULT_CONTEXT_NAME, sessionId)) #to get service we should have the context and the session id
if qos_profile_id:
service.name = qos_profile_id # if we provide a new qos profile , update the service name with new qos_profile_id
if duration:
for constraint in service.service_constraints:
if constraint.WhichOneof('constraint') == 'schedule':
constraint.schedule.duration_days = duration
updated_service = self.service_client.UpdateService(service)
qod_response = service_2_qod(updated_service)
return qod_response, 200
except KeyError as e:
LOGGER.error(f"Missing required key: {e}")
return {"error": f"Missing required key: {str(e)}"}, 400
except grpc._channel._InactiveRpcError as exc:
if exc.code() == grpc.StatusCode.NOT_FOUND:
LOGGER.warning(f"Qod Session not found: {sessionId}")
return {"error": f"Qod Session {sessionId} not found"}, 404
LOGGER.error(f"gRPC error while updating Qod Session: {exc}")
return {"error": "Internal Server Error"}, 500
except:
LOGGER.exception(f"Error in PUT /sessions/{sessionId}")
return {"error": "Internal Server Error"}, 500
def delete(self, sessionId: str):
self.service_client.DeleteService(grpc_service_id(DEFAULT_CONTEXT_NAME, sessionId))
return{"Session Deleted"}
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, re, time
from flask.json import jsonify
from common.proto.context_pb2 import (
ContextId, Empty, EndPointId, ServiceId, ServiceStatusEnum, ServiceTypeEnum,
Service,ConfigRule, ConfigRule_Custom,
ConfigActionEnum)
from common.tools.grpc.ConfigRules import update_config_rule_custom
from common.tools.grpc.Tools import grpc_message_to_json
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Service import json_service_id
from uuid import uuid4
from nbi.service.rest_server.nbi_plugins.ietf_network.bindings.networks import network
from qos_profile.client.QoSProfileClient import QoSProfileClient
from context.client.ContextClient import ContextClient
from common.proto.context_pb2 import QoSProfileId, Uuid,Empty,ServiceId
from common.proto.context_pb2 import Uuid, QoSProfileId
from common.proto.qos_profile_pb2 import QoSProfileValueUnitPair, QoSProfile,QoDConstraintsRequest
import logging
from netaddr import IPAddress, IPNetwork
LOGGER = logging.getLogger(__name__)
ENDPOINT_SETTINGS_KEY = '/device[{:s}]/endpoint[{:s}]/vlan[{:d}]/settings'
DEVICE_SETTINGS_KEY = '/device[{:s}]/settings'
RE_CONFIG_RULE_IF_SUBIF = re.compile(r'^\/interface\[([^\]]+)\]\/subinterface\[([^\]]+)\]$')
MEC_CONSIDERED_FIELDS = ['device', 'applicationServer', 'qosProfile', 'sessionId', 'duration', 'startedAt', 'expiresAt', 'qosStatus']
def __init__(self) -> None:
super().__init__()
self.qos_profile_client = QoSProfileClient()
self.client = ContextClient()
def grpc_message_to_qos_table_data(message: QoSProfile) -> dict:
return {
'qos_profile_id' : message.qos_profile_id.qos_profile_id.uuid,
'name' : message.name,
'description' : message.description,
'status' : message.status,
'targetMinUpstreamRate' : grpc_message_to_json(message.targetMinUpstreamRate),
'maxUpstreamRate' : grpc_message_to_json(message.maxUpstreamRate),
'maxUpstreamBurstRate' : grpc_message_to_json(message.maxUpstreamBurstRate),
'targetMinDownstreamRate' : grpc_message_to_json(message.targetMinDownstreamRate),
'maxDownstreamRate' : grpc_message_to_json(message.maxDownstreamRate),
'maxDownstreamBurstRate' : grpc_message_to_json(message.maxDownstreamBurstRate),
'minDuration' : grpc_message_to_json(message.minDuration),
'maxDuration' : grpc_message_to_json(message.maxDuration),
'priority' : message.priority,
'packetDelayBudget' : grpc_message_to_json(message.packetDelayBudget),
'jitter' : grpc_message_to_json(message.jitter),
'packetErrorLossRate' : message.packetErrorLossRate,
}
def create_qos_profile_from_json(qos_profile_data: dict) -> QoSProfile:
def create_QoSProfileValueUnitPair(data) -> QoSProfileValueUnitPair:
return QoSProfileValueUnitPair(value=data['value'], unit=data['unit'])
qos_profile = QoSProfile()
qos_profile.qos_profile_id.CopyFrom(QoSProfileId(qos_profile_id=Uuid(uuid=qos_profile_data['qos_profile_id'])))
qos_profile.name = qos_profile_data['name']
qos_profile.description = qos_profile_data['description']
qos_profile.status = qos_profile_data['status']
qos_profile.targetMinUpstreamRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['targetMinUpstreamRate']))
qos_profile.maxUpstreamRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxUpstreamRate']))
qos_profile.maxUpstreamBurstRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxUpstreamBurstRate']))
qos_profile.targetMinDownstreamRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['targetMinDownstreamRate']))
qos_profile.maxDownstreamRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxDownstreamRate']))
qos_profile.maxDownstreamBurstRate.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxDownstreamBurstRate']))
qos_profile.minDuration.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['minDuration']))
qos_profile.maxDuration.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['maxDuration']))
qos_profile.priority = qos_profile_data['priority']
qos_profile.packetDelayBudget.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['packetDelayBudget']))
qos_profile.jitter.CopyFrom(create_QoSProfileValueUnitPair(qos_profile_data['jitter']))
qos_profile.packetErrorLossRate = qos_profile_data['packetErrorLossRate']
return qos_profile
def ip_withoutsubnet(ip_withsubnet,neededip):
network=IPNetwork(ip_withsubnet)
return IPAddress(neededip) in network
def QOD_2_service(client,qod_info: dict,qos_profile_id: int,duration: int) -> Service:
qos_profile_client = QoSProfileClient()
service = Service()
service_config_rules = service.service_config.config_rules
request_cr_key = '/request'
request_cr_value = {k: qod_info[k] for k in MEC_CONSIDERED_FIELDS if k in qod_info}
config_rule = ConfigRule()
config_rule.action = ConfigActionEnum.CONFIGACTION_SET
config_rule_custom = ConfigRule_Custom()
config_rule_custom.resource_key = request_cr_key
config_rule_custom.resource_value = json.dumps(request_cr_value)
config_rule.custom.CopyFrom(config_rule_custom)
service_config_rules.append(config_rule)
if 'device' in qod_info and 'applicationServer' in qod_info:
a_ip = qod_info['device'].get('ipv4Address')
z_ip = qod_info['applicationServer'].get('ipv4Address')
LOGGER.debug('a_ip = {:s}'.format(str(a_ip)))
LOGGER.debug('z_ip = {:s}'.format(str(z_ip)))
if a_ip and z_ip:
devices = client.ListDevices(Empty()).devices
#LOGGER.info('devices = {:s}'.format(str(devices)))
ip_interface_name_dict = {}
for device in devices:
#LOGGER.info('device..uuid = {:s}'.format(str(device.device_id.device_uuid.uuid)))
#LOGGER.info('device..name = {:s}'.format(str(device.name)))
device_endpoint_uuids = {ep.name: ep.endpoint_id.endpoint_uuid.uuid for ep in device.device_endpoints}
#LOGGER.info('device_endpoint_uuids = {:s}'.format(str(device_endpoint_uuids)))
for cr in device.device_config.config_rules:
if cr.WhichOneof('config_rule') != 'custom':
continue
#LOGGER.info('cr = {:s}'.format(str(cr)))
match_subif = RE_CONFIG_RULE_IF_SUBIF.match(cr.custom.resource_key)
if not match_subif:
continue
address_ip =json.loads(cr.custom.resource_value).get('address_ip')
LOGGER.debug('cr..address_ip = {:s}'.format(str(address_ip)))
short_port_name = match_subif.groups()[0]
LOGGER.debug('short_port_name = {:s}'.format(str(short_port_name)))
ip_interface_name_dict[address_ip] = short_port_name
if not (ip_withoutsubnet(a_ip, address_ip) or ip_withoutsubnet(z_ip, address_ip)):
continue
ep_id = EndPointId()
ep_id.endpoint_uuid.uuid = device_endpoint_uuids.get(short_port_name , '')
ep_id.device_id.device_uuid.uuid = device.device_id.device_uuid.uuid
service.service_endpoint_ids.append(ep_id)
LOGGER.debug(f"the ip address{ep_id}")
#LOGGER.info('ip_interface_name_dict = {:s}'.format(str(ip_interface_name_dict)))
settings_cr_key = '/settings'
settings_cr_value = {}
update_config_rule_custom(service_config_rules, settings_cr_key, settings_cr_value)
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
service.service_type = ServiceTypeEnum.SERVICETYPE_L3NM
qod_info["sessionID"]=str(uuid4())
qod_info["context"]='admin'
service.service_id.service_uuid.uuid = qod_info['sessionID']
service.service_id.context_id.context_uuid.uuid = qod_info["context"]
service.name = qod_info.get('qos_profile_id', qos_profile_id)
current_time = time.time()
duration_days = duration # days as i saw it in the proto files
id = QoSProfileId(qos_profile_id=Uuid(uuid=qos_profile_id))
request = QoDConstraintsRequest(qos_profile_id=id,start_timestamp=current_time,duration=duration_days) #defined attributes in proto file for the QoDconstraint rquest message
qos_profiles_constraint = qos_profile_client.GetConstraintListFromQoSProfile(request)
LOGGER.info(f'current time contains{current_time}')
LOGGER.info(f'current duration time contains{duration_days}')
LOGGER.info(f'id : {id}')
for cs in qos_profiles_constraint:
if cs.WhichOneof('constraint') == 'schedule' and cs.WhichOneof('constraint') == 'qos_profile': #the method of which one of
cs.schedule.start_timestamp = current_time
cs.schedule.duration_days = duration_days
cs.qos_profile.qos_profile_id=id
cs.qos_profile.qos_profile_name.CopyFrom(qos_profile_client.name) #i copied this from the qosprofile
LOGGER.info(f'the cs : {cs}')
service.service_constraints.append(cs)
return service
def service_2_qod(service: Service) -> dict:
response = {}
for config_rule in service.service_config.config_rules:
resource_value_json = json.loads(config_rule.custom.resource_value)
LOGGER.info(f"the resource value contains:{resource_value_json}")
if config_rule.custom.resource_key != '/request':
continue
if 'device' in resource_value_json and 'ipv4Address' in resource_value_json['device']:
response['device'] = {
'ipv4Address': resource_value_json['device']['ipv4Address']
}
if 'applicationServer' in resource_value_json and 'ipv4Address' in resource_value_json['applicationServer']:
response['applicationServer'] = {
'ipv4Address': resource_value_json['applicationServer']['ipv4Address']
}
if service.name:
response['qos_profile_id'] = service.name
if service.service_id:
response['sessionId'] = service.service_id.service_uuid.uuid
if service.service_constraints:
for constraint in service.service_constraints:
if constraint.WhichOneof('constraint') == 'schedule':
response['duration'] = float(constraint.schedule.duration_days* (86400))
LOGGER.info(f'the duration in seconds: {response["duration"]}')
response['startedAt'] = int(constraint.schedule.start_timestamp)
response['expiresAt'] = response['startedAt'] + response['duration']
return response
def format_grpc_to_json(grpc_reply):
return jsonify(grpc_message_to_json(grpc_reply))
def grpc_context_id(context_uuid):
return ContextId(**json_context_id(context_uuid))
def grpc_service_id(context_uuid, service_uuid):
return ServiceId(**json_service_id(service_uuid, context_id=json_context_id(context_uuid)))
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from nbi.service.rest_server.RestServer import RestServer
from .Resources import ProfileList, ProfileDetail, QodInfo, QodInfoID
URL_PREFIX = '/camara/qod/v0'
# Use 'path' type since some identifiers might contain char '/' and Flask is unable to recognize them in 'string' type.
RESOURCES = [
# (endpoint_name, resource_class, resource_url)
('camara.qod_session_info', QodInfo, '/sessions'),
('camara.qod_info_session_id', QodInfoID, '/sessions/<sessionId>'),
('camara.qod.profile_list', ProfileList, '/profiles'),
('camara.qod.profile_detail', ProfileDetail, '/profiles/<string:qos_profile_id>'),
]
def register_camara_qod(rest_server : RestServer):
for endpoint_name, resource_class, resource_url in RESOURCES:
rest_server.add_resource(resource_class, URL_PREFIX + resource_url, endpoint=endpoint_name)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from flask import jsonify
import requests
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()
def test_create_profile():
BASE_URL = 'http://10.1.7.197/camara/qod/v0'
qos_profile_data={
"name": "QCI_2_voice",
"description": "QoS profile for video streaming",
"status": "ACTIVE",
"targetMinUpstreamRate": {
"value": 10,
"unit": "bps"
},
"maxUpstreamRate": {
"value": 10,
"unit": "bps"
},
"maxUpstreamBurstRate": {
"value": 10,
"unit": "bps"
},
"targetMinDownstreamRate": {
"value": 10,
"unit": "bps"
},
"maxDownstreamRate": {
"value": 10,
"unit": "bps"
},
"maxDownstreamBurstRate": {
"value": 10,
"unit": "bps"
},
"minDuration": {
"value": 12,
"unit": "Minutes"
},
"maxDuration": {
"value": 12,
"unit": "Minutes"
},
"priority": 20,
"packetDelayBudget": {
"value": 12,
"unit": "Minutes"
},
"jitter": {
"value": 12,
"unit": "Minutes"
},
"packetErrorLossRate": 3
}
post_response = requests.post(f'{BASE_URL}/profiles', json=qos_profile_data).json()
id=post_response['qos_profile_id']
get_response = requests.get(f'{BASE_URL}/profiles/{id}').json()
assert post_response['qos_profile_id'] == get_response['qos_profile_id']
assert post_response['jitter'] == get_response['jitter']
assert post_response['maxDownstreamBurstRate'] == get_response['maxDownstreamBurstRate']
assert post_response['maxDownstreamRate'] == get_response['maxDownstreamRate']
assert post_response['maxUpstreamBurstRate'] == get_response['maxUpstreamBurstRate']
assert post_response['maxUpstreamRate'] == get_response['maxUpstreamRate']
assert post_response['minDuration'] == get_response['minDuration']
assert post_response['name'] == get_response['name']
assert post_response['packetDelayBudget'] == get_response['packetDelayBudget']
assert post_response['packetErrorLossRate'] == get_response['packetErrorLossRate']
assert post_response['priority'] == get_response['priority']
assert post_response['status'] == get_response['status']
assert post_response['targetMinDownstreamRate'] == get_response['targetMinDownstreamRate']
assert post_response['targetMinUpstreamRate'] == get_response['targetMinUpstreamRate']
#assert response.status_code == 200, f"Failed to retrieve profile with status code {response.status_code}"
#def test_update_profile():
# qos_profile_id = '0898e7e8-ef15-4522-8e93-623e31c92efa'
# qos_profile_data = {
# "qos_profile_id": "0898e7e8-ef15-4522-8e93-623e31c92efa",
# "name": "Updated Name",
# "description": "NEW GAMING PROFILE",
# "status": "ACTIVE",
# "targetMinUpstreamRate": {
# "value": 20,
# "unit": "bps"
# },
# "maxUpstreamRate": {
# "value": 50,
# "unit": "bps"
# },
# "maxUpstreamBurstRate": {
# "value": 60,
# "unit": "bps"
# },
# "targetMinDownstreamRate": {
# "value": 30,
# "unit": "bps"
# },
# "maxDownstreamRate": {
# "value": 100,
# "unit": "bps"
# },
# "maxDownstreamBurstRate": {
# "value": 70,
# "unit": "bps"
# },
# "minDuration": {
# "value": 15,
# "unit": "Minutes"
# },
# "maxDuration": {
# "value": 25,
# "unit": "Minutes"
# },
# "priority": 15,
# "packetDelayBudget": {
# "value": 10,
# "unit": "Minutes"
# },
# "jitter": {
# "value": 10,
# "unit": "Minutes"
# },
# "packetErrorLossRate": 1
#}
# response = requests.put(f'{BASE_URL}/profiles/{qos_profile_id}', json=qos_profile_data)
# def test_delete_profile_by_id():
# qos_profile_id = '0898e7e8-ef15-4522-8e93-623e31c92efa'
# response = requests.delete(f'{BASE_URL}/profiles/{qos_profile_id}')
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from flask import jsonify
import requests
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()
BASE_URL = 'http://10.1.7.197/camara/qod/v0'
#def test_create_SESSION():
# service_data={
# "device":
# {"ipv4Address":"84.75.11.12/25" },
# "applicationServer": {
# "ipv4Address": "192.168.0.1/26",
# },
# "duration":10000000.00,
# "qos_profile_id": "f46d563f-9f1a-44c8-9649-5fde673236d6",
# }
# post_response = requests.post(f'{BASE_URL}/sessions', json=service_data).json()
# #id=post_response['sessionID']
# #get_response = requests.get(f'{BASE_URL}/sessions/{id}').json()
# get_response = requests.get(f'{BASE_URL}/sessions').json()
#def test_delete_session_by_id():
# session_id = '83d4b75a-9b09-40f4-b9a9-f31d591fa319'
# response = requests.delete(f'{BASE_URL}/sessions/{session_id}')
#def test_update_session_by_id():
# session_id='3ac2ff12-d763-4ded-9e13-7e82709b16cd'
# session_data={"session_id":'3ac2ff12-d763-4ded-9e13-7e82709b16cd',
# "device":
# {"ipv4Address":"84.75.11.12/25" },
# "applicationServer": {
# "ipv4Address": "192.168.0.1/26",
# },
# "duration":2000000000.00,
# "qos_profile_id": "28499a15-c1d9-428c-9732-82859a727235"}
# put_response=requests.put(f'{BASE_URL}/sessions/{session_id}',json=session_data).json()
\ No newline at end of file