Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (69)
Showing
with 419 additions and 188 deletions
......@@ -49,5 +49,6 @@ include:
- local: '/src/kpi_value_api/.gitlab-ci.yml'
- local: '/src/kpi_value_writer/.gitlab-ci.yml'
- local: '/src/telemetry/.gitlab-ci.yml'
- local: '/src/analytics/.gitlab-ci.yml'
# This should be last one: end-to-end integration tests
- local: '/src/tests/.gitlab-ci.yml'
......@@ -47,10 +47,10 @@ function kafka_deploy() {
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
# echo "Apache Kafka Namespace"
echo ">>> Delete Apache Kafka Namespace"
echo "Delete Apache Kafka Namespace"
kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found
echo ">>> Create Apache Kafka Namespace"
echo "Create Apache Kafka Namespace"
kubectl create namespace ${KFK_NAMESPACE}
# echo ">>> Deplying Apache Kafka Zookeeper"
......@@ -76,15 +76,15 @@ function kafka_deploy() {
# fi
}
echo "Apache Kafka"
echo ">>> Checking if Apache Kafka is deployed ... "
echo ">>> Apache Kafka"
echo "Checking if Apache Kafka is deployed ... "
if [ "$KFK_REDEPLOY" == "YES" ]; then
echo ">>> Redeploying kafka namespace"
echo "Redeploying kafka namespace"
kafka_deploy
elif kubectl get namespace "${KFK_NAMESPACE}" &> /dev/null; then
echo ">>> Apache Kafka already present; skipping step."
echo "Apache Kafka already present; skipping step."
else
echo ">>> Kafka namespace doesn't exists. Deploying kafka namespace"
echo "Kafka namespace doesn't exists. Deploying kafka namespace"
kafka_deploy
fi
echo
......@@ -146,55 +146,17 @@ kubectl create namespace $TFS_K8S_NAMESPACE
sleep 2
printf "\n"
echo "Create secret with CockroachDB data"
echo ">>> Create Secret with CockroachDB data..."
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_CONTEXT=${CRDB_DATABASE} # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \
--from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \
--from-literal=CRDB_DATABASE=${CRDB_DATABASE_CONTEXT} \
--from-literal=CRDB_USERNAME=${CRDB_USERNAME} \
--from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with CockroachDB data for KPI Management microservices"
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \
--from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \
--from-literal=CRDB_DATABASE=${CRDB_DATABASE_KPI_MGMT} \
--from-literal=CRDB_USERNAME=${CRDB_USERNAME} \
--from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with CockroachDB data for Telemetry microservices"
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_TELEMETRY="tfs_telemetry" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-telemetry --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \
--from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \
--from-literal=CRDB_DATABASE=${CRDB_DATABASE_TELEMETRY} \
--from-literal=CRDB_USERNAME=${CRDB_USERNAME} \
--from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with CockroachDB data for Analytics microservices"
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_ANALYTICS="tfs_analytics" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-analytics --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=CRDB_NAMESPACE=${CRDB_NAMESPACE} \
--from-literal=CRDB_SQL_PORT=${CRDB_SQL_PORT} \
--from-literal=CRDB_DATABASE=${CRDB_DATABASE_ANALYTICS} \
--from-literal=CRDB_USERNAME=${CRDB_USERNAME} \
--from-literal=CRDB_PASSWORD=${CRDB_PASSWORD} \
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with Apache Kafka data for KPI, Telemetry and Analytics microservices"
echo ">>> Create Secret with Apache Kakfa..."
KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}')
kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \
......@@ -669,6 +631,10 @@ if [[ "$TFS_COMPONENTS" == *"monitoring"* ]] && [[ "$TFS_COMPONENTS" == *"webui"
printf "\n\n"
fi
echo "Pruning Docker Images..."
docker image prune --force
printf "\n\n"
if [ "$DOCKER_BUILD" == "docker buildx build" ]; then
echo "Pruning Docker Buildx Cache..."
docker buildx prune --force
......
......@@ -37,9 +37,13 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
- name: CRDB_DATABASE
value: "tfs_analytics"
- name: METRICS_PORT
value: "9192"
envFrom:
- secretRef:
name: crdb-analytics
name: crdb-data
- secretRef:
name: kfk-kpi-data
readinessProbe:
......@@ -60,10 +64,12 @@ spec:
imagePullPolicy: Always
ports:
- containerPort: 30090
- containerPort: 9192
- containerPort: 9193
env:
- name: LOG_LEVEL
value: "INFO"
- name: METRICS_PORT
value: "9193"
envFrom:
- secretRef:
name: kfk-kpi-data
......@@ -100,10 +106,14 @@ spec:
protocol: TCP
port: 30090
targetPort: 30090
- name: metrics
- name: metrics-frontend
protocol: TCP
port: 9192
targetPort: 9192
- name: metrics-backend
protocol: TCP
port: 9193
targetPort: 9193
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
......
......@@ -45,6 +45,8 @@ spec:
value: "FALSE"
- name: ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY
value: "FALSE"
- name: CRDB_DATABASE
value: "tfs_context"
envFrom:
- secretRef:
name: crdb-data
......
......@@ -39,9 +39,11 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
- name: CRDB_DATABASE
value: "tfs_kpi"
envFrom:
- secretRef:
name: crdb-kpi-data
name: crdb-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30010"]
......
......@@ -62,3 +62,10 @@ spec:
name: nbiservice
port:
number: 8080
- path: /()(qkd_app/.*)
pathType: Prefix
backend:
service:
name: qkd-appservice
port:
number: 8005
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1
kind: Deployment
metadata:
name: qkd-appservice
spec:
selector:
matchLabels:
app: qkd-appservice
#replicas: 1
template:
metadata:
labels:
app: qkd-appservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: server
image: labs.etsi.org:5050/tfs/controller/qkd_app:latest
imagePullPolicy: Always
ports:
- containerPort: 10060
- containerPort: 9192
- containerPort: 8005
env:
- name: LOG_LEVEL
value: "DEBUG"
- name: CRDB_DATABASE_APP
value: "qkd_app"
envFrom:
- secretRef:
name: crdb-data
- secretRef:
name: nats-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10060"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10060"]
resources:
requests:
cpu: 150m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
---
apiVersion: v1
kind: Service
metadata:
name: qkd-appservice
labels:
app: qkd-appservice
spec:
type: ClusterIP
selector:
app: qkd-appservice
ports:
- name: grpc
protocol: TCP
port: 10060
targetPort: 10060
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
- name: http
port: 8005
targetPort: 8005
......@@ -475,3 +475,156 @@ spec:
any: false
matchNames:
- tfs # namespace where the app is running
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
namespace: monitoring # namespace where prometheus is running
name: tfs-analyticsservice-metric
labels:
app: analyticsservice
#release: prometheus
#release: prom # name of the release
# ( VERY IMPORTANT: You need to know the correct release name by viewing
# the servicemonitor of Prometheus itself: Without the correct name,
# Prometheus cannot identify the metrics of the Flask app as the target.)
spec:
selector:
matchLabels:
# Target app service
#namespace: tfs
app: analyticsservice # same as above
#release: prometheus # same as above
endpoints:
- port: metrics-frontend # named port in target app
scheme: http
path: /metrics # path to scrape
interval: 5s # scrape interval
- port: metrics-backend # named port in target app
scheme: http
path: /metrics # path to scrape
interval: 5s # scrape interval
namespaceSelector:
any: false
matchNames:
- tfs # namespace where the app is running
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
namespace: monitoring # namespace where prometheus is running
name: tfs-telemetryservice-metric
labels:
app: telemetryservice
#release: prometheus
#release: prom # name of the release
# ( VERY IMPORTANT: You need to know the correct release name by viewing
# the servicemonitor of Prometheus itself: Without the correct name,
# Prometheus cannot identify the metrics of the Flask app as the target.)
spec:
selector:
matchLabels:
# Target app service
#namespace: tfs
app: telemetryservice # same as above
#release: prometheus # same as above
endpoints:
- port: metrics-frontend # named port in target app
scheme: http
path: /metrics # path to scrape
interval: 5s # scrape interval
- port: metrics-backend # named port in target app
scheme: http
path: /metrics # path to scrape
interval: 5s # scrape interval
namespaceSelector:
any: false
matchNames:
- tfs # namespace where the app is running
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
namespace: monitoring # namespace where prometheus is running
name: tfs-kpi-managerservice-metric
labels:
app: kpi-managerservice
#release: prometheus
#release: prom # name of the release
# ( VERY IMPORTANT: You need to know the correct release name by viewing
# the servicemonitor of Prometheus itself: Without the correct name,
# Prometheus cannot identify the metrics of the Flask app as the target.)
spec:
selector:
matchLabels:
# Target app service
#namespace: tfs
app: kpi-managerservice # same as above
#release: prometheus # same as above
endpoints:
- port: metrics # named port in target app
scheme: http
path: /metrics # path to scrape
interval: 5s # scrape interval
namespaceSelector:
any: false
matchNames:
- tfs # namespace where the app is running
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
namespace: monitoring # namespace where prometheus is running
name: tfs-kpi_value_apiservice-metric
labels:
app: kpi_value_apiservice
#release: prometheus
#release: prom # name of the release
# ( VERY IMPORTANT: You need to know the correct release name by viewing
# the servicemonitor of Prometheus itself: Without the correct name,
# Prometheus cannot identify the metrics of the Flask app as the target.)
spec:
selector:
matchLabels:
# Target app service
#namespace: tfs
app: kpi_value_apiservice # same as above
#release: prometheus # same as above
endpoints:
- port: metrics # named port in target app
scheme: http
path: /metrics # path to scrape
interval: 5s # scrape interval
namespaceSelector:
any: false
matchNames:
- tfs # namespace where the app is running
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
namespace: monitoring # namespace where prometheus is running
name: tfs-kpi_value_writerservice-metric
labels:
app: kpi_value_writerservice
#release: prometheus
#release: prom # name of the release
# ( VERY IMPORTANT: You need to know the correct release name by viewing
# the servicemonitor of Prometheus itself: Without the correct name,
# Prometheus cannot identify the metrics of the Flask app as the target.)
spec:
selector:
matchLabels:
# Target app service
#namespace: tfs
app: kpi_value_writerservice # same as above
#release: prometheus # same as above
endpoints:
- port: metrics # named port in target app
scheme: http
path: /metrics # path to scrape
interval: 5s # scrape interval
namespaceSelector:
any: false
matchNames:
- tfs # namespace where the app is running
......@@ -37,9 +37,13 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
- name: CRDB_DATABASE
value: "tfs_kpi"
- name: METRICS_PORT
value: "9192"
envFrom:
- secretRef:
name: crdb-telemetry
name: crdb-data
- secretRef:
name: kfk-kpi-data
readinessProbe:
......@@ -60,10 +64,12 @@ spec:
imagePullPolicy: Always
ports:
- containerPort: 30060
- containerPort: 9192
- containerPort: 9193
env:
- name: LOG_LEVEL
value: "INFO"
- name: METRICS_PORT
value: "9193"
envFrom:
- secretRef:
name: kfk-kpi-data
......@@ -100,10 +106,14 @@ spec:
protocol: TCP
port: 30060
targetPort: 30060
- name: metrics
- name: metrics-frontend
protocol: TCP
port: 9192
targetPort: 9192
- name: metrics-backend
protocol: TCP
port: 9193
targetPort: 9193
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
......
......@@ -71,7 +71,14 @@ export TFS_COMPONENTS="context device pathcomp opticalcontroller service slice
#fi
# Uncomment to activate QKD App
#export TFS_COMPONENTS="${TFS_COMPONENTS} app"
# To manage QKD Apps, "service" requires "qkd_app" to be deployed
# before "service", thus we "hack" the TFS_COMPONENTS environment variable prepending the
# "qkd_app" only if "service" is already in TFS_COMPONENTS, and re-export it.
#if [[ "$TFS_COMPONENTS" == *"service"* ]]; then
# BEFORE="${TFS_COMPONENTS% service*}"
# AFTER="${TFS_COMPONENTS#* service}"
# export TFS_COMPONENTS="${BEFORE} qkd_app service ${AFTER}"
#fi
# Set the tag you want to use for your images.
......
syntax = "proto3";
package qkd_app;
import "context.proto";
// Optare: Change this if you want to change App's structure or enums.
// Optare: If a message (structure) is changed it must be changed in src/app/service/database
enum QKDAppStatusEnum {
QKDAPPSTATUS_ON = 0;
QKDAPPSTATUS_DISCONNECTED = 1;
QKDAPPSTATUS_OUT_OF_TIME = 2;
QKDAPPSTATUS_ZOMBIE = 3;
}
enum QKDAppTypesEnum {
QKDAPPTYPES_INTERNAL = 0;
QKDAPPTYPES_CLIENT = 1;
}
message QKDLId {
context.Uuid qkdl_uuid = 1;
}
message App {
AppId app_id = 1;
QKDAppStatusEnum app_status = 2;
QKDAppTypesEnum app_type = 3;
string server_app_id = 4;
repeated string client_app_id = 5;
repeated QKDLId backing_qkdl_id = 6;
context.DeviceId local_device_id = 7;
context.DeviceId remote_device_id = 8;
}
message AppId {
context.ContextId context_id = 1;
context.Uuid app_uuid = 2;
}
service AppService {
rpc RegisterApp(App) returns (context.Empty) {}
rpc ListApps (context.ContextId ) returns ( AppList ) {}
}
message AppList {
repeated App apps = 1;
}
......@@ -24,7 +24,7 @@ cd $PROJECTDIR/src
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
kpi_manager/tests/test_kpi_db.py
......@@ -20,7 +20,8 @@ cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=debug --verbose \
telemetry/tests/test_telemetryDB.py
......@@ -19,10 +19,9 @@ PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
# python3 kpi_manager/tests/test_unitary.py
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \
telemetry/backend/tests/test_TelemetryBackend.py
......@@ -17,11 +17,9 @@
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc
# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
# kpi_manager/tests/test_unitary.py
# python3 kpi_manager/tests/test_unitary.py
CRDB_SQL_ADDRESS=$(kubectl get service --namespace ${CRDB_NAMESPACE} cockroachdb-public -o 'jsonpath={.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
......
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/qkd-appservice -c server
......@@ -13,138 +13,44 @@
# limitations under the License.
import logging
import sqlalchemy_utils
from common.method_wrappers.Decorator import MetricsPool
from common.tools.database.GenericDatabase import Database
from common.method_wrappers.ServiceExceptions import OperationFailedException
from sqlalchemy import inspect, or_
from sqlalchemy.orm import sessionmaker
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiManager', 'Database')
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
from analytics.database.AnalyzerEngine import AnalyzerEngine
from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException)
class AnalyzerDB(Database):
def __init__(self, model) -> None:
LOGGER.info('Init KpiManagerService')
super().__init__(model)
LOGGER = logging.getLogger(__name__)
DB_NAME = "tfs_analyzer" # TODO: export name from enviornment variable
class AnalyzerDB:
def __init__(self):
self.db_engine = AnalyzerEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
try:
AnalyzerModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the database: {:}".format(self.db_name))
except Exception as e:
LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e)))
raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
def verify_tables(self):
try:
inspect_object = inspect(self.db_engine)
if(inspect_object.has_table('analyzer', None)):
LOGGER.info("Table exists in DB: {:}".format(self.db_name))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
# ----------------- CURD OPERATIONS ---------------------
def add_row_to_db(self, row):
session = self.Session()
try:
session.add(row)
session.commit()
LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.")
return True
except Exception as e:
session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def search_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
entity = session.query(model).filter_by(**{col_name: id_to_search}).first()
if entity:
# LOGGER.debug(f"{model.__name__} ID found: {str(entity)}")
return entity
else:
LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}")
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
session.close()
def delete_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
record = session.query(model).filter_by(**{col_name: id_to_search}).first()
if record:
session.delete(record)
session.commit()
LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
else:
LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search)
return None
except Exception as e:
session.rollback()
LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def select_with_filter(self, model, filter_object):
"""
Generic method to create filters dynamically based on filter_object attributes.
params: model: SQLAlchemy model class to query.
filter_object: Object that contains filtering criteria as attributes.
return: SQLAlchemy session, query and Model
"""
session = self.Session()
try:
query = session.query(AnalyzerModel)
query = session.query(model)
# Apply filters based on the filter_object
if filter_object.analyzer_id:
query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id]))
query = query.filter(model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id]))
if filter_object.algorithm_names:
query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names))
query = query.filter(model.algorithm_name.in_(filter_object.algorithm_names))
if filter_object.input_kpi_ids:
input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids]
query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids))
query = query.filter(model.input_kpi_ids.op('&&')(input_kpi_uuids))
if filter_object.output_kpi_ids:
output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids]
query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids))
result = query.all()
# query should be added to return all rows
if result:
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
else:
LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}")
return result
query = query.filter(model.output_kpi_ids.op('&&')(output_kpi_uuids))
except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
finally:
session.close()
LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}")
raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)])
return super().select_with_filter(query, session, model)
......@@ -37,7 +37,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def __init__(self):
LOGGER.info('Init AnalyticsFrontendService')
self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value
self.db_obj = AnalyzerDB()
self.db_obj = AnalyzerDB(AnalyzerModel)
self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
......@@ -84,7 +84,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
self.kafka_producer.flush()
# self.StartResponseListener(analyzer_uuid)
def StartResponseListener(self, filter_key=None):
"""
......@@ -209,6 +208,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print ('Message delivery failed: {:}'.format(err))
# else:
# LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
else:
LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
print('Message delivered to topic {:}'.format(msg.topic()))
......@@ -16,9 +16,11 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from .AnalyticsFrontendService import AnalyticsFrontendService
from analytics.database.AnalyzerModel import Analyzer as Model
from common.tools.database.GenericDatabase import Database
terminate = threading.Event()
LOGGER = None
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
......@@ -36,6 +38,11 @@ def main():
LOGGER.info('Starting...')
# To create DB
kpiDBobj = Database(Model)
kpiDBobj.create_database()
kpiDBobj.create_tables()
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
......