Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 908 additions and 50 deletions
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
apscheduler==3.10.4
apscheduler==3.10.1
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
......
......@@ -20,7 +20,7 @@ from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import Anal
class AnalyticsFrontendService(GenericGrpcService):
def __init__(self, cls_name: str = __name__):
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)
port = get_service_port_grpc(ServiceNameEnum.ANALYTICS)
super().__init__(port, cls_name=cls_name)
self.analytics_frontend_servicer = AnalyticsFrontendServiceServicerImpl()
......
......@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc, json, queue
from typing import Dict
......@@ -102,10 +101,8 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
key, value = self.result_queue.get()
yield key, value
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
finally:
......@@ -135,8 +132,6 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
self.result_queue.put((key, value))
else:
LOGGER.info(f"Skipping message with unmatched key: {key}")
# value = json.loads(msg.value().decode('utf-8')) # Added for debugging
# self.result_queue.put((filter_key, value)) # Added for debugging
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
......@@ -207,7 +202,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def delivery_callback(self, err, msg):
if err:
LOGGER.debug('Message delivery failed: {:}'.format(err))
print ('Message delivery failed: {:}'.format(err))
# print ('Message delivery failed: {:}'.format(err))
else:
LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
print('Message delivered to topic {:}'.format(msg.topic()))
# print('Message delivered to topic {:}'.format(msg.topic()))
......@@ -21,13 +21,14 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze
def create_analyzer_id():
_create_analyzer_id = AnalyzerId()
# _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
# _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
return _create_analyzer_id
def create_analyzer():
_create_analyzer = Analyzer()
# _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
......@@ -48,11 +49,12 @@ def create_analyzer():
_create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
# 'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stdev_value':(00, 10)}
'mean_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'std_value':(00, 10)
}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
_create_analyzer.parameters['window_size'] = "10s" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "5s" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer
......
......@@ -41,9 +41,9 @@ from apscheduler.triggers.interval import IntervalTrigger
LOCAL_HOST = '127.0.0.1'
ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND))
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT)
ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICS))
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__)
......@@ -84,10 +84,10 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
###########################
# --- "test_validate_kafka_topics" should be executed before the functionality tests ---
# def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
# ----- core funtionality test -----
# def test_StartAnalytics(analyticsFrontend_client):
......@@ -97,24 +97,31 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
# assert isinstance(response, AnalyzerId)
# To test start and stop listener together
def test_StartStopAnalyzers(analyticsFrontend_client):
LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ')
LOGGER.info('--> StartAnalyzer')
def test_StartAnalyzers(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalyzers START: <<< ')
added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(added_analyzer_id))
assert isinstance(added_analyzer_id, AnalyzerId)
def test_StopAnalytic(analyticsFrontend_client):
LOGGER.info(' >>> test_StopAnalytic START: <<< ')
response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
LOGGER.info(' --> Calling StartResponseListener... ')
class_obj = AnalyticsFrontendServiceServicerImpl()
response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id.uuid)
LOGGER.debug(response)
LOGGER.info("waiting for timer to comlete ...")
time.sleep(3)
LOGGER.info('--> StopAnalyzer')
response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
def test_SelectAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
LOGGER.debug(str(response))
assert isinstance(response, AnalyzerList)
# def test_SelectAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerList)
# def test_StopAnalytic(analyticsFrontend_client):
# LOGGER.info(' >>> test_StopAnalytic START: <<< ')
# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# def test_ResponseListener():
# LOGGER.info(' >>> test_ResponseListener START <<< ')
......
......@@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
java==11.0.*
pyspark==3.5.2
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
......
......@@ -15,12 +15,13 @@
import logging
from analytics.database.Analyzer_DB import AnalyzerDB
from analytics.database.AnalyzerModel import Analyzer
LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_tables():
LOGGER.info('>>> test_verify_databases_and_tables : START <<< ')
AnalyzerDBobj = AnalyzerDB()
AnalyzerDBobj = AnalyzerDB(Analyzer)
# AnalyzerDBobj.drop_database()
# AnalyzerDBobj.verify_tables()
AnalyzerDBobj.create_database()
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker image to the GitLab Docker registry
build automation:
variables:
IMAGE_NAME: 'automation' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test automation:
variables:
IMAGE_NAME: 'automation' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build automation
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- >
if docker network list | grep teraflowbridge; then
echo "teraflowbridge is already created";
else
docker network create -d bridge teraflowbridge;
fi
- >
if docker container ls | grep $IMAGE_NAME; then
docker rm -f $IMAGE_NAME;
else
echo "$IMAGE_NAME image is not in the system";
fi
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 2020:2020 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_emulated.py --junitxml=/opt/results/${IMAGE_NAME}_report_emulated.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_ietf_actn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report_*.xml
## Deployment of the service in Kubernetes Cluster
#deploy automation:
# variables:
# IMAGE_NAME: 'automation' # name of the microservice
# IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
# stage: deploy
# needs:
# - unit test automation
# # - integ_test execute
# script:
# - 'sed -i "s/$IMAGE_NAME:.*/$IMAGE_NAME:$IMAGE_TAG/" manifests/${IMAGE_NAME}service.yaml'
# - kubectl version
# - kubectl get all
# - kubectl apply -f "manifests/${IMAGE_NAME}service.yaml"
# - kubectl get all
# # environment:
# # name: test
# # url: https://example.com
# # kubernetes:
# # namespace: test
# rules:
# - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
# when: manual
# - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
# when: manual
......@@ -12,12 +12,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_baseurl_http, get_service_port_http
from common.tools.service.GenericRestServer import GenericRestServer
class RestServer(GenericRestServer):
def __init__(self, cls_name: str = __name__) -> None:
bind_port = get_service_port_http(ServiceNameEnum.QKD_APP)
base_url = get_service_baseurl_http(ServiceNameEnum.QKD_APP)
super().__init__(bind_port, base_url, cls_name=cls_name)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/automation
WORKDIR /var/teraflow/automation
COPY src/automation/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/context/__init__.py context/__init__.py
COPY src/context/client/. context/client/
COPY src/kpi_manager/__init__.py kpi_manager/__init__.py
COPY src/kpi_manager/client/. kpi_manager/client/
COPY src/telemetry/__init__.py telemetry/__init__.py
COPY src/telemetry/frontend/__init__.py telemetry/frontend/__init__.py
COPY src/telemetry/frontend/client/. telemetry/frontend/client/
COPY src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py
COPY src/analytics/frontend/client/. analytics/frontend/client/
COPY src/automation/. automation/
# Start the service
ENTRYPOINT ["python", "-m", "automation.service"]
......@@ -11,3 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
from common.proto.policy_pb2_grpc import PolicyServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.openconfig_device_pb2_grpc import OpenConfigServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class PolicyClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.POLICY)
if not port: port = get_service_port_grpc(ServiceNameEnum.POLICY)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.info('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.openconfig_stub=None
self.connect()
LOGGER.info('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = PolicyServiceStub(self.channel)
self.openconfig_stub=OpenConfigServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def PolicyAddService(self, request : PolicyRuleService) -> PolicyRuleState:
LOGGER.debug('AddPolicy request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.PolicyAddService(request)
LOGGER.debug('AddPolicy result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apscheduler==3.10.4
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.proto.automation_pb2_grpc import add_AutomationServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from automation.service.AutomationServiceServicerImpl import AutomationServiceServicerImpl
class AutomationService(GenericGrpcService):
def __init__(self, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.AUTOMATION)
super().__init__(port, cls_name=cls_name)
self.automation_servicer = AutomationServiceServicerImpl()
def install_servicers(self):
add_AutomationServiceServicer_to_server(self.automation_servicer, self.server)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging
from uuid import uuid4
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId
from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService, ZSMServiceID, ZSMServiceState, ZSMCreateUpdate
from common.proto.automation_pb2_grpc import AutomationServiceServicer
from common.proto.context_pb2 import Service, ServiceId
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor
from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
from common.proto.policy_action_pb2 import PolicyRuleAction, PolicyRuleActionConfig
from common.proto.policy_condition_pb2 import PolicyRuleCondition
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from automation.client.PolicyClient import PolicyClient
from context.client.ContextClient import ContextClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Automation', 'RPC')
class AutomationServiceServicerImpl(AutomationServiceServicer):
def __init__(self):
LOGGER.info('Init AutomationService')
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService:
# check that service does not exist
context_client = ContextClient()
kpi_manager_client = KpiManagerClient()
policy_client = PolicyClient()
telemetry_frontend_client = TelemetryFrontendClient()
analytics_frontend_client = AnalyticsFrontendClient()
try:
# TODO: Remove static variables(get them from ZSMCreateRequest)
# TODO: Refactor policy component (remove unnecessary variables)
####### GET Context #######################
LOGGER.info('Get the service from Context: ')
service: Service = context_client.GetService(request.serviceId)
LOGGER.info('Service ({:s}) :'.format(str(service)))
###########################################
####### SET Kpi Descriptor LAT ################
LOGGER.info('Set Kpi Descriptor LAT: ')
if(len(service.service_constraints) == 0):
raise InvalidArgumentException("service_constraints" , "empty", []);
if(len(service.service_constraints) > 1):
raise InvalidArgumentException("service_constraints" , ">1", []);
if(service.service_constraints[0].sla_latency is None ):
raise InvalidArgumentException("sla_latency", "empty", []);
## Static Implementation Applied only in case of SLA Latency Constraint ##
# KPI Descriptor
kpi_descriptor_lat = KpiDescriptor()
kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS' #static service.service_constraints[].sla_latency.e2e_latency_ms
kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4())
kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat)
LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat)))
###########################################
####### SET Kpi Descriptor TX ################
LOGGER.info('Set Kpi Descriptor TX: ')
kpi_descriptor_tx = KpiDescriptor()
kpi_descriptor_tx.kpi_sample_type = 101 # static KPISAMPLETYPE_PACKETS_TRANSMITTED
kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4())
kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx)
LOGGER.info('The kpi_id_tx({:s})'.format(str(kpi_id_tx)))
###########################################
####### SET Kpi Descriptor RX ################
LOGGER.info('Set Kpi Descriptor RX: ')
kpi_descriptor_rx = KpiDescriptor()
kpi_descriptor_rx.kpi_sample_type = 102 # static KPISAMPLETYPE_PACKETS_RECEIVED
kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4())
kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx)
LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx)))
###########################################
####### START Collector TX #################
collect_tx = Collector()
collect_tx.collector_id.collector_id.uuid = str(uuid4())
collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
collect_tx.duration_s = 20000 # static
collect_tx.interval_s = 1 # static
LOGGER.info('Start Collector TX'.format(str(collect_tx)))
collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx)))
#############################################
####### START Collector RX ##################
collect_rx = Collector()
collect_rx.collector_id.collector_id.uuid = str(uuid4())
collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
collect_rx.duration_s = 20000 # static
collect_rx.interval_s = 1 # static
LOGGER.info('Start Collector RX'.format(str(collect_rx)))
collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
###############################################
####### START Analyzer LAT ################
analyzer = Analyzer()
analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
analyzer.algorithm_name = 'Test_Aggregate_and_Threshold' # static
analyzer.operation_mode = 2
analyzer.input_kpi_ids.append(kpi_id_rx)
analyzer.input_kpi_ids.append(kpi_id_tx)
analyzer.output_kpi_ids.append(kpi_id_lat)
thresholdStr = service.service_constraints[0].custom.constraint_type
_threshold_dict = {thresholdStr: (0, int(service.service_constraints[0].custom.constraint_value))}
analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
analyzer.parameters['window_size'] = "60s"
analyzer.parameters['window_slider'] = "30s"
analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
###########################################################
####### SET Policy LAT ################
policy_lat = PolicyRuleService()
policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid
policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid
# PolicyRuleBasic
policy_lat.policyRuleBasic.priority = 0
policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4())
policy_lat.policyRuleBasic.booleanOperator = 2
# PolicyRuleAction
policyRuleActionConfig = PolicyRuleActionConfig()
policyRuleActionConfig.action_key = ""
policyRuleActionConfig.action_value = ""
policyRuleAction = PolicyRuleAction()
policyRuleAction.action = 5
policyRuleAction.action_config.append(policyRuleActionConfig)
policy_lat.policyRuleBasic.actionList.append(policyRuleAction)
# for constraint in service.service_constraints:
# PolicyRuleCondition
policyRuleCondition = PolicyRuleCondition()
policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid
policyRuleCondition.numericalOperator = 5
policyRuleCondition.kpiValue.floatVal = 300
policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition)
policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat)
LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member
LOGGER.exception('Unable to get Service({:s})'.format(str(request)))
context_client.close()
kpi_manager_client.close()
policy_client.close()
telemetry_frontend_client.close()
return None
context_client.close()
kpi_manager_client.close()
policy_client.close()
telemetry_frontend_client.close()
return ZSMService()
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMUpdate(self, request : ZSMCreateUpdate, context : grpc.ServicerContext) -> ZSMService:
LOGGER.info('NOT IMPLEMENTED ZSMUpdate')
return ZSMService()
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMDelete(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMServiceState:
LOGGER.info('NOT IMPLEMENTED ZSMDelete')
return ZSMServiceState()
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMGetById(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMService:
LOGGER.info('NOT IMPLEMENTED ZSMGetById')
return ZSMService()
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService:
LOGGER.info('NOT IMPLEMENTED ZSMGetByService')
return ZSMService()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, queue, threading
from typing import Dict, Optional
from automation.service.Tools import create_kpi_descriptor, start_collector
from common.proto.context_pb2 import (
ConfigActionEnum, DeviceEvent, DeviceOperationalStatusEnum, Empty, ServiceEvent
)
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
LOGGER = logging.getLogger(__name__)
DEVICE_OP_STATUS_UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DEVICE_OP_STATUS_DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
KPISAMPLETYPE_UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN
class EventCollector(BaseEventCollector):
pass
class EventDispatcher(BaseEventDispatcher):
def __init__(
self, events_queue : queue.PriorityQueue,
terminate : Optional[threading.Event] = None
) -> None:
super().__init__(events_queue, terminate)
self._context_client = ContextClient()
self._kpi_manager_client = KpiManagerClient()
self._telemetry_client = TelemetryFrontendClient()
self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict()
def dispatch_device_create(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
self._device_activate_monitoring(device_event)
def dispatch_device_update(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
self._device_activate_monitoring(device_event)
def dispatch_device_remove(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_service_create(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_update(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_remove(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def _device_activate_monitoring(self, device_event : DeviceEvent) -> None:
device_id = device_event.device_id
device_uuid = device_id.device_uuid.uuid
device = self._context_client.GetDevice(device_id)
device_op_status = device.device_operational_status
if device_op_status != DEVICE_OP_STATUS_ENABLED:
LOGGER.debug('Ignoring Device not enabled: {:s}'.format(grpc_message_to_json_string(device)))
return
enabled_endpoint_names = set()
for config_rule in device.device_config.config_rules:
if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
if config_rule.WhichOneof('config_rule') != 'custom': continue
str_resource_key = str(config_rule.custom.resource_key)
if not str_resource_key.startswith('/interface['): continue
json_resource_value = json.loads(config_rule.custom.resource_value)
if 'name' not in json_resource_value: continue
if 'enabled' not in json_resource_value: continue
if not json_resource_value['enabled']: continue
enabled_endpoint_names.add(json_resource_value['name'])
endpoints_monitored = self._device_endpoint_monitored.setdefault(device_uuid, dict())
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_name_or_uuid = endpoint.name
if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0:
endpoint_name_or_uuid = endpoint_uuid
endpoint_was_monitored = endpoints_monitored.get(endpoint_uuid, False)
endpoint_is_enabled = (endpoint_name_or_uuid in enabled_endpoint_names)
if not endpoint_was_monitored and endpoint_is_enabled:
# activate
for kpi_sample_type in endpoint.kpi_sample_types:
if kpi_sample_type == KPISAMPLETYPE_UNKNOWN: continue
kpi_id = create_kpi_descriptor(
self._kpi_manager_client, kpi_sample_type,
device_id=device.device_id,
endpoint_id=endpoint.endpoint_id,
)
duration_seconds = 86400
interval_seconds = 10
collector_id = start_collector(
self._telemetry_client, kpi_id,
duration_seconds, interval_seconds
)
endpoints_monitored[endpoint_uuid] = True
else:
MSG = 'Not implemented condition: event={:s} device={:s} endpoint={:s}' + \
' endpoint_was_monitored={:s} endpoint_is_enabled={:s}'
LOGGER.warning(MSG.format(
grpc_message_to_json_string(device_event), grpc_message_to_json_string(device),
grpc_message_to_json_string(endpoint), str(endpoint_was_monitored),
str(endpoint_is_enabled)
))
class EventEngine:
def __init__(
self, terminate : Optional[threading.Event] = None
) -> None:
self._terminate = threading.Event() if terminate is None else terminate
self._context_client = ContextClient()
self._event_collector = EventCollector(terminate=self._terminate)
self._event_collector.install_collector(
self._context_client.GetDeviceEvents, Empty(),
log_events_received=True
)
self._event_collector.install_collector(
self._context_client.GetServiceEvents, Empty(),
log_events_received=True
)
self._event_dispatcher = EventDispatcher(
self._event_collector.get_events_queue(),
terminate=self._terminate
)
def start(self) -> None:
self._context_client.connect()
self._event_collector.start()
self._event_dispatcher.start()
def stop(self) -> None:
self._terminate.set()
self._event_dispatcher.stop()
self._event_collector.stop()
self._context_client.close()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, uuid
from typing import Optional
from common.proto.context_pb2 import ConnectionId, DeviceId, EndPointId, LinkId, ServiceId, SliceId
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
LOGGER = logging.getLogger(__name__)
def create_kpi_descriptor(
kpi_manager_client : KpiManagerClient,
kpi_sample_type : KpiSampleType,
device_id : Optional[DeviceId ] = None,
endpoint_id : Optional[EndPointId ] = None,
service_id : Optional[ServiceId ] = None,
slice_id : Optional[SliceId ] = None,
connection_id : Optional[ConnectionId] = None,
link_id : Optional[LinkId ] = None,
) -> KpiId:
kpi_descriptor = KpiDescriptor()
kpi_descriptor.kpi_id.kpi_id.uuid = str(uuid.uuid4())
kpi_descriptor.kpi_description = ''
kpi_descriptor.kpi_sample_type = kpi_sample_type
if device_id is not None: kpi_descriptor.device_id .CopyFrom(device_id )
if endpoint_id is not None: kpi_descriptor.endpoint_id .CopyFrom(endpoint_id )
if service_id is not None: kpi_descriptor.service_id .CopyFrom(service_id )
if slice_id is not None: kpi_descriptor.slice_id .CopyFrom(slice_id )
if connection_id is not None: kpi_descriptor.connection_id.CopyFrom(connection_id)
if link_id is not None: kpi_descriptor.link_id .CopyFrom(link_id )
kpi_id : KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor)
return kpi_id
def start_collector(
telemetry_client : TelemetryFrontendClient,
kpi_id : KpiId,
duration_seconds : float,
interval_seconds : float
) -> CollectorId:
collector = Collector()
collector.collector_id.collector_id.uuid = str(uuid.uuid4())
collector.kpi_id.CopyFrom(kpi_id)
collector.duration_s = duration_seconds
collector.interval_s = interval_seconds
collector_id : CollectorId = telemetry_client.StartCollector(collector)
return collector_id
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
from automation.service.EventEngine import EventEngine
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables
)
from .AutomationService import AutomationService
LOG_LEVEL = get_log_level()
logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
terminate = threading.Event()
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name,unused-argument
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
LOGGER.info('Starting...')
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.POLICY, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.POLICY, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Start Event Collection+Dispatching Engine
event_engine = EventEngine(terminate=terminate)
event_engine.start()
# Starting Automation service
grpc_service = AutomationService()
grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
grpc_service.stop()
event_engine.stop()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())