Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 705 additions and 522 deletions
...@@ -16,8 +16,6 @@ import logging, sqlalchemy ...@@ -16,8 +16,6 @@ import logging, sqlalchemy
from common.Settings import get_setting from common.Settings import get_setting
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}'
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class KpiEngine: class KpiEngine:
...@@ -33,12 +31,10 @@ class KpiEngine: ...@@ -33,12 +31,10 @@ class KpiEngine:
CRDB_SSLMODE = get_setting('CRDB_SSLMODE') CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format( crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
# crdb_uri = CRDB_URI_TEMPLATE.format(
# CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try: try:
engine = sqlalchemy.create_engine(crdb_uri, echo=False) engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri)) LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri))
except: # pylint: disable=bare-except # pragma: no cover except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore return None # type: ignore
return engine return engine
...@@ -34,14 +34,15 @@ class KpiDB: ...@@ -34,14 +34,15 @@ class KpiDB:
def create_database(self) -> None: def create_database(self) -> None:
if not sqlalchemy_utils.database_exists(self.db_engine.url): if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url) sqlalchemy_utils.create_database(self.db_engine.url)
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
def drop_database(self) -> None: def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url): if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url) sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self): def create_tables(self):
# TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables.
try: try:
KpiModel.metadata.create_all(self.db_engine) # type: ignore KpiModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name)) LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name))
...@@ -69,8 +70,7 @@ class KpiDB: ...@@ -69,8 +70,7 @@ class KpiDB:
session.rollback() session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e): if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row, raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] )
extra_details=["Unique key voilation: {:}".format(e)] )
else: else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
...@@ -89,7 +89,6 @@ class KpiDB: ...@@ -89,7 +89,6 @@ class KpiDB:
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None return None
except Exception as e: except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally: finally:
......
...@@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__) ...@@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_Tables(): def test_verify_databases_and_Tables():
LOGGER.info('>>> test_verify_Tables : START <<< ') LOGGER.info('>>> test_verify_Tables : START <<< ')
kpiDBobj = KpiDB() kpiDBobj = KpiDB()
kpiDBobj.drop_database() # kpiDBobj.drop_database()
kpiDBobj.verify_tables() # kpiDBobj.verify_tables()
kpiDBobj.create_database() kpiDBobj.create_database()
kpiDBobj.create_tables() kpiDBobj.create_tables()
kpiDBobj.verify_tables() kpiDBobj.verify_tables()
...@@ -17,7 +17,7 @@ import os, pytest ...@@ -17,7 +17,7 @@ import os, pytest
import logging import logging
from typing import Union from typing import Union
#from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import ( from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
...@@ -26,12 +26,6 @@ from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server ...@@ -26,12 +26,6 @@ from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
from common.tools.service.GenericGrpcService import GenericGrpcService from common.tools.service.GenericGrpcService import GenericGrpcService
#from context.client.ContextClient import ContextClient
# from device.service.driver_api.DriverFactory import DriverFactory
# from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
# from device.service.DeviceService import DeviceService
# from device.client.DeviceClient import DeviceClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a
from kpi_manager.service.KpiManagerService import KpiManagerService from kpi_manager.service.KpiManagerService import KpiManagerService
...@@ -39,12 +33,6 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient ...@@ -39,12 +33,6 @@ from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from kpi_manager.tests.test_messages import create_kpi_id_request from kpi_manager.tests.test_messages import create_kpi_id_request
#from monitoring.service.NameMapping import NameMapping
#os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
#from device.service.drivers import DRIVERS
########################### ###########################
# Tests Setup # Tests Setup
########################### ###########################
...@@ -55,8 +43,6 @@ KPIMANAGER_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # t ...@@ -55,8 +43,6 @@ KPIMANAGER_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # t
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT) os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT)
# METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME'){}
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService): class MockContextService(GenericGrpcService):
...@@ -70,84 +56,10 @@ class MockContextService(GenericGrpcService): ...@@ -70,84 +56,10 @@ class MockContextService(GenericGrpcService):
self.context_servicer = MockServicerImpl_Context() self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server) add_ContextServiceServicer_to_server(self.context_servicer, self.server)
# @pytest.fixture(scope='session')
# def context_service():
# LOGGER.info('Initializing MockContextService...')
# _service = MockContextService(MOCKSERVICE_PORT)
# _service.start()
# LOGGER.info('Yielding MockContextService...')
# yield _service
# LOGGER.info('Terminating MockContextService...')
# _service.context_servicer.msg_broker.terminate()
# _service.stop()
# LOGGER.info('Terminated MockContextService...')
# @pytest.fixture(scope='session')
# def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing ContextClient...')
# _client = ContextClient()
# LOGGER.info('Yielding ContextClient...')
# yield _client
# LOGGER.info('Closing ContextClient...')
# _client.close()
# LOGGER.info('Closed ContextClient...')
# @pytest.fixture(scope='session')
# def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceService...')
# driver_factory = DriverFactory(DRIVERS)
# driver_instance_cache = DriverInstanceCache(driver_factory)
# _service = DeviceService(driver_instance_cache)
# _service.start()
# # yield the server, when test finishes, execution will resume to stop it
# LOGGER.info('Yielding DeviceService...')
# yield _service
# LOGGER.info('Terminating DeviceService...')
# _service.stop()
# LOGGER.info('Terminated DeviceService...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# This fixture will be requested by test cases and last during testing session # This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def kpi_manager_service(): def kpi_manager_service():
LOGGER.info('Initializing KpiManagerService...') LOGGER.info('Initializing KpiManagerService...')
#name_mapping = NameMapping()
# _service = MonitoringService(name_mapping)
# _service = KpiManagerService(name_mapping)
_service = KpiManagerService() _service = KpiManagerService()
_service.start() _service.start()
...@@ -181,35 +93,28 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab ...@@ -181,35 +93,28 @@ def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disab
# Prepare Environment, should be the first test # Prepare Environment, should be the first test
################################################## ##################################################
# # ERROR on this test ---
# def test_prepare_environment(
# context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument
# ):
# context_id = json_context_id(DEFAULT_CONTEXT_NAME)
# context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME)))
# context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id)))
########################### ###########################
# Tests Implementation of Kpi Manager # Tests Implementation of Kpi Manager
########################### ###########################
# ---------- 3rd Iteration Tests ---------------- # ---------- 3rd Iteration Tests ----------------
# def test_SetKpiDescriptor(kpi_manager_client): def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ") LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# LOGGER.info("Response gRPC message object: {:}".format(response)) LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId) assert isinstance(response, KpiId)
# def test_DeleteKpiDescriptor(kpi_manager_client): def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# # adding KPI # adding KPI
# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # deleting KPI # deleting KPI
# del_response = kpi_manager_client.DeleteKpiDescriptor(response_id) del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# # select KPI # select KPI
# kpi_manager_client.GetKpiDescriptor(response_id) kpi_manager_client.GetKpiDescriptor(response_id)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty) assert isinstance(del_response, Empty)
def test_GetKpiDescriptor(kpi_manager_client): def test_GetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
...@@ -225,77 +130,18 @@ def test_GetKpiDescriptor(kpi_manager_client): ...@@ -225,77 +130,18 @@ def test_GetKpiDescriptor(kpi_manager_client):
assert isinstance(response, KpiDescriptor) assert isinstance(response, KpiDescriptor)
# def test_SelectKpiDescriptor(kpi_manager_client): def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ") LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# # adding KPI # adding KPI
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # select KPI(s) # select KPI(s)
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
# LOGGER.info("Response gRPC message object: {:}".format(response)) LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList) assert isinstance(response, KpiDescriptorList)
# def test_set_list_of_KPIs(kpi_manager_client):
# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# # adding KPI
# for kpi in KPIs_TO_SEARCH:
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
# ---------- 2nd Iteration Tests -----------------
# def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# with open("kpi_manager/tests/KPI_configs.json", 'r') as file:
# data = json.load(file)
# _descriptors = data.get('KPIs', [])
# for _descritor_name in _descriptors:
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(_descritor_name))
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId)
# def test_GetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptor)
# def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# del_response = kpi_manager_client.DeleteKpiDescriptor(response)
# kpi_manager_client.GetKpiDescriptor(response)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty)
# def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a())
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request_a())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList)
# ------------- INITIAL TESTs ----------------
# Test case that makes use of client fixture to test server's CreateKpi method
# def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# # make call to server
# LOGGER.warning('test_create_kpi requesting')
# for i in range(3):
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request(str(i+1)))
# LOGGER.debug(str(response))
# assert isinstance(response, KpiId)
# # Test case that makes use of client fixture to test server's DeleteKpi method
# def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# # make call to server
# LOGGER.warning('delete_kpi requesting')
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request('4'))
# response = kpi_manager_client.DeleteKpiDescriptor(response)
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# # Test case that makes use of client fixture to test server's GetKpiDescriptor method def test_set_list_of_KPIs(kpi_manager_client):
# def test_select_kpi_descriptor(kpi_manager_client): # pylint: disable=redefined-outer-name LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# LOGGER.warning('test_selectkpidescritor begin') KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request()) # adding KPI
# LOGGER.debug(str(response)) for kpi in KPIs_TO_SEARCH:
# assert isinstance(response, KpiDescriptorList) kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
...@@ -50,10 +50,30 @@ unit_test kpi-value-api: ...@@ -50,10 +50,30 @@ unit_test kpi-value-api:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
- docker container prune -f - docker container prune -f
script: script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 30020:30020 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 20 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
- >
docker run --name $IMAGE_NAME -d -p 30020:30020
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5 - sleep 5
- docker ps -a - docker ps -a
- docker logs $IMAGE_NAME - docker logs $IMAGE_NAME
...@@ -74,7 +94,7 @@ unit_test kpi-value-api: ...@@ -74,7 +94,7 @@ unit_test kpi-value-api:
- src/$IMAGE_NAME/**/*.{py,in,yml} - src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile - src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py - src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile # - src/$IMAGE_NAME/tests/Dockerfile # mayne not needed
- manifests/${IMAGE_NAME}service.yaml - manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml - .gitlab-ci.yml
artifacts: artifacts:
......
...@@ -63,6 +63,8 @@ RUN python3 -m pip install -r requirements.txt ...@@ -63,6 +63,8 @@ RUN python3 -m pip install -r requirements.txt
# Add component files into working directory # Add component files into working directory
WORKDIR /var/teraflow WORKDIR /var/teraflow
COPY src/kpi_value_api/. kpi_value_api/ COPY src/kpi_value_api/. kpi_value_api/
COPY src/kpi_manager/__init__.py kpi_manager/__init__.py
COPY src/kpi_manager/client/. kpi_manager/client/
# Start the service # Start the service
ENTRYPOINT ["python", "-m", "kpi_value_api.service"] ENTRYPOINT ["python", "-m", "kpi_value_api.service"]
# How to locally run and test KPI Value API micro-service
### Pre-requisets
Ensure the following requirements are met before executing the KPI Value API service.
1. The KPI Manger service is running and Apache Kafka is running.
2. A virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/requirements.in) file sucessfully installed.
3. Call the ["create_all_topics()"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/common/tools/kafka/Variables.py) function to verify the existence of all required topics on kafka.
### Messages format templates
The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/tests/messages.py) python file contains templates for creating gRPC messages.
### Unit test file
The ["KPI Value API test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_api/tests/test_kpi_value_api.py) python file enlist various tests conducted to validate functionality.
### Flow of execution (Kpi Maanager Service functions)
1. Call the `create_new_topic_if_not_exists(<list of string>)` method to create any new topics if needed.
2. Call `StoreKpiValues(KpiValueList)` to produce `Kpi Value` on a Kafka Topic. (The `KpiValueWriter` microservice will consume and process the `Kpi Value`)
3. Call `SelectKpiValues(KpiValueFilter) -> KpiValueList` to read metric from the Prometheus DB.
...@@ -14,3 +14,4 @@ ...@@ -14,3 +14,4 @@
confluent-kafka==2.3.* confluent-kafka==2.3.*
requests==2.27.* requests==2.27.*
prometheus-api-client==0.5.3
\ No newline at end of file
...@@ -12,98 +12,141 @@ ...@@ -12,98 +12,141 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, grpc, requests import logging, grpc, json
from typing import Tuple, Any from typing import Dict
from datetime import datetime
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Producer as KafkaProducer
from prometheus_api_client import PrometheusConnect
from prometheus_api_client.utils import parse_datetime
from kpi_manager.client.KpiManagerClient import KpiManagerClient
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC') METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC')
PROM_URL = "http://localhost:9090" PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TODO: updated with the env variables
class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def __init__(self): def __init__(self):
LOGGER.debug('Init KpiValueApiService') LOGGER.debug('Init KpiValueApiService')
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
) -> Empty: ) -> Empty:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
producer_obj = KafkaProducer({
'bootstrap.servers' : KafkaConfig.SERVER_IP.value producer = self.kafka_producer
})
for kpi_value in request.kpi_value_list: for kpi_value in request.kpi_value_list:
kpi_value_to_produce : Tuple [str, Any, Any] = ( kpi_value_to_produce : Dict = {
kpi_value.kpi_id.kpi_id, "kpi_uuid" : kpi_value.kpi_id.kpi_id.uuid,
kpi_value.timestamp, "timestamp" : kpi_value.timestamp.timestamp,
kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how? "kpi_value_type" : self.ExtractKpiValueByType(kpi_value.kpi_value_type)
) }
LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce)) LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce))
msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used
producer_obj.produce( producer.produce(
KafkaTopic.VALUE.value, KafkaTopic.VALUE.value,
key = msg_key, key = msg_key,
value = kpi_value.SerializeToString(), # value = json.dumps(kpi_value_to_produce), value = json.dumps(kpi_value_to_produce),
callback = self.delivery_callback callback = self.delivery_callback
) )
producer_obj.flush() producer.flush()
return Empty() return Empty()
def ExtractKpiValueByType(self, value):
attributes = [ 'floatVal' , 'int32Val' , 'uint32Val','int64Val',
'uint64Val', 'stringVal', 'boolVal']
for attr in attributes:
try:
return getattr(value, attr)
except (ValueError, TypeError, AttributeError):
continue
return None
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext
) -> KpiValueList: ) -> KpiValueList:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
response = KpiValueList() response = KpiValueList()
metrics = [kpi.kpi_id for kpi in request.kpi_id]
start_timestamps = [timestamp for timestamp in request.start_timestamp] kpi_manager_client = KpiManagerClient()
end_timestamps = [timestamp for timestamp in request.end_timestamp] prom_connect = PrometheusConnect(url=PROM_URL)
results = []
for start, end in zip(start_timestamps, end_timestamps): metrics = [self.GetKpiSampleType(kpi, kpi_manager_client) for kpi in request.kpi_id]
start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z" start_timestamps = [parse_datetime(timestamp) for timestamp in request.start_timestamp]
end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z" end_timestamps = [parse_datetime(timestamp) for timestamp in request.end_timestamp]
prom_response = []
for start_time, end_time in zip(start_timestamps, end_timestamps):
for metric in metrics: for metric in metrics:
url = f'{PROM_URL}/api/v1/query_range' print(start_time, end_time, metric)
params = { LOGGER.debug(">>> Query: {:}".format(metric))
'query': metric, prom_response.append(
'start': start_str, prom_connect.custom_query_range(
'end' : end_str, query = metric, # this is the metric name and label config
'step' : '30s' # or any other step you need start_time = start_time,
} end_time = end_time,
response = requests.get(url, params=params) step = 30, # or any other step value (missing in gRPC Filter request)
if response.status_code == 200: )
data = response.json() )
for result in data['data']['result']:
for value in result['values']: for single_resposne in prom_response:
kpi_value = KpiValue( # print ("{:}".format(single_resposne))
kpi_id=metric, for record in single_resposne:
timestamp=str(seconds=value[0]), # print("Record >>> kpi: {:} >>> time & values set: {:}".format(record['metric']['__name__'], record['values']))
kpi_value_type=self._convert_value_to_kpi_value_type(value[1]) for value in record['values']:
) # print("{:} - {:}".format(record['metric']['__name__'], value))
results.append(kpi_value) kpi_value = KpiValue()
kpi_value.kpi_id.kpi_id = record['metric']['__name__'],
def _convert_value_to_kpi_value_type(self, value): kpi_value.timestamp = value[0],
kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1])
response.kpi_value_list.append(kpi_value)
return response
def GetKpiSampleType(self, kpi_value: str, kpi_manager_client):
print("--- START -----")
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
# print("KpiId generated: {:}".format(kpi_id))
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
# TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
return KpiSampleType.Name(kpi_descriptor_object.kpi_sample_type) # extract and return the name of KpiSampleType
else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
except Exception as e:
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))
def ConverValueToKpiValueType(self, value):
# Check if the value is an integer (int64) # Check if the value is an integer (int64)
try: try:
int64_value = int(value) int_value = int(value)
return KpiValueType(int64Val=int64_value) return KpiValueType(int64Val=int_value)
except ValueError: except (ValueError, TypeError):
pass pass
# Check if the value is a float # Check if the value is a float
try: try:
float_value = float(value) float_value = float(value)
return KpiValueType(floatVal=float_value) return KpiValueType(floatVal=float_value)
except ValueError: except (ValueError, TypeError):
pass pass
# Check if the value is a boolean # Check if the value is a boolean
if value.lower() in ['true', 'false']: if value.lower() in ['true', 'false']:
...@@ -112,7 +155,6 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ...@@ -112,7 +155,6 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
# If none of the above, treat it as a string # If none of the above, treat it as a string
return KpiValueType(stringVal=value) return KpiValueType(stringVal=value)
def delivery_callback(self, err, msg): def delivery_callback(self, err, msg):
if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
...@@ -18,8 +18,9 @@ from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList ...@@ -18,8 +18,9 @@ from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList
def create_kpi_value_list(): def create_kpi_value_list():
_create_kpi_value_list = KpiValueList() _create_kpi_value_list = KpiValueList()
# To run this experiment sucessfully, already existing UUID in KPI DB in necessary. # To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB.
# because the UUID is used to get the descriptor form KPI DB. # This UUID is used to get the descriptor form the KPI DB. If the Kpi ID does not exists,
# some part of the code won't execute.
EXISTING_KPI_IDs = ["725ce3ad-ac67-4373-bd35-8cd9d6a86e09", EXISTING_KPI_IDs = ["725ce3ad-ac67-4373-bd35-8cd9d6a86e09",
str(uuid.uuid4()), str(uuid.uuid4()),
str(uuid.uuid4())] str(uuid.uuid4())]
......
...@@ -50,10 +50,30 @@ unit_test kpi-value-writer: ...@@ -50,10 +50,30 @@ unit_test kpi-value-writer:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
- docker container prune -f - docker container prune -f
script: script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 30030:30030 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 20 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
- >
docker run --name $IMAGE_NAME -d -p 30030:30030
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5 - sleep 5
- docker ps -a - docker ps -a
- docker logs $IMAGE_NAME - docker logs $IMAGE_NAME
...@@ -64,6 +84,8 @@ unit_test kpi-value-writer: ...@@ -64,6 +84,8 @@ unit_test kpi-value-writer:
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script: after_script:
- docker rm -f $IMAGE_NAME - docker rm -f $IMAGE_NAME
- docker rm -f zookeeper
- docker rm -f kafka
- docker network rm teraflowbridge - docker network rm teraflowbridge
rules: rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
......
# How to locally run and test KPI manager micro-service # How to locally run and test the KPI Value Writer micro-service
## --- File links need to be updated. ---
### Pre-requisets ### Pre-requisets
The following requirements should be fulfilled before the execuation of KPI management service. Ensure the following requirements are meet before executing the KPI Value Writer service>
1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully. 1. The KPI Manger and KPI Value API services are running and Apache Kafka is running.
2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully.
3. Verify the creation of required database and table.
[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and
[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment.
### Messages format templates 2. A Virtual enviornment exist with all the required packages listed in the ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/requirements.in) file installed sucessfully.
["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing.
### Test file
["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment.
### Flow of execution (Kpi Maanager Service functions)
1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table.
2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types. ### Messages format templates
The ["messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/tests/test_messages.py) python file contains the templates to create gRPC messages.
3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB.
4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types. ### Unit test file
The ["KPI Value API test"](https://labs.etsi.org/rep/tfs/controller/-/blob/develop/src/kpi_value_writer/tests/test_kpi_value_writer.py) python file enlist various tests conducted to validate functionality.
## For KPI composer and KPI writer ### Flow of execution
The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in). 1. Call the `RunKafkaConsumer` method from the `KpiValueWriter` class to start consuming the `KPI Value` generated by the `KPI Value API` or `Telemetry`. For every valid `KPI Value` consumer from Kafka, it invokes the `PrometheusWriter` class to prepare and push the metric to the Promethues DB.
\ No newline at end of file
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
import threading import threading
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
...@@ -33,32 +34,30 @@ from .MetricWriterToPrometheus import MetricWriterToPrometheus ...@@ -33,32 +34,30 @@ from .MetricWriterToPrometheus import MetricWriterToPrometheus
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = [] ACTIVE_CONSUMERS = []
METRIC_WRITER = MetricWriterToPrometheus()
class KpiValueWriter(GenericGrpcService): class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None: def __init__(self, cls_name : str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER) port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'KpiValueWriter',
'auto.offset.reset' : 'latest'})
@staticmethod def RunKafkaConsumer(self):
def RunKafkaConsumer(): thread = threading.Thread(target=self.KafkaKpiConsumer, args=())
thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=())
ACTIVE_CONSUMERS.append(thread) ACTIVE_CONSUMERS.append(thread)
thread.start() thread.start()
@staticmethod def KafkaKpiConsumer(self):
def KafkaConsumer():
kafka_consumer = KafkaConsumer(
{ 'bootstrap.servers' : KafkaConfig.SERVER_IP.value,
'group.id' : __class__,
'auto.offset.reset' : 'latest'}
)
kpi_manager_client = KpiManagerClient() kpi_manager_client = KpiManagerClient()
kafka_consumer.subscribe([KafkaTopic.VALUE.value]) metric_writer = MetricWriterToPrometheus()
consumer = self.kafka_consumer
consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
while True: while True:
raw_kpi = kafka_consumer.poll(1.0) raw_kpi = consumer.poll(1.0)
if raw_kpi is None: if raw_kpi is None:
continue continue
elif raw_kpi.error(): elif raw_kpi.error():
...@@ -68,33 +67,29 @@ class KpiValueWriter(GenericGrpcService): ...@@ -68,33 +67,29 @@ class KpiValueWriter(GenericGrpcService):
print("Consumer error: {}".format(raw_kpi.error())) print("Consumer error: {}".format(raw_kpi.error()))
continue continue
try: try:
kpi_value = KpiValue() kpi_value = json.loads(raw_kpi.value().decode('utf-8'))
kpi_value.ParseFromString(raw_kpi.value())
LOGGER.info("Received KPI : {:}".format(kpi_value)) LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value)) print("Received KPI : {:}".format(kpi_value))
KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client) self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer)
except Exception as e: except Exception as e:
print("Error detail: {:}".format(e)) print("Error detail: {:}".format(e))
continue continue
@staticmethod def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer):
def get_kpi_descriptor(kpi_value: str, kpi_manager_client ):
print("--- START -----") print("--- START -----")
kpi_id = KpiId() kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid kpi_id.kpi_id.uuid = kpi_value['kpi_uuid']
print("KpiId generated: {:}".format(kpi_id)) print("KpiId generated: {:}".format(kpi_id))
# print("Kpi manger client created: {:}".format(kpi_manager_client)) # print("Kpi manger client created: {:}".format(kpi_manager_client))
try: try:
kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
# TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
# print("kpi descriptor received: {:}".format(kpi_descriptor_object))
# if isinstance (kpi_descriptor_object, KpiDescriptor):
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
METRIC_WRITER.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
else: else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
......
...@@ -14,11 +14,9 @@ ...@@ -14,11 +14,9 @@
# read Kafka stream from Kafka topic # read Kafka stream from Kafka topic
import ast
import time
import threading
import logging import logging
from prometheus_client import start_http_server, Gauge, CollectorRegistry from typing import Dict
from prometheus_client import Gauge
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_value_api_pb2 import KpiValue
...@@ -26,7 +24,6 @@ from common.proto.kpi_manager_pb2 import KpiDescriptor ...@@ -26,7 +24,6 @@ from common.proto.kpi_manager_pb2 import KpiDescriptor
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
PROM_METRICS = {} PROM_METRICS = {}
PROM_REGISTERY = CollectorRegistry()
class MetricWriterToPrometheus: class MetricWriterToPrometheus:
''' '''
...@@ -34,13 +31,7 @@ class MetricWriterToPrometheus: ...@@ -34,13 +31,7 @@ class MetricWriterToPrometheus:
cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message) cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message)
''' '''
def __init__(self): def __init__(self):
# prometheus server address and configs
self.start_prometheus_client()
pass pass
def start_prometheus_client(self):
start_http_server(10808, registry=PROM_REGISTERY)
LOGGER.debug("Prometheus client is started on port 10808")
def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value): def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value):
# Creating a dictionary from the kpi_descriptor's attributes # Creating a dictionary from the kpi_descriptor's attributes
...@@ -54,25 +45,24 @@ class MetricWriterToPrometheus: ...@@ -54,25 +45,24 @@ class MetricWriterToPrometheus:
'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid,
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value.timestamp.timestamp, 'time_stamp' : kpi_value['timestamp'],
'kpi_value' : kpi_value.kpi_value_type.floatVal 'kpi_value' : kpi_value['kpi_value_type']
} }
# LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) # LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi return cooked_kpi
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: Dict):
# merge both gRPC messages into single varible. # merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} # extracted values will be used as metric tag tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags
metric_name = cooked_kpi['kpi_sample_type'] metric_name = cooked_kpi['kpi_sample_type']
try: try:
if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists
PROM_METRICS[metric_name] = Gauge ( PROM_METRICS[metric_name] = Gauge (
metric_name, metric_name,
cooked_kpi['kpi_description'], cooked_kpi['kpi_description'],
metric_tags, metric_tags
registry=PROM_REGISTERY
) )
LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) LOGGER.debug("Metric is created with labels: {:}".format(metric_tags))
PROM_METRICS[metric_name].labels( PROM_METRICS[metric_name].labels(
...@@ -84,7 +74,7 @@ class MetricWriterToPrometheus: ...@@ -84,7 +74,7 @@ class MetricWriterToPrometheus:
connection_id = cooked_kpi['connection_id'], connection_id = cooked_kpi['connection_id'],
link_id = cooked_kpi['link_id'], link_id = cooked_kpi['link_id'],
time_stamp = cooked_kpi['time_stamp'], time_stamp = cooked_kpi['time_stamp'],
).set(float(cooked_kpi['kpi_value'])) ).set(cooked_kpi['kpi_value'])
LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name]))
except ValueError as e: except ValueError as e:
...@@ -93,4 +83,5 @@ class MetricWriterToPrometheus: ...@@ -93,4 +83,5 @@ class MetricWriterToPrometheus:
print("Metric {:} is already registered. Skipping.".format(metric_name)) print("Metric {:} is already registered. Skipping.".format(metric_name))
else: else:
LOGGER.error("Error while pushing metric: {}".format(e)) LOGGER.error("Error while pushing metric: {}".format(e))
raise raise
\ No newline at end of file
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import logging, signal, sys, threading import logging, signal, sys, threading
from prometheus_client import start_http_server
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from common.Settings import get_log_level from common.Settings import get_log_level
...@@ -38,6 +39,8 @@ def main(): ...@@ -38,6 +39,8 @@ def main():
grpc_service = KpiValueWriter() grpc_service = KpiValueWriter()
grpc_service.start() grpc_service.start()
start_http_server(10808)
LOGGER.debug("Prometheus client is started on port 10808")
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass
......
...@@ -14,31 +14,12 @@ ...@@ -14,31 +14,12 @@
import logging import logging
from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from kpi_value_writer.service.KpiValueWriter import KpiValueWriter
from common.tools.kafka.Variables import KafkaTopic from common.tools.kafka.Variables import KafkaTopic
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from common.proto.kpi_manager_pb2 import KpiDescriptor
from kpi_value_writer.tests.test_messages import create_kpi_id_request
LOGGER = logging.getLogger(__name__)
# def test_GetKpiDescriptor():
# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# kpi_manager_client = KpiManagerClient()
# # adding KPI
# LOGGER.info(" --->>> calling SetKpiDescriptor ")
# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # get KPI
# LOGGER.info(" --->>> calling GetKpiDescriptor with response ID")
# response = kpi_manager_client.GetKpiDescriptor(response_id)
# LOGGER.info("Response gRPC message object: {:}".format(response))
# LOGGER.info(" --->>> calling GetKpiDescriptor with random ID")
# rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
# LOGGER.info("Response gRPC message object: {:}".format(rand_response))
# LOGGER.info("\n------------------ TEST FINISHED ---------------------\n") LOGGER = logging.getLogger(__name__)
# assert isinstance(response, KpiDescriptor)
# -------- Initial Test ---------------- # -------- Initial Test ----------------
def test_validate_kafka_topics(): def test_validate_kafka_topics():
...@@ -48,5 +29,5 @@ def test_validate_kafka_topics(): ...@@ -48,5 +29,5 @@ def test_validate_kafka_topics():
def test_KafkaConsumer(): def test_KafkaConsumer():
LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
KpiValueWriter.RunKafkaConsumer() kpi_value_writer = KpiValueWriter()
kpi_value_writer.RunKafkaConsumer()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker image to the GitLab Docker registry
build telemetry:
variables:
IMAGE_NAME: 'telemetry' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
# This first build tags the builder resulting image to prevent being removed by dangling image removal command
# - docker buildx build -t "${IMAGE_NAME}-backend:${IMAGE_TAG}-builder" --target builder -f ./src/$IMAGE_NAME/backend/Dockerfile .
- docker buildx build -t "${IMAGE_NAME}-frontend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/frontend/Dockerfile .
- docker buildx build -t "${IMAGE_NAME}-backend:$IMAGE_TAG" -f ./src/$IMAGE_NAME/backend/Dockerfile .
- docker tag "${IMAGE_NAME}-frontend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker tag "${IMAGE_NAME}-backend:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/.gitlab-ci.yml
- src/$IMAGE_NAME/frontend/**/*.{py,in,yml}
- src/$IMAGE_NAME/frontend/Dockerfile
- src/$IMAGE_NAME/frontend/tests/*.py
- src/$IMAGE_NAME/backend/Dockerfile
- src/$IMAGE_NAME/backend/**/*.{py,in,yml}
- src/$IMAGE_NAME/backend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test telemetry-backend:
variables:
IMAGE_NAME: 'telemetry' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build telemetry
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
# - if docker container ls | grep ${IMAGE_NAME}-frontend; then docker rm -f ${IMAGE_NAME}-frontend; else echo "${IMAGE_NAME}-frontend container is not in the system"; fi
- if docker container ls | grep ${IMAGE_NAME}-backend; then docker rm -f ${IMAGE_NAME}-backend; else echo "${IMAGE_NAME}-backend container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG"
- docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 20 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
- >
docker run --name $IMAGE_NAME-backend -d -p 30060:30060
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/backend/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/${IMAGE_NAME}-backend:$IMAGE_TAG
- docker ps -a
- sleep 5
- docker logs ${IMAGE_NAME}-backend
- >
docker exec -i ${IMAGE_NAME}-backend bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-backend_report.xml $IMAGE_NAME/backend/tests/test_*.py"
- docker exec -i ${IMAGE_NAME}-backend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker network rm teraflowbridge
- docker volume prune --force
- docker image prune --force
- docker rm -f ${IMAGE_NAME}-backend
- docker rm -f zookeeper
- docker rm -f kafka
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/backend/**/*.{py,in,yml}
- src/$IMAGE_NAME/backend/Dockerfile
- src/$IMAGE_NAME/backend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/backend/tests/${IMAGE_NAME}-backend_report.xml
# Apply unit test to the component
unit_test telemetry-frontend:
variables:
IMAGE_NAME: 'telemetry' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build telemetry
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep crdb; then docker rm -f crdb; else echo "CockroachDB container is not in the system"; fi
- if docker volume ls | grep crdb; then docker volume rm -f crdb; else echo "CockroachDB volume is not in the system"; fi
- if docker container ls | grep kafka; then docker rm -f kafka; else echo "Kafka container is not in the system"; fi
- if docker container ls | grep zookeeper; then docker rm -f zookeeper; else echo "Zookeeper container is not in the system"; fi
- if docker container ls | grep ${IMAGE_NAME}-frontend; then docker rm -f ${IMAGE_NAME}-frontend; else echo "${IMAGE_NAME}-frontend container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG"
- docker pull "bitnami/zookeeper:latest"
- docker pull "bitnami/kafka:latest"
- docker pull "cockroachdb/cockroach:latest-v22.2"
- docker volume create crdb
- >
docker run --name crdb -d --network=teraflowbridge -p 26257:26257 -p 8080:8080
--env COCKROACH_DATABASE=tfs_test --env COCKROACH_USER=tfs --env COCKROACH_PASSWORD=tfs123
--volume "crdb:/cockroach/cockroach-data"
cockroachdb/cockroach:latest-v22.2 start-single-node
- echo "Waiting for initialization..."
- while ! docker logs crdb 2>&1 | grep -q 'finished creating default user \"tfs\"'; do sleep 1; done
# - docker logs crdb
# - docker ps -a
- CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $CRDB_ADDRESS
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 \
-e ALLOW_ANONYMOUS_LOGIN=yes \
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 20 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
# - docker logs zookeeper
# - docker logs kafka
- >
docker run --name $IMAGE_NAME-frontend -d -p 30050:30050
--env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require"
--env "KFK_SERVER_ADDRESS=${KAFKA_IP}:9092"
--volume "$PWD/src/$IMAGE_NAME/frontend/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/${IMAGE_NAME}-frontend:$IMAGE_TAG
- docker ps -a
- sleep 5
- docker logs ${IMAGE_NAME}-frontend
- >
docker exec -i ${IMAGE_NAME}-frontend bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}-frontend_report.xml $IMAGE_NAME/frontend/tests/test_*.py"
- docker exec -i ${IMAGE_NAME}-frontend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker volume rm -f crdb
- docker network rm teraflowbridge
- docker volume prune --force
- docker image prune --force
- docker rm -f ${IMAGE_NAME}-frontend
- docker rm -f zookeeper
- docker rm -f kafka
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/frontend/**/*.{py,in,yml}
- src/$IMAGE_NAME/frontend/Dockerfile
- src/$IMAGE_NAME/frontend/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/frontend/tests/${IMAGE_NAME}-frontend_report.xml
\ No newline at end of file
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/telemetry/backend
WORKDIR /var/teraflow/telemetry/backend
COPY src/telemetry/backend/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/telemetry/__init__.py telemetry/__init__.py
COPY src/telemetry/backend/. telemetry/backend/
# Start the service
ENTRYPOINT ["python", "-m", "telemetry.backend.service"]
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
confluent-kafka==2.3.*