Commit 34ebe9f0 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

feat: refactor automation for supporting zsm plugin handler logic.

parent 5f3f7d28
Loading
Loading
Loading
Loading
+76 −69
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ package automation;

import "context.proto";
import "policy.proto";
import "analytics_frontend.proto";

// Automation service RPCs
service AutomationService {
@@ -37,9 +38,15 @@ enum ZSMServiceStateEnum {
  ZSM_REMOVED = 5;       // ZSM loop is removed
}

enum ZSMTypeEnum {
  ZSMTYPE_UNKNOWN = 0;
}

message ZSMCreateRequest {
  context.ServiceId serviceId = 1;
  policy.PolicyRuleList policyList = 2;
  context.ServiceId target_service_id = 1;
  context.ServiceId telemetry_service_id = 2;
  analytics_frontend.Analyzer analyzer = 3;
  policy.PolicyRuleService policy = 4;
}

message ZSMCreateUpdate {
+40 −0
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
import logging
from enum import Enum
import pandas as pd
from collections import defaultdict

logger = logging.getLogger(__name__)

@@ -134,3 +135,42 @@ def aggregation_handler(
            return results
        else:
            return []

def find(data , type , value):
    return next((item for item in data if item[type] == value), None)


def aggregation_handler_three_to_one(
        batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds
):

    # Group and sum
    # Track sum and count
    sum_dict = defaultdict(int)
    count_dict = defaultdict(int)

    for item in batch:
        kpi_id = item["kpi_id"]
        if kpi_id in input_kpi_list:
            sum_dict[kpi_id] += item["kpi_value"]
            count_dict[kpi_id] += 1

    # Compute average
    avg_dict = {kpi_id: sum_dict[kpi_id] / count_dict[kpi_id] for kpi_id in sum_dict}

    total_kpi_metric = 0
    for kpi_id, total_value in avg_dict.items():
        total_kpi_metric += total_value

    result = {
        "kpi_id": output_kpi_list[0],
        "avg": total_kpi_metric,
        "THRESHOLD_RAISE": bool(total_kpi_metric > 2600),
        "THRESHOLD_FALL": bool(total_kpi_metric < 699)
    }
    results = []

    results.append(result)
    logger.warning(f"result : {result}.")

    return results
+17 −9
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ import logging

from confluent_kafka                            import KafkaException, KafkaError
from common.tools.kafka.Variables               import KafkaTopic
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler
from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one
from analytics.backend.service.AnalyzerHelper   import AnalyzerHelper


@@ -113,6 +113,14 @@ class DaskStreamer(threading.Thread):
        logger.info(f"Batch to be processed: {self.batch}")
        if Handlers.is_valid_handler(self.thresholds["task_type"]):
            if self.client is not None and self.client.status == 'running':
                if any('total' in d for d in self.thresholds.get('task_parameter', [])):
                    try:
                        future = self.client.submit(aggregation_handler_three_to_one, "batch size", self.key,
                                                        self.batch, self.input_kpis, self.output_kpis, self.thresholds)
                        future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value))
                    except Exception as e:
                        logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}")
                else:
                    try:
                        future = self.client.submit(aggregation_handler, "batch size", self.key,
                                                        self.batch, self.input_kpis, self.output_kpis, self.thresholds)
+1 −2
Original line number Diff line number Diff line
@@ -66,8 +66,7 @@ unit_test automation:
    - sleep 5
    - docker ps -a
    - docker logs $IMAGE_NAME
    - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_emulated.py --junitxml=/opt/results/${IMAGE_NAME}_report_emulated.xml"
    - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_ietf_actn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml"
    - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_automation_handlers.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml"
    - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
  coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
  after_script:
