diff --git a/src/common/Constants.py b/src/common/Constants.py index 767b21343f89e35c2338b522bcdc71c56aca1815..8b0fa80c158a6782fc529d598b179ddd5c9e3926 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -65,6 +65,7 @@ class ServiceNameEnum(Enum): KPIVALUEAPI = 'kpi-value-api' KPIVALUEWRITER = 'kpi-value-writer' TELEMETRYFRONTEND = 'telemetry-frontend' + QOSPROFILE = 'qos-profile' # Used for test and debugging only DLT_GATEWAY = 'dltgateway' @@ -98,6 +99,7 @@ DEFAULT_SERVICE_GRPC_PORTS = { ServiceNameEnum.KPIVALUEAPI .value : 30020, ServiceNameEnum.KPIVALUEWRITER .value : 30030, ServiceNameEnum.TELEMETRYFRONTEND .value : 30050, + ServiceNameEnum.QOSPROFILE .value : 30060, # Used for test and debugging only ServiceNameEnum.DLT_GATEWAY .value : 50051, diff --git a/src/context/client/ContextClient.py b/src/context/client/ContextClient.py index 2776a0d294e9a9ee7b00e46bfd3fbb068133741f..024381c54a8de6ec9c305f8ac6ecd8e584e33983 100644 --- a/src/context/client/ContextClient.py +++ b/src/context/client/ContextClient.py @@ -27,7 +27,7 @@ from common.proto.context_pb2 import ( Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList, - OpticalConfig, OpticalConfigId, OpticalConfigList + OpticalConfig, OpticalConfigId, OpticalConfigList, QoSProfileId, QoSProfile ) from common.proto.context_pb2_grpc import ContextServiceStub from common.proto.context_policy_pb2_grpc import ContextPolicyServiceStub @@ -362,6 +362,41 @@ class ContextClient: LOGGER.debug('GetSliceEvents result: {:s}'.format(grpc_message_to_json_string(response))) return response + @RETRY_DECORATOR + def CreateQoSProfile(self, request : QoSProfile) -> QoSProfile: + LOGGER.debug('CreateQoSProfile request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.CreateQoSProfile(request) + LOGGER.debug('CreateQoSProfile result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def UpdateQoSProfile(self, request : QoSProfile) -> QoSProfile: + LOGGER.debug('UpdateQoSProfile request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.UpdateQoSProfile(request) + LOGGER.debug('UpdateQoSProfile result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def DeleteQoSProfile(self, request : QoSProfileId) -> Empty: + LOGGER.debug('DeleteQoSProfile request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.DeleteQoSProfile(request) + LOGGER.debug('DeleteQoSProfile result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetQoSProfile(self, request : QoSProfileId) -> QoSProfile: + LOGGER.debug('GetQoSProfile request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetQoSProfile(request) + LOGGER.debug('GetQoSProfile result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetQoSProfiles(self, request : Empty) -> Iterator[QoSProfile]: + LOGGER.debug('GetQoSProfiles request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetQoSProfiles(request) + LOGGER.debug('GetQoSProfiles result: {:s}'.format(grpc_message_to_json_string(response))) + return response + @RETRY_DECORATOR def ListConnectionIds(self, request: ServiceId) -> ConnectionIdList: LOGGER.debug('ListConnectionIds request: {:s}'.format(grpc_message_to_json_string(request))) diff --git a/src/context/service/ContextServiceServicerImpl.py b/src/context/service/ContextServiceServicerImpl.py index be32372108e059625801d14c660d18cbe0df677f..ee4bc4de6a80ff07e5f49a80377721ccdfb3ba72 100644 --- a/src/context/service/ContextServiceServicerImpl.py +++ b/src/context/service/ContextServiceServicerImpl.py @@ -24,7 +24,7 @@ from common.proto.context_pb2 import ( Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList, - OpticalConfigList, OpticalConfigId, OpticalConfig + OpticalConfigList, OpticalConfigId, OpticalConfig, QoSProfileId, QoSProfile ) from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer @@ -46,6 +46,7 @@ from .database.Slice import ( from .database.Topology import ( topology_delete, topology_get, topology_get_details, topology_list_ids, topology_list_objs, topology_set) from .database.OpticalConfig import set_opticalconfig, select_opticalconfig, get_opticalconfig +from .database.QoSProfile import set_qos_profile, delete_qos_profile, get_qos_profile, get_qos_profiles LOGGER = logging.getLogger(__name__) @@ -251,6 +252,29 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer for message in consume_events(self.messagebroker, {EventTopicEnum.SLICE}): yield message + # ----- QoSProfile ----------------------------------------------------------------------------------------------- + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def CreateQoSProfile(self, request : QoSProfile, context : grpc.ServicerContext) -> QoSProfile: + return set_qos_profile(self.db_engine, request) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def UpdateQoSProfile(self, request : QoSProfile, context : grpc.ServicerContext) -> QoSProfile: + return set_qos_profile(self.db_engine, request) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def DeleteQoSProfile(self, request : QoSProfileId, context : grpc.ServicerContext) -> Empty: + return delete_qos_profile(self.db_engine, request) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetQoSProfile(self, request : QoSProfileId, context : grpc.ServicerContext) -> QoSProfile: + return get_qos_profile(self.db_engine, request) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetQoSProfiles(self, request : Empty, context : grpc.ServicerContext) -> Iterator[QoSProfile]: + yield from get_qos_profiles(self.db_engine, request) + + # ----- Connection ------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) diff --git a/src/context/service/database/QoSProfile.py b/src/context/service/database/QoSProfile.py new file mode 100644 index 0000000000000000000000000000000000000000..8b50f64def2ebd8235e94fade8f651b55c3d125f --- /dev/null +++ b/src/context/service/database/QoSProfile.py @@ -0,0 +1,97 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.engine import Engine +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy_cockroachdb import run_transaction +from typing import List, Optional + +from common.proto.context_pb2 import Empty, QoSProfileId, QoSProfileValueUnitPair, QoSProfile +from common.method_wrappers.ServiceExceptions import NotFoundException +from common.tools.grpc.Tools import grpc_message_to_json +from .models.QoSProfile import QoSProfileModel + +LOGGER = logging.getLogger(__name__) + +def grpc_message_to_qos_table_data(message: QoSProfile) -> dict: + return [{ + 'qos_profile_id' : message.qos_profile_id.uuid, + 'name' : message.name, + 'description' : message.description, + 'status' : message.status, + 'targetMinUpstreamRate' : grpc_message_to_json(message.targetMinUpstreamRate), + 'maxUpstreamRate' : grpc_message_to_json(message.maxUpstreamRate), + 'maxUpstreamBurstRate' : grpc_message_to_json(message.maxUpstreamBurstRate), + 'targetMinDownstreamRate' : grpc_message_to_json(message.targetMinDownstreamRate), + 'maxDownstreamRate' : grpc_message_to_json(message.maxDownstreamRate), + 'maxDownstreamBurstRate' : grpc_message_to_json(message.maxDownstreamBurstRate), + 'minDuration' : grpc_message_to_json(message.minDuration), + 'maxDuration' : grpc_message_to_json(message.maxDuration), + 'priority' : message.priority, + 'packetDelayBudget' : grpc_message_to_json(message.packetDelayBudget), + 'jitter' : grpc_message_to_json(message.jitter), + 'packetErrorLossRate' : message.packetErrorLossRate, + }] + +def qos_table_data_to_grpc_message(data: QoSProfileModel) -> QoSProfile: + QoSProfile( + qos_profile_id = QoSProfileId(uuid=data.qos_profile_id), + name = data.name, + description = data.description, + status = data.status, + targetMinUpstreamRate = QoSProfileValueUnitPair(**data.targetMinUpstreamRate), + maxUpstreamRate = QoSProfileValueUnitPair(**data.maxUpstreamRate), + maxUpstreamBurstRate = QoSProfileValueUnitPair(**data.maxUpstreamBurstRate), + targetMinDownstreamRate = QoSProfileValueUnitPair(**data.targetMinDownstreamRate), + maxDownstreamRate = QoSProfileValueUnitPair(**data.maxDownstreamRate), + maxDownstreamBurstRate = QoSProfileValueUnitPair(**data.maxDownstreamBurstRate), + minDuration = QoSProfileValueUnitPair(**data.minDuration), + maxDuration = QoSProfileValueUnitPair(**data.maxDuration), + priority = data.priority, + packetDelayBudget = QoSProfileValueUnitPair(**data.packetDelayBudget), + jitter = QoSProfileValueUnitPair(**data.jitter), + packetErrorLossRate = data.packetErrorLossRate + ) + +def set_qos_profile(db_engine : Engine, request : QoSProfile) -> QoSProfile: + qos_profile_data = grpc_message_to_qos_table_data(request) + def callback(session : Session) -> bool: + stmt = insert(QoSProfileModel).values(qos_profile_data) + session.execute(stmt) + return get_qos_profile(db_engine, request.qos_profile_id.uuid) + return run_transaction(sessionmaker(bind=db_engine), callback) + +def delete_qos_profile(db_engine : Engine, request : str) -> Empty: + def callback(session : Session) -> bool: + num_deleted = session.query(QoSProfileModel).filter_by(qos_profile_id=request).delete() + return num_deleted > 0 + deleted = run_transaction(sessionmaker(bind=db_engine), callback) + return Empty() + +def get_qos_profile(db_engine : Engine, request : str) -> QoSProfile: + def callback(session : Session) -> Optional[QoSProfile]: + obj : Optional[QoSProfileModel] = session.query(QoSProfileModel).filter_by(qos_profile_id=request).one_or_none() + return None if obj is None else qos_table_data_to_grpc_message(obj) + qos_profile = run_transaction(sessionmaker(bind=db_engine), callback) + if qos_profile is None: + raise NotFoundException('QoSProfile', request) + return qos_profile + +def get_qos_profiles(db_engine : Engine, request : Empty) -> List[QoSProfile]: + def callback(session : Session) -> List[QoSProfile]: + obj_list : List[QoSProfileModel] = session.query(QoSProfileModel).all() + return [qos_table_data_to_grpc_message(obj) for obj in obj_list] + return run_transaction(sessionmaker(bind=db_engine), callback) diff --git a/src/context/service/database/models/QoSProfile.py b/src/context/service/database/models/QoSProfile.py new file mode 100644 index 0000000000000000000000000000000000000000..0ce7365aa6b8c862604c194a2c6e4d3db29a00f5 --- /dev/null +++ b/src/context/service/database/models/QoSProfile.py @@ -0,0 +1,43 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sqlalchemy import Column, Integer, String, JSON +from sqlalchemy.dialects.postgresql import UUID +from typing import TypedDict +from ._Base import _Base + + +class QoSProfileValueUnitPair(TypedDict): + value: int + unit: str + +class QoSProfileModel(_Base): + __tablename__ = 'qos_profile' + + qos_profile_id = Column(UUID(as_uuid=False), primary_key=True) + name = Column(String, nullable=False) + description = Column(String, nullable=False) + status = Column(String, nullable=False) + targetMinUpstreamRate = Column(JSON, nullable=False) + maxUpstreamRate = Column(JSON, nullable=False) + maxUpstreamBurstRate = Column(JSON, nullable=False) + targetMinDownstreamRate = Column(JSON, nullable=False) + maxDownstreamRate = Column(JSON, nullable=False) + maxDownstreamBurstRate = Column(JSON, nullable=False) + minDuration = Column(JSON, nullable=False) + maxDuration = Column(JSON, nullable=False) + priority = Column(Integer, nullable=False) + packetDelayBudget = Column(JSON, nullable=False) + jitter = Column(JSON, nullable=False) + packetErrorLossRate = Column(Integer, nullable=False)