Skip to content
Snippets Groups Projects
Commit 485d3178 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Initial Version of Analytics Component

- Added the Analytics Frontend client and service with no logic implemented yet.
- Added enums and ports for the Analytics frontend and backend in the constants.
- Added test files and messages.
- Added test execution scripts.
parent bf4dd447
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
Showing
with 542 additions and 0 deletions
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/frontend/tests/test_frontend.py
# How to locally run and test Analytic service (To be added soon)
### Pre-requisets
The following requirements should be fulfilled before the execuation of Telemetry service.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/analytics/frontend
WORKDIR /var/analyticstelemetry/frontend
COPY src/analytics/frontend/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/frontend/. analytics/frontend/
COPY src/analytics/database/. analytics/database/
# Start the service
ENTRYPOINT ["python", "-m", "analytics.frontend.service"]
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.proto.context_pb2 import Empty
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceStub
from common.proto.analytics_frontend_pb2 import AnalyzerId, Analyzer, AnalyzerFilter, AnalyzerList
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.client.RetryDecorator import retry, delay_exponential
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class AnalyticsFrontendClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.ANALYTICSFRONTEND)
if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = AnalyticsFrontendServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def StartAnalyzer (self, request: Analyzer) -> AnalyzerId: #type: ignore
LOGGER.debug('StartAnalyzer: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.StartAnalyzer(request)
LOGGER.debug('StartAnalyzer result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def StopAnalyzer(self, request : AnalyzerId) -> Empty: # type: ignore
LOGGER.debug('StopAnalyzer: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.StopAnalyzer(request)
LOGGER.debug('StopAnalyzer result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def SelectAnalyzers(self, request : AnalyzerFilter) -> AnalyzerList: # type: ignore
LOGGER.debug('SelectAnalyzers: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SelectAnalyzers(request)
LOGGER.debug('SelectAnalyzers result: {:s}'.format(grpc_message_to_json_string(response)))
return response
\ No newline at end of file
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
java==11.0.*
pyspark==3.5.2
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
\ No newline at end of file
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.proto.analytics_frontend_pb2_grpc import add_AnalyticsFrontendServiceServicer_to_server
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
class AnalyticsFrontendService(GenericGrpcService):
def __init__(self, cls_name: str = __name__):
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)
super().__init__(port, cls_name=cls_name)
self.analytics_frontend_servicer = AnalyticsFrontendServiceServicerImpl()
def install_servicers(self):
add_AnalyticsFrontendServiceServicer_to_server(self.analytics_frontend_servicer, self.server)
\ No newline at end of file
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc
from common.proto.context_pb2 import Empty
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def __init__(self):
LOGGER.info('Init AnalyticsFrontendService')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self,
request : Analyzer, grpc_context: grpc.ServicerContext # type: ignore
) -> AnalyzerId: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request))
response = AnalyzerId()
return response
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopAnalyzer(self,
request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request))
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectAnalyzers(self,
request : AnalyzerFilter, contextgrpc_context: grpc.ServicerContext # type: ignore
) -> AnalyzerList: # type: ignore
LOGGER.info("At Service gRPC message: {:}".format(request))
response = AnalyzerList()
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from common.Settings import get_log_level
from .AnalyticsFrontendService import AnalyticsFrontendService
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.debug('Starting...')
grpc_service = AnalyticsFrontendService()
grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.debug('Terminating...')
grpc_service.stop()
LOGGER.debug('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, AnalyzerId,
Analyzer, AnalyzerFilter )
def create_analyzer_id():
_create_analyzer_id = AnalyzerId()
_create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
return _create_analyzer_id
def create_analyzer():
_create_analyzer = Analyzer()
_create_analyzer.algorithm_name = "some_algo_name"
_kpi_id = KpiId()
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) # input IDs to analyze
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) # output IDs after analysis
_create_analyzer.output_kpi_ids.append(_kpi_id)
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
return _create_analyzer
def create_analyzer_filter():
_create_analyzer_filter = AnalyzerFilter()
_analyzer_id_obj = AnalyzerId()
_analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer_filter.analyzer_id.append(_analyzer_id_obj)
_create_analyzer_filter.algorithm_names.append('Algorithum1')
_input_kpi_id_obj = KpiId()
_input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
_output_kpi_id_obj = KpiId()
_output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj)
return _create_analyzer_filter
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pytest
import logging
from common.Constants import ServiceNameEnum
from common.proto.context_pb2 import Empty
from common.Settings import ( get_service_port_grpc, get_env_var_name,
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
create_analyzer_filter )
###########################
# Tests Setup
###########################
LOCAL_HOST = '127.0.0.1'
ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND))
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__)
@pytest.fixture(scope='session')
def analyticsFrontend_service():
LOGGER.info('Initializing AnalyticsFrontendService...')
_service = AnalyticsFrontendService()
_service.start()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding AnalyticsFrontendService...')
yield _service
LOGGER.info('Terminating AnalyticsFrontendService...')
_service.stop()
LOGGER.info('Terminated AnalyticsFrontendService...')
@pytest.fixture(scope='session')
def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendService):
LOGGER.info('Initializing AnalyticsFrontendClient...')
_client = AnalyticsFrontendClient()
# yield the server, when test finishes, execution will resume to stop it
LOGGER.info('Yielding AnalyticsFrontendClient...')
yield _client
LOGGER.info('Closing AnalyticsFrontendClient...')
_client.close()
LOGGER.info('Closed AnalyticsFrontendClient...')
###########################
# Tests Implementation of Analytics Frontend
###########################
# ----- core funtionality test -----
def test_StartAnalytic(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalytic START: <<< ')
response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(response))
assert isinstance(response, AnalyzerId)
def test_StopAnalytic(analyticsFrontend_client):
LOGGER.info(' >>> test_StopAnalytic START: <<< ')
response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
LOGGER.debug(str(response))
assert isinstance(response, Empty)
def test_SelectAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
LOGGER.debug(str(response))
assert isinstance(response, AnalyzerList)
\ No newline at end of file
...@@ -65,6 +65,9 @@ class ServiceNameEnum(Enum): ...@@ -65,6 +65,9 @@ class ServiceNameEnum(Enum):
KPIVALUEAPI = 'kpi-value-api' KPIVALUEAPI = 'kpi-value-api'
KPIVALUEWRITER = 'kpi-value-writer' KPIVALUEWRITER = 'kpi-value-writer'
TELEMETRYFRONTEND = 'telemetry-frontend' TELEMETRYFRONTEND = 'telemetry-frontend'
TELEMETRYBACKEND = 'telemetry-backend'
ANALYTICSFRONTEND = 'analytics-frontend'
ANALYTICSBACKEND = 'analytics-backend'
# Used for test and debugging only # Used for test and debugging only
DLT_GATEWAY = 'dltgateway' DLT_GATEWAY = 'dltgateway'
...@@ -98,6 +101,9 @@ DEFAULT_SERVICE_GRPC_PORTS = { ...@@ -98,6 +101,9 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.KPIVALUEAPI .value : 30020, ServiceNameEnum.KPIVALUEAPI .value : 30020,
ServiceNameEnum.KPIVALUEWRITER .value : 30030, ServiceNameEnum.KPIVALUEWRITER .value : 30030,
ServiceNameEnum.TELEMETRYFRONTEND .value : 30050, ServiceNameEnum.TELEMETRYFRONTEND .value : 30050,
ServiceNameEnum.TELEMETRYBACKEND .value : 30060,
ServiceNameEnum.ANALYTICSFRONTEND .value : 30080,
ServiceNameEnum.ANALYTICSBACKEND .value : 30090,
# Used for test and debugging only # Used for test and debugging only
ServiceNameEnum.DLT_GATEWAY .value : 50051, ServiceNameEnum.DLT_GATEWAY .value : 50051,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment