-
Shayan Hajipour authored
- test_crud.py refactored - test_constraints.py added - start_timestamp of Constraint_Schedule changed to double from float
Shayan Hajipour authored- test_crud.py refactored - test_constraints.py added - start_timestamp of Constraint_Schedule changed to double from float
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_constraints.py 3.79 KiB
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from google.protobuf.json_format import MessageToDict
from common.proto.context_pb2 import QoDConstraintsRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from qos_profile.client.QoSProfileClient import QoSProfileClient
from .conftest import create_qos_profile_from_json
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
qos_profile_data = {
"qos_profile_id": "0afc905f-f1f0-4ae2-9925-9df17140b8bf",
"name": "QCI_2_voice",
"description": "QoS profile for game streaming",
"status": "ACTIVE",
"targetMinUpstreamRate": {
"value": 5,
"unit": "bps"
},
"maxUpstreamRate": {
"value": 5,
"unit": "bps"
},
"maxUpstreamBurstRate": {
"value": 5,
"unit": "bps"
},
"targetMinDownstreamRate": {
"value": 5,
"unit": "bps"
},
"maxDownstreamRate": {
"value": 5,
"unit": "bps"
},
"maxDownstreamBurstRate": {
"value": 5,
"unit": "bps"
},
"minDuration": {
"value": 5,
"unit": "Minutes"
},
"maxDuration": {
"value": 6,
"unit": "Minutes"
},
"priority": 5,
"packetDelayBudget": {
"value": 5,
"unit": "Minutes"
},
"jitter": {
"value": 5,
"unit": "Minutes"
},
"packetErrorLossRate": 3
}
target_qos_profile_constraint = {
"action": "CONSTRAINTACTION_SET",
"qos_profile": {
"target_min_upstream_rate": {
"value": 5,
"unit": "bps"
},
"max_upstream_rate": {
"value": 5,
"unit": "bps"
},
"max_upstream_burst_rate": {
"value": 5,
"unit": "bps"
},
"target_min_downstream_rate": {
"value": 5,
"unit": "bps"
},
"max_downstream_rate": {
"value": 5,
"unit": "bps"
},
"max_downstream_burst_rate": {
"value": 5,
"unit": "bps"
},
"min_duration": {
"value": 5,
"unit": "Minutes"
},
"max_duration": {
"value": 6,
"unit": "Minutes"
},
"priority": 5,
"packet_delay_budget": {
"value": 5,
"unit": "Minutes"
},
"jitter": {
"value": 5,
"unit": "Minutes"
},
"packet_error_loss_rate": 3
}
}
def test_get_constraints(qos_profile_client: QoSProfileClient):
qos_profile = create_qos_profile_from_json(qos_profile_data)
qos_profile_created = qos_profile_client.CreateQoSProfile(qos_profile)
LOGGER.info('qos_profile_data = {:s}'.format(grpc_message_to_json_string(qos_profile_created)))
constraints = list(qos_profile_client.GetConstraintListFromQoSProfile(QoDConstraintsRequest(
qos_profile_id=qos_profile.qos_profile_id, start_timestamp=1726063284.25332, duration=86400)
))
constraint_1 = constraints[0]
constraint_2 = constraints[1]
assert len(constraints) == 2
assert constraint_1.WhichOneof('constraint') == 'qos_profile'
print(MessageToDict(constraint_1, preserving_proto_field_name=True))
assert MessageToDict(constraint_1, preserving_proto_field_name=True) == target_qos_profile_constraint
assert constraint_2.WhichOneof('constraint') == 'schedule'
assert constraint_2.schedule.start_timestamp == 1726063284.25332
assert constraint_2.schedule.duration_days == 1
qos_profile_client.DeleteQoSProfile(qos_profile.qos_profile_id)