Commit 890dbe11 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

Refactor automation component. Add plugin mechanism for creating a ZSM Loop....

Refactor automation component. Add plugin mechanism for creating a ZSM Loop. Create some basic test for zsm_handler.
parent 5f3f7d28
Loading
Loading
Loading
Loading
+11 −2
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ package automation;

import "context.proto";
import "policy.proto";
import "analytics_frontend.proto";

// Automation service RPCs
service AutomationService {
@@ -37,9 +38,17 @@ enum ZSMServiceStateEnum {
  ZSM_REMOVED = 5;       // ZSM loop is removed
}

enum ZSMTypeEnum {
  ZSMTYPE_UNKNOWN = 0;
  ZSMTYPE_P4 = 1;
  ZSMTYPE_L2NM = 2;
}

message ZSMCreateRequest {
  context.ServiceId serviceId = 1;
  policy.PolicyRuleList policyList = 2;
  context.ServiceId targetServiceId = 1;
  context.ServiceId telemetryServiceId = 2;
  analytics_frontend.Analyzer analyzer = 3;
  policy.PolicyRuleService policy = 4;
}

message ZSMCreateUpdate {
+1 −2
Original line number Diff line number Diff line
@@ -66,8 +66,7 @@ unit_test automation:
    - sleep 5
    - docker ps -a
    - docker logs $IMAGE_NAME
    - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_emulated.py --junitxml=/opt/results/${IMAGE_NAME}_report_emulated.xml"
    - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_ietf_actn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml"
    - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_automation_handlers.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml"
    - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
  coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
  after_script:
+22 −0
Original line number Diff line number Diff line
# Plug In A ZSM LOOP

### Initialization
The following requirements should be fulfilled before the execuation of Analytics service.

1. IN ZSMFilterFieldEnum add the new available services
2. In zsm_handlers folder , on the init file and the ZSM_SERVICE_HANDLERS. Give the name of the file and the filter criteria 
3. Add the handler with the same name as you add it in ZSM_SERVICE_HANDLERS.
4. Implement some of those methods in side your handler

```
    def zsmCreate(self,request : ZSMCreateRequest, context : grpc.ServicerContext):
       LOGGER.info('Init zsmUpdate method')
    def zsmUpdate(self):
        LOGGER.info('Init zsmUpdate method')
    def zsmDelete(self):
        LOGGER.info('Init zsmDelete method')
    def ZSMGetById(self):
        LOGGER.info('Init ZSMGetById method')
    def ZSMGetByService(self):
        LOGGER.info('Init ZSMGetByService method')
```
 No newline at end of file
+35 −166
Original line number Diff line number Diff line
@@ -15,22 +15,14 @@
import grpc, json, logging
from uuid import uuid4
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.automation_pb2_grpc import AutomationServiceServicer
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId
from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService, ZSMServiceID, ZSMServiceState, ZSMCreateUpdate
from common.proto.automation_pb2_grpc import AutomationServiceServicer
from common.proto.context_pb2 import Service, ServiceId
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor
from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
from common.proto.policy_action_pb2 import PolicyRuleAction, PolicyRuleActionConfig
from common.proto.policy_condition_pb2 import PolicyRuleCondition
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId

from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from automation.client.PolicyClient import PolicyClient
from automation.service.zsm_handlers import ZSM_SERVICE_HANDLERS
from automation.service.zsm_handler_api.ZSMFilterFields import ZSMFilterFieldEnum , TELEMETRY_SERVICE_TYPE_VALUES, TARGET_SERVICE_TYPE_VALUES , ZSM_FILTER_FIELD_ALLOWED_VALUES
from common.proto.context_pb2 import ServiceTypeEnum , DeviceDriverEnum
from context.client.ContextClient import ContextClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient

LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Automation', 'RPC')
@@ -41,165 +33,19 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):

    @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService:

        # check that service does not exist
        context_client = ContextClient()
        kpi_manager_client = KpiManagerClient()
        policy_client = PolicyClient()
        telemetry_frontend_client = TelemetryFrontendClient()
        analytics_frontend_client = AnalyticsFrontendClient()

        try:

            # TODO: Remove static variables(get them from ZSMCreateRequest)
            # TODO: Refactor policy component (remove unnecessary variables)

            ####### GET Context #######################
            LOGGER.info('Get the service from Context: ')
            service: Service = context_client.GetService(request.serviceId)
            LOGGER.info('Service ({:s}) :'.format(str(service)))
            ###########################################

            ####### SET Kpi Descriptor LAT ################
            LOGGER.info('Set Kpi Descriptor LAT: ')

            if(len(service.service_constraints) == 0):
                raise InvalidArgumentException("service_constraints" , "empty",  []);

            if(len(service.service_constraints) > 1):
                raise InvalidArgumentException("service_constraints" , ">1",  []);

            if(service.service_constraints[0].sla_latency is None ):
                raise InvalidArgumentException("sla_latency", "empty", []);

            ## Static Implementation Applied only in case of SLA Latency Constraint ##

            # KPI Descriptor
            kpi_descriptor_lat = KpiDescriptor()
            kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS'  #static service.service_constraints[].sla_latency.e2e_latency_ms
            kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
            kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4())

            kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat)
            LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat)))
            ###########################################

            ####### SET Kpi Descriptor TX ################
            LOGGER.info('Set Kpi Descriptor TX: ')

            kpi_descriptor_tx = KpiDescriptor()
            kpi_descriptor_tx.kpi_sample_type = 101  # static KPISAMPLETYPE_PACKETS_TRANSMITTED
            kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
            kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4())

            kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx)
            LOGGER.info('The kpi_id_tx({:s})'.format(str(kpi_id_tx)))
            ###########################################

            ####### SET Kpi Descriptor RX ################
            LOGGER.info('Set Kpi Descriptor RX: ')

            kpi_descriptor_rx = KpiDescriptor()
            kpi_descriptor_rx.kpi_sample_type = 102  # static KPISAMPLETYPE_PACKETS_RECEIVED
            kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
            kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4())

            kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx)
            LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx)))
            ###########################################

        targetService = context_client.GetService(request.targetServiceId)
        telemetryService = context_client.GetService(request.telemetryServiceId)

        handler_cls = self.get_service_handler_based_on_service_types(targetService.service_type , telemetryService.service_type , ZSM_SERVICE_HANDLERS)

            ####### START Collector TX #################
            collect_tx = Collector()
            collect_tx.collector_id.collector_id.uuid = str(uuid4())
            collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
            collect_tx.duration_s = 20000  # static
            collect_tx.interval_s = 1  # static
            LOGGER.info('Start Collector TX'.format(str(collect_tx)))
        if handler_cls:
            handler_obj = handler_cls()  # instantiate it
            handler_obj.zsmCreate(request , context)
        else:
            LOGGER.info("No matching handler found.")

            collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
            LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx)))
            #############################################

            ####### START Collector RX ##################
            collect_rx = Collector()
            collect_rx.collector_id.collector_id.uuid = str(uuid4())
            collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
            collect_rx.duration_s = 20000  # static
            collect_rx.interval_s = 1  # static
            LOGGER.info('Start Collector RX'.format(str(collect_rx)))

            collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
            LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
            ###############################################

            ####### START Analyzer LAT ################
            analyzer = Analyzer()
            analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
            analyzer.algorithm_name = 'Test_Aggregate_and_Threshold'  # static
            analyzer.operation_mode = 2
            analyzer.input_kpi_ids.append(kpi_id_rx)
            analyzer.input_kpi_ids.append(kpi_id_tx)
            analyzer.output_kpi_ids.append(kpi_id_lat)

            thresholdStr = service.service_constraints[0].custom.constraint_type

            _threshold_dict = {thresholdStr: (0, int(service.service_constraints[0].custom.constraint_value))}
            analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
            analyzer.parameters['window_size'] = "60s"
            analyzer.parameters['window_slider'] = "30s"

            analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
            LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
            ###########################################################

            ####### SET Policy LAT ################
            policy_lat = PolicyRuleService()
            policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid
            policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid

            # PolicyRuleBasic
            policy_lat.policyRuleBasic.priority = 0
            policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4())
            policy_lat.policyRuleBasic.booleanOperator = 2

            # PolicyRuleAction
            policyRuleActionConfig = PolicyRuleActionConfig()
            policyRuleActionConfig.action_key = ""
            policyRuleActionConfig.action_value = ""

            policyRuleAction = PolicyRuleAction()
            policyRuleAction.action = 5
            policyRuleAction.action_config.append(policyRuleActionConfig)
            policy_lat.policyRuleBasic.actionList.append(policyRuleAction)

            # for constraint in service.service_constraints:

                # PolicyRuleCondition
            policyRuleCondition = PolicyRuleCondition()
            policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid
            policyRuleCondition.numericalOperator = 5
            policyRuleCondition.kpiValue.floatVal = 300

            policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition)

            policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat)
            LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))

        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.NOT_FOUND: raise  # pylint: disable=no-member
            LOGGER.exception('Unable to get Service({:s})'.format(str(request)))
            context_client.close()
            kpi_manager_client.close()
            policy_client.close()
            telemetry_frontend_client.close()
            return None

        context_client.close()
        kpi_manager_client.close()
        policy_client.close()
        telemetry_frontend_client.close()
        return ZSMService()

    @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
