Skip to content
Snippets Groups Projects
AutomationServiceServicerImpl.py 11.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import grpc , logging, os, grpc
    
    from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
    from common.method_wrappers.Decorator import MetricsPool
    
    from common.proto.automation_pb2_grpc import AutomationServiceServicer
    
    from common.proto.automation_pb2 import ( ZSMCreateRequest , ZSMService ,ZSMServiceID ,ZSMServiceState,ZSMCreateUpdate , ZSMServiceStateEnum)
    from common.proto.context_pb2 import ( ServiceId , ContextId , Uuid , Empty)
    
    from common.proto.telemetry_frontend_pb2 import ( Collector , CollectorId )
    
    from common.proto.policy_pb2 import ( PolicyRuleList)
    
    from context.client.ContextClient import ContextClient
    from automation.client.PolicyClient import PolicyClient
    from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
    from kpi_manager.client.KpiManagerClient import KpiManagerClient
    from common.proto.context_pb2 import ( Service )
    
    from common.proto.kpi_manager_pb2 import (KpiId, KpiDescriptor)
    
    from common.proto.kpi_value_api_pb2 import (KpiAlarms)
    
    
    from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
    from common.proto.policy_action_pb2 import PolicyRuleAction , PolicyRuleActionConfig
    from common.proto.policy_condition_pb2 import PolicyRuleCondition
    from uuid import uuid4
    
    import json
    
    from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
    from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
    from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId
    
    from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
    
    from common.method_wrappers.ServiceExceptions import InvalidArgumentException
    
    LOGGER = logging.getLogger(__name__)
    METRICS_POOL = MetricsPool('Automation', 'RPC')
    
    
    class AutomationServiceServicerImpl(AutomationServiceServicer):
    
        def __init__(self):
            LOGGER.info('Init AutomationService')
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService:
    
    
            # check that service does not exist
            context_client = ContextClient()
            kpi_manager_client = KpiManagerClient()
            policy_client = PolicyClient()
            telemetry_frontend_client = TelemetryFrontendClient()
    
            analytics_frontend_client = AnalyticsFrontendClient()
            analytic_frontend_service = AnalyticsFrontendServiceServicerImpl()
    
    
            LOGGER.info('Trying to get the service ')
            LOGGER.info('request.serviceId.service_uuid.uuid({:s})'.format(str(request.serviceId.service_uuid.uuid)))
            LOGGER.info('request.serviceId.service_uuid({:s})'.format(str(request.serviceId.service_uuid)))
            LOGGER.info('request.serviceId({:s})'.format(str(request.serviceId)))
            LOGGER.info('Request({:s})'.format(str(request)))
    
            try:
    
                ####### GET Context #######################
                service: Service = context_client.GetService(request.serviceId)
                LOGGER.info('service({:s})'.format(str(service)))
                ###########################################
    
                ####### SET Kpi Descriptor LAT ################
    
                # if(len(service.service_constraints) == 0):
                #     raise InvalidArgumentException("argument_name" , "argument_value",  []);
    
                    # KPI Descriptor
                kpi_descriptor_lat = KpiDescriptor()
                kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS'  #static service.service_constraints[].sla_latency.e2e_latency_ms
                kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
                kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4())
    
                kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat)
                LOGGER.info('kpi_id_lat({:s})'.format(str(kpi_id_lat)))
                ###########################################
    
                ####### SET Kpi Descriptor TX ################
                kpi_descriptor_tx = KpiDescriptor()
                kpi_descriptor_tx.kpi_sample_type = 101  # static KPISAMPLETYPE_PACKETS_TRANSMITTED
                kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
                kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4())
    
                kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx)
                LOGGER.info('kpi_id_tx({:s})'.format(str(kpi_id_tx)))
                ###########################################
    
                ####### SET Kpi Descriptor RX ################
                kpi_descriptor_rx = KpiDescriptor()
                kpi_descriptor_rx.kpi_sample_type = 102  # static KPISAMPLETYPE_PACKETS_RECEIVED
                kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
                kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4())
    
                kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx)
                LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx)))
                ###########################################
    
    
    
    
                ####### START Collector TX #################
                collect_tx = Collector()
                collect_tx.collector_id.collector_id.uuid = str(uuid4())
                collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
                collect_tx.duration_s = 2000  # static
                collect_tx.interval_s = 1  # static
                LOGGER.info('Start Collector TX'.format(str(collect_tx)))
    
                collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
                LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx)))
                #############################################
    
                ####### START Collector RX ##################
                collect_rx = Collector()
                collect_rx.collector_id.collector_id.uuid = str(uuid4())
                collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
                collect_rx.duration_s = 2000  # static
                collect_rx.interval_s = 1  # static
                LOGGER.info('Start Collector RX'.format(str(collect_rx)))
    
                collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
                LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
                ###############################################
    
    
                ####### SET Policy LAT ################
                policy_lat = PolicyRuleService()
                policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid
                policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid
    
                # PolicyRuleBasic
                policy_lat.policyRuleBasic.priority = 0
                policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4())
                policy_lat.policyRuleBasic.booleanOperator = 2
    
                # PolicyRuleAction
                policyRuleActionConfig = PolicyRuleActionConfig()
                policyRuleActionConfig.action_key = ""
                policyRuleActionConfig.action_value = ""
    
                policyRuleAction = PolicyRuleAction()
                policyRuleAction.action = 5
                policyRuleAction.action_config.append(policyRuleActionConfig)
                policy_lat.policyRuleBasic.actionList.append(policyRuleAction)
    
                # for constraint in service.service_constraints:
    
                    # PolicyRuleCondition
                policyRuleCondition = PolicyRuleCondition()
                policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid
                policyRuleCondition.numericalOperator = 5
                policyRuleCondition.kpiValue.floatVal = 300 #constraint.sla_latency.e2e_latency_ms
    
                policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition)
    
    
                policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat)
                LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))
    
    
    
                ####### START Analyzer LAT ################
                analyzer = Analyzer()
                analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
                analyzer.algorithm_name = 'Test_Aggergate_and_Threshold'  # static
                analyzer.operation_mode = 2
                analyzer.input_kpi_ids.append(kpi_id_rx)
                analyzer.input_kpi_ids.append(kpi_id_tx)
                analyzer.output_kpi_ids.append(kpi_id_lat)
    
                _threshold_dict = {'min_latency_E2E': (2, 105)}
                analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
                analyzer.parameters['window_size'] = "60s"
                analyzer.parameters['window_slider'] = "30s"
    
                analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
                LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
    
                kpi_value_api_client = KpiValueApiClient()
                stream: KpiAlarms = kpi_value_api_client.GetKpiAlarms(kpi_id_lat.kpi_id.uuid)
                for response in stream:
                    if response is None:
                        LOGGER.debug('NO message')
                    else:
                        LOGGER.debug(str(response))
                ###########################################
    
                # for response in analytic_frontend_service.StartResponseListener( analyzer_id_lat.analyzer_id.uuid):
                #     LOGGER.info("response.value {:s}",response)
    
    
    
            except grpc.RpcError as e:
                if e.code() != grpc.StatusCode.NOT_FOUND: raise  # pylint: disable=no-member
                LOGGER.exception('Unable to get Service({:s})'.format(str(request)))
                context_client.close()
                kpi_manager_client.close()
                policy_client.close()
                telemetry_frontend_client.close()
                return None
    
            LOGGER.info('Here is the service')
    
            context_client.close()
            kpi_manager_client.close()
            policy_client.close()
            telemetry_frontend_client.close()
    
            return ZSMService()
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMUpdate(self, request : ZSMCreateUpdate, context : grpc.ServicerContext) -> ZSMService:
    
            LOGGER.info('NOT IMPLEMENTED ZSMUpdate')
    
            return ZSMService()
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMDelete(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMServiceState:
    
            LOGGER.info('NOT IMPLEMENTED ZSMDelete')
    
            return ZSMServiceState()
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMGetById(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMService:
    
            LOGGER.info('NOT IMPLEMENTED ZSMGetById')
    
            return ZSMService()
    
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService:
    
            LOGGER.info('NOT IMPLEMENTED ZSMGetByService')
    
            return ZSMService()