Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (29)
Showing
with 258 additions and 332 deletions
......@@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# Deploy QuestDB
./deploy/qdb.sh
# Deploy Apache Kafka
./deploy/kafka.sh
# Expose Dashboard
./deploy/expose_dashboard.sh
......
......@@ -20,50 +20,69 @@
# If not already set, set the namespace where Apache Kafka will be deployed.
export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"}
# If not already set, set the port Apache Kafka server will be exposed to.
export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"}
# If not already set, if flag is YES, Apache Kafka will be redeployed and all topics will be lost.
export KFK_REDEPLOY=${KFK_REDEPLOY:-""}
########################################################################################################################
# Automated steps start here
########################################################################################################################
# Constants
TMP_FOLDER="./tmp"
KFK_MANIFESTS_PATH="manifests/kafka"
KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml"
KFK_MANIFEST="02-kafka.yaml"
# Constants
TMP_FOLDER="./tmp"
KFK_MANIFESTS_PATH="manifests/kafka"
KFK_ZOOKEEPER_MANIFEST="01-zookeeper.yaml"
KFK_MANIFEST="02-kafka.yaml"
# Create a tmp folder for files modified during the deployment
TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests"
mkdir -p ${TMP_MANIFESTS_FOLDER}
# Create a tmp folder for files modified during the deployment
TMP_MANIFESTS_FOLDER="${TMP_FOLDER}/${KFK_NAMESPACE}/manifests"
mkdir -p ${TMP_MANIFESTS_FOLDER}
function kafka_deploy() {
# copy zookeeper and kafka manifest files to temporary manifest location
cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
# copy zookeeper and kafka manifest files to temporary manifest location
cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
# echo "Apache Kafka Namespace"
echo ">>> Delete Apache Kafka Namespace"
kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found
echo "Apache Kafka Namespace"
echo ">>> Delete Apache Kafka Namespace"
kubectl delete namespace ${KFK_NAMESPACE} --ignore-not-found
echo ">>> Create Apache Kafka Namespace"
kubectl create namespace ${KFK_NAMESPACE}
echo ">>> Create Apache Kafka Namespace"
kubectl create namespace ${KFK_NAMESPACE}
# echo ">>> Deplying Apache Kafka Zookeeper"
# Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
echo ">>> Deplying Apache Kafka Zookeeper"
# Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
# Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Verifing Apache Kafka deployment"
sleep 5
# KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods)
# if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then
# echo "Deployment Error: \n $KFK_PODS_STATUS"
# else
# echo "$KFK_PODS_STATUS"
# fi
}
echo ">>> Verifing Apache Kafka deployment"
sleep 10
KFK_PODS_STATUS=$(kubectl --namespace ${KFK_NAMESPACE} get pods)
if echo "$KFK_PODS_STATUS" | grep -qEv 'STATUS|Running'; then
echo "Deployment Error: \n $KFK_PODS_STATUS"
echo "Apache Kafka"
echo ">>> Checking if Apache Kafka is deployed ... "
if [ "$KFK_REDEPLOY" == "YES" ]; then
kafka_deploy
elif kubectl get --namespace ${KFK_NAMESPACE} deployments.apps &> /dev/null; then
echo ">>> Apache Kafka already present; skipping step."
else
echo "$KFK_PODS_STATUS"
fi
\ No newline at end of file
kafka_deploy
fi
echo
......@@ -115,6 +115,17 @@ export PROM_EXT_PORT_HTTP=${PROM_EXT_PORT_HTTP:-"9090"}
export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"}
# ----- Apache Kafka ------------------------------------------------------
# If not already set, set the namespace where Apache Kafka will be deployed.
export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"}
# If not already set, set the port Apache Kafka server will be exposed to.
export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"}
# If not already set, if flag is YES, Apache Kafka will be redeployed and topic will be lost.
export KFK_REDEPLOY=${KFK_REDEPLOY:-""}
########################################################################################################################
# Automated steps start here
########################################################################################################################
......@@ -147,7 +158,7 @@ kubectl create secret generic crdb-data --namespace ${TFS_K8S_NAMESPACE} --type=
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with CockroachDB data for KPI Management"
echo "Create secret with CockroachDB data for KPI Management microservices"
CRDB_SQL_PORT=$(kubectl --namespace ${CRDB_NAMESPACE} get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
CRDB_DATABASE_KPI_MGMT="tfs_kpi_mgmt" # TODO: change by specific configurable environment variable
kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
......@@ -159,6 +170,13 @@ kubectl create secret generic crdb-kpi-data --namespace ${TFS_K8S_NAMESPACE} --t
--from-literal=CRDB_SSLMODE=require
printf "\n"
echo "Create secret with Apache Kafka kfk-kpi-data for KPI and Telemetry microservices"
KFK_SERVER_PORT=$(kubectl --namespace ${KFK_NAMESPACE} get service kafka-service -o 'jsonpath={.spec.ports[0].port}')
kubectl create secret generic kfk-kpi-data --namespace ${TFS_K8S_NAMESPACE} --type='Opaque' \
--from-literal=KFK_NAMESPACE=${KFK_NAMESPACE} \
--from-literal=KFK_SERVER_PORT=${KFK_SERVER_PORT}
printf "\n"
echo "Create secret with NATS data"
NATS_CLIENT_PORT=$(kubectl --namespace ${NATS_NAMESPACE} get service ${NATS_NAMESPACE} -o 'jsonpath={.spec.ports[?(@.name=="client")].port}')
if [ -z "$NATS_CLIENT_PORT" ]; then
......
......@@ -39,6 +39,9 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30020"]
......
......@@ -39,6 +39,9 @@ spec:
env:
- name: LOG_LEVEL
value: "INFO"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:30030"]
......
......@@ -181,3 +181,8 @@ export GRAF_EXT_PORT_HTTP="3000"
# Set the namespace where Apache Kafka will be deployed.
export KFK_NAMESPACE="kafka"
# Set the port Apache Kafka server will be exposed to.
export KFK_SERVER_PORT="9092"
# Set the flag to YES for redeploying of Apache Kafka
export KFK_REDEPLOY=""
......@@ -16,14 +16,20 @@ import logging
from enum import Enum
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
class KafkaConfig(Enum):
# SERVER_IP = "127.0.0.1:9092"
SERVER_IP = "kafka-service.kafka.svc.cluster.local:9092"
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_IP})
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
KFK_NAMESPACE = 'kafka'
# KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
KFK_PORT = '9092'
# KFK_PORT = get_setting('KFK_SERVER_PORT')
# SERVER_ADDRESS = "127.0.0.1:9092"
SERVER_ADDRESS = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS })
class KafkaTopic(Enum):
REQUEST = 'topic_request'
......@@ -38,6 +44,7 @@ class KafkaTopic(Enum):
Method to create Kafka topics defined as class members
"""
all_topics = [member.value for member in KafkaTopic]
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.SERVER_ADDRESS.value))
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully")
return True
......
# How to locally run and test KPI manager micro-service
## --- File links need to be updated. ---
### Pre-requisets
The following requirements should be fulfilled before the execuation of KPI management service.
Ensure the following requirements are met before executing the KPI management service:
1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully.
2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully.
3. Verify the creation of required database and table.
[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and
[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment.
1. A virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/requirements.in) sucessfully installed.
2. Verify the creation of required database and table. The
[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_kpi_db.py) python file lists the functions to create tables and the database. The
[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/database/KpiEngine.py) file contains the DB string.
### Messages format templates
["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing.
The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_messages.py) python file contains templates for creating gRPC messages.
### Test file
["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment.
### Unit test file
The ["KPI manager test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_manager/tests/test_kpi_manager.py) python file lists various tests conducted to validate functionality.
### Flow of execution (Kpi Maanager Service functions)
1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table.
2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types.
2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor to the `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types.
3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB.
3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from the DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from the DB.
4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types.
## For KPI composer and KPI writer
The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in).
4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types.
......@@ -34,14 +34,15 @@ class KpiDB:
def create_database(self) -> None:
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
# TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables.
try:
KpiModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name))
......@@ -69,8 +70,7 @@ class KpiDB:
session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
......@@ -89,7 +89,6 @@ class KpiDB:
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
......
......@@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_Tables():
LOGGER.info('>>> test_verify_Tables : START <<< ')
kpiDBobj = KpiDB()
kpiDBobj.drop_database()
kpiDBobj.verify_tables()
# kpiDBobj.drop_database()
# kpiDBobj.verify_tables()
kpiDBobj.create_database()
kpiDBobj.create_tables()
kpiDBobj.verify_tables()
......@@ -17,7 +17,7 @@ import os, pytest
import logging
from typing import Union
#from common.proto.context_pb2 import Empty
from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
......@@ -26,12 +26,6 @@ from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
from common.tools.service.GenericGrpcService import GenericGrpcService
#from context.client.ContextClient import ContextClient
# from device.service.driver_api.DriverFactory import DriverFactory
# from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
# from device.service.DeviceService import DeviceService
# from device.client.DeviceClient import DeviceClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a
from kpi_manager.service.KpiManagerService import KpiManagerService
......@@ -39,12 +33,6 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from kpi_manager.tests.test_messages import create_kpi_id_request
#from monitoring.service.NameMapping import NameMapping
#os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
#from device.service.drivers import DRIVERS
###########################
# Tests Setup
###########################
......@@ -55,8 +43,6 @@ KPIMANAGER_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # t
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT)
# METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME'){}
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
......@@ -70,84 +56,10 @@ class MockContextService(GenericGrpcService):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
# @pytest.fixture(scope='session')
# def context_service():
# LOGGER.info('Initializing MockContextService...')
# _service = MockContextService(MOCKSERVICE_PORT)
# _service.start()
# LOGGER.info('Yielding MockContextService...')
# yield _service
# LOGGER.info('Terminating MockContextService...')
# _service.context_servicer.msg_broker.terminate()
# _service.stop()
# LOGGER.info('Terminated MockContextService...')
# @pytest.fixture(scope='session')
# def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing ContextClient...')
# _client = ContextClient()
# LOGGER.info('Yielding ContextClient...')
# yield _client
# LOGGER.info('Closing ContextClient...')
# _client.close()
# LOGGER.info('Closed ContextClient...')
# @pytest.fixture(scope='session')
# def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceService...')
# driver_factory = DriverFactory(DRIVERS)
# driver_instance_cache = DriverInstanceCache(driver_factory)
# _service = DeviceService(driver_instance_cache)
# _service.start()
# # yield the server, when test finishes, execution will resume to stop it
# LOGGER.info('Yielding DeviceService...')
# yield _service
# LOGGER.info('Terminating DeviceService...')
# _service.stop()
# LOGGER.info('Terminated DeviceService...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def kpi_manager_service():
LOGGER.info('Initializing KpiManagerService...')
#name_mapping = NameMapping()
# _service = MonitoringService(name_mapping)
# _service = KpiManagerService(name_mapping)
_service = KpiManagerService()
_service.start()
......@@ -181,35 +93,28 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab
# Prepare Environment, should be the first test
##################################################
# # ERROR on this test ---
# def test_prepare_environment(
# context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument
# ):
# context_id = json_context_id(DEFAULT_CONTEXT_NAME)
# context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME)))
# context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id)))
###########################
# Tests Implementation of Kpi Manager
###########################
# ---------- 3rd Iteration Tests ----------------
# def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId)
def test_SetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
LOGGER.info("Response gRPC message object: {:}".format(response))
assert isinstance(response, KpiId)
# def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# # adding KPI
# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # deleting KPI
# del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# # select KPI
# kpi_manager_client.GetKpiDescriptor(response_id)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty)
def test_DeleteKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# adding KPI
response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# deleting KPI
del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# select KPI
kpi_manager_client.GetKpiDescriptor(response_id)
LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
assert isinstance(del_response, Empty)
def test_GetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
......@@ -225,77 +130,18 @@ def test_GetKpiDescriptor(kpi_manager_client):
assert isinstance(response, KpiDescriptor)
# def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# # adding KPI
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # select KPI(s)
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList)
# def test_set_list_of_KPIs(kpi_manager_client):
# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# # adding KPI
# for kpi in KPIs_TO_SEARCH:
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
# ---------- 2nd Iteration Tests -----------------
# def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# with open("kpi_manager/tests/KPI_configs.json", 'r') as file:
# data = json.load(file)
# _descriptors = data.get('KPIs', [])
# for _descritor_name in _descriptors:
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(_descritor_name))
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId)
# def test_GetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptor)
# def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# del_response = kpi_manager_client.DeleteKpiDescriptor(response)
# kpi_manager_client.GetKpiDescriptor(response)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty)
# def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a())
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request_a())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList)
# ------------- INITIAL TESTs ----------------
# Test case that makes use of client fixture to test server's CreateKpi method
# def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# # make call to server
# LOGGER.warning('test_create_kpi requesting')
# for i in range(3):
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request(str(i+1)))
# LOGGER.debug(str(response))
# assert isinstance(response, KpiId)
# # Test case that makes use of client fixture to test server's DeleteKpi method
# def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# # make call to server
# LOGGER.warning('delete_kpi requesting')
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request('4'))
# response = kpi_manager_client.DeleteKpiDescriptor(response)
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
def test_SelectKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# adding KPI
kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# select KPI(s)
response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
LOGGER.info("Response gRPC message object: {:}".format(response))
assert isinstance(response, KpiDescriptorList)
# # Test case that makes use of client fixture to test server's GetKpiDescriptor method
# def test_select_kpi_descriptor(kpi_manager_client): # pylint: disable=redefined-outer-name
# LOGGER.warning('test_selectkpidescritor begin')
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
# LOGGER.debug(str(response))
# assert isinstance(response, KpiDescriptorList)
def test_set_list_of_KPIs(kpi_manager_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# adding KPI
for kpi in KPIs_TO_SEARCH:
kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
......@@ -63,6 +63,8 @@ RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/kpi_value_api/. kpi_value_api/
COPY src/kpi_manager/__init__.py kpi_manager/__init__.py
COPY src/kpi_manager/client/. kpi_manager/client/
# Start the service
ENTRYPOINT ["python", "-m", "kpi_value_api.service"]
# How to locally run and test KPI Value API micro-service
### Pre-requisets
Ensure the following requirements are met before executing the KPI Value API service.
1. The KPI Manger service is running and Apache Kafka is running.
2. A virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/requirements.in) file sucessfully installed.
3. Call the ["create_all_topics()"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) function to verify the existence of all required topics on kafka.
### Messages format templates
The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/tests/messages.py) python file contains templates for creating gRPC messages.
### Unit test file
The ["KPI Value API test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/tests/test_kpi_value_api.py) python file enlist various tests conducted to validate functionality.
### Flow of execution (Kpi Maanager Service functions)
1. Call the `create_new_topic_if_not_exists(<list of string>)` method to create any new topics if needed.
2. Call `StoreKpiValues(KpiValueList)` to produce `Kpi Value` on a Kafka Topic. (The `KpiValueWriter` microservice will consume and process the `Kpi Value`)
3. Call `SelectKpiValues(KpiValueFilter) -> KpiValueList` to read metric from the Prometheus DB.
......@@ -14,3 +14,4 @@
confluent-kafka==2.3.*
requests==2.27.*
prometheus-api-client==0.5.3
\ No newline at end of file
......@@ -12,22 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc, requests
import logging, grpc
from typing import Tuple, Any
from datetime import datetime
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from confluent_kafka import Producer as KafkaProducer
from prometheus_api_client import PrometheusConnect
from prometheus_api_client.utils import parse_datetime
from kpi_manager.client.KpiManagerClient import KpiManagerClient
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC')
PROM_URL = "http://localhost:9090"
PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TODO: updated with the env variables
class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def __init__(self):
......@@ -38,7 +43,7 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
) -> Empty:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
producer_obj = KafkaProducer({
'bootstrap.servers' : KafkaConfig.SERVER_IP.value
'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value
})
for kpi_value in request.kpi_value_list:
kpi_value_to_produce : Tuple [str, Any, Any] = (
......@@ -63,40 +68,68 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
) -> KpiValueList:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
response = KpiValueList()
metrics = [kpi.kpi_id for kpi in request.kpi_id]
start_timestamps = [timestamp for timestamp in request.start_timestamp]
end_timestamps = [timestamp for timestamp in request.end_timestamp]
results = []
kpi_manager_client = KpiManagerClient()
prom_connect = PrometheusConnect(url=PROM_URL)
for start, end in zip(start_timestamps, end_timestamps):
start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z"
end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z"
metrics = [self.GetKpiSampleType(kpi, kpi_manager_client) for kpi in request.kpi_id]
start_timestamps = [parse_datetime(timestamp) for timestamp in request.start_timestamp]
end_timestamps = [parse_datetime(timestamp) for timestamp in request.end_timestamp]
prom_response = []
for start_time, end_time in zip(start_timestamps, end_timestamps):
for metric in metrics:
url = f'{PROM_URL}/api/v1/query_range'
params = {
'query': metric,
'start': start_str,
'end' : end_str,
'step' : '30s' # or any other step you need
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
for result in data['data']['result']:
for value in result['values']:
kpi_value = KpiValue(
kpi_id=metric,
timestamp=str(seconds=value[0]),
kpi_value_type=self._convert_value_to_kpi_value_type(value[1])
)
results.append(kpi_value)
print(start_time, end_time, metric)
LOGGER.debug(">>> Query: {:}".format(metric))
prom_response.append(
prom_connect.custom_query_range(
query = metric, # this is the metric name and label config
start_time = start_time,
end_time = end_time,
step = 30, # or any other step value (missing in gRPC Filter request)
)
)
for single_resposne in prom_response:
# print ("{:}".format(single_resposne))
for record in single_resposne:
# print("Record >>> kpi: {:} >>> time & values set: {:}".format(record['metric']['__name__'], record['values']))
for value in record['values']:
# print("{:} - {:}".format(record['metric']['__name__'], value))
kpi_value = KpiValue()
kpi_value.kpi_id.kpi_id = record['metric']['__name__'],
kpi_value.timestamp = value[0],
kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1])
response.kpi_value_list.append(kpi_value)
return response
def GetKpiSampleType(self, kpi_value: str, kpi_manager_client):
print("--- START -----")
def _convert_value_to_kpi_value_type(self, value):
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
# print("KpiId generated: {:}".format(kpi_id))
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
# TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
return KpiSampleType.Name(kpi_descriptor_object.kpi_sample_type) # extract and return the name of KpiSampleType
else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
except Exception as e:
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))
def ConverValueToKpiValueType(self, value):
# Check if the value is an integer (int64)
try:
int64_value = int(value)
return KpiValueType(int64Val=int64_value)
int_value = int(value)
return KpiValueType(int64Val=int_value)
except ValueError:
pass
# Check if the value is a float
......@@ -112,7 +145,6 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
# If none of the above, treat it as a string
return KpiValueType(stringVal=value)
def delivery_callback(self, err, msg):
if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
......@@ -18,8 +18,9 @@ from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList
def create_kpi_value_list():
_create_kpi_value_list = KpiValueList()
# To run this experiment sucessfully, already existing UUID in KPI DB in necessary.
# because the UUID is used to get the descriptor form KPI DB.
# To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB.
# This UUID is used to get the descriptor form the KPI DB. If the Kpi ID does not exists,
# some part of the code won't execute.
EXISTING_KPI_IDs = ["725ce3ad-ac67-4373-bd35-8cd9d6a86e09",
str(uuid.uuid4()),
str(uuid.uuid4())]
......
# How to locally run and test KPI manager micro-service
# How to locally run and test the KPI Value Writer micro-service
## --- File links need to be updated. ---
### Pre-requisets
The following requirements should be fulfilled before the execuation of KPI management service.
Ensure the following requirements are meet before executing the KPI Value Writer service>
1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully.
2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully.
3. Verify the creation of required database and table.
[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and
[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment.
1. The KPI Manger and KPI Value API services are running and Apache Kafka is running.
### Messages format templates
["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing.
### Test file
["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment.
### Flow of execution (Kpi Maanager Service functions)
1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table.
2. A Virtual enviornment exist with all the required packages listed in the ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/requirements.in) file installed sucessfully.
2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types.
3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB.
### Messages format templates
The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/tests/test_messages.py) python file contains the templates to create gRPC messages.
4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types.
### Unit test file
The ["KPI Value API test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/tests/test_kpi_value_writer.py) python file enlist various tests conducted to validate functionality.
## For KPI composer and KPI writer
The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in).
\ No newline at end of file
### Flow of execution
1. Call the `RunKafkaConsumer` method from the `KpiValueWriter` class to start consuming the `KPI Value` generated by the `KPI Value API` or `Telemetry`. For every valid `KPI Value` consumer from Kafka, it invokes the `PrometheusWriter` class to prepare and push the metric to the Promethues DB.
......@@ -33,7 +33,6 @@ from .MetricWriterToPrometheus import MetricWriterToPrometheus
LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = []
METRIC_WRITER = MetricWriterToPrometheus()
class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None:
......@@ -48,12 +47,14 @@ class KpiValueWriter(GenericGrpcService):
@staticmethod
def KafkaConsumer():
kpi_manager_client = KpiManagerClient()
metric_writer = MetricWriterToPrometheus()
kafka_consumer = KafkaConsumer(
{ 'bootstrap.servers' : KafkaConfig.SERVER_IP.value,
{ 'bootstrap.servers' : KafkaConfig.SERVER_ADDRESS.value,
'group.id' : __class__,
'auto.offset.reset' : 'latest'}
)
kpi_manager_client = KpiManagerClient()
kafka_consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
......@@ -72,13 +73,13 @@ class KpiValueWriter(GenericGrpcService):
kpi_value.ParseFromString(raw_kpi.value())
LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value))
KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client)
KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
except Exception as e:
print("Error detail: {:}".format(e))
continue
@staticmethod
def get_kpi_descriptor(kpi_value: str, kpi_manager_client ):
def get_kpi_descriptor(kpi_value: str, kpi_manager_client, metric_writer):
print("--- START -----")
kpi_id = KpiId()
......@@ -89,12 +90,11 @@ class KpiValueWriter(GenericGrpcService):
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
# TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
# print("kpi descriptor received: {:}".format(kpi_descriptor_object))
# if isinstance (kpi_descriptor_object, KpiDescriptor):
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
METRIC_WRITER.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
......
......@@ -63,8 +63,8 @@ class MetricWriterToPrometheus:
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
# merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} # extracted values will be used as metric tag
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude]
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags
metric_name = cooked_kpi['kpi_sample_type']
try:
if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists
......@@ -93,4 +93,5 @@ class MetricWriterToPrometheus:
print("Metric {:} is already registered. Skipping.".format(metric_name))
else:
LOGGER.error("Error while pushing metric: {}".format(e))
raise
\ No newline at end of file
raise
......@@ -14,31 +14,12 @@
import logging
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from common.tools.kafka.Variables import KafkaTopic
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from common.proto.kpi_manager_pb2 import KpiDescriptor
from kpi_value_writer.tests.test_messages import create_kpi_id_request
LOGGER = logging.getLogger(__name__)
# def test_GetKpiDescriptor():
# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# kpi_manager_client = KpiManagerClient()
# # adding KPI
# LOGGER.info(" --->>> calling SetKpiDescriptor ")
# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # get KPI
# LOGGER.info(" --->>> calling GetKpiDescriptor with response ID")
# response = kpi_manager_client.GetKpiDescriptor(response_id)
# LOGGER.info("Response gRPC message object: {:}".format(response))
# LOGGER.info(" --->>> calling GetKpiDescriptor with random ID")
# rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
# LOGGER.info("Response gRPC message object: {:}".format(rand_response))
# LOGGER.info("\n------------------ TEST FINISHED ---------------------\n")
# assert isinstance(response, KpiDescriptor)
LOGGER = logging.getLogger(__name__)
# -------- Initial Test ----------------
def test_validate_kafka_topics():
......@@ -49,4 +30,3 @@ def test_validate_kafka_topics():
def test_KafkaConsumer():
LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
KpiValueWriter.RunKafkaConsumer()