Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
AutomationServiceServicerImpl.py 11.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    import grpc , logging, os, grpc
    
    from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
    from common.method_wrappers.Decorator import MetricsPool
    
    from common.proto.automation_pb2_grpc import AutomationServiceServicer
    
    from common.proto.automation_pb2 import ( ZSMCreateRequest , ZSMService ,ZSMServiceID ,ZSMServiceState,ZSMCreateUpdate , ZSMServiceStateEnum)
    from common.proto.context_pb2 import ( ServiceId , ContextId , Uuid , Empty)
    
    from common.proto.telemetry_frontend_pb2 import ( Collector , CollectorId )
    
    from common.proto.policy_pb2 import ( PolicyRuleList)
    
    from context.client.ContextClient import ContextClient
    from automation.client.PolicyClient import PolicyClient
    from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
    from kpi_manager.client.KpiManagerClient import KpiManagerClient
    from common.proto.context_pb2 import ( Service )
    
    from common.proto.kpi_manager_pb2 import (KpiId, KpiDescriptor)
    
    from common.proto.kpi_value_api_pb2 import (KpiAlarms)
    
    
    from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
    from common.proto.policy_action_pb2 import PolicyRuleAction , PolicyRuleActionConfig
    from common.proto.policy_condition_pb2 import PolicyRuleCondition
    from uuid import uuid4
    
    import json
    
    from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
    from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
    from common.proto.analytics_frontend_pb2      import Analyzer, AnalyzerId
    
    from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
    
    from common.method_wrappers.ServiceExceptions import InvalidArgumentException
    
    LOGGER = logging.getLogger(__name__)
    METRICS_POOL = MetricsPool('Automation', 'RPC')
    
    
    class AutomationServiceServicerImpl(AutomationServiceServicer):
    
        def __init__(self):
            LOGGER.info('Init AutomationService')
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService:
    
    
            # check that service does not exist
            context_client = ContextClient()
            kpi_manager_client = KpiManagerClient()
            policy_client = PolicyClient()
            telemetry_frontend_client = TelemetryFrontendClient()
    
            analytics_frontend_client = AnalyticsFrontendClient()
    
                # TODO: Remove static variables(get them from ZSMCreateRequest)
                # TODO: Refactor policy component (remove unnecessary variables)
    
    
                ####### GET Context #######################
    
                service: Service = context_client.GetService(request.serviceId)
    
                ###########################################
    
                ####### SET Kpi Descriptor LAT ################
    
                LOGGER.info('Set Kpi Descriptor LAT: ')
    
                if(len(service.service_constraints) == 0):
                    raise InvalidArgumentException("service_constraints" , "empty",  []);
    
                if(len(service.service_constraints) > 1):
                    raise InvalidArgumentException("service_constraints" , ">1",  []);
    
                if(service.service_constraints[0].sla_latency is None ):
                    raise InvalidArgumentException("sla_latency", "empty", []);
    
                ## Static Implementation Applied only in case of SLA Latency Constraint ##
    
                # KPI Descriptor
    
                kpi_descriptor_lat = KpiDescriptor()
                kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS'  #static service.service_constraints[].sla_latency.e2e_latency_ms
                kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
                kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4())
    
                kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat)
    
                LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat)))
    
                ###########################################
    
                ####### SET Kpi Descriptor TX ################
    
                kpi_descriptor_tx = KpiDescriptor()
                kpi_descriptor_tx.kpi_sample_type = 101  # static KPISAMPLETYPE_PACKETS_TRANSMITTED
                kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
                kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4())
    
                kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx)
    
                ###########################################
    
                ####### SET Kpi Descriptor RX ################
    
                kpi_descriptor_rx = KpiDescriptor()
                kpi_descriptor_rx.kpi_sample_type = 102  # static KPISAMPLETYPE_PACKETS_RECEIVED
                kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
                kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4())
    
                kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx)
                LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx)))
                ###########################################
    
    
    
    
                ####### START Collector TX #################
                collect_tx = Collector()
                collect_tx.collector_id.collector_id.uuid = str(uuid4())
                collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
    
                collect_tx.interval_s = 1  # static
                LOGGER.info('Start Collector TX'.format(str(collect_tx)))
    
                collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
                LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx)))
                #############################################
    
                ####### START Collector RX ##################
                collect_rx = Collector()
                collect_rx.collector_id.collector_id.uuid = str(uuid4())
                collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
    
                collect_rx.interval_s = 1  # static
                LOGGER.info('Start Collector RX'.format(str(collect_rx)))
    
                collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
                LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
                ###############################################
    
                ####### START Analyzer LAT ################
                analyzer = Analyzer()
                analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
                analyzer.algorithm_name = 'Test_Aggregate_and_Threshold'  # static
                analyzer.operation_mode = 2
                analyzer.input_kpi_ids.append(kpi_id_rx)
                analyzer.input_kpi_ids.append(kpi_id_tx)
                analyzer.output_kpi_ids.append(kpi_id_lat)
    
    
                thresholdStr = service.service_constraints[0].custom.constraint_type
    
                _threshold_dict = {thresholdStr: (0, int(service.service_constraints[0].custom.constraint_value))}
    
                analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
                analyzer.parameters['window_size'] = "60s"
                analyzer.parameters['window_slider'] = "30s"
    
                analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
                LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
                ###########################################################
    
    
                ####### SET Policy LAT ################
                policy_lat = PolicyRuleService()
                policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid
                policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid
    
                # PolicyRuleBasic
                policy_lat.policyRuleBasic.priority = 0
                policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4())
                policy_lat.policyRuleBasic.booleanOperator = 2
    
                # PolicyRuleAction
                policyRuleActionConfig = PolicyRuleActionConfig()
                policyRuleActionConfig.action_key = ""
                policyRuleActionConfig.action_value = ""
    
                policyRuleAction = PolicyRuleAction()
                policyRuleAction.action = 5
                policyRuleAction.action_config.append(policyRuleActionConfig)
                policy_lat.policyRuleBasic.actionList.append(policyRuleAction)
    
                # for constraint in service.service_constraints:
    
                    # PolicyRuleCondition
                policyRuleCondition = PolicyRuleCondition()
                policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid
                policyRuleCondition.numericalOperator = 5
    
    
                policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition)
    
                policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat)
                LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))
    
            except grpc.RpcError as e:
                if e.code() != grpc.StatusCode.NOT_FOUND: raise  # pylint: disable=no-member
                LOGGER.exception('Unable to get Service({:s})'.format(str(request)))
                context_client.close()
                kpi_manager_client.close()
                policy_client.close()
                telemetry_frontend_client.close()
                return None
    
            context_client.close()
            kpi_manager_client.close()
            policy_client.close()
            telemetry_frontend_client.close()
    
            return ZSMService()
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMUpdate(self, request : ZSMCreateUpdate, context : grpc.ServicerContext) -> ZSMService:
    
            LOGGER.info('NOT IMPLEMENTED ZSMUpdate')
    
            return ZSMService()
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMDelete(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMServiceState:
    
            LOGGER.info('NOT IMPLEMENTED ZSMDelete')
    
            return ZSMServiceState()
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMGetById(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMService:
    
            LOGGER.info('NOT IMPLEMENTED ZSMGetById')
    
            return ZSMService()
    
    
        @safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
    
        def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService:
    
            LOGGER.info('NOT IMPLEMENTED ZSMGetByService')
    
            return ZSMService()