diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 115b336761dd94902597c3b6e21e7d3dcf225af1..cb6ea273b144535bb3bbb425df601f77ad117cc5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -49,5 +49,6 @@ include: - local: '/src/kpi_value_api/.gitlab-ci.yml' - local: '/src/kpi_value_writer/.gitlab-ci.yml' - local: '/src/telemetry/.gitlab-ci.yml' + - local: '/src/analytics/.gitlab-ci.yml' # This should be last one: end-to-end integration tests - local: '/src/tests/.gitlab-ci.yml' diff --git a/deploy/kafka.sh b/deploy/kafka.sh index 0483bce153b457800c6f7db2ef66685e90118111..4cbcdb7014c983eeda9bab1d6655fa042751b931 100755 --- a/deploy/kafka.sh +++ b/deploy/kafka.sh @@ -47,10 +47,10 @@ function kafka_deploy() { cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}" # echo "Apache Kafka Namespace" - echo ">>> Delete Apache Kafka Namespace" + echo "Delete Apache Kafka Namespace" kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found - echo ">>> Create Apache Kafka Namespace" + echo "Create Apache Kafka Namespace" kubectl create namespace ${KFK_NAMESPACE} # echo ">>> Deplying Apache Kafka Zookeeper" @@ -76,15 +76,15 @@ function kafka_deploy() { # fi } -echo "Apache Kafka" -echo ">>> Checking if Apache Kafka is deployed ... " +echo ">>> Apache Kafka" +echo "Checking if Apache Kafka is deployed ... " if [ "$KFK_REDEPLOY" == "YES" ]; then - echo ">>> Redeploying kafka namespace" + echo "Redeploying kafka namespace" kafka_deploy elif kubectl get namespace "${KFK_NAMESPACE}" &> /dev/null; then - echo ">>> Apache Kafka already present; skipping step." + echo "Apache Kafka already present; skipping step." else - echo ">>> Kafka namespace doesn't exists. Deploying kafka namespace" + echo "Kafka namespace doesn't exists. Deploying kafka namespace" kafka_deploy fi echo diff --git a/deploy/tfs.sh b/deploy/tfs.sh index da078a4f3d9b4c441085a9ae0b8c532ca1f66032..65c1e8de28f2045b2ac78938b84d3c33e282025e 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -146,55 +146,17 @@ kubectl create namespace $TFS_K8S_NAMESPACE sleep 2 printf "\n" -echo "Create secret with CockroachDB data" +echo ">>> Create Secret with CockroachDB data..." CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_CONTEXT=${CRDB_DATABASE} # TODO: change by specific configurable environment variable kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_CONTEXT} \ --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ --from-literal=CRDB_SSLMODE=require printf "\n" -echo "Create secret with CockroachDB data for KPI Management microservices" -CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable -kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ - --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ - --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_KPI_MGMT} \ - --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ - --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ - --from-literal=CRDB_SSLMODE=require -printf "\n" - -echo "Create secret with CockroachDB data for Telemetry microservices" -CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_TELEMETRY="tfs_telemetry" # TODO: change by specific configurable environment variable -kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ - --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ - --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \ - --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ - --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ - --from-literal=CRDB_SSLMODE=require -printf "\n" - -echo "Create secret with CockroachDB data for Analytics microservices" -CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}') -CRDB_DATABASE_ANALYTICS="tfs_analytics" # TODO: change by specific configurable environment variable -kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ - --from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \ - --from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \ - --from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \ - --from-literal=CRDB_USERNAME=${CRDB_USERNAME} \ - --from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \ - --from-literal=CRDB_SSLMODE=require -printf "\n" - -echo "Create secret with Apache Kafka data for KPI, Telemetry and Analytics microservices" +echo ">>> Create Secret with Apache Kakfa..." KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}') kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \ --from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \ diff --git a/manifests/analyticsservice.yaml b/manifests/analyticsservice.yaml index 0fa3ed0be6eda8cf944e199543e3c2cd59cc98d6..61666ead951c73e4034110b00a51743d33bd4ce2 100644 --- a/manifests/analyticsservice.yaml +++ b/manifests/analyticsservice.yaml @@ -37,9 +37,13 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_analytics" + - name: METRICS_PORT + value: "9192" envFrom: - secretRef: - name: crdb-analytics + name: crdb-data - secretRef: name: kfk-kpi-data readinessProbe: @@ -60,10 +64,12 @@ spec: imagePullPolicy: Always ports: - containerPort: 30090 - - containerPort: 9192 + - containerPort: 9193 env: - name: LOG_LEVEL value: "INFO" + - name: METRICS_PORT + value: "9193" envFrom: - secretRef: name: kfk-kpi-data @@ -100,10 +106,14 @@ spec: protocol: TCP port: 30090 targetPort: 30090 - - name: metrics + - name: metrics-frontend protocol: TCP port: 9192 targetPort: 9192 + - name: metrics-backend + protocol: TCP + port: 9193 + targetPort: 9193 --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler diff --git a/manifests/contextservice.yaml b/manifests/contextservice.yaml index 3abc4f208da8b4820b589b798a328c4a971f55f0..0fc8a1c44f7358a962276ebcf38a165d2db986cd 100644 --- a/manifests/contextservice.yaml +++ b/manifests/contextservice.yaml @@ -45,6 +45,8 @@ spec: value: "FALSE" - name: ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY value: "FALSE" + - name: CRDB_DATABASE + value: "tfs_context" envFrom: - secretRef: name: crdb-data diff --git a/manifests/kpi_managerservice.yaml b/manifests/kpi_managerservice.yaml index 984d783a9de7ed3c0c02e87d82ec673dc19c9508..31eaf1284a08961adc6fe97d5e54eeaa7a98edae 100644 --- a/manifests/kpi_managerservice.yaml +++ b/manifests/kpi_managerservice.yaml @@ -39,9 +39,11 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_kpi" envFrom: - secretRef: - name: crdb-kpi-data + name: crdb-data readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30010"] diff --git a/manifests/servicemonitors.yaml b/manifests/servicemonitors.yaml index 716c1c6891802d7fcc55da798d06c650373fb1b5..8a8fe6f39eff87d12582f2f83734c07dc695cea3 100644 --- a/manifests/servicemonitors.yaml +++ b/manifests/servicemonitors.yaml @@ -475,3 +475,156 @@ spec: any: false matchNames: - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-analyticsservice-metric + labels: + app: analyticsservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: analyticsservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics-frontend # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + - port: metrics-backend # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-telemetryservice-metric + labels: + app: telemetryservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: telemetryservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics-frontend # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + - port: metrics-backend # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-kpi-managerservice-metric + labels: + app: kpi-managerservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: kpi-managerservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-kpi_value_apiservice-metric + labels: + app: kpi_value_apiservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: kpi_value_apiservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-kpi_value_writerservice-metric + labels: + app: kpi_value_writerservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: kpi_value_writerservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 2f9917499a425b95d436ffa8cdb311d29483d2ca..c3763d6a91756c2592ec819d60bc649584ef3ca9 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -37,9 +37,13 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_kpi" + - name: METRICS_PORT + value: "9192" envFrom: - secretRef: - name: crdb-telemetry + name: crdb-data - secretRef: name: kfk-kpi-data readinessProbe: @@ -60,10 +64,12 @@ spec: imagePullPolicy: Always ports: - containerPort: 30060 - - containerPort: 9192 + - containerPort: 9193 env: - name: LOG_LEVEL value: "INFO" + - name: METRICS_PORT + value: "9193" envFrom: - secretRef: name: kfk-kpi-data @@ -100,10 +106,14 @@ spec: protocol: TCP port: 30060 targetPort: 30060 - - name: metrics + - name: metrics-frontend protocol: TCP port: 9192 targetPort: 9192 + - name: metrics-backend + protocol: TCP + port: 9193 + targetPort: 9193 --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler diff --git a/scripts/run_tests_locally-kpi-DB.sh b/scripts/run_tests_locally-kpi-DB.sh index 4953b49e0a437becfda1648c722bcdcf92c58d93..29c6595102c22bc47fa221eb80459aea934cbcd9 100755 --- a/scripts/run_tests_locally-kpi-DB.sh +++ b/scripts/run_tests_locally-kpi-DB.sh @@ -24,7 +24,7 @@ cd $PROJECTDIR/src # python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc -CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_manager/tests/test_kpi_db.py diff --git a/scripts/run_tests_locally-telemetry-DB.sh b/scripts/run_tests_locally-telemetry-DB.sh index 4b9a417603cc42a4e7e8b19c7394cc38633817fa..85cb8664a7e93b63363b0bad51b52449e57d80b1 100755 --- a/scripts/run_tests_locally-telemetry-DB.sh +++ b/scripts/run_tests_locally-telemetry-DB.sh @@ -20,7 +20,8 @@ cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ # kpi_manager/tests/test_unitary.py - +CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \ telemetry/tests/test_telemetryDB.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index 79db05fcf1259365e8a909ee99395eb59dfb9437..97a06a0d6c16daf94e3e6b30bfc70eca3e7ce3a3 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -19,10 +19,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc # coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ -# kpi_manager/tests/test_unitary.py - -# python3 kpi_manager/tests/test_unitary.py RCFILE=$PROJECTDIR/coverage/.coveragerc + + python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ telemetry/backend/tests/test_TelemetryBackend.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index a2a1de52340cac527d4d1c446c76740d38ce7783..7506be5e0750b44e37368e86dbbfd00131c0d270 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -17,11 +17,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src -# RCFILE=$PROJECTDIR/coverage/.coveragerc -# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ -# kpi_manager/tests/test_unitary.py -# python3 kpi_manager/tests/test_unitary.py +CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py index 1ba68989a066e4638adc12e65289ed50b740731d..ab0b50f2ebba8e2590f1fcb4f2801f42a9c5d208 100644 --- a/src/analytics/database/Analyzer_DB.py +++ b/src/analytics/database/Analyzer_DB.py @@ -13,138 +13,44 @@ # limitations under the License. import logging -import sqlalchemy_utils +from common.method_wrappers.Decorator import MetricsPool +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException -from sqlalchemy import inspect, or_ -from sqlalchemy.orm import sessionmaker +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('KpiManager', 'Database') -from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel -from analytics.database.AnalyzerEngine import AnalyzerEngine -from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) +class AnalyzerDB(Database): + def __init__(self, model) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__(model) -LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_analyzer" # TODO: export name from enviornment variable - -class AnalyzerDB: - def __init__(self): - self.db_engine = AnalyzerEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self): - if not sqlalchemy_utils.database_exists(self.db_engine.url): - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) - sqlalchemy_utils.create_database(self.db_engine.url) - - def drop_database(self) -> None: - if sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.drop_database(self.db_engine.url) - - def create_tables(self): - try: - AnalyzerModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) - except Exception as e: - LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) - raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) - - def verify_tables(self): - try: - inspect_object = inspect(self.db_engine) - if(inspect_object.has_table('analyzer', None)): - LOGGER.info("Table exists in DB: {:}".format(self.db_name)) - except Exception as e: - LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) - -# ----------------- CURD OPERATIONS --------------------- - - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") - return True - except Exception as e: - session.rollback() - if "psycopg2.errors.UniqueViolation" in str(e): - LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, - extra_details=["Unique key voilation: {:}".format(e)] ) - else: - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") - print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) - return None - except Exception as e: - session.rollback() - LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) - return None - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ session = self.Session() try: - query = session.query(AnalyzerModel) - + query = session.query(model) # Apply filters based on the filter_object if filter_object.analyzer_id: - query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) + query = query.filter(model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id])) if filter_object.algorithm_names: - query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names)) + query = query.filter(model.algorithm_name.in_(filter_object.algorithm_names)) if filter_object.input_kpi_ids: input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids] - query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids)) + query = query.filter(model.input_kpi_ids.op('&&')(input_kpi_uuids)) if filter_object.output_kpi_ids: output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids] - query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids)) - - result = query.all() - # query should be added to return all rows - if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} - else: - LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") - return result + query = query.filter(model.output_kpi_ids.op('&&')(output_kpi_uuids)) except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) - finally: - session.close() + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, model) diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 8bb6a17afb5b911e3652fdb8d1853b5b7bc6faf3..a7fc8d49248ff01a860accac1b64a29d5533069f 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -37,7 +37,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value - self.db_obj = AnalyzerDB() + self.db_obj = AnalyzerDB(AnalyzerModel) self.result_queue = queue.Queue() self.scheduler = BackgroundScheduler() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) @@ -84,7 +84,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate)) self.kafka_producer.flush() - # self.StartResponseListener(analyzer_uuid) def StartResponseListener(self, filter_key=None): """ @@ -209,6 +208,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) print ('Message delivery failed: {:}'.format(err)) - # else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) + else: + LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + print('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index 6c331844f45d98095ef98951f3db43a0e2f0c69c..1df996785ec636592cf5197144d916a89257d9af 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -16,9 +16,11 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .AnalyticsFrontendService import AnalyticsFrontendService +from analytics.database.AnalyzerModel import Analyzer as Model +from common.tools.database.GenericDatabase import Database terminate = threading.Event() -LOGGER = None +LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') @@ -36,6 +38,11 @@ def main(): LOGGER.info('Starting...') + # To create DB + kpiDBobj = Database(Model) + kpiDBobj.create_database() + kpiDBobj.create_tables() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index d2428c01fb021f71a884d9a99c446bfef6e66559..44e84e4683bcdcec72e572b8e4deea903bf0de65 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -84,10 +84,10 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ########################### # --- "test_validate_kafka_topics" should be executed before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) # ----- core funtionality test ----- # def test_StartAnalytics(analyticsFrontend_client): @@ -102,27 +102,19 @@ def test_StartStopAnalyzers(analyticsFrontend_client): LOGGER.info('--> StartAnalyzer') added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer()) LOGGER.debug(str(added_analyzer_id)) - LOGGER.info(' --> Calling StartResponseListener... ') - class_obj = AnalyticsFrontendServiceServicerImpl() - response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid) - LOGGER.debug(response) - LOGGER.info("waiting for timer to comlete ...") - time.sleep(3) - LOGGER.info('--> StopAnalyzer') - response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id) - LOGGER.debug(str(response)) + assert isinstance(added_analyzer_id, AnalyzerId) -# def test_SelectAnalytics(analyticsFrontend_client): -# LOGGER.info(' >>> test_SelectAnalytics START: <<< ') -# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) -# LOGGER.debug(str(response)) -# assert isinstance(response, AnalyzerList) +def test_StopAnalytic(analyticsFrontend_client): + LOGGER.info(' >>> test_StopAnalytic START: <<< ') + response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) + LOGGER.debug(str(response)) + assert isinstance(response, Empty) -# def test_StopAnalytic(analyticsFrontend_client): -# LOGGER.info(' >>> test_StopAnalytic START: <<< ') -# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id()) -# LOGGER.debug(str(response)) -# assert isinstance(response, Empty) +def test_SelectAnalytics(analyticsFrontend_client): + LOGGER.info(' >>> test_SelectAnalytics START: <<< ') + response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, AnalyzerList) # def test_ResponseListener(): # LOGGER.info(' >>> test_ResponseListener START <<< ') @@ -131,4 +123,4 @@ def test_StartStopAnalyzers(analyticsFrontend_client): # class_obj = AnalyticsFrontendServiceServicerImpl() # for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid): # LOGGER.debug(response) -# assert isinstance(response, tuple) \ No newline at end of file +# assert isinstance(response, tuple) diff --git a/src/common/Settings.py b/src/common/Settings.py index eaeb363adc1d9eadb9ddb0487abef8a0885ce380..936c0387b5ad989621680b9f6f848e69fcc00e39 100644 --- a/src/common/Settings.py +++ b/src/common/Settings.py @@ -79,12 +79,12 @@ def get_service_host(service_name : ServiceNameEnum): def get_service_port_grpc(service_name : ServiceNameEnum): envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_PORT_GRPC) default_value = DEFAULT_SERVICE_GRPC_PORTS.get(service_name.value) - return get_setting(envvar_name, default=default_value) + return int(get_setting(envvar_name, default=default_value)) def get_service_port_http(service_name : ServiceNameEnum): envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_PORT_HTTP) default_value = DEFAULT_SERVICE_HTTP_PORTS.get(service_name.value) - return get_setting(envvar_name, default=default_value) + return int(get_setting(envvar_name, default=default_value)) def get_service_baseurl_http(service_name : ServiceNameEnum): envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_BASEURL_HTTP) @@ -95,16 +95,16 @@ def get_log_level(): return get_setting(ENVVAR_LOG_LEVEL, default=DEFAULT_LOG_LEVEL) def get_metrics_port(): - return get_setting(ENVVAR_METRICS_PORT, default=DEFAULT_METRICS_PORT) + return int(get_setting(ENVVAR_METRICS_PORT, default=DEFAULT_METRICS_PORT)) def get_grpc_bind_address(): return get_setting(ENVVAR_GRPC_BIND_ADDRESS, default=DEFAULT_GRPC_BIND_ADDRESS) def get_grpc_max_workers(): - return get_setting(ENVVAR_GRPC_MAX_WORKERS, default=DEFAULT_GRPC_MAX_WORKERS) + return int(get_setting(ENVVAR_GRPC_MAX_WORKERS, default=DEFAULT_GRPC_MAX_WORKERS)) def get_grpc_grace_period(): - return get_setting(ENVVAR_GRPC_GRACE_PERIOD, default=DEFAULT_GRPC_GRACE_PERIOD) + return int(get_setting(ENVVAR_GRPC_GRACE_PERIOD, default=DEFAULT_GRPC_GRACE_PERIOD)) def get_http_bind_address(): return get_setting(ENVVAR_HTTP_BIND_ADDRESS, default=DEFAULT_HTTP_BIND_ADDRESS) diff --git a/src/kpi_manager/database/Kpi_DB.py b/src/common/tools/database/GenericDatabase.py similarity index 58% rename from src/kpi_manager/database/Kpi_DB.py rename to src/common/tools/database/GenericDatabase.py index 49ad9c9b579daa918818366a1d9505089968edc2..0cd41b9ef0c97263b56a5eda67b173f6ba61a997 100644 --- a/src/kpi_manager/database/Kpi_DB.py +++ b/src/common/tools/database/GenericDatabase.py @@ -12,52 +12,54 @@ # See the License for the specific language governing permissions and # limitations under the License. + import logging import sqlalchemy_utils +from .GenericEngine import Engine +from sqlalchemy import inspect from sqlalchemy.orm import sessionmaker -from kpi_manager.database.KpiEngine import KpiEngine -from kpi_manager.database.KpiModel import Kpi as KpiModel -from common.method_wrappers.ServiceExceptions import ( - AlreadyExistsException, OperationFailedException , NotFoundException) +from common.Settings import get_setting + +from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_kpi_mgmt" -class KpiDB: - def __init__(self): - self.db_engine = KpiEngine.get_engine() +class Database: + def __init__(self, model): + self.db_engine = Engine.get_engine() if self.db_engine is None: LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self) -> None: + raise Exception('Failed to initialize the database engine.') + self.db_model = model + self.db_table = model.__name__ + self.Session = sessionmaker(bind=self.db_engine) + + def create_database(self): if not sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.create_database(self.db_engine.url) LOGGER.debug("Database created. {:}".format(self.db_engine.url)) + sqlalchemy_utils.create_database(self.db_engine.url) def drop_database(self) -> None: if sqlalchemy_utils.database_exists(self.db_engine.url): sqlalchemy_utils.drop_database(self.db_engine.url) def create_tables(self): - # TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables. try: - KpiModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name)) + self.db_model.metadata.create_all(self.db_engine) + LOGGER.debug("Tables created in the database: {:}".format(self.db_table)) except Exception as e: - LOGGER.debug("Tables cannot be created in the kpi database. {:s}".format(str(e))) + LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) def verify_tables(self): try: - with self.db_engine.connect() as connection: - result = connection.execute("SHOW TABLES;") - tables = result.fetchall() # type: ignore - LOGGER.debug("Tables verified: {:}".format(tables)) + inspect_object = inspect(self.db_engine) + if(inspect_object.has_table(self.db_table , None)): + LOGGER.info("Table exists in DB: {:}".format(self.db_name)) except Exception as e: - LOGGER.debug("Unable to fetch Table names. {:s}".format(str(e))) + LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + +# ----------------- DB OPERATIONS --------------------- def add_row_to_db(self, row): session = self.Session() @@ -70,7 +72,8 @@ class KpiDB: session.rollback() if "psycopg2.errors.UniqueViolation" in str(e): LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] ) + raise AlreadyExistsException(row.__class__.__name__, row, + extra_details=["Unique key voilation: {:}".format(e)] ) else: LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) @@ -89,6 +92,7 @@ class KpiDB: print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) return None except Exception as e: + session.rollback() LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) finally: @@ -112,43 +116,24 @@ class KpiDB: finally: session.close() - def select_with_filter(self, model, filter_object): - session = self.Session() + def select_with_filter(self, query_object, session, model): + """ + Generic method to apply filters dynamically based on filter. + params: model_name: SQLAlchemy model class name. + query_object : Object that contains query with applied filters. + session: session of the query. + return: List of filtered records. + """ try: - query = session.query(KpiModel) - # Apply filters based on the filter_object - if filter_object.kpi_id: - query = query.filter(KpiModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) - - if filter_object.kpi_sample_type: - query = query.filter(KpiModel.kpi_sample_type.in_(filter_object.kpi_sample_type)) - - if filter_object.device_id: - query = query.filter(KpiModel.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) - - if filter_object.endpoint_id: - query = query.filter(KpiModel.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) - - if filter_object.service_id: - query = query.filter(KpiModel.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) - - if filter_object.slice_id: - query = query.filter(KpiModel.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) - - if filter_object.connection_id: - query = query.filter(KpiModel.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) - - if filter_object.link_id: - query = query.filter(KpiModel.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) - result = query.all() - + result = query_object.all() + # Log result and handle empty case if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} + LOGGER.debug(f"Fetched filtered rows from {model.__name__} with filters: {query_object}") else: - LOGGER.debug(f"No matching row found in {model.__name__} table with filters: {filter_object}") + LOGGER.warning(f"No matching rows found in {model.__name__} with filters: {query_object}") return result except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) + LOGGER.error(f"Error fetching filtered rows from {model.__name__} with filters {query_object} ::: {e}") + raise OperationFailedException("Select by filter", extra_details=[f"Unable to apply the filter: {e}"]) finally: session.close() diff --git a/src/analytics/database/AnalyzerEngine.py b/src/common/tools/database/GenericEngine.py similarity index 92% rename from src/analytics/database/AnalyzerEngine.py rename to src/common/tools/database/GenericEngine.py index 9294e09966ef9e13c9cfa3cab590e5d0c8b6a80e..18bb15360853524ed93606f3137972aa76aa850a 100644 --- a/src/analytics/database/AnalyzerEngine.py +++ b/src/common/tools/database/GenericEngine.py @@ -18,14 +18,14 @@ from common.Settings import get_setting LOGGER = logging.getLogger(__name__) CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' -class AnalyzerEngine: +class Engine: @staticmethod def get_engine() -> sqlalchemy.engine.Engine: crdb_uri = get_setting('CRDB_URI', default=None) if crdb_uri is None: CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = "tfs-analyzer" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') + CRDB_DATABASE = get_setting('CRDB_DATABASE') CRDB_USERNAME = get_setting('CRDB_USERNAME') CRDB_PASSWORD = get_setting('CRDB_PASSWORD') CRDB_SSLMODE = get_setting('CRDB_SSLMODE') diff --git a/src/common/tools/database/__init__.py b/src/common/tools/database/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/common/tools/database/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index fc43c315114e7b51c4e2604afbb14e165796e7c5..73b633e23cd55aefeed9b8075f2ad35348fc83ef 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -25,11 +25,11 @@ class KafkaConfig(Enum): @staticmethod def get_kafka_address() -> str: - # kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) - # if kafka_server_address is None: - KFK_NAMESPACE = get_setting('KFK_NAMESPACE') - KFK_PORT = get_setting('KFK_SERVER_PORT') - kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) + kafka_server_address = get_setting('KFK_SERVER_ADDRESS', default=None) + if kafka_server_address is None: + KFK_NAMESPACE = get_setting('KFK_NAMESPACE') + KFK_PORT = get_setting('KFK_SERVER_PORT') + kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT) return kafka_server_address @staticmethod diff --git a/src/kpi_manager/database/KpiDB.py b/src/kpi_manager/database/KpiDB.py new file mode 100644 index 0000000000000000000000000000000000000000..d503f06f4cdeb57efd4c02701803f81fd31d3eea --- /dev/null +++ b/src/kpi_manager/database/KpiDB.py @@ -0,0 +1,66 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from common.method_wrappers.Decorator import MetricsPool +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('KpiManager', 'Database') + +class KpiDB(Database): + def __init__(self, model) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__(model) + + def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ + session = self.Session() + try: + query = session.query(model) + # Apply filters based on the filter_object + if filter_object.kpi_id: + query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) + + if filter_object.kpi_sample_type: + query = query.filter(model.kpi_sample_type.in_(filter_object.kpi_sample_type)) + + if filter_object.device_id: + query = query.filter(model.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id])) + + if filter_object.endpoint_id: + query = query.filter(model.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id])) + + if filter_object.service_id: + query = query.filter(model.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id])) + + if filter_object.slice_id: + query = query.filter(model.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id])) + + if filter_object.connection_id: + query = query.filter(model.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id])) + + if filter_object.link_id: + query = query.filter(model.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id])) + except Exception as e: + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, model) diff --git a/src/kpi_manager/database/KpiEngine.py b/src/kpi_manager/database/KpiEngine.py deleted file mode 100644 index 0fce7e3d36cf2f03a18f311c815719a4f17b2869..0000000000000000000000000000000000000000 --- a/src/kpi_manager/database/KpiEngine.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, sqlalchemy -from common.Settings import get_setting - -LOGGER = logging.getLogger(__name__) -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' - -class KpiEngine: - @staticmethod - def get_engine() -> sqlalchemy.engine.Engine: - crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') - CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = 'tfs_kpi_mgmt' # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') - CRDB_USERNAME = get_setting('CRDB_USERNAME') - CRDB_PASSWORD = get_setting('CRDB_PASSWORD') - CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - try: - engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri)) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) - return None # type: ignore - return engine diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index fd22474829ea0dfb6b1a25e70bbb4d5440c0216b..3f9ae8492380e5e11cd3cbc926a2fce07620d8a7 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -18,7 +18,8 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m from common.proto.context_pb2 import Empty from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList -from kpi_manager.database.Kpi_DB import KpiDB +# from kpi_manager.database.Kpi_DB import KpiDB +from kpi_manager.database.KpiDB import KpiDB from kpi_manager.database.KpiModel import Kpi as KpiModel LOGGER = logging.getLogger(__name__) @@ -27,7 +28,7 @@ METRICS_POOL = MetricsPool('KpiManager', 'NBIgRPC') class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): def __init__(self): LOGGER.info('Init KpiManagerService') - self.kpi_db_obj = KpiDB() + self.kpi_db_obj = KpiDB(KpiModel) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetKpiDescriptor(self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore diff --git a/src/kpi_manager/service/__main__.py b/src/kpi_manager/service/__main__.py index 244d5afa373a6462a0382a0ed26a588088a689a1..05e32bb58128975ea5d2a5f015d1e8b3977c9905 100644 --- a/src/kpi_manager/service/__main__.py +++ b/src/kpi_manager/service/__main__.py @@ -16,8 +16,11 @@ import logging, signal, sys, threading from common.Settings import get_log_level from .KpiManagerService import KpiManagerService +from kpi_manager.database.KpiModel import Kpi as Model +from common.tools.database.GenericDatabase import Database + terminate = threading.Event() -LOGGER = None +LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') @@ -35,6 +38,11 @@ def main(): LOGGER.debug('Starting...') + # To create DB + kpiDBobj = Database(Model) + kpiDBobj.create_database() + kpiDBobj.create_tables() + grpc_service = KpiManagerService() grpc_service.start() diff --git a/src/kpi_manager/tests/__init__.py b/src/kpi_manager/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/kpi_manager/tests/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/kpi_manager/tests/test_kpi_db.py b/src/kpi_manager/tests/test_kpi_db.py index d4a57f83664f851504389b3bbe99d5c2a92542d9..b1513a83f7c6122a34553d5933aea8d8c438e2a6 100644 --- a/src/kpi_manager/tests/test_kpi_db.py +++ b/src/kpi_manager/tests/test_kpi_db.py @@ -14,7 +14,12 @@ import logging -from kpi_manager.database.Kpi_DB import KpiDB +# from kpi_manager.database.Kpi_DB import KpiDB +from common.proto.kpi_manager_pb2 import KpiDescriptorList +from .test_messages import create_kpi_filter_request +from kpi_manager.database.KpiModel import Kpi as KpiModel +from kpi_manager.database.KpiDB import KpiDB +# from common.tools.database.GenericDatabase import Database LOGGER = logging.getLogger(__name__) @@ -26,3 +31,22 @@ def test_verify_databases_and_Tables(): kpiDBobj.create_database() kpiDBobj.create_tables() kpiDBobj.verify_tables() + +# def test_generic_DB_select_method(): +# LOGGER.info("--> STARTED-test_generic_DB_select_method") +# kpi_obj = KpiDB() +# _filter = create_kpi_filter_request() +# # response = KpiDescriptorList() +# try: +# kpi_obj.select_with_filter(KpiModel, _filter) +# except Exception as e: +# LOGGER.error('Unable to apply filter on kpi descriptor. {:}'.format(e)) +# LOGGER.info("--> FINISHED-test_generic_DB_select_method") +# # try: +# # for row in rows: +# # kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row) +# # response.kpi_descriptor_list.append(kpiDescriptor_obj) +# # return response +# # except Exception as e: +# # LOGGER.info('Unable to process filter response {:}'.format(e)) +# # assert isinstance(r) diff --git a/src/kpi_manager/tests/test_kpi_manager.py b/src/kpi_manager/tests/test_kpi_manager.py index 219fdadee9e2f4ca9ea9ac0be040043d4edfbdbe..06e836b70963768b375ab04e29a640591b283108 100755 --- a/src/kpi_manager/tests/test_kpi_manager.py +++ b/src/kpi_manager/tests/test_kpi_manager.py @@ -139,9 +139,9 @@ def test_SelectKpiDescriptor(kpi_manager_client): LOGGER.info("Response gRPC message object: {:}".format(response)) assert isinstance(response, KpiDescriptorList) -def test_set_list_of_KPIs(kpi_manager_client): - LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") - KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] - # adding KPI - for kpi in KPIs_TO_SEARCH: - kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) +# def test_set_list_of_KPIs(kpi_manager_client): +# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") +# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"] +# # adding KPI +# for kpi in KPIs_TO_SEARCH: +# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi)) diff --git a/src/service/service/__main__.py b/src/service/service/__main__.py index 5f9f2fa3afb2b708068423f70f764ffc61d29d9c..589d3f67317643bf76e454822a9fea40dd7e01e3 100644 --- a/src/service/service/__main__.py +++ b/src/service/service/__main__.py @@ -44,8 +44,8 @@ def main(): get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + # get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_HOST ), + # get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) signal.signal(signal.SIGINT, signal_handler) diff --git a/src/telemetry/database/TelemetryEngine.py b/src/telemetry/database/TelemetryEngine.py deleted file mode 100644 index 7c8620faf25e695e7f971bce78be9ad208a7701b..0000000000000000000000000000000000000000 --- a/src/telemetry/database/TelemetryEngine.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging, sqlalchemy -from common.Settings import get_setting - -LOGGER = logging.getLogger(__name__) -CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' - -class TelemetryEngine: - @staticmethod - def get_engine() -> sqlalchemy.engine.Engine: - crdb_uri = get_setting('CRDB_URI', default=None) - if crdb_uri is None: - CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') - CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') - CRDB_DATABASE = "tfs-telemetry" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT') - CRDB_USERNAME = get_setting('CRDB_USERNAME') - CRDB_PASSWORD = get_setting('CRDB_PASSWORD') - CRDB_SSLMODE = get_setting('CRDB_SSLMODE') - crdb_uri = CRDB_URI_TEMPLATE.format( - CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) - try: - engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' TelemetryDB initalized with DB URL: {:}'.format(crdb_uri)) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) - return None # type: ignore - return engine diff --git a/src/telemetry/database/Telemetry_DB.py b/src/telemetry/database/Telemetry_DB.py index 32acfd73a410a7bfddd6b487d0b1962afadb3842..110c7e80a4c36eed15417bfa05c4057ccb7fe292 100644 --- a/src/telemetry/database/Telemetry_DB.py +++ b/src/telemetry/database/Telemetry_DB.py @@ -13,125 +13,32 @@ # limitations under the License. import logging -import sqlalchemy_utils -from sqlalchemy import inspect -from sqlalchemy.orm import sessionmaker -from telemetry.database.TelemetryModel import Collector as CollectorModel -from telemetry.database.TelemetryEngine import TelemetryEngine -from common.method_wrappers.ServiceExceptions import ( - OperationFailedException, AlreadyExistsException ) +from common.method_wrappers.Decorator import MetricsPool +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException -LOGGER = logging.getLogger(__name__) -DB_NAME = "tfs_telemetry" +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database') -class TelemetryDB: - def __init__(self): - self.db_engine = TelemetryEngine.get_engine() - if self.db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') - return False - self.db_name = DB_NAME - self.Session = sessionmaker(bind=self.db_engine) - - def create_database(self): - if not sqlalchemy_utils.database_exists(self.db_engine.url): - LOGGER.debug("Database created. {:}".format(self.db_engine.url)) - sqlalchemy_utils.create_database(self.db_engine.url) - - def drop_database(self) -> None: - if sqlalchemy_utils.database_exists(self.db_engine.url): - sqlalchemy_utils.drop_database(self.db_engine.url) - - def create_tables(self): - try: - CollectorModel.metadata.create_all(self.db_engine) # type: ignore - LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) - except Exception as e: - LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) - raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) - - def verify_tables(self): - try: - inspect_object = inspect(self.db_engine) - if(inspect_object.has_table('collector', None)): - LOGGER.info("Table exists in DB: {:}".format(self.db_name)) - except Exception as e: - LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) - -# ----------------- CURD METHODs --------------------- - - def add_row_to_db(self, row): - session = self.Session() - try: - session.add(row) - session.commit() - LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") - return True - except Exception as e: - session.rollback() - if "psycopg2.errors.UniqueViolation" in str(e): - LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") - raise AlreadyExistsException(row.__class__.__name__, row, - extra_details=["Unique key voilation: {:}".format(e)] ) - else: - LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() - - def search_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - entity = session.query(model).filter_by(**{col_name: id_to_search}).first() - if entity: - # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") - return entity - else: - LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") - print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) - return None - except Exception as e: - session.rollback() - LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") - raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) - finally: - session.close() - - def delete_db_row_by_id(self, model, col_name, id_to_search): - session = self.Session() - try: - record = session.query(model).filter_by(**{col_name: id_to_search}).first() - if record: - session.delete(record) - session.commit() - LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) - else: - LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) - return None - except Exception as e: - session.rollback() - LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) - raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) - finally: - session.close() +class TelemetryDB(Database): + def __init__(self, model) -> None: + LOGGER.info('Init KpiManagerService') + super().__init__(model) def select_with_filter(self, model, filter_object): + """ + Generic method to create filters dynamically based on filter_object attributes. + params: model: SQLAlchemy model class to query. + filter_object: Object that contains filtering criteria as attributes. + return: SQLAlchemy session, query and Model + """ session = self.Session() try: - query = session.query(CollectorModel) - # Apply filters based on the filter_object + query = session.query(model) if filter_object.kpi_id: - query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) - result = query.all() - # query should be added to return all rows - if result: - LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} - else: - LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") - return result + query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) except Exception as e: - LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") - raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) - finally: - session.close() - + LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}") + raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)]) + + return super().select_with_filter(query, session, model) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index b73d9fa952ee42aeb7adb8f3c0b2e4a3ba7f3e09..c72e66bdd53f165ebae131e07f51d23e609dd8be 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -40,7 +40,7 @@ ACTIVE_COLLECTORS = [] # keep and can be populated from DB class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def __init__(self): LOGGER.info('Init TelemetryFrontendService') - self.tele_db_obj = TelemetryDB() + self.tele_db_obj = TelemetryDB(CollectorModel) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'frontend', diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index 2a6c5dbcf2da6b6a074c2b8ee23791bc4896442f..6697ff5f10e58b494736738e631a29a20691732d 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -16,6 +16,8 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .TelemetryFrontendService import TelemetryFrontendService +from telemetry.database.TelemetryModel import Collector as Model +from common.tools.database.GenericDatabase import Database terminate = threading.Event() LOGGER = None @@ -36,6 +38,11 @@ def main(): LOGGER.info('Starting...') + # To create DB + kpiDBobj = Database(Model) + kpiDBobj.create_database() + kpiDBobj.create_tables() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/telemetry/tests/test_telemetryDB.py b/src/telemetry/tests/test_telemetryDB.py index c4976f8c2144fcdcad43a3e25d43091010de0d18..1b122e4bca266018c01044e2eb8a1ab277b3e3c3 100644 --- a/src/telemetry/tests/test_telemetryDB.py +++ b/src/telemetry/tests/test_telemetryDB.py @@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__) def test_verify_databases_and_tables(): LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') TelemetryDBobj = TelemetryDB() - TelemetryDBobj.drop_database() - TelemetryDBobj.verify_tables() + # TelemetryDBobj.drop_database() + # TelemetryDBobj.verify_tables() TelemetryDBobj.create_database() TelemetryDBobj.create_tables() - TelemetryDBobj.verify_tables() \ No newline at end of file + TelemetryDBobj.verify_tables() diff --git a/src/webui/service/__main__.py b/src/webui/service/__main__.py index 8ec8dcb64d55c7a4c4d699601fe97a7d6432ce05..3c7be34954a42edd4f1d376a8d2a431f30507b20 100644 --- a/src/webui/service/__main__.py +++ b/src/webui/service/__main__.py @@ -43,8 +43,8 @@ def main(): get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + # get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_HOST ), + # get_env_var_name(ServiceNameEnum.QKD_APP, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) logger.info('Starting...')