# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc , logging, os, grpc from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool from common.proto.automation_pb2_grpc import AutomationServiceServicer from common.proto.automation_pb2 import ( ZSMCreateRequest , ZSMService ,ZSMServiceID ,ZSMServiceState,ZSMCreateUpdate , ZSMServiceStateEnum) from common.proto.context_pb2 import ( ServiceId , ContextId , Uuid , Empty) from common.proto.telemetry_frontend_pb2 import ( Collector , CollectorId ) from common.proto.policy_pb2 import ( PolicyRuleList) from context.client.ContextClient import ContextClient from automation.client.PolicyClient import PolicyClient from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient from kpi_manager.client.KpiManagerClient import KpiManagerClient from common.proto.context_pb2 import ( Service ) from common.proto.kpi_manager_pb2 import (KpiId, KpiDescriptor) from common.proto.kpi_value_api_pb2 import (KpiAlarms) from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState from common.proto.policy_action_pb2 import PolicyRuleAction , PolicyRuleActionConfig from common.proto.policy_condition_pb2 import PolicyRuleCondition from uuid import uuid4 import json from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient from common.method_wrappers.ServiceExceptions import InvalidArgumentException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Automation', 'RPC') class AutomationServiceServicerImpl(AutomationServiceServicer): def __init__(self): LOGGER.info('Init AutomationService') @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService: # check that service does not exist context_client = ContextClient() kpi_manager_client = KpiManagerClient() policy_client = PolicyClient() telemetry_frontend_client = TelemetryFrontendClient() analytics_frontend_client = AnalyticsFrontendClient() analytic_frontend_service = AnalyticsFrontendServiceServicerImpl() LOGGER.info('Trying to get the service ') LOGGER.info('request.serviceId.service_uuid.uuid({:s})'.format(str(request.serviceId.service_uuid.uuid))) LOGGER.info('request.serviceId.service_uuid({:s})'.format(str(request.serviceId.service_uuid))) LOGGER.info('request.serviceId({:s})'.format(str(request.serviceId))) LOGGER.info('Request({:s})'.format(str(request))) try: ####### GET Context ####################### service: Service = context_client.GetService(request.serviceId) LOGGER.info('service({:s})'.format(str(service))) ########################################### ####### SET Kpi Descriptor LAT ################ # if(len(service.service_constraints) == 0): # raise InvalidArgumentException("argument_name" , "argument_value", []); # KPI Descriptor kpi_descriptor_lat = KpiDescriptor() kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS' #static service.service_constraints[].sla_latency.e2e_latency_ms kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4()) kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat) LOGGER.info('kpi_id_lat({:s})'.format(str(kpi_id_lat))) ########################################### ####### SET Kpi Descriptor TX ################ kpi_descriptor_tx = KpiDescriptor() kpi_descriptor_tx.kpi_sample_type = 101 # static KPISAMPLETYPE_PACKETS_TRANSMITTED kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4()) kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx) LOGGER.info('kpi_id_tx({:s})'.format(str(kpi_id_tx))) ########################################### ####### SET Kpi Descriptor RX ################ kpi_descriptor_rx = KpiDescriptor() kpi_descriptor_rx.kpi_sample_type = 102 # static KPISAMPLETYPE_PACKETS_RECEIVED kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4()) kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx) LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx))) ########################################### ####### START Collector TX ################# collect_tx = Collector() collect_tx.collector_id.collector_id.uuid = str(uuid4()) collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid collect_tx.duration_s = 2000 # static collect_tx.interval_s = 1 # static LOGGER.info('Start Collector TX'.format(str(collect_tx))) collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx) LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx))) ############################################# ####### START Collector RX ################## collect_rx = Collector() collect_rx.collector_id.collector_id.uuid = str(uuid4()) collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid collect_rx.duration_s = 2000 # static collect_rx.interval_s = 1 # static LOGGER.info('Start Collector RX'.format(str(collect_rx))) collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx) LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx))) ############################################### ####### SET Policy LAT ################ policy_lat = PolicyRuleService() policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid # PolicyRuleBasic policy_lat.policyRuleBasic.priority = 0 policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4()) policy_lat.policyRuleBasic.booleanOperator = 2 # PolicyRuleAction policyRuleActionConfig = PolicyRuleActionConfig() policyRuleActionConfig.action_key = "" policyRuleActionConfig.action_value = "" policyRuleAction = PolicyRuleAction() policyRuleAction.action = 5 policyRuleAction.action_config.append(policyRuleActionConfig) policy_lat.policyRuleBasic.actionList.append(policyRuleAction) # for constraint in service.service_constraints: # PolicyRuleCondition policyRuleCondition = PolicyRuleCondition() policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid policyRuleCondition.numericalOperator = 5 policyRuleCondition.kpiValue.floatVal = 300 #constraint.sla_latency.e2e_latency_ms policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition) policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat) LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state))) ####### START Analyzer LAT ################ analyzer = Analyzer() analyzer.analyzer_id.analyzer_id.uuid = str(uuid4()) analyzer.algorithm_name = 'Test_Aggergate_and_Threshold' # static analyzer.operation_mode = 2 analyzer.input_kpi_ids.append(kpi_id_rx) analyzer.input_kpi_ids.append(kpi_id_tx) analyzer.output_kpi_ids.append(kpi_id_lat) _threshold_dict = {'min_latency_E2E': (2, 105)} analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) analyzer.parameters['window_size'] = "60s" analyzer.parameters['window_slider'] = "30s" analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer) LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat))) kpi_value_api_client = KpiValueApiClient() stream: KpiAlarms = kpi_value_api_client.GetKpiAlarms(kpi_id_lat.kpi_id.uuid) for response in stream: if response is None: LOGGER.debug('NO message') else: LOGGER.debug(str(response)) ########################################### # for response in analytic_frontend_service.StartResponseListener( analyzer_id_lat.analyzer_id.uuid): # LOGGER.info("response.value {:s}",response) except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member LOGGER.exception('Unable to get Service({:s})'.format(str(request))) context_client.close() kpi_manager_client.close() policy_client.close() telemetry_frontend_client.close() return None LOGGER.info('Here is the service') context_client.close() kpi_manager_client.close() policy_client.close() telemetry_frontend_client.close() return ZSMService() @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMUpdate(self, request : ZSMCreateUpdate, context : grpc.ServicerContext) -> ZSMService: LOGGER.info('NOT IMPLEMENTED ZSMUpdate') return ZSMService() @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMDelete(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMServiceState: LOGGER.info('NOT IMPLEMENTED ZSMDelete') return ZSMServiceState() @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMGetById(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMService: LOGGER.info('NOT IMPLEMENTED ZSMGetById') return ZSMService() @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService: LOGGER.info('NOT IMPLEMENTED ZSMGetByService') return ZSMService()