Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 1336 additions and 0 deletions
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, pytest
import logging
from typing import Union
#from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.tests.MockServicerImpl_Context import MockServicerImpl_Context
from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
from common.tools.service.GenericGrpcService import GenericGrpcService
#from context.client.ContextClient import ContextClient
# from device.service.driver_api.DriverFactory import DriverFactory
# from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
# from device.service.DeviceService import DeviceService
# from device.client.DeviceClient import DeviceClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request, create_kpi_filter_request, create_kpi_descriptor_request_a
from kpi_manager.service.KpiManagerService import KpiManagerService
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from kpi_manager.tests.test_messages import create_kpi_descriptor_request
from kpi_manager.tests.test_messages import create_kpi_id_request
#from monitoring.service.NameMapping import NameMapping
#os.environ['DEVICE_EMULATED_ONLY'] = 'TRUE'
#from device.service.drivers import DRIVERS
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
KPIMANAGER_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIMANAGER) # type: ignore
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIMANAGER_SERVICE_PORT)
# METRICSDB_HOSTNAME = os.environ.get('METRICSDB_HOSTNAME'){}
LOGGER = logging.getLogger(__name__)
class MockContextService(GenericGrpcService):
# Mock Service implementing Context to simplify unitary tests of Monitoring
def __init__(self, bind_port: Union[str, int]) -> None:
super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService')
# pylint: disable=attribute-defined-outside-init
def install_servicers(self):
self.context_servicer = MockServicerImpl_Context()
add_ContextServiceServicer_to_server(self.context_servicer, self.server)
# @pytest.fixture(scope='session')
# def context_service():
# LOGGER.info('Initializing MockContextService...')
# _service = MockContextService(MOCKSERVICE_PORT)
# _service.start()
# LOGGER.info('Yielding MockContextService...')
# yield _service
# LOGGER.info('Terminating MockContextService...')
# _service.context_servicer.msg_broker.terminate()
# _service.stop()
# LOGGER.info('Terminated MockContextService...')
# @pytest.fixture(scope='session')
# def context_client(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing ContextClient...')
# _client = ContextClient()
# LOGGER.info('Yielding ContextClient...')
# yield _client
# LOGGER.info('Closing ContextClient...')
# _client.close()
# LOGGER.info('Closed ContextClient...')
# @pytest.fixture(scope='session')
# def device_service(context_service : MockContextService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceService...')
# driver_factory = DriverFactory(DRIVERS)
# driver_instance_cache = DriverInstanceCache(driver_factory)
# _service = DeviceService(driver_instance_cache)
# _service.start()
# # yield the server, when test finishes, execution will resume to stop it
# LOGGER.info('Yielding DeviceService...')
# yield _service
# LOGGER.info('Terminating DeviceService...')
# _service.stop()
# LOGGER.info('Terminated DeviceService...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# @pytest.fixture(scope='session')
# def device_client(device_service : DeviceService): # pylint: disable=redefined-outer-name,unused-argument
# LOGGER.info('Initializing DeviceClient...')
# _client = DeviceClient()
# LOGGER.info('Yielding DeviceClient...')
# yield _client
# LOGGER.info('Closing DeviceClient...')
# _client.close()
# LOGGER.info('Closed DeviceClient...')
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def kpi_manager_service():
LOGGER.info('Initializing KpiManagerService...')
#name_mapping = NameMapping()
# _service = MonitoringService(name_mapping)
# _service = KpiManagerService(name_mapping)
_service = KpiManagerService()
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiManagerService...')
yield _service
LOGGER.info('Terminating KpiManagerService...')
_service.stop()
LOGGER.info('Terminated KpiManagerService...')
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
# def monitoring_client(monitoring_service : MonitoringService): (Add for better understanding)
@pytest.fixture(scope='session')
def kpi_manager_client(kpi_manager_service : KpiManagerService): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.info('Initializing KpiManagerClient...')
_client = KpiManagerClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiManagerClient...')
yield _client
LOGGER.info('Closing KpiManagerClient...')
_client.close()
LOGGER.info('Closed KpiManagerClient...')
##################################################
# Prepare Environment, should be the first test
##################################################
# # ERROR on this test ---
# def test_prepare_environment(
# context_client : ContextClient, # pylint: disable=redefined-outer-name,unused-argument
# ):
# context_id = json_context_id(DEFAULT_CONTEXT_NAME)
# context_client.SetContext(Context(**json_context(DEFAULT_CONTEXT_NAME)))
# context_client.SetTopology(Topology(**json_topology(DEFAULT_TOPOLOGY_NAME, context_id=context_id)))
###########################
# Tests Implementation of Kpi Manager
###########################
# ---------- 3rd Iteration Tests ----------------
# def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId)
# def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# # adding KPI
# response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # deleting KPI
# del_response = kpi_manager_client.DeleteKpiDescriptor(response_id)
# # select KPI
# kpi_manager_client.GetKpiDescriptor(response_id)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty)
def test_GetKpiDescriptor(kpi_manager_client):
LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# adding KPI
response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# get KPI
response = kpi_manager_client.GetKpiDescriptor(response_id)
LOGGER.info("Response gRPC message object: {:}".format(response))
LOGGER.info(" >>> calling GetKpiDescriptor with random ID")
rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
LOGGER.info("Response gRPC message object: {:}".format(rand_response))
assert isinstance(response, KpiDescriptor)
# def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# # adding KPI
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# # select KPI(s)
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList)
# def test_set_list_of_KPIs(kpi_manager_client):
# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# # adding KPI
# for kpi in KPIs_TO_SEARCH:
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
# ---------- 2nd Iteration Tests -----------------
# def test_SetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SetKpiDescriptor: START <<< ")
# with open("kpi_manager/tests/KPI_configs.json", 'r') as file:
# data = json.load(file)
# _descriptors = data.get('KPIs', [])
# for _descritor_name in _descriptors:
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(_descritor_name))
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiId)
# def test_GetKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
# response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptor)
# def test_DeleteKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ")
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# del_response = kpi_manager_client.DeleteKpiDescriptor(response)
# kpi_manager_client.GetKpiDescriptor(response)
# LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response))
# assert isinstance(del_response, Empty)
# def test_SelectKpiDescriptor(kpi_manager_client):
# LOGGER.info(" >>> test_SelectKpiDescriptor: START <<< ")
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a())
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request_a())
# LOGGER.info("Response gRPC message object: {:}".format(response))
# assert isinstance(response, KpiDescriptorList)
# ------------- INITIAL TESTs ----------------
# Test case that makes use of client fixture to test server's CreateKpi method
# def test_set_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# # make call to server
# LOGGER.warning('test_create_kpi requesting')
# for i in range(3):
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request(str(i+1)))
# LOGGER.debug(str(response))
# assert isinstance(response, KpiId)
# # Test case that makes use of client fixture to test server's DeleteKpi method
# def test_delete_kpi(kpi_manager_client): # pylint: disable=redefined-outer-name
# # make call to server
# LOGGER.warning('delete_kpi requesting')
# response = kpi_manager_client.SetKpiDescriptor(create_kpi_request('4'))
# response = kpi_manager_client.DeleteKpiDescriptor(response)
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# # Test case that makes use of client fixture to test server's GetKpiDescriptor method
# def test_select_kpi_descriptor(kpi_manager_client): # pylint: disable=redefined-outer-name
# LOGGER.warning('test_selectkpidescritor begin')
# response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
# LOGGER.debug(str(response))
# assert isinstance(response, KpiDescriptorList)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from common.proto import kpi_manager_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.context_pb2 import DeviceId, LinkId, ServiceId, SliceId,\
ConnectionId, EndPointId
def create_kpi_id_request():
_create_kpi_id = kpi_manager_pb2.KpiId()
_create_kpi_id.kpi_id.uuid = str(uuid.uuid4())
return _create_kpi_id
def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_kpi_request.kpi_description = descriptor_name
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV2'
_create_kpi_request.service_id.service_uuid.uuid = 'SERV2'
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1'
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1'
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON1'
_create_kpi_request.link_id.link_uuid.uuid = 'LNK1'
return _create_kpi_request
def create_kpi_descriptor_request_a(description: str = "Test Description"):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_kpi_request.kpi_description = description
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV4'
_create_kpi_request.service_id.service_uuid.uuid = 'SERV3'
_create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3'
_create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END2'
_create_kpi_request.connection_id.connection_uuid.uuid = 'CON2'
_create_kpi_request.link_id.link_uuid.uuid = 'LNK2'
return _create_kpi_request
def create_kpi_filter_request():
_create_kpi_filter_request = kpi_manager_pb2.KpiDescriptorFilter()
_create_kpi_filter_request.kpi_sample_type.append(KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED)
device_id_obj = DeviceId()
service_id_obj = ServiceId()
slice_id_obj = SliceId()
endpoint_id_obj = EndPointId()
connection_id_obj = ConnectionId()
link_id_obj = LinkId()
device_id_obj.device_uuid.uuid = "DEV2"
service_id_obj.service_uuid.uuid = "SERV2"
slice_id_obj.slice_uuid.uuid = "SLC1"
endpoint_id_obj.endpoint_uuid.uuid = "END1"
connection_id_obj.connection_uuid.uuid = "CON1"
link_id_obj.link_uuid.uuid = "LNK1"
_create_kpi_filter_request.device_id.append(device_id_obj)
_create_kpi_filter_request.service_id.append(service_id_obj)
_create_kpi_filter_request.slice_id.append(slice_id_obj)
_create_kpi_filter_request.endpoint_id.append(endpoint_id_obj)
_create_kpi_filter_request.connection_id.append(connection_id_obj)
_create_kpi_filter_request.link_id.append(link_id_obj)
return _create_kpi_filter_request
\ No newline at end of file
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker image to the GitLab Docker registry
build kpi-value-api:
variables:
IMAGE_NAME: 'kpi_value_api' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test kpi-value-api:
variables:
IMAGE_NAME: 'kpi_value_api' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build kpi-value-api
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 30020:30020 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- >
docker exec -i $IMAGE_NAME bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}_report.xml $IMAGE_NAME/tests/test_*.py"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
## Deployment of the service in Kubernetes Cluster
#deploy context:
# variables:
# IMAGE_NAME: 'context' # name of the microservice
# IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
# stage: deploy
# needs:
# - unit test context
# # - integ_test execute
# script:
# - 'sed -i "s/$IMAGE_NAME:.*/$IMAGE_NAME:$IMAGE_TAG/" manifests/${IMAGE_NAME}service.yaml'
# - kubectl version
# - kubectl get all
# - kubectl apply -f "manifests/${IMAGE_NAME}service.yaml"
# - kubectl get all
# # environment:
# # name: test
# # url: https://example.com
# # kubernetes:
# # namespace: test
# rules:
# - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
# when: manual
# - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
# when: manual
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/kpi_value_api
WORKDIR /var/teraflow/kpi_value_api
COPY src/kpi_value_api/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/kpi_value_api/. kpi_value_api/
# Start the service
ENTRYPOINT ["python", "-m", "kpi_value_api.service"]
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import Empty
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class KpiValueApiClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.KPIVALUEAPI)
if not port: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = KpiValueAPIServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def StoreKpiValues(self, request: KpiValueList) -> Empty:
LOGGER.debug('StoreKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.StoreKpiValues(request)
LOGGER.debug('StoreKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList:
LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SelectKpiValues(request)
LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
confluent-kafka==2.3.*
requests==2.27.*
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.service.GenericGrpcService import GenericGrpcService
from .KpiValueApiServiceServicerImpl import KpiValueApiServiceServicerImpl
from common.proto.kpi_value_api_pb2_grpc import add_KpiValueAPIServiceServicer_to_server
class KpiValueApiService(GenericGrpcService):
def __init__(self, cls_name : str = __name__ ) -> None:
port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI)
super().__init__(port, cls_name=cls_name)
self.kpiValueApiService_servicer = KpiValueApiServiceServicerImpl()
def install_servicers(self):
add_KpiValueAPIServiceServicer_to_server(self.kpiValueApiService_servicer, self.server)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc, requests
from typing import Tuple, Any
from datetime import datetime
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from confluent_kafka import Producer as KafkaProducer
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC')
PROM_URL = "http://localhost:9090"
class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def __init__(self):
LOGGER.debug('Init KpiValueApiService')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
) -> Empty:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
producer_obj = KafkaProducer({
'bootstrap.servers' : KafkaConfig.SERVER_IP.value
})
for kpi_value in request.kpi_value_list:
kpi_value_to_produce : Tuple [str, Any, Any] = (
kpi_value.kpi_id.kpi_id,
kpi_value.timestamp,
kpi_value.kpi_value_type # kpi_value.kpi_value_type.(many options) how?
)
LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce))
msg_key = "gRPC-kpivalueapi" # str(__class__.__name__) can be used
producer_obj.produce(
KafkaTopic.VALUE.value,
key = msg_key,
value = kpi_value.SerializeToString(), # value = json.dumps(kpi_value_to_produce),
callback = self.delivery_callback
)
producer_obj.flush()
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext
) -> KpiValueList:
LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
response = KpiValueList()
metrics = [kpi.kpi_id for kpi in request.kpi_id]
start_timestamps = [timestamp for timestamp in request.start_timestamp]
end_timestamps = [timestamp for timestamp in request.end_timestamp]
results = []
for start, end in zip(start_timestamps, end_timestamps):
start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z"
end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z"
for metric in metrics:
url = f'{PROM_URL}/api/v1/query_range'
params = {
'query': metric,
'start': start_str,
'end' : end_str,
'step' : '30s' # or any other step you need
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
for result in data['data']['result']:
for value in result['values']:
kpi_value = KpiValue(
kpi_id=metric,
timestamp=str(seconds=value[0]),
kpi_value_type=self._convert_value_to_kpi_value_type(value[1])
)
results.append(kpi_value)
def _convert_value_to_kpi_value_type(self, value):
# Check if the value is an integer (int64)
try:
int64_value = int(value)
return KpiValueType(int64Val=int64_value)
except ValueError:
pass
# Check if the value is a float
try:
float_value = float(value)
return KpiValueType(floatVal=float_value)
except ValueError:
pass
# Check if the value is a boolean
if value.lower() in ['true', 'false']:
bool_value = value.lower() == 'true'
return KpiValueType(boolVal=bool_value)
# If none of the above, treat it as a string
return KpiValueType(stringVal=value)
def delivery_callback(self, err, msg):
if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from common.Settings import get_log_level
from .KpiValueApiService import KpiValueApiService
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.debug('Starting...')
grpc_service = KpiValueApiService()
grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.debug('Terminating...')
grpc_service.stop()
LOGGER.debug('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid, time
from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList
def create_kpi_value_list():
_create_kpi_value_list = KpiValueList()
# To run this experiment sucessfully, already existing UUID in KPI DB in necessary.
# because the UUID is used to get the descriptor form KPI DB.
EXISTING_KPI_IDs = ["725ce3ad-ac67-4373-bd35-8cd9d6a86e09",
str(uuid.uuid4()),
str(uuid.uuid4())]
for kpi_id_uuid in EXISTING_KPI_IDs:
kpi_value_object = KpiValue()
kpi_value_object.kpi_id.kpi_id.uuid = kpi_id_uuid
kpi_value_object.timestamp.timestamp = float(time.time())
kpi_value_object.kpi_value_type.floatVal = 100
_create_kpi_value_list.kpi_value_list.append(kpi_value_object)
return _create_kpi_value_list
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, logging, pytest
from common.proto.context_pb2 import Empty
from common.Constants import ServiceNameEnum
from common.tools.kafka.Variables import KafkaTopic
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from kpi_value_api.service.KpiValueApiService import KpiValueApiService
from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
from kpi_value_api.tests.messages import create_kpi_value_list
LOCAL_HOST = '127.0.0.1'
KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore
os.environ[get_env_var_name(ServiceNameEnum.KPIVALUEAPI, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.KPIVALUEAPI, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(KPIVALUEAPI_SERVICE_PORT)
LOGGER = logging.getLogger(__name__)
# This fixture will be requested by test cases and last during testing session
@pytest.fixture(scope='session')
def kpi_value_api_service():
LOGGER.info('Initializing KpiValueApiService...')
# _service = MonitoringService(name_mapping)
_service = KpiValueApiService()
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiValueApiService...')
yield _service
LOGGER.info('Terminating KpiValueApiService...')
_service.stop()
LOGGER.info('Terminated KpiValueApiService...')
# This fixture will be requested by test cases and last during testing session.
# The client requires the server, so client fixture has the server as dependency.
@pytest.fixture(scope='session')
def kpi_value_api_client(kpi_value_api_service : KpiValueApiService ):
LOGGER.info('Initializing KpiValueApiClient...')
_client = KpiValueApiClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding KpiValueApiClient...')
yield _client
LOGGER.info('Closing KpiValueApiClient...')
_client.close()
LOGGER.info('Closed KpiValueApiClient...')
##################################################
# Prepare Environment, should be the first test
##################################################
# To be added here
###########################
# Tests Implementation of Kpi Value Api
###########################
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_store_kpi_values(kpi_value_api_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list())
assert isinstance(response, Empty)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker image to the GitLab Docker registry
build kpi-value-writer:
variables:
IMAGE_NAME: 'kpi_value_writer' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test kpi-value-writer:
variables:
IMAGE_NAME: 'kpi_value_writer' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build kpi-value-writer
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 30030:30030 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- >
docker exec -i $IMAGE_NAME bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}_report.xml $IMAGE_NAME/tests/test_*.py"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
## Deployment of the service in Kubernetes Cluster
#deploy context:
# variables:
# IMAGE_NAME: 'context' # name of the microservice
# IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
# stage: deploy
# needs:
# - unit test context
# # - integ_test execute
# script:
# - 'sed -i "s/$IMAGE_NAME:.*/$IMAGE_NAME:$IMAGE_TAG/" manifests/${IMAGE_NAME}service.yaml'
# - kubectl version
# - kubectl get all
# - kubectl apply -f "manifests/${IMAGE_NAME}service.yaml"
# - kubectl get all
# # environment:
# # name: test
# # url: https://example.com
# # kubernetes:
# # namespace: test
# rules:
# - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
# when: manual
# - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
# when: manual
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/kpi_value_writer
WORKDIR /var/teraflow/kpi_value_writer
COPY src/kpi_value_writer/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/kpi_value_writer/. kpi_value_writer/
COPY src/kpi_manager/__init__.py kpi_manager/__init__.py
COPY src/kpi_manager/client/. kpi_manager/client/
# Start the service
ENTRYPOINT ["python", "-m", "kpi_value_writer.service"]
# How to locally run and test KPI manager micro-service
## --- File links need to be updated. ---
### Pre-requisets
The following requirements should be fulfilled before the execuation of KPI management service.
1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully.
2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully.
3. Verify the creation of required database and table.
[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and
[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment.
### Messages format templates
["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing.
### Test file
["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment.
### Flow of execution (Kpi Maanager Service functions)
1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table.
2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types.
3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB.
4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types.
## For KPI composer and KPI writer
The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in).
\ No newline at end of file
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
confluent-kafka==2.3.*
requests==2.27.*
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.Settings import get_service_port_grpc
from common.Constants import ServiceNameEnum
from common.tools.service.GenericGrpcService import GenericGrpcService
from confluent_kafka import KafkaError
from confluent_kafka import Consumer as KafkaConsumer
from kpi_manager.client.KpiManagerClient import KpiManagerClient
# -- test import --
# from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request
from .MetricWriterToPrometheus import MetricWriterToPrometheus
LOGGER = logging.getLogger(__name__)
ACTIVE_CONSUMERS = []
METRIC_WRITER = MetricWriterToPrometheus()
class KpiValueWriter(GenericGrpcService):
def __init__(self, cls_name : str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.KPIVALUEWRITER)
super().__init__(port, cls_name=cls_name)
@staticmethod
def RunKafkaConsumer():
thread = threading.Thread(target=KpiValueWriter.KafkaConsumer, args=())
ACTIVE_CONSUMERS.append(thread)
thread.start()
@staticmethod
def KafkaConsumer():
kafka_consumer = KafkaConsumer(
{ 'bootstrap.servers' : KafkaConfig.SERVER_IP.value,
'group.id' : __class__,
'auto.offset.reset' : 'latest'}
)
kpi_manager_client = KpiManagerClient()
kafka_consumer.subscribe([KafkaTopic.VALUE.value])
LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value))
while True:
raw_kpi = kafka_consumer.poll(1.0)
if raw_kpi is None:
continue
elif raw_kpi.error():
if raw_kpi.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(raw_kpi.error()))
continue
try:
kpi_value = KpiValue()
kpi_value.ParseFromString(raw_kpi.value())
LOGGER.info("Received KPI : {:}".format(kpi_value))
print("Received KPI : {:}".format(kpi_value))
KpiValueWriter.get_kpi_descriptor(kpi_value, kpi_manager_client)
except Exception as e:
print("Error detail: {:}".format(e))
continue
@staticmethod
def get_kpi_descriptor(kpi_value: str, kpi_manager_client ):
print("--- START -----")
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
print("KpiId generated: {:}".format(kpi_id))
# print("Kpi manger client created: {:}".format(kpi_manager_client))
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
# print("kpi descriptor received: {:}".format(kpi_descriptor_object))
# if isinstance (kpi_descriptor_object, KpiDescriptor):
LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
METRIC_WRITER.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value)
else:
LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
except Exception as e:
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))