Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.Decorator import MetricsPool
from common.proto.automation_pb2_grpc import AutomationServiceServicer
from common.proto.automation_pb2 import ( ZSMCreateRequest , ZSMService ,ZSMServiceID ,ZSMServiceState,ZSMCreateUpdate , ZSMServiceStateEnum)
from common.proto.context_pb2 import ( ServiceId , ContextId , Uuid , Empty)
Konstantinos Poulakakis
committed
from common.proto.telemetry_frontend_pb2 import ( Collector , CollectorId )
from common.proto.policy_pb2 import ( PolicyRuleList)
Konstantinos Poulakakis
committed
from context.client.ContextClient import ContextClient
from automation.client.PolicyClient import PolicyClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from common.proto.context_pb2 import ( Service )
from common.proto.kpi_manager_pb2 import (KpiId, KpiDescriptor)
Konstantinos Poulakakis
committed
from common.proto.kpi_value_api_pb2 import (KpiAlarms)
Konstantinos Poulakakis
committed
from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState
from common.proto.policy_action_pb2 import PolicyRuleAction , PolicyRuleActionConfig
from common.proto.policy_condition_pb2 import PolicyRuleCondition
from uuid import uuid4
Konstantinos Poulakakis
committed
import json
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId
Konstantinos Poulakakis
committed
Konstantinos Poulakakis
committed
from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
Konstantinos Poulakakis
committed
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('Automation', 'RPC')
class AutomationServiceServicerImpl(AutomationServiceServicer):
Konstantinos Poulakakis
committed
def __init__(self):
LOGGER.info('Init AutomationService')
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService:
Konstantinos Poulakakis
committed
# check that service does not exist
context_client = ContextClient()
kpi_manager_client = KpiManagerClient()
policy_client = PolicyClient()
telemetry_frontend_client = TelemetryFrontendClient()
Konstantinos Poulakakis
committed
analytics_frontend_client = AnalyticsFrontendClient()
Konstantinos Poulakakis
committed
try:
Konstantinos Poulakakis
committed
# TODO: Remove static variables(get them from ZSMCreateRequest)
# TODO: Refactor policy component (remove unnecessary variables)
Konstantinos Poulakakis
committed
####### GET Context #######################
Konstantinos Poulakakis
committed
LOGGER.info('Get the service from Context: ')
Konstantinos Poulakakis
committed
service: Service = context_client.GetService(request.serviceId)
Konstantinos Poulakakis
committed
LOGGER.info('Service ({:s}) :'.format(str(service)))
Konstantinos Poulakakis
committed
###########################################
####### SET Kpi Descriptor LAT ################
Konstantinos Poulakakis
committed
LOGGER.info('Set Kpi Descriptor LAT: ')
if(len(service.service_constraints) == 0):
raise InvalidArgumentException("service_constraints" , "empty", []);
Konstantinos Poulakakis
committed
Konstantinos Poulakakis
committed
if(len(service.service_constraints) > 1):
raise InvalidArgumentException("service_constraints" , ">1", []);
Konstantinos Poulakakis
committed
Konstantinos Poulakakis
committed
if(service.service_constraints[0].sla_latency is None ):
raise InvalidArgumentException("sla_latency", "empty", []);
## Static Implementation Applied only in case of SLA Latency Constraint ##
# KPI Descriptor
Konstantinos Poulakakis
committed
kpi_descriptor_lat = KpiDescriptor()
kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS' #static service.service_constraints[].sla_latency.e2e_latency_ms
kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4())
kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat)
Konstantinos Poulakakis
committed
LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat)))
Konstantinos Poulakakis
committed
###########################################
####### SET Kpi Descriptor TX ################
Konstantinos Poulakakis
committed
LOGGER.info('Set Kpi Descriptor TX: ')
Konstantinos Poulakakis
committed
kpi_descriptor_tx = KpiDescriptor()
kpi_descriptor_tx.kpi_sample_type = 101 # static KPISAMPLETYPE_PACKETS_TRANSMITTED
kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4())
kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx)
Konstantinos Poulakakis
committed
LOGGER.info('The kpi_id_tx({:s})'.format(str(kpi_id_tx)))
Konstantinos Poulakakis
committed
###########################################
####### SET Kpi Descriptor RX ################
Konstantinos Poulakakis
committed
LOGGER.info('Set Kpi Descriptor RX: ')
Konstantinos Poulakakis
committed
kpi_descriptor_rx = KpiDescriptor()
kpi_descriptor_rx.kpi_sample_type = 102 # static KPISAMPLETYPE_PACKETS_RECEIVED
kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4())
kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx)
LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx)))
###########################################
Konstantinos Poulakakis
committed
####### START Collector TX #################
collect_tx = Collector()
collect_tx.collector_id.collector_id.uuid = str(uuid4())
collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
Konstantinos Poulakakis
committed
collect_tx.duration_s = 20000 # static
Konstantinos Poulakakis
committed
collect_tx.interval_s = 1 # static
LOGGER.info('Start Collector TX'.format(str(collect_tx)))
collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx)))
#############################################
####### START Collector RX ##################
collect_rx = Collector()
collect_rx.collector_id.collector_id.uuid = str(uuid4())
collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
Konstantinos Poulakakis
committed
collect_rx.duration_s = 20000 # static
Konstantinos Poulakakis
committed
collect_rx.interval_s = 1 # static
LOGGER.info('Start Collector RX'.format(str(collect_rx)))
collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx)
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
###############################################
Konstantinos Poulakakis
committed
Konstantinos Poulakakis
committed
####### START Analyzer LAT ################
analyzer = Analyzer()
analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
analyzer.algorithm_name = 'Test_Aggregate_and_Threshold' # static
analyzer.operation_mode = 2
analyzer.input_kpi_ids.append(kpi_id_rx)
analyzer.input_kpi_ids.append(kpi_id_tx)
analyzer.output_kpi_ids.append(kpi_id_lat)
thresholdStr = service.service_constraints[0].custom.constraint_type
_threshold_dict = {thresholdStr: (0, int(service.service_constraints[0].custom.constraint_value))}
Konstantinos Poulakakis
committed
analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
analyzer.parameters['window_size'] = "60s"
analyzer.parameters['window_slider'] = "30s"
analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
###########################################################
Konstantinos Poulakakis
committed
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
####### SET Policy LAT ################
policy_lat = PolicyRuleService()
policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid
policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid
# PolicyRuleBasic
policy_lat.policyRuleBasic.priority = 0
policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4())
policy_lat.policyRuleBasic.booleanOperator = 2
# PolicyRuleAction
policyRuleActionConfig = PolicyRuleActionConfig()
policyRuleActionConfig.action_key = ""
policyRuleActionConfig.action_value = ""
policyRuleAction = PolicyRuleAction()
policyRuleAction.action = 5
policyRuleAction.action_config.append(policyRuleActionConfig)
policy_lat.policyRuleBasic.actionList.append(policyRuleAction)
# for constraint in service.service_constraints:
# PolicyRuleCondition
policyRuleCondition = PolicyRuleCondition()
policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid
policyRuleCondition.numericalOperator = 5
Konstantinos Poulakakis
committed
policyRuleCondition.kpiValue.floatVal = 300
Konstantinos Poulakakis
committed
policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition)
policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat)
LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member
LOGGER.exception('Unable to get Service({:s})'.format(str(request)))
context_client.close()
kpi_manager_client.close()
policy_client.close()
telemetry_frontend_client.close()
return None
context_client.close()
kpi_manager_client.close()
policy_client.close()
telemetry_frontend_client.close()
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMUpdate(self, request : ZSMCreateUpdate, context : grpc.ServicerContext) -> ZSMService:
LOGGER.info('NOT IMPLEMENTED ZSMUpdate')
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMDelete(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMServiceState:
LOGGER.info('NOT IMPLEMENTED ZSMDelete')
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMGetById(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMService:
LOGGER.info('NOT IMPLEMENTED ZSMGetById')
@safe_and_metered_rpc_method(METRICS_POOL,LOGGER)
def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService:
LOGGER.info('NOT IMPLEMENTED ZSMGetByService')