@@ -222,3 +68,26 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
    def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService:
        LOGGER.info('NOT IMPLEMENTED ZSMGetByService')
        return ZSMService()

    def get_service_handler_based_on_service_types(self, targetServiceType ,telemetryServiceType , ZSM_SERVICE_HANDLERS):
        flag = True
        for handler_cls, filters in ZSM_SERVICE_HANDLERS:
            for filter in filters:
                flag = self.check_if_requested_services_pass_filter_criteria(filter , targetServiceType, telemetryServiceType)
            if flag:
                return handler_cls
        return None

    def check_if_requested_services_pass_filter_criteria(self ,filter , targetServiceType , telemetryServiceType):
        flag = True
        for filter_key, filter_value in filter.items():
            if filter_value in ZSM_FILTER_FIELD_ALLOWED_VALUES[filter_key.value]:
                if filter_key.value == ZSMFilterFieldEnum.TARGET_SERVICE_TYPE.value:
                    if filter_value != targetServiceType:
                        flag = False
                elif filter_key.value == ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE.value:
                    if filter_value != telemetryServiceType:
                        flag = False
            else:
                flag = False
        return flag
+37 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
from common.proto.context_pb2 import ServiceTypeEnum

class ZSMFilterFieldEnum(Enum):
    TARGET_SERVICE_TYPE  = 'target_service_type'
    TELEMETRY_SERVICE_TYPE = 'telemetry_service_driver'


TARGET_SERVICE_TYPE_VALUES = {
    ServiceTypeEnum.SERVICETYPE_L2NM,
    ServiceTypeEnum.SERVICETYPE_INT
}

TELEMETRY_SERVICE_TYPE_VALUES = {
    ServiceTypeEnum.SERVICETYPE_INT,
    ServiceTypeEnum.SERVICETYPE_L2NM
}

# Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified
ZSM_FILTER_FIELD_ALLOWED_VALUES = {
    ZSMFilterFieldEnum.TARGET_SERVICE_TYPE.value  : TARGET_SERVICE_TYPE_VALUES,
    ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE.value : TELEMETRY_SERVICE_TYPE_VALUES,
}
Loading