+35 −166
Original line number Diff line number Diff line
@@ -15,22 +15,14 @@
import grpc, json, logging
from uuid import uuid4
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.automation_pb2_grpc import AutomationServiceServicer
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId
from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService, ZSMServiceID, ZSMServiceState, ZSMCreateUpdate
from common.proto.automation_pb2_grpc import AutomationServiceServicer
from common.proto.context_pb2 import Service, ServiceId
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor
from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
from common.proto.policy_action_pb2 import PolicyRuleAction, PolicyRuleActionConfig
from common.proto.policy_condition_pb2 import PolicyRuleCondition
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId

from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from automation.client.PolicyClient import PolicyClient
from automation.service.zsm_handlers import ZSM_SERVICE_HANDLERS
from automation.service.zsm_handler_api.ZSMFilterFields import ZSMFilterFieldEnum , TELEMETRY_SERVICE_TYPE_VALUES, TARGET_SERVICE_TYPE_VALUES , ZSM_FILTER_FIELD_ALLOWED_VALUES
from common.proto.context_pb2 import ServiceTypeEnum , DeviceDriverEnum
from context.client.ContextClient import ContextClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient

LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Automation', 'RPC')
@@ -41,165 +33,19 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):

    @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService:

        # check that service does not exist
        context_client = ContextClient()
        kpi_manager_client = KpiManagerClient()
        policy_client = PolicyClient()
        telemetry_frontend_client = TelemetryFrontendClient()
        analytics_frontend_client = AnalyticsFrontendClient()

        try:

            # TODO: Remove static variables(get them from ZSMCreateRequest)
            # TODO: Refactor policy component (remove unnecessary variables)

            ####### GET Context #######################
            LOGGER.info('Get the service from Context: ')
            service: Service = context_client.GetService(request.serviceId)
            LOGGER.info('Service ({:s}) :'.format(str(service)))
            ###########################################

            ####### SET Kpi Descriptor LAT ################
            LOGGER.info('Set Kpi Descriptor LAT: ')

            if(len(service.service_constraints) == 0):
                raise InvalidArgumentException("service_constraints" , "empty",  []);

            if(len(service.service_constraints) > 1):
                raise InvalidArgumentException("service_constraints" , ">1",  []);

            if(service.service_constraints[0].sla_latency is None ):
                raise InvalidArgumentException("sla_latency", "empty", []);

            ## Static Implementation Applied only in case of SLA Latency Constraint ##

            # KPI Descriptor
            kpi_descriptor_lat = KpiDescriptor()
            kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS'  #static service.service_constraints[].sla_latency.e2e_latency_ms
            kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
            kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4())

            kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat)
            LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat)))
            ###########################################

            ####### SET Kpi Descriptor TX ################
            LOGGER.info('Set Kpi Descriptor TX: ')

            kpi_descriptor_tx = KpiDescriptor()
            kpi_descriptor_tx.kpi_sample_type = 101  # static KPISAMPLETYPE_PACKETS_TRANSMITTED
            kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
            kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4())

            kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx)
            LOGGER.info('The kpi_id_tx({:s})'.format(str(kpi_id_tx)))
            ###########################################

            ####### SET Kpi Descriptor RX ################
            LOGGER.info('Set Kpi Descriptor RX: ')

            kpi_descriptor_rx = KpiDescriptor()
            kpi_descriptor_rx.kpi_sample_type = 102  # static KPISAMPLETYPE_PACKETS_RECEIVED
            kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
            kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4())

            kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx)
            LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx)))
            ###########################################

        targetService = context_client.GetService(request.target_service_id)
        telemetryService = context_client.GetService(request.telemetry_service_id)

        handler_cls = self.get_service_handler_based_on_service_types(targetService.service_type, telemetryService.service_type, ZSM_SERVICE_HANDLERS)

            ####### START Collector TX #################
            collect_tx = Collector()
            collect_tx.collector_id.collector_id.uuid = str(uuid4())
            collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
            collect_tx.duration_s = 20000  # static
            collect_tx.interval_s = 1  # static
            LOGGER.info('Start Collector TX'.format(str(collect_tx)))
        if handler_cls:
            handler_obj = handler_cls()  # instantiate it
            handler_obj.zsmCreate(request, context)
        else:
            LOGGER.info("No matching handler found.")

            collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
            LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx)))
            #############################################

            ####### START Collector RX ##################
            collect_rx = Collector()
            collect_rx.collector_id.collector_id.uuid = str(uuid4())
            collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
            collect_rx.duration_s = 20000  # static
            collect_rx.interval_s = 1  # static
            LOGGER.info('Start Collector RX'.format(str(collect_rx)))

            collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
            LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
            ###############################################

            ####### START Analyzer LAT ################
            analyzer = Analyzer()
            analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
            analyzer.algorithm_name = 'Test_Aggregate_and_Threshold'  # static
            analyzer.operation_mode = 2
            analyzer.input_kpi_ids.append(kpi_id_rx)
            analyzer.input_kpi_ids.append(kpi_id_tx)
            analyzer.output_kpi_ids.append(kpi_id_lat)

            thresholdStr = service.service_constraints[0].custom.constraint_type

            _threshold_dict = {thresholdStr: (0, int(service.service_constraints[0].custom.constraint_value))}
            analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
            analyzer.parameters['window_size'] = "60s"
            analyzer.parameters['window_slider'] = "30s"

            analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
            LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
            ###########################################################

            ####### SET Policy LAT ################
            policy_lat = PolicyRuleService()
            policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid
            policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid

            # PolicyRuleBasic
            policy_lat.policyRuleBasic.priority = 0
            policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4())
            policy_lat.policyRuleBasic.booleanOperator = 2

            # PolicyRuleAction
            policyRuleActionConfig = PolicyRuleActionConfig()
            policyRuleActionConfig.action_key = ""
            policyRuleActionConfig.action_value = ""

            policyRuleAction = PolicyRuleAction()
            policyRuleAction.action = 5
            policyRuleAction.action_config.append(policyRuleActionConfig)
            policy_lat.policyRuleBasic.actionList.append(policyRuleAction)

            # for constraint in service.service_constraints:

                # PolicyRuleCondition
            policyRuleCondition = PolicyRuleCondition()
            policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid
            policyRuleCondition.numericalOperator = 5
            policyRuleCondition.kpiValue.floatVal = 300

            policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition)

            policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat)
            LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))

        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.NOT_FOUND: raise  # pylint: disable=no-member
            LOGGER.exception('Unable to get Service({:s})'.format(str(request)))
            context_client.close()
            kpi_manager_client.close()
            policy_client.close()
            telemetry_frontend_client.close()
            return None

        context_client.close()
        kpi_manager_client.close()
        policy_client.close()
        telemetry_frontend_client.close()
        return ZSMService()

    @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
@@ -222,3 +68,26 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
    def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService:
        LOGGER.info('NOT IMPLEMENTED ZSMGetByService')
        return ZSMService()

    def get_service_handler_based_on_service_types(self, targetServiceType ,telemetryServiceType , ZSM_SERVICE_HANDLERS):
        flag = True
        for handler_cls, filters in ZSM_SERVICE_HANDLERS:
            for filter in filters:
                flag = self.check_if_requested_services_pass_filter_criteria(filter , targetServiceType, telemetryServiceType)
            if flag:
                return handler_cls
        return None

    def check_if_requested_services_pass_filter_criteria(self ,filter , targetServiceType , telemetryServiceType):
        flag = True
        for filter_key, filter_value in filter.items():
            if filter_value in ZSM_FILTER_FIELD_ALLOWED_VALUES[filter_key.value]:
                if filter_key.value == ZSMFilterFieldEnum.TARGET_SERVICE_TYPE.value:
                    if filter_value != targetServiceType:
                        flag = False
                elif filter_key.value == ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE.value:
                    if filter_value != telemetryServiceType:
                        flag = False
            else:
                flag = False
        return flag
Loading