diff --git a/deploy/tfs.sh b/deploy/tfs.sh index 62f36a2c138c99b1ee666c8c5397083266ad699d..f720674559081b523e8774ae38db52cd87955750 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -159,6 +159,17 @@ kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --t --from-literal=CRDB_SSLMODE=require printf "\n" +echo "Create secret with CockroachDB data for QoSProfile" +CRDB_DATABASE_QoSProfile="tfs_qos_profile" # TODO: change by specific configurable environment variable +kubectl create secret generic crdb-qos-profile-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ + --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ + --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ + --from-literal=CRDB_DATABASE=${CRDB_DATABASE_QoSProfile} \ + --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ + --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ + --from-literal=CRDB_SSLMODE=require +printf "\n" + echo "Create secret with NATS data" NATS_CLIENT_PORT=$(kubectl --namespace ${NATS_NAMESPACE} get service ${NATS_NAMESPACE} -o 'jsonpath={.spec.ports[?(@.name=="client")].port}') if [ -z "$NATS_CLIENT_PORT" ]; then diff --git a/manifests/qos_profileservice.yaml b/manifests/qos_profileservice.yaml index 1bcaa500fd204cef1bdcde0db35306113f9cacd1..0a27787f3477a23acd4eea1087fa14b0e6bd9fb4 100644 --- a/manifests/qos_profileservice.yaml +++ b/manifests/qos_profileservice.yaml @@ -37,6 +37,9 @@ spec: env: - name: LOG_LEVEL value: "INFO" + envFrom: + - secretRef: + name: crdb-qos-profile-data readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30060"] @@ -69,4 +72,4 @@ spec: - name: metrics protocol: TCP port: 9192 - targetPort: 9192 + targetPort: 9192 \ No newline at end of file diff --git a/my_deploy.sh b/my_deploy.sh index b89df7481ebd17edf2b966eb818598d1a04a596f..922e72ba6a2769c644b5b7ab4d68046fc54001c5 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -20,7 +20,8 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. -export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator" +# export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator qos_profile" +export TFS_COMPONENTS="context device pathcomp service nbi qos_profile" # Uncomment to activate Monitoring (old) #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" diff --git a/proto/context.proto b/proto/context.proto index 7f99c4525d8ce2e7c3d4e3b4b2f82fed6d861e96..65a2344c64618524257665b838e0b3ba69bb1979 100644 --- a/proto/context.proto +++ b/proto/context.proto @@ -68,12 +68,6 @@ service ContextService { rpc GetSliceEvents (Empty ) returns (stream SliceEvent ) {} rpc SelectSlice (SliceFilter ) returns ( SliceList ) {} - rpc CreateQoSProfile (QoSProfile ) returns ( QoSProfile ) {} - rpc UpdateQoSProfile (QoSProfile ) returns ( QoSProfile ) {} - rpc DeleteQoSProfile (QoSProfileId ) returns ( Empty ) {} - rpc GetQoSProfile (QoSProfileId ) returns ( QoSProfile ) {} - rpc GetQoSProfiles (Empty ) returns (stream QoSProfile ) {} - rpc ListConnectionIds (ServiceId ) returns ( ConnectionIdList) {} rpc ListConnections (ServiceId ) returns ( ConnectionList ) {} rpc GetConnection (ConnectionId ) returns ( Connection ) {} @@ -408,43 +402,6 @@ message SliceEvent { SliceId slice_id = 2; } - -// ----- QoSProfile ---------------------------------------------------------------------------------------------------- -message QoSProfileId { - Uuid qos_profile_id = 1; -} - -message QoSProfileValueUnitPair { - int32 value = 1; - string unit = 2; -} - -message QoDConstraintsRequest { - QoSProfileId qos_profile_id = 1; - double start_timestamp = 2; - float duration = 3; -} - -message QoSProfile { - QoSProfileId qos_profile_id = 1; - string name = 2; - string description = 3; - string status = 4; - QoSProfileValueUnitPair targetMinUpstreamRate = 5; - QoSProfileValueUnitPair maxUpstreamRate = 6; - QoSProfileValueUnitPair maxUpstreamBurstRate = 7; - QoSProfileValueUnitPair targetMinDownstreamRate = 8; - QoSProfileValueUnitPair maxDownstreamRate = 9; - QoSProfileValueUnitPair maxDownstreamBurstRate = 10; - QoSProfileValueUnitPair minDuration = 11; - QoSProfileValueUnitPair maxDuration = 12; - int32 priority = 13; - QoSProfileValueUnitPair packetDelayBudget = 14; - QoSProfileValueUnitPair jitter = 15; - int32 packetErrorLossRate = 16; -} - - // ----- Connection ---------------------------------------------------------------------------------------------------- message ConnectionId { Uuid connection_uuid = 1; @@ -640,20 +597,14 @@ message Constraint_Exclusions { repeated LinkId link_ids = 4; } -message Constraint_QoSProfile { - QoSProfileValueUnitPair target_min_upstream_rate = 1; - QoSProfileValueUnitPair max_upstream_rate = 2; - QoSProfileValueUnitPair max_upstream_burst_rate = 3; - QoSProfileValueUnitPair target_min_downstream_rate = 4; - QoSProfileValueUnitPair max_downstream_rate = 5; - QoSProfileValueUnitPair max_downstream_burst_rate = 6; - QoSProfileValueUnitPair min_duration = 7; - QoSProfileValueUnitPair max_duration = 8; - int32 priority = 9; - QoSProfileValueUnitPair packet_delay_budget = 10; - QoSProfileValueUnitPair jitter = 11; - int32 packet_error_loss_rate = 12; +message QoSProfileId { + context.Uuid qos_profile_id = 1; +} + +message Constraint_QoSProfile { + QoSProfileId qos_profile_id = 1; + string qos_profile_name = 2; } message Constraint { diff --git a/proto/qos_profile.proto b/proto/qos_profile.proto index 557cef5391077043e3367054768ce87ef5f32d97..1237314e17f4cb1b530896fe4de171122d20f01f 100644 --- a/proto/qos_profile.proto +++ b/proto/qos_profile.proto @@ -17,11 +17,47 @@ package qos_profile; import "context.proto"; + +message QoSProfileId { + context.Uuid qos_profile_id = 1; +} + +message QoSProfileValueUnitPair { + int32 value = 1; + string unit = 2; +} + +message QoDConstraintsRequest { + QoSProfileId qos_profile_id = 1; + double start_timestamp = 2; + float duration = 3; +} + +message QoSProfile { + QoSProfileId qos_profile_id = 1; + string name = 2; + string description = 3; + string status = 4; + QoSProfileValueUnitPair targetMinUpstreamRate = 5; + QoSProfileValueUnitPair maxUpstreamRate = 6; + QoSProfileValueUnitPair maxUpstreamBurstRate = 7; + QoSProfileValueUnitPair targetMinDownstreamRate = 8; + QoSProfileValueUnitPair maxDownstreamRate = 9; + QoSProfileValueUnitPair maxDownstreamBurstRate = 10; + QoSProfileValueUnitPair minDuration = 11; + QoSProfileValueUnitPair maxDuration = 12; + int32 priority = 13; + QoSProfileValueUnitPair packetDelayBudget = 14; + QoSProfileValueUnitPair jitter = 15; + int32 packetErrorLossRate = 16; +} + + service QoSProfileService { - rpc CreateQoSProfile (context.QoSProfile ) returns (context.QoSProfile ) {} - rpc UpdateQoSProfile (context.QoSProfile ) returns (context.QoSProfile ) {} - rpc DeleteQoSProfile (context.QoSProfileId ) returns (context.Empty ) {} - rpc GetQoSProfile (context.QoSProfileId ) returns (context.QoSProfile ) {} - rpc GetQoSProfiles (context.Empty ) returns (stream context.QoSProfile ) {} - rpc GetConstraintListFromQoSProfile (context.QoDConstraintsRequest) returns (stream context.Constraint ) {} + rpc CreateQoSProfile (QoSProfile ) returns (QoSProfile ) {} + rpc UpdateQoSProfile (QoSProfile ) returns (QoSProfile ) {} + rpc DeleteQoSProfile (QoSProfileId ) returns (context.Empty ) {} + rpc GetQoSProfile (QoSProfileId ) returns (QoSProfile ) {} + rpc GetQoSProfiles (context.Empty ) returns (stream QoSProfile ) {} + rpc GetConstraintListFromQoSProfile (QoDConstraintsRequest) returns (stream context.Constraint ) {} } diff --git a/src/context/client/ContextClient.py b/src/context/client/ContextClient.py index 024381c54a8de6ec9c305f8ac6ecd8e584e33983..2776a0d294e9a9ee7b00e46bfd3fbb068133741f 100644 --- a/src/context/client/ContextClient.py +++ b/src/context/client/ContextClient.py @@ -27,7 +27,7 @@ from common.proto.context_pb2 import ( Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList, - OpticalConfig, OpticalConfigId, OpticalConfigList, QoSProfileId, QoSProfile + OpticalConfig, OpticalConfigId, OpticalConfigList ) from common.proto.context_pb2_grpc import ContextServiceStub from common.proto.context_policy_pb2_grpc import ContextPolicyServiceStub @@ -362,41 +362,6 @@ class ContextClient: LOGGER.debug('GetSliceEvents result: {:s}'.format(grpc_message_to_json_string(response))) return response - @RETRY_DECORATOR - def CreateQoSProfile(self, request : QoSProfile) -> QoSProfile: - LOGGER.debug('CreateQoSProfile request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.CreateQoSProfile(request) - LOGGER.debug('CreateQoSProfile result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - def UpdateQoSProfile(self, request : QoSProfile) -> QoSProfile: - LOGGER.debug('UpdateQoSProfile request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.UpdateQoSProfile(request) - LOGGER.debug('UpdateQoSProfile result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - def DeleteQoSProfile(self, request : QoSProfileId) -> Empty: - LOGGER.debug('DeleteQoSProfile request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.DeleteQoSProfile(request) - LOGGER.debug('DeleteQoSProfile result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - def GetQoSProfile(self, request : QoSProfileId) -> QoSProfile: - LOGGER.debug('GetQoSProfile request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetQoSProfile(request) - LOGGER.debug('GetQoSProfile result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - def GetQoSProfiles(self, request : Empty) -> Iterator[QoSProfile]: - LOGGER.debug('GetQoSProfiles request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetQoSProfiles(request) - LOGGER.debug('GetQoSProfiles result: {:s}'.format(grpc_message_to_json_string(response))) - return response - @RETRY_DECORATOR def ListConnectionIds(self, request: ServiceId) -> ConnectionIdList: LOGGER.debug('ListConnectionIds request: {:s}'.format(grpc_message_to_json_string(request))) diff --git a/src/context/service/ContextServiceServicerImpl.py b/src/context/service/ContextServiceServicerImpl.py index e6c305f2f68b2638047a3f945da26929a0d1c48e..be32372108e059625801d14c660d18cbe0df677f 100644 --- a/src/context/service/ContextServiceServicerImpl.py +++ b/src/context/service/ContextServiceServicerImpl.py @@ -24,7 +24,7 @@ from common.proto.context_pb2 import ( Service, ServiceEvent, ServiceFilter, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, SliceFilter, SliceId, SliceIdList, SliceList, Topology, TopologyDetails, TopologyEvent, TopologyId, TopologyIdList, TopologyList, - OpticalConfigList, OpticalConfigId, OpticalConfig, QoSProfileId, QoSProfile + OpticalConfigList, OpticalConfigId, OpticalConfig ) from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer @@ -46,7 +46,6 @@ from .database.Slice import ( from .database.Topology import ( topology_delete, topology_get, topology_get_details, topology_list_ids, topology_list_objs, topology_set) from .database.OpticalConfig import set_opticalconfig, select_opticalconfig, get_opticalconfig -from .database.QoSProfile import set_qos_profile, delete_qos_profile, get_qos_profile, get_qos_profiles LOGGER = logging.getLogger(__name__) @@ -252,34 +251,6 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer for message in consume_events(self.messagebroker, {EventTopicEnum.SLICE}): yield message - # ----- QoSProfile ----------------------------------------------------------------------------------------------- - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def CreateQoSProfile(self, request : QoSProfile, context : grpc.ServicerContext) -> QoSProfile: - return set_qos_profile(self.db_engine, request) - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def UpdateQoSProfile(self, request : QoSProfile, context : grpc.ServicerContext) -> QoSProfile: - return set_qos_profile(self.db_engine, request) - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def DeleteQoSProfile(self, request : QoSProfileId, context : grpc.ServicerContext) -> Empty: - return delete_qos_profile(self.db_engine, request.qos_profile_id.uuid) - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def GetQoSProfile(self, request : QoSProfileId, context : grpc.ServicerContext) -> QoSProfile: - qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid) - if qos_profile is None: - context.set_details(f'QoSProfile {request.qos_profile_id.uuid} not found') - context.set_code(grpc.StatusCode.NOT_FOUND) - return QoSProfile() - return qos_profile - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def GetQoSProfiles(self, request : Empty, context : grpc.ServicerContext) -> Iterator[QoSProfile]: - yield from get_qos_profiles(self.db_engine, request) - - # ----- Connection ------------------------------------------------------------------------------------------------- @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) diff --git a/src/qos_profile/Dockerfile b/src/qos_profile/Dockerfile index a35c2e741c8db372421bc8525f50e77f47e28be3..d8576ac49d7f92e7dbb23c97f4a1e1a4597f6651 100644 --- a/src/qos_profile/Dockerfile +++ b/src/qos_profile/Dockerfile @@ -56,7 +56,7 @@ RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/qos_profile WORKDIR /var/teraflow/qos_profile -COPY src/service/requirements.in requirements.in +COPY src/qos_profile/requirements.in requirements.in RUN pip-compile --quiet --output-file=requirements.txt requirements.in RUN python3 -m pip install -r requirements.txt diff --git a/src/qos_profile/client/QoSProfileClient.py b/src/qos_profile/client/QoSProfileClient.py index e7bec8739ca1b374d79694c14b3c6fa511d7079a..d243323a13b017d37c9509844db7ecfd779df889 100644 --- a/src/qos_profile/client/QoSProfileClient.py +++ b/src/qos_profile/client/QoSProfileClient.py @@ -16,7 +16,8 @@ from typing import Iterator import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc -from common.proto.context_pb2 import Empty, QoSProfileId, QoSProfile, QoDConstraintsRequest, Constraint +from common.proto.context_pb2 import Empty +from common.proto.qos_profile_pb2 import QoSProfile, QoSProfileId, QoDConstraintsRequest from common.proto.qos_profile_pb2_grpc import QoSProfileServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string diff --git a/src/qos_profile/requirements.in b/src/qos_profile/requirements.in index 5cf553eaaec41de7599b6723e31e4ca3f82cbcae..9ea7059c4508029762700a2d523b2d25844d7a41 100644 --- a/src/qos_profile/requirements.in +++ b/src/qos_profile/requirements.in @@ -13,3 +13,8 @@ # limitations under the License. + +psycopg2-binary==2.9.* +SQLAlchemy==1.4.* +sqlalchemy-cockroachdb==1.4.* +SQLAlchemy-Utils==0.38.* \ No newline at end of file diff --git a/src/qos_profile/service/QoSProfileService.py b/src/qos_profile/service/QoSProfileService.py index bdc90f5bc0a12f5a1afbe59cf651439ee1f3219a..ce5c5591b498787240c5390bbe5575822bc9da91 100644 --- a/src/qos_profile/service/QoSProfileService.py +++ b/src/qos_profile/service/QoSProfileService.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sqlalchemy from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from common.proto.qos_profile_pb2_grpc import add_QoSProfileServiceServicer_to_server @@ -19,10 +20,10 @@ from common.tools.service.GenericGrpcService import GenericGrpcService from .QoSProfileServiceServicerImpl import QoSProfileServiceServicerImpl class QoSProfileService(GenericGrpcService): - def __init__(self, cls_name: str = __name__) -> None: + def __init__(self, db_engine: sqlalchemy.engine.Engine, cls_name: str = __name__) -> None: port = get_service_port_grpc(ServiceNameEnum.QOSPROFILE) super().__init__(port, cls_name=cls_name) - self.qos_profile_servicer = QoSProfileServiceServicerImpl() + self.qos_profile_servicer = QoSProfileServiceServicerImpl(db_engine) def install_servicers(self): add_QoSProfileServiceServicer_to_server(self.qos_profile_servicer, self.server) diff --git a/src/qos_profile/service/QoSProfileServiceServicerImpl.py b/src/qos_profile/service/QoSProfileServiceServicerImpl.py index bdcc3e8c3c17544b1c8e05ecbf64d6107e208b2b..860ee2ee3269d31e4682ceb175b0eac4e0af7730 100644 --- a/src/qos_profile/service/QoSProfileServiceServicerImpl.py +++ b/src/qos_profile/service/QoSProfileServiceServicerImpl.py @@ -12,14 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import grpc, logging, sqlalchemy from typing import Iterator import grpc._channel from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from common.proto.context_pb2 import QoDConstraintsRequest, Constraint, ConstraintActionEnum, Constraint_QoSProfile, Constraint_Schedule, Empty, QoSProfileId, QoSProfile +from common.proto.context_pb2 import Constraint, ConstraintActionEnum, Constraint_QoSProfile, Constraint_Schedule, Empty +from common.proto.qos_profile_pb2 import QoSProfile, QoSProfileId, QoDConstraintsRequest from common.proto.qos_profile_pb2_grpc import QoSProfileServiceServicer from context.client.ContextClient import ContextClient +from .database.QoSProfile import set_qos_profile, delete_qos_profile, get_qos_profile, get_qos_profiles LOGGER = logging.getLogger(__name__) @@ -27,65 +29,49 @@ LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('QoSProfile', 'RPC') class QoSProfileServiceServicerImpl(QoSProfileServiceServicer): - def __init__(self ) -> None: + def __init__(self, db_engine: sqlalchemy.engine.Engine) -> None: LOGGER.debug('Servicer Created') + self.db_engine = db_engine @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def CreateQoSProfile(self, request: QoSProfile, context: grpc.ServicerContext) -> QoSProfile: - context_client = ContextClient() - try: - qos_profile_get = context_client.GetQoSProfile(request.qos_profile_id) + qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid) + if qos_profile is not None: context.set_details(f'QoSProfile {request.qos_profile_id.qos_profile_id.uuid} already exists') context.set_code(grpc.StatusCode.ALREADY_EXISTS) return QoSProfile() - except grpc._channel._InactiveRpcError as exc: - if exc.code() != grpc.StatusCode.NOT_FOUND: - raise exc - qos_profile = context_client.CreateQoSProfile(request) - return qos_profile + return set_qos_profile(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def UpdateQoSProfile(self, request: QoSProfile, context: grpc.ServicerContext) -> QoSProfile: - context_client = ContextClient() - try: - _ = context_client.GetQoSProfile(request.qos_profile_id) - except grpc._channel._InactiveRpcError as exc: - if exc.code() == grpc.StatusCode.NOT_FOUND: - context.set_details(f'QoSProfile {request.qos_profile_id.qos_profile_id.uuid} not found') - context.set_code(grpc.StatusCode.NOT_FOUND) - return QoSProfile() - qos_profile = context_client.UpdateQoSProfile(request) - return qos_profile + qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid) + if qos_profile is None: + context.set_details(f'QoSProfile {request.qos_profile_id.qos_profile_id.uuid} not found') + context.set_code(grpc.StatusCode.NOT_FOUND) + return QoSProfile() + return set_qos_profile(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def DeleteQoSProfile(self, request: QoSProfileId, context: grpc.ServicerContext) -> Empty: - context_client = ContextClient() - try: - _ = context_client.GetQoSProfile(request) - except grpc._channel._InactiveRpcError as exc: - if exc.code() == grpc.StatusCode.NOT_FOUND: - context.set_details(f'QoSProfile {request.qos_profile_id.uuid} not found') - context.set_code(grpc.StatusCode.NOT_FOUND) - return QoSProfile() - empty = context_client.DeleteQoSProfile(request) - return empty + qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid) + if qos_profile is None: + context.set_details(f'QoSProfile {request.qos_profile_id.qos_profile_id.uuid} not found') + context.set_code(grpc.StatusCode.NOT_FOUND) + return QoSProfile() + return delete_qos_profile(self.db_engine, request.qos_profile_id.uuid) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetQoSProfile(self, request: QoSProfileId, context: grpc.ServicerContext) -> QoSProfile: - context_client = ContextClient() - try: - qos_profile = context_client.GetQoSProfile(request) - except grpc._channel._InactiveRpcError as exc: - if exc.code() == grpc.StatusCode.NOT_FOUND: - context.set_details(f'QoSProfile {request.qos_profile_id.uuid} not found') - context.set_code(grpc.StatusCode.NOT_FOUND) - return QoSProfile() + qos_profile = get_qos_profile(self.db_engine, request.qos_profile_id.uuid) + if qos_profile is None: + context.set_details(f'QoSProfile {request.qos_profile_id.uuid} not found') + context.set_code(grpc.StatusCode.NOT_FOUND) + return QoSProfile() return qos_profile @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetQoSProfiles(self, request: Empty, context: grpc.ServicerContext) -> Iterator[QoSProfile]: - context_client = ContextClient() - yield from context_client.GetQoSProfiles(request) + yield from get_qos_profiles(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) diff --git a/src/qos_profile/service/__main__.py b/src/qos_profile/service/__main__.py index ccd1ca23ffa4504569b44e6f2250ce583eafcf81..6666d9f193ff46d801299ab8eeef3102a1abeb31 100644 --- a/src/qos_profile/service/__main__.py +++ b/src/qos_profile/service/__main__.py @@ -19,6 +19,7 @@ from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables ) +from .database.Engine import Engine from .QoSProfileService import QoSProfileService terminate = threading.Event() @@ -49,8 +50,20 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) + # Get Database Engine instance and initialize database, if needed + LOGGER.info('Getting SQLAlchemy DB Engine...') + db_engine = Engine.get_engine() + if db_engine is None: + LOGGER.error('Unable to get SQLAlchemy DB Engine...') + return -1 + + try: + Engine.create_database(db_engine) + except: # pylint: disable=bare-except # pragma: no cover + LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url))) + # Starting service service - grpc_service = QoSProfileService() + grpc_service = QoSProfileService(db_engine) grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/qos_profile/service/database/Engine.py b/src/qos_profile/service/database/Engine.py new file mode 100644 index 0000000000000000000000000000000000000000..6ba1a82d0b5790deded242ecde682020a0c785f8 --- /dev/null +++ b/src/qos_profile/service/database/Engine.py @@ -0,0 +1,55 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, sqlalchemy, sqlalchemy_utils +from common.Settings import get_setting + +LOGGER = logging.getLogger(__name__) + +APP_NAME = 'tfs' +ECHO = False # true: dump SQL commands and transactions executed +CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@qos-profileservice.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' + +class Engine: + @staticmethod + def get_engine() -> sqlalchemy.engine.Engine: + crdb_uri = get_setting('CRDB_URI', default=None) + if crdb_uri is None: + CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') + CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') + CRDB_DATABASE = get_setting('CRDB_DATABASE') + CRDB_USERNAME = get_setting('CRDB_USERNAME') + CRDB_PASSWORD = get_setting('CRDB_PASSWORD') + CRDB_SSLMODE = get_setting('CRDB_SSLMODE') + crdb_uri = CRDB_URI_TEMPLATE.format( + CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) + + try: + engine = sqlalchemy.create_engine( + crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True) + except: # pylint: disable=bare-except # pragma: no cover + LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) + return None + + return engine + + @staticmethod + def create_database(engine : sqlalchemy.engine.Engine) -> None: + if not sqlalchemy_utils.database_exists(engine.url): + sqlalchemy_utils.create_database(engine.url) + + @staticmethod + def drop_database(engine : sqlalchemy.engine.Engine) -> None: + if sqlalchemy_utils.database_exists(engine.url): + sqlalchemy_utils.drop_database(engine.url) diff --git a/src/context/service/database/QoSProfile.py b/src/qos_profile/service/database/QoSProfile.py similarity index 100% rename from src/context/service/database/QoSProfile.py rename to src/qos_profile/service/database/QoSProfile.py diff --git a/src/qos_profile/service/database/__init__.py b/src/qos_profile/service/database/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/qos_profile/service/database/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/context/service/database/models/QoSProfile.py b/src/qos_profile/service/database/models/QoSProfile.py similarity index 93% rename from src/context/service/database/models/QoSProfile.py rename to src/qos_profile/service/database/models/QoSProfile.py index 431d0f503ce7c5224bff108d570c953ff6f538c6..bfbdeef0a35490b1a62b80bddb098fd2bf90c2e4 100644 --- a/src/context/service/database/models/QoSProfile.py +++ b/src/qos_profile/service/database/models/QoSProfile.py @@ -13,13 +13,14 @@ # limitations under the License. from sqlalchemy import Column, Integer, String, JSON +from sqlalchemy.dialects.postgresql import UUID from ._Base import _Base class QoSProfileModel(_Base): __tablename__ = 'qos_profile' - qos_profile_id = Column(String, primary_key=True) + qos_profile_id = Column(UUID(as_uuid=False), primary_key=True) name = Column(String, nullable=False) description = Column(String, nullable=False) status = Column(String, nullable=False) diff --git a/src/qos_profile/service/database/models/_Base.py b/src/qos_profile/service/database/models/_Base.py new file mode 100644 index 0000000000000000000000000000000000000000..6e71b3c0f94d68fa718a8ac11210fc03f5ed9ff9 --- /dev/null +++ b/src/qos_profile/service/database/models/_Base.py @@ -0,0 +1,74 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy +from typing import Any, List +from sqlalchemy.orm import Session, sessionmaker, declarative_base +from sqlalchemy.sql import text +from sqlalchemy_cockroachdb import run_transaction + +_Base = declarative_base() + +def create_performance_enhancers(db_engine : sqlalchemy.engine.Engine) -> None: + def index_storing( + index_name : str, table_name : str, index_fields : List[str], storing_fields : List[str] + ) -> Any: + str_index_fields = ','.join(['"{:s}"'.format(index_field) for index_field in index_fields]) + str_storing_fields = ','.join(['"{:s}"'.format(storing_field) for storing_field in storing_fields]) + INDEX_STORING = 'CREATE INDEX IF NOT EXISTS {:s} ON "{:s}" ({:s}) STORING ({:s});' + return text(INDEX_STORING.format(index_name, table_name, str_index_fields, str_storing_fields)) + + statements = [ + index_storing('device_configrule_device_uuid_rec_idx', 'device_configrule', ['device_uuid'], [ + 'position', 'kind', 'action', 'data', 'created_at', 'updated_at' + ]), + index_storing('service_configrule_service_uuid_rec_idx', 'service_configrule', ['service_uuid'], [ + 'position', 'kind', 'action', 'data', 'created_at', 'updated_at' + ]), + index_storing('slice_configrule_slice_uuid_rec_idx', 'slice_configrule', ['slice_uuid'], [ + 'position', 'kind', 'action', 'data', 'created_at', 'updated_at' + ]), + index_storing('connection_service_uuid_rec_idx', 'connection', ['service_uuid'], [ + 'settings', 'created_at', 'updated_at' + ]), + index_storing('service_constraint_service_uuid_rec_idx', 'service_constraint', ['service_uuid'], [ + 'position', 'kind', 'data', 'created_at', 'updated_at' + ]), + index_storing('slice_constraint_slice_uuid_rec_idx', 'slice_constraint', ['slice_uuid'], [ + 'position', 'kind', 'data', 'created_at', 'updated_at' + ]), + index_storing('endpoint_device_uuid_rec_idx', 'endpoint', ['device_uuid'], [ + 'topology_uuid', 'name', 'endpoint_type', 'kpi_sample_types', 'created_at', 'updated_at' + ]), + index_storing('qos_profile_context_uuid_rec_idx', 'qos_profile', ['context_uuid'], [ + 'service_name', 'service_type', 'service_status', 'created_at', 'updated_at' + ]), + index_storing('slice_context_uuid_rec_idx', 'slice', ['context_uuid'], [ + 'slice_name', 'slice_status', 'slice_owner_uuid', 'slice_owner_string', 'created_at', 'updated_at' + ]), + index_storing('topology_context_uuid_rec_idx', 'topology', ['context_uuid'], [ + 'topology_name', 'created_at', 'updated_at' + ]), + index_storing('device_component_idx', 'device_component', ['device_uuid'], [ + 'name', 'type', 'attributes', 'created_at', 'updated_at' + ]), + ] + def callback(session : Session) -> bool: + for stmt in statements: session.execute(stmt) + run_transaction(sessionmaker(bind=db_engine), callback) + +def rebuild_database(db_engine : sqlalchemy.engine.Engine, drop_if_exists : bool = False): + if drop_if_exists: _Base.metadata.drop_all(db_engine) + _Base.metadata.create_all(db_engine) + # create_performance_enhancers(db_engine) diff --git a/src/qos_profile/service/database/models/__init__.py b/src/qos_profile/service/database/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..bbfc943b68af13a11e562abbc8680ade71db8f02 --- /dev/null +++ b/src/qos_profile/service/database/models/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/qos_profile/tests/conftest.py b/src/qos_profile/tests/conftest.py index 8d8e455f2ac1f72faf3e34835c860fa69e9a9417..e0ae3490c00ca8057a0f13ad3a3ee1908943b11b 100644 --- a/src/qos_profile/tests/conftest.py +++ b/src/qos_profile/tests/conftest.py @@ -15,6 +15,8 @@ import pytest from qos_profile.client.QoSProfileClient import QoSProfileClient from common.proto.context_pb2 import Uuid, QoSProfileValueUnitPair, QoSProfileId, QoSProfile +from common.proto.context_pb2 import Uuid +from common.proto.qos_profile_pb2 import QoSProfileValueUnitPair, QoSProfileId, QoSProfile @pytest.fixture(scope='function') def qos_profile_client(): diff --git a/src/qos_profile/tests/test_crud.py b/src/qos_profile/tests/test_crud.py index b98351ce9ae0d7da5151702d5b76c436b4844928..e60f34933c8eefc508f33234f7e399b2ecd6aaee 100644 --- a/src/qos_profile/tests/test_crud.py +++ b/src/qos_profile/tests/test_crud.py @@ -16,6 +16,7 @@ from grpc import RpcError, StatusCode import logging, pytest from .conftest import create_qos_profile_from_json from common.proto.context_pb2 import Empty, Uuid, QoSProfileId +from common.proto.qos_profile_pb2 import QoSProfileId from common.tools.grpc.Tools import grpc_message_to_json_string from qos_profile.client.QoSProfileClient import QoSProfileClient