Loading manifests/automationservice.yaml +5 −0 Original line number Diff line number Diff line Loading @@ -40,6 +40,11 @@ spec: env: - name: LOG_LEVEL value: "INFO" - name: CRDB_DATABASE value: "tfs_automation" envFrom: - secretRef: name: crdb-data startupProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30200"] Loading proto/automation.proto +70 −69 Original line number Diff line number Diff line Loading @@ -17,11 +17,11 @@ package automation; import "context.proto"; import "policy.proto"; import "analytics_frontend.proto"; // Automation service RPCs service AutomationService { rpc ZSMCreate (ZSMCreateRequest ) returns (ZSMService ) {} rpc ZSMUpdate (ZSMCreateUpdate ) returns (ZSMService ) {} rpc ZSMDelete (ZSMServiceID ) returns (ZSMServiceState) {} rpc ZSMGetById (ZSMServiceID ) returns (ZSMService ) {} rpc ZSMGetByService (context.ServiceId) returns (ZSMService ) {} Loading @@ -37,14 +37,15 @@ enum ZSMServiceStateEnum { ZSM_REMOVED = 5; // ZSM loop is removed } message ZSMCreateRequest { context.ServiceId serviceId = 1; policy.PolicyRuleList policyList = 2; enum ZSMTypeEnum { ZSMTYPE_UNKNOWN = 0; } message ZSMCreateUpdate { context.Uuid ZSMServiceID = 1; policy.PolicyRuleList policyList = 2; message ZSMCreateRequest { context.ServiceId target_service_id = 1; context.ServiceId telemetry_service_id = 2; analytics_frontend.Analyzer analyzer = 3; policy.PolicyRuleService policy = 4; } // A unique identifier per ZSM service Loading src/analytics/backend/service/AnalyzerHandlers.py +49 −0 Original line number Diff line number Diff line Loading @@ -15,18 +15,28 @@ import logging from enum import Enum import pandas as pd from collections import defaultdict logger = logging.getLogger(__name__) class Handlers(Enum): AGGREGATION_HANDLER = "AggregationHandler" AGGREGATION_HANDLER_THREE_TO_ONE = "AggregationHandlerThreeToOne" UNSUPPORTED_HANDLER = "UnsupportedHandler" @classmethod def is_valid_handler(cls, handler_name): return handler_name in cls._value2member_map_ def select_handler(handler_name): if handler_name == "AggregationHandler": return aggregation_handler elif handler_name == "AggregationHandlerThreeToOne": return aggregation_handler_three_to_one else: return "UnsupportedHandler" # This method is top-level and should not be part of the class due to serialization issues. def threshold_handler(key, aggregated_df, thresholds): """ Loading Loading @@ -134,3 +144,42 @@ def aggregation_handler( return results else: return [] def find(data , type , value): return next((item for item in data if item[type] == value), None) def aggregation_handler_three_to_one( batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds ): # Group and sum # Track sum and count sum_dict = defaultdict(int) count_dict = defaultdict(int) for item in batch: kpi_id = item["kpi_id"] if kpi_id in input_kpi_list: sum_dict[kpi_id] += item["kpi_value"] count_dict[kpi_id] += 1 # Compute average avg_dict = {kpi_id: sum_dict[kpi_id] / count_dict[kpi_id] for kpi_id in sum_dict} total_kpi_metric = 0 for kpi_id, total_value in avg_dict.items(): total_kpi_metric += total_value result = { "kpi_id": output_kpi_list[0], "avg": total_kpi_metric, "THRESHOLD_RAISE": bool(total_kpi_metric > 2600), "THRESHOLD_FALL": bool(total_kpi_metric < 699) } results = [] results.append(result) logger.warning(f"result : {result}.") return results src/analytics/backend/service/Streamer.py +8 −6 Original line number Diff line number Diff line Loading @@ -19,7 +19,7 @@ import logging from confluent_kafka import KafkaException, KafkaError from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one , select_handler from analytics.backend.service.AnalyzerHelper import AnalyzerHelper Loading Loading @@ -114,11 +114,13 @@ class DaskStreamer(threading.Thread): if Handlers.is_valid_handler(self.thresholds["task_type"]): if self.client is not None and self.client.status == 'running': try: future = self.client.submit(aggregation_handler, "batch size", self.key, future = self.client.submit(select_handler(self.thresholds["task_type"]), "batch size", self.key, self.batch, self.input_kpis, self.output_kpis, self.thresholds) future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) except Exception as e: logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") logger.error( f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") else: logger.warning("Dask client is not running. Skipping processing.") else: Loading src/automation/.gitlab-ci.yml +2 −3 Original line number Diff line number Diff line Loading @@ -67,8 +67,7 @@ unit_test automation: - sleep 5 - docker ps -a - docker logs $IMAGE_NAME - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_emulated.py --junitxml=/opt/results/${IMAGE_NAME}_report_emulated.xml" - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_ietf_actn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml" - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_automation_handlers.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml" - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: Loading @@ -89,7 +88,7 @@ unit_test automation: artifacts: when: always reports: junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report_*.xml junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml ## Deployment of the service in Kubernetes Cluster #deploy automation: Loading Loading
manifests/automationservice.yaml +5 −0 Original line number Diff line number Diff line Loading @@ -40,6 +40,11 @@ spec: env: - name: LOG_LEVEL value: "INFO" - name: CRDB_DATABASE value: "tfs_automation" envFrom: - secretRef: name: crdb-data startupProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30200"] Loading
proto/automation.proto +70 −69 Original line number Diff line number Diff line Loading @@ -17,11 +17,11 @@ package automation; import "context.proto"; import "policy.proto"; import "analytics_frontend.proto"; // Automation service RPCs service AutomationService { rpc ZSMCreate (ZSMCreateRequest ) returns (ZSMService ) {} rpc ZSMUpdate (ZSMCreateUpdate ) returns (ZSMService ) {} rpc ZSMDelete (ZSMServiceID ) returns (ZSMServiceState) {} rpc ZSMGetById (ZSMServiceID ) returns (ZSMService ) {} rpc ZSMGetByService (context.ServiceId) returns (ZSMService ) {} Loading @@ -37,14 +37,15 @@ enum ZSMServiceStateEnum { ZSM_REMOVED = 5; // ZSM loop is removed } message ZSMCreateRequest { context.ServiceId serviceId = 1; policy.PolicyRuleList policyList = 2; enum ZSMTypeEnum { ZSMTYPE_UNKNOWN = 0; } message ZSMCreateUpdate { context.Uuid ZSMServiceID = 1; policy.PolicyRuleList policyList = 2; message ZSMCreateRequest { context.ServiceId target_service_id = 1; context.ServiceId telemetry_service_id = 2; analytics_frontend.Analyzer analyzer = 3; policy.PolicyRuleService policy = 4; } // A unique identifier per ZSM service Loading
src/analytics/backend/service/AnalyzerHandlers.py +49 −0 Original line number Diff line number Diff line Loading @@ -15,18 +15,28 @@ import logging from enum import Enum import pandas as pd from collections import defaultdict logger = logging.getLogger(__name__) class Handlers(Enum): AGGREGATION_HANDLER = "AggregationHandler" AGGREGATION_HANDLER_THREE_TO_ONE = "AggregationHandlerThreeToOne" UNSUPPORTED_HANDLER = "UnsupportedHandler" @classmethod def is_valid_handler(cls, handler_name): return handler_name in cls._value2member_map_ def select_handler(handler_name): if handler_name == "AggregationHandler": return aggregation_handler elif handler_name == "AggregationHandlerThreeToOne": return aggregation_handler_three_to_one else: return "UnsupportedHandler" # This method is top-level and should not be part of the class due to serialization issues. def threshold_handler(key, aggregated_df, thresholds): """ Loading Loading @@ -134,3 +144,42 @@ def aggregation_handler( return results else: return [] def find(data , type , value): return next((item for item in data if item[type] == value), None) def aggregation_handler_three_to_one( batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds ): # Group and sum # Track sum and count sum_dict = defaultdict(int) count_dict = defaultdict(int) for item in batch: kpi_id = item["kpi_id"] if kpi_id in input_kpi_list: sum_dict[kpi_id] += item["kpi_value"] count_dict[kpi_id] += 1 # Compute average avg_dict = {kpi_id: sum_dict[kpi_id] / count_dict[kpi_id] for kpi_id in sum_dict} total_kpi_metric = 0 for kpi_id, total_value in avg_dict.items(): total_kpi_metric += total_value result = { "kpi_id": output_kpi_list[0], "avg": total_kpi_metric, "THRESHOLD_RAISE": bool(total_kpi_metric > 2600), "THRESHOLD_FALL": bool(total_kpi_metric < 699) } results = [] results.append(result) logger.warning(f"result : {result}.") return results
src/analytics/backend/service/Streamer.py +8 −6 Original line number Diff line number Diff line Loading @@ -19,7 +19,7 @@ import logging from confluent_kafka import KafkaException, KafkaError from common.tools.kafka.Variables import KafkaTopic from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one , select_handler from analytics.backend.service.AnalyzerHelper import AnalyzerHelper Loading Loading @@ -114,11 +114,13 @@ class DaskStreamer(threading.Thread): if Handlers.is_valid_handler(self.thresholds["task_type"]): if self.client is not None and self.client.status == 'running': try: future = self.client.submit(aggregation_handler, "batch size", self.key, future = self.client.submit(select_handler(self.thresholds["task_type"]), "batch size", self.key, self.batch, self.input_kpis, self.output_kpis, self.thresholds) future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) except Exception as e: logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") logger.error( f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") else: logger.warning("Dask client is not running. Skipping processing.") else: Loading
src/automation/.gitlab-ci.yml +2 −3 Original line number Diff line number Diff line Loading @@ -67,8 +67,7 @@ unit_test automation: - sleep 5 - docker ps -a - docker logs $IMAGE_NAME - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_emulated.py --junitxml=/opt/results/${IMAGE_NAME}_report_emulated.xml" - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_ietf_actn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml" - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_automation_handlers.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml" - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: Loading @@ -89,7 +88,7 @@ unit_test automation: artifacts: when: always reports: junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report_*.xml junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml ## Deployment of the service in Kubernetes Cluster #deploy automation: Loading