Skip to content
Snippets Groups Projects
Commit 0c5c6807 authored by Shayan Hajipour's avatar Shayan Hajipour
Browse files

feat: Move QoSProfile OPs form context to qos_profile module.

parent 4c479a1b
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!257Resolve "Create QoSProfile component"
Showing
with 280 additions and 177 deletions
......@@ -159,6 +159,17 @@ kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --t
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with CockroachDB data for QoSProfile"
CRDB_DATABASE_QoSProfile="tfs_qos_profile" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-qos-profile-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \
--from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \
--from-literal=CRDB_DATABASE=${CRDB_DATABASE_QoSProfile} \
--from-literal=CRDB_USERNAME=${CRDB_USERNAME} \
--from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with NATS data"
NATS_CLIENT_PORT=$(kubectl --namespace ${NATS_NAMESPACE} get service ${NATS_NAMESPACE} -o 'jsonpath={.spec.ports[?(@.name=="client")].port}')
if [ -z "$NATS_CLIENT_PORT" ]; then
......
......@@ -37,6 +37,9 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: crdb-qos-profile-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30060"]
......@@ -69,4 +72,4 @@ spec:
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
targetPort: 9192
\ No newline at end of file
......@@ -20,7 +20,8 @@
export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/"
# Set the list of components, separated by spaces, you want to build images for, and deploy.
export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator"
# export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator qos_profile"
export TFS_COMPONENTS="context device pathcomp service nbi qos_profile"
# Uncomment to activate Monitoring (old)
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
......
......@@ -68,12 +68,6 @@ service ContextService {
rpc GetSliceEvents (Empty ) returns (stream SliceEvent ) {}
rpc SelectSlice (SliceFilter ) returns ( SliceList ) {}
rpc CreateQoSProfile (QoSProfile ) returns ( QoSProfile ) {}
rpc UpdateQoSProfile (QoSProfile ) returns ( QoSProfile ) {}
rpc DeleteQoSProfile (QoSProfileId ) returns ( Empty ) {}
rpc GetQoSProfile (QoSProfileId ) returns ( QoSProfile ) {}
rpc GetQoSProfiles (Empty ) returns (stream QoSProfile ) {}
rpc ListConnectionIds (ServiceId ) returns ( ConnectionIdList) {}
rpc ListConnections (ServiceId ) returns ( ConnectionList ) {}
rpc GetConnection (ConnectionId ) returns ( Connection ) {}
......@@ -408,43 +402,6 @@ message SliceEvent {
SliceId slice_id = 2;
}
// ----- QoSProfile ----------------------------------------------------------------------------------------------------
message QoSProfileId {
Uuid qos_profile_id = 1;
}
message QoSProfileValueUnitPair {
int32 value = 1;
string unit = 2;
}
message QoDConstraintsRequest {
QoSProfileId qos_profile_id = 1;
double start_timestamp = 2;
float duration = 3;
}
message QoSProfile {
QoSProfileId qos_profile_id = 1;
string name = 2;
string description = 3;
string status = 4;
QoSProfileValueUnitPair targetMinUpstreamRate = 5;
QoSProfileValueUnitPair maxUpstreamRate = 6;
QoSProfileValueUnitPair maxUpstreamBurstRate = 7;
QoSProfileValueUnitPair targetMinDownstreamRate = 8;
QoSProfileValueUnitPair maxDownstreamRate = 9;
QoSProfileValueUnitPair maxDownstreamBurstRate = 10;
QoSProfileValueUnitPair minDuration = 11;
QoSProfileValueUnitPair maxDuration = 12;
int32 priority = 13;
QoSProfileValueUnitPair packetDelayBudget = 14;
QoSProfileValueUnitPair jitter = 15;
int32 packetErrorLossRate = 16;
}
// ----- Connection ----------------------------------------------------------------------------------------------------
message ConnectionId {
Uuid connection_uuid = 1;
......@@ -640,20 +597,14 @@ message Constraint_Exclusions {
repeated LinkId link_ids = 4;
}
message Constraint_QoSProfile {
QoSProfileValueUnitPair target_min_upstream_rate = 1;
QoSProfileValueUnitPair max_upstream_rate = 2;
QoSProfileValueUnitPair max_upstream_burst_rate = 3;
QoSProfileValueUnitPair target_min_downstream_rate = 4;
QoSProfileValueUnitPair max_downstream_rate = 5;
QoSProfileValueUnitPair max_downstream_burst_rate = 6;
QoSProfileValueUnitPair min_duration = 7;
QoSProfileValueUnitPair max_duration = 8;
int32 priority = 9;
QoSProfileValueUnitPair packet_delay_budget = 10;
QoSProfileValueUnitPair jitter = 11;
int32 packet_error_loss_rate = 12;
message QoSProfileId {
context.Uuid qos_profile_id = 1;
}
message Constraint_QoSProfile {
QoSProfileId qos_profile_id = 1;
string qos_profile_name = 2;
}
message Constraint {
......
......@@ -17,11 +17,47 @@ package qos_profile;
import "context.proto";
message QoSProfileId {
context.Uuid qos_profile_id = 1;
}
message QoSProfileValueUnitPair {
int32 value = 1;
string unit = 2;
}
message QoDConstraintsRequest {
QoSProfileId qos_profile_id = 1;
double start_timestamp = 2;
float duration = 3;
}
message QoSProfile {
QoSProfileId qos_profile_id = 1;
string name = 2;
string description = 3;
string status = 4;
QoSProfileValueUnitPair targetMinUpstreamRate = 5;
QoSProfileValueUnitPair maxUpstreamRate = 6;
QoSProfileValueUnitPair maxUpstreamBurstRate = 7;
QoSProfileValueUnitPair targetMinDownstreamRate = 8;
QoSProfileValueUnitPair maxDownstreamRate = 9;
QoSProfileValueUnitPair maxDownstreamBurstRate = 10;
QoSProfileValueUnitPair minDuration = 11;
QoSProfileValueUnitPair maxDuration = 12;
int32 priority = 13;
QoSProfileValueUnitPair packetDelayBudget = 14;
QoSProfileValueUnitPair jitter = 15;
int32 packetErrorLossRate = 16;
}
service QoSProfileService {
rpc CreateQoSProfile (context.QoSProfile ) returns (context.QoSProfile ) {}
rpc UpdateQoSProfile (context.QoSProfile ) returns (context.QoSProfile ) {}
rpc DeleteQoSProfile (context.QoSProfileId ) returns (context.Empty ) {}
rpc GetQoSProfile (context.QoSProfileId ) returns (context.QoSProfile ) {}
rpc GetQoSProfiles (context.Empty ) returns (stream context.QoSProfile ) {}
rpc GetConstraintListFromQoSProfile (context.QoDConstraintsRequest) returns (stream context.Constraint ) {}
rpc CreateQoSProfile (QoSProfile ) returns (QoSProfile ) {}
rpc UpdateQoSProfile (QoSProfile ) returns (QoSProfile ) {}
rpc DeleteQoSProfile (QoSProfileId ) returns (context.Empty ) {}
rpc GetQoSProfile (QoSProfileId ) returns (QoSProfile ) {}
rpc GetQoSProfiles (context.Empty ) returns (stream QoSProfile ) {}
rpc GetConstraintListFromQoSProfile (QoDConstraintsRequest) returns (stream context.Constraint ) {}
}
......@@ -27,7 +27,7 @@ from common.proto.context_pb2 import (
Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList,
Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
OpticalConfig, OpticalConfigId, OpticalConfigList, QoSProfileId, QoSProfile
OpticalConfig, OpticalConfigId, OpticalConfigList
)
from common.proto.context_pb2_grpc import ContextServiceStub
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceStub
......@@ -362,41 +362,6 @@ class ContextClient:
LOGGER.debug('GetSliceEvents result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def CreateQoSProfile(self, request : QoSProfile) -> QoSProfile:
LOGGER.debug('CreateQoSProfile request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.CreateQoSProfile(request)
LOGGER.debug('CreateQoSProfile result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def UpdateQoSProfile(self, request : QoSProfile) -> QoSProfile:
LOGGER.debug('UpdateQoSProfile request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.UpdateQoSProfile(request)
LOGGER.debug('UpdateQoSProfile result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def DeleteQoSProfile(self, request : QoSProfileId) -> Empty:
LOGGER.debug('DeleteQoSProfile request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.DeleteQoSProfile(request)
LOGGER.debug('DeleteQoSProfile result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetQoSProfile(self, request : QoSProfileId) -> QoSProfile:
LOGGER.debug('GetQoSProfile request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetQoSProfile(request)
LOGGER.debug('GetQoSProfile result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetQoSProfiles(self, request : Empty) -> Iterator[QoSProfile]:
LOGGER.debug('GetQoSProfiles request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetQoSProfiles(request)
LOGGER.debug('GetQoSProfiles result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def ListConnectionIds(self, request: ServiceId) -> ConnectionIdList:
LOGGER.debug('ListConnectionIds request: {:s}'.format(grpc_message_to_json_string(request)))
......
......@@ -24,7 +24,7 @@ from common.proto.context_pb2 import (
Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList,
Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
OpticalConfigList, OpticalConfigId, OpticalConfig, QoSProfileId, QoSProfile
OpticalConfigList, OpticalConfigId, OpticalConfig
)
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
from common.proto.context_pb2_grpc import ContextServiceServicer
......@@ -46,7 +46,6 @@ from .database.Slice import (
from .database.Topology import (
topology_delete, topology_get, topology_get_details, topology_list_ids, topology_list_objs, topology_set)
from .database.OpticalConfig import set_opticalconfig, select_opticalconfig, get_opticalconfig
from .database.QoSProfile import set_qos_profile, delete_qos_profile, get_qos_profile, get_qos_profiles
LOGGER = logging.getLogger(__name__)
......@@ -252,34 +251,6 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer
for message in consume_events(self.messagebroker, {EventTopicEnum.SLICE}): yield message
# ----- QoSProfile -----------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def CreateQoSProfile(self, request : QoSProfile, context : grpc.ServicerContext) -> QoSProfile:
return set_qos_profile(self.db_engine, request)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def UpdateQoSProfile(self, request : QoSProfile, context : grpc.ServicerContext) -> QoSProfile:
return set_qos_profile(self.db_engine, request)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def DeleteQoSProfile(self, request : QoSProfileId, context : grpc.ServicerContext) -> Empty:
return delete_qos_profile(self.db_engine, request.qos_profile_id.uuid)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetQoSProfile(self, request : QoSProfileId, context : grpc.ServicerContext) -> QoSProfile:
qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid)
if qos_profile is None:
context.set_details(f'QoSProfile {request.qos_profile_id.uuid} not found')
context.set_code(grpc.StatusCode.NOT_FOUND)
return QoSProfile()
return qos_profile
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetQoSProfiles(self, request : Empty, context : grpc.ServicerContext) -> Iterator[QoSProfile]:
yield from get_qos_profiles(self.db_engine, request)
# ----- Connection -------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
......
......@@ -56,7 +56,7 @@ RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/qos_profile
WORKDIR /var/teraflow/qos_profile
COPY src/service/requirements.in requirements.in
COPY src/qos_profile/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
......
......@@ -16,7 +16,8 @@ from typing import Iterator
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, QoSProfileId, QoSProfile, QoDConstraintsRequest, Constraint
from common.proto.context_pb2 import Empty
from common.proto.qos_profile_pb2 import QoSProfile, QoSProfileId, QoDConstraintsRequest
from common.proto.qos_profile_pb2_grpc import QoSProfileServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
......
......@@ -13,3 +13,8 @@
# limitations under the License.
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
\ No newline at end of file
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.proto.qos_profile_pb2_grpc import add_QoSProfileServiceServicer_to_server
......@@ -19,10 +20,10 @@ from common.tools.service.GenericGrpcService import GenericGrpcService
from .QoSProfileServiceServicerImpl import QoSProfileServiceServicerImpl
class QoSProfileService(GenericGrpcService):
def __init__(self, cls_name: str = __name__) -> None:
def __init__(self, db_engine: sqlalchemy.engine.Engine, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.QOSPROFILE)
super().__init__(port, cls_name=cls_name)
self.qos_profile_servicer = QoSProfileServiceServicerImpl()
self.qos_profile_servicer = QoSProfileServiceServicerImpl(db_engine)
def install_servicers(self):
add_QoSProfileServiceServicer_to_server(self.qos_profile_servicer, self.server)
......@@ -12,14 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
import grpc, logging, sqlalchemy
from typing import Iterator
import grpc._channel
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import QoDConstraintsRequest, Constraint, ConstraintActionEnum, Constraint_QoSProfile, Constraint_Schedule, Empty, QoSProfileId, QoSProfile
from common.proto.context_pb2 import Constraint, ConstraintActionEnum, Constraint_QoSProfile, Constraint_Schedule, Empty
from common.proto.qos_profile_pb2 import QoSProfile, QoSProfileId, QoDConstraintsRequest
from common.proto.qos_profile_pb2_grpc import QoSProfileServiceServicer
from context.client.ContextClient import ContextClient
from .database.QoSProfile import set_qos_profile, delete_qos_profile, get_qos_profile, get_qos_profiles
LOGGER = logging.getLogger(__name__)
......@@ -27,65 +29,49 @@ LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('QoSProfile', 'RPC')
class QoSProfileServiceServicerImpl(QoSProfileServiceServicer):
def __init__(self ) -> None:
def __init__(self, db_engine: sqlalchemy.engine.Engine) -> None:
LOGGER.debug('Servicer Created')
self.db_engine = db_engine
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def CreateQoSProfile(self, request: QoSProfile, context: grpc.ServicerContext) -> QoSProfile:
context_client = ContextClient()
try:
qos_profile_get = context_client.GetQoSProfile(request.qos_profile_id)
qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid)
if qos_profile is not None:
context.set_details(f'QoSProfile {request.qos_profile_id.qos_profile_id.uuid} already exists')
context.set_code(grpc.StatusCode.ALREADY_EXISTS)
return QoSProfile()
except grpc._channel._InactiveRpcError as exc:
if exc.code() != grpc.StatusCode.NOT_FOUND:
raise exc
qos_profile = context_client.CreateQoSProfile(request)
return qos_profile
return set_qos_profile(self.db_engine, request)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def UpdateQoSProfile(self, request: QoSProfile, context: grpc.ServicerContext) -> QoSProfile:
context_client = ContextClient()
try:
_ = context_client.GetQoSProfile(request.qos_profile_id)
except grpc._channel._InactiveRpcError as exc:
if exc.code() == grpc.StatusCode.NOT_FOUND:
context.set_details(f'QoSProfile {request.qos_profile_id.qos_profile_id.uuid} not found')
context.set_code(grpc.StatusCode.NOT_FOUND)
return QoSProfile()
qos_profile = context_client.UpdateQoSProfile(request)
return qos_profile
qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid)
if qos_profile is None:
context.set_details(f'QoSProfile {request.qos_profile_id.qos_profile_id.uuid} not found')
context.set_code(grpc.StatusCode.NOT_FOUND)
return QoSProfile()
return set_qos_profile(self.db_engine, request)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def DeleteQoSProfile(self, request: QoSProfileId, context: grpc.ServicerContext) -> Empty:
context_client = ContextClient()
try:
_ = context_client.GetQoSProfile(request)
except grpc._channel._InactiveRpcError as exc:
if exc.code() == grpc.StatusCode.NOT_FOUND:
context.set_details(f'QoSProfile {request.qos_profile_id.uuid} not found')
context.set_code(grpc.StatusCode.NOT_FOUND)
return QoSProfile()
empty = context_client.DeleteQoSProfile(request)
return empty
qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid)
if qos_profile is None:
context.set_details(f'QoSProfile {request.qos_profile_id.qos_profile_id.uuid} not found')
context.set_code(grpc.StatusCode.NOT_FOUND)
return QoSProfile()
return delete_qos_profile(self.db_engine, request.qos_profile_id.uuid)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetQoSProfile(self, request: QoSProfileId, context: grpc.ServicerContext) -> QoSProfile:
context_client = ContextClient()
try:
qos_profile = context_client.GetQoSProfile(request)
except grpc._channel._InactiveRpcError as exc:
if exc.code() == grpc.StatusCode.NOT_FOUND:
context.set_details(f'QoSProfile {request.qos_profile_id.uuid} not found')
context.set_code(grpc.StatusCode.NOT_FOUND)
return QoSProfile()
qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid)
if qos_profile is None:
context.set_details(f'QoSProfile {request.qos_profile_id.uuid} not found')
context.set_code(grpc.StatusCode.NOT_FOUND)
return QoSProfile()
return qos_profile
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetQoSProfiles(self, request: Empty, context: grpc.ServicerContext) -> Iterator[QoSProfile]:
context_client = ContextClient()
yield from context_client.GetQoSProfiles(request)
yield from get_qos_profiles(self.db_engine, request)
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
......
......@@ -19,6 +19,7 @@ from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables
)
from .database.Engine import Engine
from .QoSProfileService import QoSProfileService
terminate = threading.Event()
......@@ -49,8 +50,20 @@ def main():
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Get Database Engine instance and initialize database, if needed
LOGGER.info('Getting SQLAlchemy DB Engine...')
db_engine = Engine.get_engine()
if db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return -1
try:
Engine.create_database(db_engine)
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url)))
# Starting service service
grpc_service = QoSProfileService()
grpc_service = QoSProfileService(db_engine)
grpc_service.start()
# Wait for Ctrl+C or termination signal
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy, sqlalchemy_utils
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
APP_NAME = 'tfs'
ECHO = False # true: dump SQL commands and transactions executed
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@qos-profileservice.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class Engine:
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = get_setting('CRDB_DATABASE')
CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
engine = sqlalchemy.create_engine(
crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None
return engine
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.create_database(engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -13,13 +13,14 @@
# limitations under the License.
from sqlalchemy import Column, Integer, String, JSON
from sqlalchemy.dialects.postgresql import UUID
from ._Base import _Base
class QoSProfileModel(_Base):
__tablename__ = 'qos_profile'
qos_profile_id = Column(String, primary_key=True)
qos_profile_id = Column(UUID(as_uuid=False), primary_key=True)
name = Column(String, nullable=False)
description = Column(String, nullable=False)
status = Column(String, nullable=False)
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy
from typing import Any, List
from sqlalchemy.orm import Session, sessionmaker, declarative_base
from sqlalchemy.sql import text
from sqlalchemy_cockroachdb import run_transaction
_Base = declarative_base()
def create_performance_enhancers(db_engine : sqlalchemy.engine.Engine) -> None:
def index_storing(
index_name : str, table_name : str, index_fields : List[str], storing_fields : List[str]
) -> Any:
str_index_fields = ','.join(['"{:s}"'.format(index_field) for index_field in index_fields])
str_storing_fields = ','.join(['"{:s}"'.format(storing_field) for storing_field in storing_fields])
INDEX_STORING = 'CREATE INDEX IF NOT EXISTS {:s} ON "{:s}" ({:s}) STORING ({:s});'
return text(INDEX_STORING.format(index_name, table_name, str_index_fields, str_storing_fields))
statements = [
index_storing('device_configrule_device_uuid_rec_idx', 'device_configrule', ['device_uuid'], [
'position', 'kind', 'action', 'data', 'created_at', 'updated_at'
]),
index_storing('service_configrule_service_uuid_rec_idx', 'service_configrule', ['service_uuid'], [
'position', 'kind', 'action', 'data', 'created_at', 'updated_at'
]),
index_storing('slice_configrule_slice_uuid_rec_idx', 'slice_configrule', ['slice_uuid'], [
'position', 'kind', 'action', 'data', 'created_at', 'updated_at'
]),
index_storing('connection_service_uuid_rec_idx', 'connection', ['service_uuid'], [
'settings', 'created_at', 'updated_at'
]),
index_storing('service_constraint_service_uuid_rec_idx', 'service_constraint', ['service_uuid'], [
'position', 'kind', 'data', 'created_at', 'updated_at'
]),
index_storing('slice_constraint_slice_uuid_rec_idx', 'slice_constraint', ['slice_uuid'], [
'position', 'kind', 'data', 'created_at', 'updated_at'
]),
index_storing('endpoint_device_uuid_rec_idx', 'endpoint', ['device_uuid'], [
'topology_uuid', 'name', 'endpoint_type', 'kpi_sample_types', 'created_at', 'updated_at'
]),
index_storing('qos_profile_context_uuid_rec_idx', 'qos_profile', ['context_uuid'], [
'service_name', 'service_type', 'service_status', 'created_at', 'updated_at'
]),
index_storing('slice_context_uuid_rec_idx', 'slice', ['context_uuid'], [
'slice_name', 'slice_status', 'slice_owner_uuid', 'slice_owner_string', 'created_at', 'updated_at'
]),
index_storing('topology_context_uuid_rec_idx', 'topology', ['context_uuid'], [
'topology_name', 'created_at', 'updated_at'
]),
index_storing('device_component_idx', 'device_component', ['device_uuid'], [
'name', 'type', 'attributes', 'created_at', 'updated_at'
]),
]
def callback(session : Session) -> bool:
for stmt in statements: session.execute(stmt)
run_transaction(sessionmaker(bind=db_engine), callback)
def rebuild_database(db_engine : sqlalchemy.engine.Engine, drop_if_exists : bool = False):
if drop_if_exists: _Base.metadata.drop_all(db_engine)
_Base.metadata.create_all(db_engine)
# create_performance_enhancers(db_engine)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -15,6 +15,8 @@
import pytest
from qos_profile.client.QoSProfileClient import QoSProfileClient
from common.proto.context_pb2 import Uuid, QoSProfileValueUnitPair, QoSProfileId, QoSProfile
from common.proto.context_pb2 import Uuid
from common.proto.qos_profile_pb2 import QoSProfileValueUnitPair, QoSProfileId, QoSProfile
@pytest.fixture(scope='function')
def qos_profile_client():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment