diff --git a/proto/automation.proto b/proto/automation.proto index ccd17124956f51e15ae517d736a33fdfec226bbc..265bbb88277461f1bf567e83a21b844a0c3177af 100644 --- a/proto/automation.proto +++ b/proto/automation.proto @@ -1,69 +1,76 @@ -// Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package automation; - -import "context.proto"; -import "policy.proto"; - -// Automation service RPCs -service AutomationService { - rpc ZSMCreate (ZSMCreateRequest ) returns (ZSMService ) {} - rpc ZSMUpdate (ZSMCreateUpdate ) returns (ZSMService ) {} - rpc ZSMDelete (ZSMServiceID ) returns (ZSMServiceState) {} - rpc ZSMGetById (ZSMServiceID ) returns (ZSMService ) {} - rpc ZSMGetByService (context.ServiceId) returns (ZSMService ) {} -} - -// ZSM service states -enum ZSMServiceStateEnum { - ZSM_UNDEFINED = 0; // Undefined ZSM loop state - ZSM_FAILED = 1; // ZSM loop failed - ZSM_ACTIVE = 2; // ZSM loop is currently active - ZSM_INACTIVE = 3; // ZSM loop is currently inactive - ZSM_UPDATED = 4; // ZSM loop is updated - ZSM_REMOVED = 5; // ZSM loop is removed -} - -message ZSMCreateRequest { - context.ServiceId serviceId = 1; - policy.PolicyRuleList policyList = 2; -} - -message ZSMCreateUpdate { - context.Uuid ZSMServiceID = 1; - policy.PolicyRuleList policyList = 2; -} - -// A unique identifier per ZSM service -message ZSMServiceID { - context.Uuid uuid = 1; -} - -// The state of a ZSM service -message ZSMServiceState { - ZSMServiceStateEnum zsmServiceState = 1; - string zsmServiceStateMessage = 2; -} - -// Basic ZSM service attributes -message ZSMService { - ZSMServiceID zsmServiceId = 1; - - context.ServiceId serviceId = 2; - policy.PolicyRuleList policyList = 3; - - // TODO: When new Analytics and updated Monitoring are in place, add the necessary binding to them -} +// Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package automation; + +import "context.proto"; +import "policy.proto"; +import "analytics_frontend.proto"; + +// Automation service RPCs +service AutomationService { + rpc ZSMCreate (ZSMCreateRequest ) returns (ZSMService ) {} + rpc ZSMUpdate (ZSMCreateUpdate ) returns (ZSMService ) {} + rpc ZSMDelete (ZSMServiceID ) returns (ZSMServiceState) {} + rpc ZSMGetById (ZSMServiceID ) returns (ZSMService ) {} + rpc ZSMGetByService (context.ServiceId) returns (ZSMService ) {} +} + +// ZSM service states +enum ZSMServiceStateEnum { + ZSM_UNDEFINED = 0; // Undefined ZSM loop state + ZSM_FAILED = 1; // ZSM loop failed + ZSM_ACTIVE = 2; // ZSM loop is currently active + ZSM_INACTIVE = 3; // ZSM loop is currently inactive + ZSM_UPDATED = 4; // ZSM loop is updated + ZSM_REMOVED = 5; // ZSM loop is removed +} + +enum ZSMTypeEnum { + ZSMTYPE_UNKNOWN = 0; +} + +message ZSMCreateRequest { + context.ServiceId target_service_id = 1; + context.ServiceId telemetry_service_id = 2; + analytics_frontend.Analyzer analyzer = 3; + policy.PolicyRuleService policy = 4; +} + +message ZSMCreateUpdate { + context.Uuid ZSMServiceID = 1; + policy.PolicyRuleList policyList = 2; +} + +// A unique identifier per ZSM service +message ZSMServiceID { + context.Uuid uuid = 1; +} + +// The state of a ZSM service +message ZSMServiceState { + ZSMServiceStateEnum zsmServiceState = 1; + string zsmServiceStateMessage = 2; +} + +// Basic ZSM service attributes +message ZSMService { + ZSMServiceID zsmServiceId = 1; + + context.ServiceId serviceId = 2; + policy.PolicyRuleList policyList = 3; + + // TODO: When new Analytics and updated Monitoring are in place, add the necessary binding to them +} diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index 75f7ed252d1dbb86a10750e7be892f2d8af8bf9c..b3479abde3d699d25ca7e2ddd0e7ce7bef17253c 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -15,6 +15,7 @@ import logging from enum import Enum import pandas as pd +from collections import defaultdict logger = logging.getLogger(__name__) @@ -134,3 +135,42 @@ def aggregation_handler( return results else: return [] + +def find(data , type , value): + return next((item for item in data if item[type] == value), None) + + +def aggregation_handler_three_to_one( + batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds +): + + # Group and sum + # Track sum and count + sum_dict = defaultdict(int) + count_dict = defaultdict(int) + + for item in batch: + kpi_id = item["kpi_id"] + if kpi_id in input_kpi_list: + sum_dict[kpi_id] += item["kpi_value"] + count_dict[kpi_id] += 1 + + # Compute average + avg_dict = {kpi_id: sum_dict[kpi_id] / count_dict[kpi_id] for kpi_id in sum_dict} + + total_kpi_metric = 0 + for kpi_id, total_value in avg_dict.items(): + total_kpi_metric += total_value + + result = { + "kpi_id": output_kpi_list[0], + "avg": total_kpi_metric, + "THRESHOLD_RAISE": bool(total_kpi_metric > 2600), + "THRESHOLD_FALL": bool(total_kpi_metric < 699) + } + results = [] + + results.append(result) + logger.warning(f"result : {result}.") + + return results diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index 9ddf8b2364f94128d8df610e3d765cc701188b5d..e2aca9f1164978215b0f0fd897748c63ce6293c3 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -19,7 +19,7 @@ import logging from confluent_kafka import KafkaException, KafkaError from common.tools.kafka.Variables import KafkaTopic -from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler +from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one from analytics.backend.service.AnalyzerHelper import AnalyzerHelper @@ -27,8 +27,8 @@ logger = logging.getLogger(__name__) class DaskStreamer(threading.Thread): - def __init__(self, key, input_kpis, output_kpis, thresholds, - batch_size = 5, + def __init__(self, key, input_kpis, output_kpis, thresholds, + batch_size = 5, batch_duration = None, window_size = None, cluster_instance = None, @@ -113,12 +113,20 @@ class DaskStreamer(threading.Thread): logger.info(f"Batch to be processed: {self.batch}") if Handlers.is_valid_handler(self.thresholds["task_type"]): if self.client is not None and self.client.status == 'running': - try: - future = self.client.submit(aggregation_handler, "batch size", self.key, - self.batch, self.input_kpis, self.output_kpis, self.thresholds) - future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) - except Exception as e: - logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") + if any('total' in d for d in self.thresholds.get('task_parameter', [])): + try: + future = self.client.submit(aggregation_handler_three_to_one, "batch size", self.key, + self.batch, self.input_kpis, self.output_kpis, self.thresholds) + future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + except Exception as e: + logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") + else: + try: + future = self.client.submit(aggregation_handler, "batch size", self.key, + self.batch, self.input_kpis, self.output_kpis, self.thresholds) + future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) + except Exception as e: + logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") else: logger.warning("Dask client is not running. Skipping processing.") else: diff --git a/src/automation/.gitlab-ci.yml b/src/automation/.gitlab-ci.yml index 3343a5e917706d9d0c93f6853d1b8d458b489de5..5d18a73ba12ffb750d35a9eda70b53bb8db86878 100644 --- a/src/automation/.gitlab-ci.yml +++ b/src/automation/.gitlab-ci.yml @@ -66,8 +66,7 @@ unit_test automation: - sleep 5 - docker ps -a - docker logs $IMAGE_NAME - - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_emulated.py --junitxml=/opt/results/${IMAGE_NAME}_report_emulated.xml" - - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_ietf_actn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml" + - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_automation_handlers.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml" - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: diff --git a/src/automation/service/AutomationServiceServicerImpl.py b/src/automation/service/AutomationServiceServicerImpl.py index 1f94f572ec51e6accd2ccfedc7c32fe4381d7f1c..4802db007f2bff4a86dd2539a5bd2bd8d1bc493e 100644 --- a/src/automation/service/AutomationServiceServicerImpl.py +++ b/src/automation/service/AutomationServiceServicerImpl.py @@ -15,22 +15,14 @@ import grpc, json, logging from uuid import uuid4 from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.proto.automation_pb2_grpc import AutomationServiceServicer from common.method_wrappers.ServiceExceptions import InvalidArgumentException -from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService, ZSMServiceID, ZSMServiceState, ZSMCreateUpdate -from common.proto.automation_pb2_grpc import AutomationServiceServicer from common.proto.context_pb2 import Service, ServiceId -from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor -from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState -from common.proto.policy_action_pb2 import PolicyRuleAction, PolicyRuleActionConfig -from common.proto.policy_condition_pb2 import PolicyRuleCondition -from common.proto.telemetry_frontend_pb2 import Collector, CollectorId - -from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient -from automation.client.PolicyClient import PolicyClient +from automation.service.zsm_handlers import ZSM_SERVICE_HANDLERS +from automation.service.zsm_handler_api.ZSMFilterFields import ZSMFilterFieldEnum , TELEMETRY_SERVICE_TYPE_VALUES, TARGET_SERVICE_TYPE_VALUES , ZSM_FILTER_FIELD_ALLOWED_VALUES +from common.proto.context_pb2 import ServiceTypeEnum , DeviceDriverEnum from context.client.ContextClient import ContextClient -from kpi_manager.client.KpiManagerClient import KpiManagerClient -from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Automation', 'RPC') @@ -41,165 +33,19 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService: - - # check that service does not exist context_client = ContextClient() - kpi_manager_client = KpiManagerClient() - policy_client = PolicyClient() - telemetry_frontend_client = TelemetryFrontendClient() - analytics_frontend_client = AnalyticsFrontendClient() - - try: - - # TODO: Remove static variables(get them from ZSMCreateRequest) - # TODO: Refactor policy component (remove unnecessary variables) - - ####### GET Context ####################### - LOGGER.info('Get the service from Context: ') - service: Service = context_client.GetService(request.serviceId) - LOGGER.info('Service ({:s}) :'.format(str(service))) - ########################################### - - ####### SET Kpi Descriptor LAT ################ - LOGGER.info('Set Kpi Descriptor LAT: ') - - if(len(service.service_constraints) == 0): - raise InvalidArgumentException("service_constraints" , "empty", []); - - if(len(service.service_constraints) > 1): - raise InvalidArgumentException("service_constraints" , ">1", []); - - if(service.service_constraints[0].sla_latency is None ): - raise InvalidArgumentException("sla_latency", "empty", []); - - ## Static Implementation Applied only in case of SLA Latency Constraint ## - - # KPI Descriptor - kpi_descriptor_lat = KpiDescriptor() - kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS' #static service.service_constraints[].sla_latency.e2e_latency_ms - kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid - kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4()) - - kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat) - LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat))) - ########################################### - - ####### SET Kpi Descriptor TX ################ - LOGGER.info('Set Kpi Descriptor TX: ') - - kpi_descriptor_tx = KpiDescriptor() - kpi_descriptor_tx.kpi_sample_type = 101 # static KPISAMPLETYPE_PACKETS_TRANSMITTED - kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid - kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4()) - - kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx) - LOGGER.info('The kpi_id_tx({:s})'.format(str(kpi_id_tx))) - ########################################### - - ####### SET Kpi Descriptor RX ################ - LOGGER.info('Set Kpi Descriptor RX: ') - - kpi_descriptor_rx = KpiDescriptor() - kpi_descriptor_rx.kpi_sample_type = 102 # static KPISAMPLETYPE_PACKETS_RECEIVED - kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid - kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4()) - kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx) - LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx))) - ########################################### + targetService = context_client.GetService(request.target_service_id) + telemetryService = context_client.GetService(request.telemetry_service_id) + handler_cls = self.get_service_handler_based_on_service_types(targetService.service_type, telemetryService.service_type, ZSM_SERVICE_HANDLERS) + if handler_cls: + handler_obj = handler_cls() # instantiate it + handler_obj.zsmCreate(request, context) + else: + LOGGER.info("No matching handler found.") - ####### START Collector TX ################# - collect_tx = Collector() - collect_tx.collector_id.collector_id.uuid = str(uuid4()) - collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid - collect_tx.duration_s = 20000 # static - collect_tx.interval_s = 1 # static - LOGGER.info('Start Collector TX'.format(str(collect_tx))) - - collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx) - LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx))) - ############################################# - - ####### START Collector RX ################## - collect_rx = Collector() - collect_rx.collector_id.collector_id.uuid = str(uuid4()) - collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid - collect_rx.duration_s = 20000 # static - collect_rx.interval_s = 1 # static - LOGGER.info('Start Collector RX'.format(str(collect_rx))) - - collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx) - LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx))) - ############################################### - - ####### START Analyzer LAT ################ - analyzer = Analyzer() - analyzer.analyzer_id.analyzer_id.uuid = str(uuid4()) - analyzer.algorithm_name = 'Test_Aggregate_and_Threshold' # static - analyzer.operation_mode = 2 - analyzer.input_kpi_ids.append(kpi_id_rx) - analyzer.input_kpi_ids.append(kpi_id_tx) - analyzer.output_kpi_ids.append(kpi_id_lat) - - thresholdStr = service.service_constraints[0].custom.constraint_type - - _threshold_dict = {thresholdStr: (0, int(service.service_constraints[0].custom.constraint_value))} - analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) - analyzer.parameters['window_size'] = "60s" - analyzer.parameters['window_slider'] = "30s" - - analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer) - LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat))) - ########################################################### - - ####### SET Policy LAT ################ - policy_lat = PolicyRuleService() - policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid - policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid - - # PolicyRuleBasic - policy_lat.policyRuleBasic.priority = 0 - policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4()) - policy_lat.policyRuleBasic.booleanOperator = 2 - - # PolicyRuleAction - policyRuleActionConfig = PolicyRuleActionConfig() - policyRuleActionConfig.action_key = "" - policyRuleActionConfig.action_value = "" - - policyRuleAction = PolicyRuleAction() - policyRuleAction.action = 5 - policyRuleAction.action_config.append(policyRuleActionConfig) - policy_lat.policyRuleBasic.actionList.append(policyRuleAction) - - # for constraint in service.service_constraints: - - # PolicyRuleCondition - policyRuleCondition = PolicyRuleCondition() - policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid - policyRuleCondition.numericalOperator = 5 - policyRuleCondition.kpiValue.floatVal = 300 - - policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition) - - policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat) - LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state))) - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - LOGGER.exception('Unable to get Service({:s})'.format(str(request))) - context_client.close() - kpi_manager_client.close() - policy_client.close() - telemetry_frontend_client.close() - return None - - context_client.close() - kpi_manager_client.close() - policy_client.close() - telemetry_frontend_client.close() return ZSMService() @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) @@ -222,3 +68,26 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService: LOGGER.info('NOT IMPLEMENTED ZSMGetByService') return ZSMService() + + def get_service_handler_based_on_service_types(self, targetServiceType ,telemetryServiceType , ZSM_SERVICE_HANDLERS): + flag = True + for handler_cls, filters in ZSM_SERVICE_HANDLERS: + for filter in filters: + flag = self.check_if_requested_services_pass_filter_criteria(filter , targetServiceType, telemetryServiceType) + if flag: + return handler_cls + return None + + def check_if_requested_services_pass_filter_criteria(self ,filter , targetServiceType , telemetryServiceType): + flag = True + for filter_key, filter_value in filter.items(): + if filter_value in ZSM_FILTER_FIELD_ALLOWED_VALUES[filter_key.value]: + if filter_key.value == ZSMFilterFieldEnum.TARGET_SERVICE_TYPE.value: + if filter_value != targetServiceType: + flag = False + elif filter_key.value == ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE.value: + if filter_value != telemetryServiceType: + flag = False + else: + flag = False + return flag diff --git a/src/automation/service/zsm_handler_api/ZSMFilterFields.py b/src/automation/service/zsm_handler_api/ZSMFilterFields.py new file mode 100644 index 0000000000000000000000000000000000000000..7b00de5bc474f8ac4a6a711acf9f7200723ddaae --- /dev/null +++ b/src/automation/service/zsm_handler_api/ZSMFilterFields.py @@ -0,0 +1,34 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from common.proto.context_pb2 import ServiceTypeEnum + +class ZSMFilterFieldEnum(Enum): + TARGET_SERVICE_TYPE = 'target_service_type' + TELEMETRY_SERVICE_TYPE = 'telemetry_service_type' + +TARGET_SERVICE_TYPE_VALUES = { + ServiceTypeEnum.SERVICETYPE_L2NM +} + +TELEMETRY_SERVICE_TYPE_VALUES = { + ServiceTypeEnum.SERVICETYPE_INT +} + +# Maps filter fields to allowed values per Filter field. # If no restriction (free text) None is specified +ZSM_FILTER_FIELD_ALLOWED_VALUES = { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE.value : TARGET_SERVICE_TYPE_VALUES, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE.value : TELEMETRY_SERVICE_TYPE_VALUES, +} diff --git a/src/automation/service/zsm_handler_api/_ZSMHandler.py b/src/automation/service/zsm_handler_api/_ZSMHandler.py new file mode 100644 index 0000000000000000000000000000000000000000..352f2276d3c87adce656994c1dfd2eee9c05b500 --- /dev/null +++ b/src/automation/service/zsm_handler_api/_ZSMHandler.py @@ -0,0 +1,38 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc , logging +from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService, ZSMServiceID, ZSMServiceState, ZSMCreateUpdate +from common.proto.context_pb2 import ServiceId + +LOGGER = logging.getLogger(__name__) + +class _ZSMHandler: + def __init__(self): + LOGGER.info('Init Scenario') + + def zsmCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext): + LOGGER.info('zsmCreate method') + + def zsmUpdate(self, request : ZSMCreateUpdate, context : grpc.ServicerContext): + LOGGER.info('zsmUpdate method') + + def zsmDelete(self, request : ZSMServiceID, context : grpc.ServicerContext): + LOGGER.info('zsmDelete method') + + def zsmGetById(self, request : ZSMServiceID, context : grpc.ServicerContext): + LOGGER.info('zsmGetById method') + + def zsmGetByService(self, request : ServiceId, context : grpc.ServicerContext): + LOGGER.info('zsmGetByService method') diff --git a/src/automation/service/zsm_handler_api/__init__.py b/src/automation/service/zsm_handler_api/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c985cb35bce23982bca39b434a5157be339c45d6 --- /dev/null +++ b/src/automation/service/zsm_handler_api/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/automation/service/zsm_handlers/P4INTZSMPlugin.py b/src/automation/service/zsm_handlers/P4INTZSMPlugin.py new file mode 100644 index 0000000000000000000000000000000000000000..a7d397c58411555405cc53782ff865a347f8954b --- /dev/null +++ b/src/automation/service/zsm_handlers/P4INTZSMPlugin.py @@ -0,0 +1,92 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc , logging +from common.proto.analytics_frontend_pb2 import AnalyzerId +from common.proto.policy_pb2 import PolicyRuleState +from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService + +from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient +from automation.client.PolicyClient import PolicyClient +from context.client.ContextClient import ContextClient +from automation.service.zsm_handler_api._ZSMHandler import _ZSMHandler + +LOGGER = logging.getLogger(__name__) + +class P4INTZSMPlugin(_ZSMHandler): + def __init__(self): + LOGGER.info('Init P4INTZSMPlugin') + + def zsmCreate(self,request : ZSMCreateRequest, context : grpc.ServicerContext): + # check that service does not exist + context_client = ContextClient() + policy_client = PolicyClient() + analytics_frontend_client = AnalyticsFrontendClient() + + # Verify the input target service ID + try: + target_service_id = context_client.GetService(request.target_service_id) + except grpc.RpcError as ex: + if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + LOGGER.exception('Unable to get target service({:s})'.format(str(target_service_id))) + context_client.close() + return None + + # Verify the input telemetry service ID + try: + telemetry_service_id = context_client.GetService(request.telemetry_service_id) + except grpc.RpcError as ex: + if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + LOGGER.exception('Unable to get telemetry service({:s})'.format(str(telemetry_service_id))) + context_client.close() + return None + + # Start an analyzer + try: + analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(request.analyzer) # type: ignore + LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat))) + except grpc.RpcError as ex: + if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + LOGGER.exception('Unable to start analyzer({:s})'.format(str(request.analyzer))) + context_client.close() + analytics_frontend_client.close() + return None + + # Create a policy + try: + policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(request.policy) # type: ignore + LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state))) + except grpc.RpcError as ex: + if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + LOGGER.exception('Unable to create policy({:s})'.format(str(request.policy))) + context_client.close() + policy_client.close() + return None + + context_client.close() + analytics_frontend_client.close() + policy_client.close() + return ZSMService() + + def zsmUpdate(self): + LOGGER.info('zsmUpdate method') + + def zsmDelete(self): + LOGGER.info('zsmDelete method') + + def zsmGetById(self): + LOGGER.info('zsmGetById method') + + def zsmGetByService(self): + LOGGER.info('zsmGetByService method') diff --git a/src/automation/service/zsm_handlers/__init__.py b/src/automation/service/zsm_handlers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ae96f895eec410ee2788e47cfedb5abf8353af3a --- /dev/null +++ b/src/automation/service/zsm_handlers/__init__.py @@ -0,0 +1,26 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto.context_pb2 import ServiceTypeEnum +from ..zsm_handler_api.ZSMFilterFields import ZSMFilterFieldEnum +from automation.service.zsm_handlers.P4INTZSMPlugin import P4INTZSMPlugin + +ZSM_SERVICE_HANDLERS = [ + (P4INTZSMPlugin, [ + { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + } + ]) +] diff --git a/src/automation/tests/test_automation_handlers.py b/src/automation/tests/test_automation_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..16fbcb255bbfb03829f48ed8901cb23c5161deaa --- /dev/null +++ b/src/automation/tests/test_automation_handlers.py @@ -0,0 +1,83 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from ..service.AutomationServiceServicerImpl import AutomationServiceServicerImpl +from common.proto.context_pb2 import ServiceTypeEnum +from ..service.zsm_handler_api.ZSMFilterFields import ZSMFilterFieldEnum +from ..service.zsm_handlers import Poc1 , Poc2 + +LOGGER = logging.getLogger(__name__) + +ZSM_SERVICE_HANDLERS = [ + (Poc1, [ + { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + } + ]), + (Poc2, [ + { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + } + ]) +] + +def test_get_service_handler_based_on_service_types(): + automation = AutomationServiceServicerImpl() + + handler_cls = automation.get_service_handler_based_on_service_types(ServiceTypeEnum.SERVICETYPE_INT, ServiceTypeEnum.SERVICETYPE_L2NM , ZSM_SERVICE_HANDLERS) + if handler_cls: + assert True + else: + assert False + + +def test_get_service_handler_based_on_service_types_error(): + automation = AutomationServiceServicerImpl() + + handler_cls = automation.get_service_handler_based_on_service_types(ServiceTypeEnum.SERVICETYPE_INT , ServiceTypeEnum.SERVICETYPE_L2NM , ZSM_SERVICE_HANDLERS) + if handler_cls: + assert True + else: + assert False + +def test_check_if_requested_services_pass_filter_criteria(): + filter = { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + } + + automation = AutomationServiceServicerImpl() + flag = automation.check_if_requested_services_pass_filter_criteria(filter , ServiceTypeEnum.SERVICETYPE_L2NM , ServiceTypeEnum.SERVICETYPE_INT) + + if flag: + assert True + else: + assert False + +def test_check_if_requested_services_pass_filter_criteria_error(): + filter = { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + } + + automation = AutomationServiceServicerImpl() + flag = automation.check_if_requested_services_pass_filter_criteria(filter , ServiceTypeEnum.SERVICETYPE_INT , ServiceTypeEnum.SERVICETYPE_L2NM) + + if flag: + assert False + else: + assert True