Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, AnalyzerId,
Analyzer, AnalyzerFilter )
def create_analyzer_id():
_create_analyzer_id = AnalyzerId()
_create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer_id.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
# _create_analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
return _create_analyzer_id
def create_analyzer():
_create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
# _create_analyzer.analyzer_id.analyzer_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.algorithm_name = "Test_new_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId()
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.input_kpi_ids.append(_kpi_id)
_kpi_id = KpiId()
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf181818"
_create_analyzer.output_kpi_ids.append(_kpi_id)
# _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf441616"
_create_analyzer.output_kpi_ids.append(_kpi_id)
_create_analyzer.output_kpi_ids.append(_kpi_id)
# _threshold_dict = {
# 'mean_value' :[20, 30], 'min_value' :[00, 10], 'max_value' :[45, 50],
# 'first_value' :[00, 10], 'last_value' :[40, 50], 'std_value' :[00, 10]
# }
"task_type": Handlers.AGGREGATION_HANDLER.value,
"task_parameter": [
{"last": [40, 80], "variance": [300, 500]},
{"count": [2, 4], "max": [70, 100]},
{"min": [10, 20], "avg": [50, 70]},
],
}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "0" # slider window size in seconds (Total time for aggeration processing)
_create_analyzer.parameters['window_slider'] = "0" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
# duration of the analyzer
_create_analyzer.duration_s = 90
# batch window size
_create_analyzer.batch_min_duration_s = 20
_create_analyzer.batch_max_duration_s = 50
# batch size
_create_analyzer.batch_min_size = 5
_create_analyzer.batch_max_size = 10
return _create_analyzer
def create_analyzer_filter():
_create_analyzer_filter = AnalyzerFilter()
_analyzer_id_obj = AnalyzerId()
# _analyzer_id_obj.analyzer_id.uuid = str(uuid.uuid4())
_analyzer_id_obj.analyzer_id.uuid = "efef4d95-1cf1-43c4-9742-95c283ddd7a6"
_create_analyzer_filter.analyzer_id.append(_analyzer_id_obj)
_create_analyzer_filter.algorithm_names.append('Test_Aggergate_and_Threshold')
# _input_kpi_id_obj = KpiId()
# _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
# _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
# another input kpi Id
# _input_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
# _create_analyzer_filter.input_kpi_ids.append(_input_kpi_id_obj)
# _output_kpi_id_obj = KpiId()
# _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
# _create_analyzer_filter.output_kpi_ids.append(_output_kpi_id_obj)
# # another output kpi Id
# _output_kpi_id_obj.kpi_id.uuid = str(uuid.uuid4())
# _create_analyzer_filter.input_kpi_ids.append(_output_kpi_id_obj)
return _create_analyzer_filter
# Added for testing to remove the dependency on the backend service
from enum import Enum
class Handlers(Enum):
AGGREGATION_HANDLER = "AggregationHandler"
UNSUPPORTED_HANDLER = "UnsupportedHandler"