Commit 18a6c2e6 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

Complete Automation Service implementation. Add correct error messages. Add analyzer before policy.

parent e5baccf9
Loading
Loading
Loading
Loading
+44 −50
Original line number Diff line number Diff line
@@ -58,25 +58,31 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
        policy_client = PolicyClient()
        telemetry_frontend_client = TelemetryFrontendClient()
        analytics_frontend_client = AnalyticsFrontendClient()
        analytic_frontend_service = AnalyticsFrontendServiceServicerImpl()

        LOGGER.info('Trying to get the service ')
        LOGGER.info('request.serviceId.service_uuid.uuid({:s})'.format(str(request.serviceId.service_uuid.uuid)))
        LOGGER.info('request.serviceId.service_uuid({:s})'.format(str(request.serviceId.service_uuid)))
        LOGGER.info('request.serviceId({:s})'.format(str(request.serviceId)))
        LOGGER.info('Request({:s})'.format(str(request)))

        try:

            # TODO: Remove static variables(get them from ZSMCreateRequest)
            # TODO: Refactor policy component (remove unnecessary variables)

            ####### GET Context #######################
            LOGGER.info('Get the service from Context: ')
            service: Service = context_client.GetService(request.serviceId)
            LOGGER.info('service({:s})'.format(str(service)))
            LOGGER.info('Service ({:s}) :'.format(str(service)))
            ###########################################

            ####### SET Kpi Descriptor LAT ################
            LOGGER.info('Set Kpi Descriptor LAT: ')

            if(len(service.service_constraints) == 0):
                raise InvalidArgumentException("service_constraints" , "empty",  []);

            # if(len(service.service_constraints) == 0):
            #     raise InvalidArgumentException("argument_name" , "argument_value",  []);
            if(len(service.service_constraints) > 1):
                raise InvalidArgumentException("service_constraints" , ">1",  []);

            if(service.service_constraints[0].sla_latency is None ):
                raise InvalidArgumentException("sla_latency", "empty", []);

            ## Static Implementation Applied only in case of SLA Latency Constraint ##

            # KPI Descriptor
            kpi_descriptor_lat = KpiDescriptor()
@@ -85,20 +91,24 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
            kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4())

            kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat)
            LOGGER.info('kpi_id_lat({:s})'.format(str(kpi_id_lat)))
            LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat)))
            ###########################################

            ####### SET Kpi Descriptor TX ################
            LOGGER.info('Set Kpi Descriptor TX: ')

            kpi_descriptor_tx = KpiDescriptor()
            kpi_descriptor_tx.kpi_sample_type = 101  # static KPISAMPLETYPE_PACKETS_TRANSMITTED
            kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
            kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4())

            kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx)
            LOGGER.info('kpi_id_tx({:s})'.format(str(kpi_id_tx)))
            LOGGER.info('The kpi_id_tx({:s})'.format(str(kpi_id_tx)))
            ###########################################

            ####### SET Kpi Descriptor RX ################
            LOGGER.info('Set Kpi Descriptor RX: ')

            kpi_descriptor_rx = KpiDescriptor()
            kpi_descriptor_rx.kpi_sample_type = 102  # static KPISAMPLETYPE_PACKETS_RECEIVED
            kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
@@ -114,7 +124,7 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
            collect_tx = Collector()
            collect_tx.collector_id.collector_id.uuid = str(uuid4())
            collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
            collect_tx.duration_s = 2000  # static
            collect_tx.duration_s = 20000  # static
            collect_tx.interval_s = 1  # static
            LOGGER.info('Start Collector TX'.format(str(collect_tx)))

@@ -126,7 +136,7 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
            collect_rx = Collector()
            collect_rx.collector_id.collector_id.uuid = str(uuid4())
            collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
            collect_rx.duration_s = 2000  # static
            collect_rx.duration_s = 20000  # static
            collect_rx.interval_s = 1  # static
            LOGGER.info('Start Collector RX'.format(str(collect_rx)))

@@ -134,6 +144,24 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
            LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
            ###############################################

            ####### START Analyzer LAT ################
            analyzer = Analyzer()
            analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
            analyzer.algorithm_name = 'Test_Aggregate_and_Threshold'  # static
            analyzer.operation_mode = 2
            analyzer.input_kpi_ids.append(kpi_id_rx)
            analyzer.input_kpi_ids.append(kpi_id_tx)
            analyzer.output_kpi_ids.append(kpi_id_lat)

            _threshold_dict = {'min_latency_E2E': (0, service.service_constraints[0].sla_latency.e2e_latency_ms)}
            analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
            analyzer.parameters['window_size'] = "60s"
            analyzer.parameters['window_slider'] = "30s"

            analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
            LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
            ###########################################################

            ####### SET Policy LAT ################
            policy_lat = PolicyRuleService()
            policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid
@@ -160,45 +188,13 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
            policyRuleCondition = PolicyRuleCondition()
            policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid
            policyRuleCondition.numericalOperator = 5
            policyRuleCondition.kpiValue.floatVal = 300 #constraint.sla_latency.e2e_latency_ms
            policyRuleCondition.kpiValue.floatVal = 300

            policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition)


            policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat)
            LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))


            ####### START Analyzer LAT ################
            analyzer = Analyzer()
            analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
            analyzer.algorithm_name = 'Test_Aggergate_and_Threshold'  # static
            analyzer.operation_mode = 2
            analyzer.input_kpi_ids.append(kpi_id_rx)
            analyzer.input_kpi_ids.append(kpi_id_tx)
            analyzer.output_kpi_ids.append(kpi_id_lat)

            _threshold_dict = {'min_latency_E2E': (2, 105)}
            analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
            analyzer.parameters['window_size'] = "60s"
            analyzer.parameters['window_slider'] = "30s"

            analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
            LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))

            kpi_value_api_client = KpiValueApiClient()
            stream: KpiAlarms = kpi_value_api_client.GetKpiAlarms(kpi_id_lat.kpi_id.uuid)
            for response in stream:
                if response is None:
                    LOGGER.debug('NO message')
                else:
                    LOGGER.debug(str(response))
            ###########################################

            # for response in analytic_frontend_service.StartResponseListener( analyzer_id_lat.analyzer_id.uuid):
            #     LOGGER.info("response.value {:s}",response)


        except grpc.RpcError as e:
            if e.code() != grpc.StatusCode.NOT_FOUND: raise  # pylint: disable=no-member
            LOGGER.exception('Unable to get Service({:s})'.format(str(request)))
@@ -208,8 +204,6 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
            telemetry_frontend_client.close()
            return None

        LOGGER.info('Here is the service')

        context_client.close()
        kpi_manager_client.close()
        policy_client.close()
+2 −0
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ from service.client.ServiceClient import ServiceClient
from .Objects import CONTEXT_ID, CONTEXTS, DEVICES, LINKS, TOPOLOGIES, SERVICES
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceId,\
    DeviceOperationalStatusEnum
from common.tools.object_factory.Constraint import json_constraint_sla_latency

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
@@ -62,4 +63,5 @@ def test_rules_entry(
        service_p4 = copy.deepcopy(service)
        service_client.CreateService(Service(**service_p4))
        service_p4['service_endpoint_ids'].extend(endpoints)
        service_p4['service_constraints'].extend([json_constraint_sla_latency(3)])
        service_client.UpdateService(Service(**service_p4))