# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import List, Optional from common.proto.context_pb2 import Empty, QoSProfileId, QoSProfileValueUnitPair, QoSProfile from common.method_wrappers.ServiceExceptions import NotFoundException from common.tools.grpc.Tools import grpc_message_to_json from .models.QoSProfile import QoSProfileModel LOGGER = logging.getLogger(__name__) def grpc_message_to_qos_table_data(message: QoSProfile) -> dict: return [{ 'qos_profile_id' : message.qos_profile_id.uuid, 'name' : message.name, 'description' : message.description, 'status' : message.status, 'targetMinUpstreamRate' : grpc_message_to_json(message.targetMinUpstreamRate), 'maxUpstreamRate' : grpc_message_to_json(message.maxUpstreamRate), 'maxUpstreamBurstRate' : grpc_message_to_json(message.maxUpstreamBurstRate), 'targetMinDownstreamRate' : grpc_message_to_json(message.targetMinDownstreamRate), 'maxDownstreamRate' : grpc_message_to_json(message.maxDownstreamRate), 'maxDownstreamBurstRate' : grpc_message_to_json(message.maxDownstreamBurstRate), 'minDuration' : grpc_message_to_json(message.minDuration), 'maxDuration' : grpc_message_to_json(message.maxDuration), 'priority' : message.priority, 'packetDelayBudget' : grpc_message_to_json(message.packetDelayBudget), 'jitter' : grpc_message_to_json(message.jitter), 'packetErrorLossRate' : message.packetErrorLossRate, }] def qos_table_data_to_grpc_message(data: QoSProfileModel) -> QoSProfile: QoSProfile( qos_profile_id = QoSProfileId(uuid=data.qos_profile_id), name = data.name, description = data.description, status = data.status, targetMinUpstreamRate = QoSProfileValueUnitPair(**data.targetMinUpstreamRate), maxUpstreamRate = QoSProfileValueUnitPair(**data.maxUpstreamRate), maxUpstreamBurstRate = QoSProfileValueUnitPair(**data.maxUpstreamBurstRate), targetMinDownstreamRate = QoSProfileValueUnitPair(**data.targetMinDownstreamRate), maxDownstreamRate = QoSProfileValueUnitPair(**data.maxDownstreamRate), maxDownstreamBurstRate = QoSProfileValueUnitPair(**data.maxDownstreamBurstRate), minDuration = QoSProfileValueUnitPair(**data.minDuration), maxDuration = QoSProfileValueUnitPair(**data.maxDuration), priority = data.priority, packetDelayBudget = QoSProfileValueUnitPair(**data.packetDelayBudget), jitter = QoSProfileValueUnitPair(**data.jitter), packetErrorLossRate = data.packetErrorLossRate ) def set_qos_profile(db_engine : Engine, request : QoSProfile) -> QoSProfile: qos_profile_data = grpc_message_to_qos_table_data(request) def callback(session : Session) -> bool: stmt = insert(QoSProfileModel).values(qos_profile_data) session.execute(stmt) return get_qos_profile(db_engine, request.qos_profile_id.uuid) return run_transaction(sessionmaker(bind=db_engine), callback) def delete_qos_profile(db_engine : Engine, request : str) -> Empty: def callback(session : Session) -> bool: num_deleted = session.query(QoSProfileModel).filter_by(qos_profile_id=request).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) return Empty() def get_qos_profile(db_engine : Engine, request : str) -> QoSProfile: def callback(session : Session) -> Optional[QoSProfile]: obj : Optional[QoSProfileModel] = session.query(QoSProfileModel).filter_by(qos_profile_id=request).one_or_none() return None if obj is None else qos_table_data_to_grpc_message(obj) qos_profile = run_transaction(sessionmaker(bind=db_engine), callback) if qos_profile is None: raise NotFoundException('QoSProfile', request) return qos_profile def get_qos_profiles(db_engine : Engine, request : Empty) -> List[QoSProfile]: def callback(session : Session) -> List[QoSProfile]: obj_list : List[QoSProfileModel] = session.query(QoSProfileModel).all() return [qos_table_data_to_grpc_message(obj) for obj in obj_list] return run_transaction(sessionmaker(bind=db_engine), callback)