diff --git a/manifests/automationservice.yaml b/manifests/automationservice.yaml index ed169f31deda1e3f3351ab4935ff03eb7817a191..b91c23d232e470c0f35951f4767f03c0a909e507 100644 --- a/manifests/automationservice.yaml +++ b/manifests/automationservice.yaml @@ -40,6 +40,11 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: CRDB_DATABASE + value: "tfs_automation" + envFrom: + - secretRef: + name: crdb-data startupProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:30200"] diff --git a/proto/automation.proto b/proto/automation.proto index ccd17124956f51e15ae517d736a33fdfec226bbc..3525d41b90d3f5eeb6aa64e050a8d71cd5d97812 100644 --- a/proto/automation.proto +++ b/proto/automation.proto @@ -1,69 +1,70 @@ -// Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; -package automation; - -import "context.proto"; -import "policy.proto"; - -// Automation service RPCs -service AutomationService { - rpc ZSMCreate (ZSMCreateRequest ) returns (ZSMService ) {} - rpc ZSMUpdate (ZSMCreateUpdate ) returns (ZSMService ) {} - rpc ZSMDelete (ZSMServiceID ) returns (ZSMServiceState) {} - rpc ZSMGetById (ZSMServiceID ) returns (ZSMService ) {} - rpc ZSMGetByService (context.ServiceId) returns (ZSMService ) {} -} - -// ZSM service states -enum ZSMServiceStateEnum { - ZSM_UNDEFINED = 0; // Undefined ZSM loop state - ZSM_FAILED = 1; // ZSM loop failed - ZSM_ACTIVE = 2; // ZSM loop is currently active - ZSM_INACTIVE = 3; // ZSM loop is currently inactive - ZSM_UPDATED = 4; // ZSM loop is updated - ZSM_REMOVED = 5; // ZSM loop is removed -} - -message ZSMCreateRequest { - context.ServiceId serviceId = 1; - policy.PolicyRuleList policyList = 2; -} - -message ZSMCreateUpdate { - context.Uuid ZSMServiceID = 1; - policy.PolicyRuleList policyList = 2; -} - -// A unique identifier per ZSM service -message ZSMServiceID { - context.Uuid uuid = 1; -} - -// The state of a ZSM service -message ZSMServiceState { - ZSMServiceStateEnum zsmServiceState = 1; - string zsmServiceStateMessage = 2; -} - -// Basic ZSM service attributes -message ZSMService { - ZSMServiceID zsmServiceId = 1; - - context.ServiceId serviceId = 2; - policy.PolicyRuleList policyList = 3; - - // TODO: When new Analytics and updated Monitoring are in place, add the necessary binding to them -} +// Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package automation; + +import "context.proto"; +import "policy.proto"; +import "analytics_frontend.proto"; + +// Automation service RPCs +service AutomationService { + rpc ZSMCreate (ZSMCreateRequest ) returns (ZSMService ) {} + rpc ZSMDelete (ZSMServiceID ) returns (ZSMServiceState) {} + rpc ZSMGetById (ZSMServiceID ) returns (ZSMService ) {} + rpc ZSMGetByService (context.ServiceId) returns (ZSMService ) {} +} + +// ZSM service states +enum ZSMServiceStateEnum { + ZSM_UNDEFINED = 0; // Undefined ZSM loop state + ZSM_FAILED = 1; // ZSM loop failed + ZSM_ACTIVE = 2; // ZSM loop is currently active + ZSM_INACTIVE = 3; // ZSM loop is currently inactive + ZSM_UPDATED = 4; // ZSM loop is updated + ZSM_REMOVED = 5; // ZSM loop is removed +} + +enum ZSMTypeEnum { + ZSMTYPE_UNKNOWN = 0; +} + +message ZSMCreateRequest { + context.ServiceId target_service_id = 1; + context.ServiceId telemetry_service_id = 2; + analytics_frontend.Analyzer analyzer = 3; + policy.PolicyRuleService policy = 4; +} + +// A unique identifier per ZSM service +message ZSMServiceID { + context.Uuid uuid = 1; +} + +// The state of a ZSM service +message ZSMServiceState { + ZSMServiceStateEnum zsmServiceState = 1; + string zsmServiceStateMessage = 2; +} + +// Basic ZSM service attributes +message ZSMService { + ZSMServiceID zsmServiceId = 1; + + context.ServiceId serviceId = 2; + policy.PolicyRuleList policyList = 3; + + // TODO: When new Analytics and updated Monitoring are in place, add the necessary binding to them +} diff --git a/src/analytics/backend/service/AnalyzerHandlers.py b/src/analytics/backend/service/AnalyzerHandlers.py index 75f7ed252d1dbb86a10750e7be892f2d8af8bf9c..894f3742351990e08fe9569b6aaa545821164c13 100644 --- a/src/analytics/backend/service/AnalyzerHandlers.py +++ b/src/analytics/backend/service/AnalyzerHandlers.py @@ -15,18 +15,28 @@ import logging from enum import Enum import pandas as pd +from collections import defaultdict logger = logging.getLogger(__name__) class Handlers(Enum): AGGREGATION_HANDLER = "AggregationHandler" + AGGREGATION_HANDLER_THREE_TO_ONE = "AggregationHandlerThreeToOne" UNSUPPORTED_HANDLER = "UnsupportedHandler" @classmethod def is_valid_handler(cls, handler_name): return handler_name in cls._value2member_map_ +def select_handler(handler_name): + if handler_name == "AggregationHandler": + return aggregation_handler + elif handler_name == "AggregationHandlerThreeToOne": + return aggregation_handler_three_to_one + else: + return "UnsupportedHandler" + # This method is top-level and should not be part of the class due to serialization issues. def threshold_handler(key, aggregated_df, thresholds): """ @@ -134,3 +144,42 @@ def aggregation_handler( return results else: return [] + +def find(data , type , value): + return next((item for item in data if item[type] == value), None) + + +def aggregation_handler_three_to_one( + batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds +): + + # Group and sum + # Track sum and count + sum_dict = defaultdict(int) + count_dict = defaultdict(int) + + for item in batch: + kpi_id = item["kpi_id"] + if kpi_id in input_kpi_list: + sum_dict[kpi_id] += item["kpi_value"] + count_dict[kpi_id] += 1 + + # Compute average + avg_dict = {kpi_id: sum_dict[kpi_id] / count_dict[kpi_id] for kpi_id in sum_dict} + + total_kpi_metric = 0 + for kpi_id, total_value in avg_dict.items(): + total_kpi_metric += total_value + + result = { + "kpi_id": output_kpi_list[0], + "avg": total_kpi_metric, + "THRESHOLD_RAISE": bool(total_kpi_metric > 2600), + "THRESHOLD_FALL": bool(total_kpi_metric < 699) + } + results = [] + + results.append(result) + logger.warning(f"result : {result}.") + + return results diff --git a/src/analytics/backend/service/Streamer.py b/src/analytics/backend/service/Streamer.py index 9ddf8b2364f94128d8df610e3d765cc701188b5d..ff0b10ef58fdba8cf9b93461c08a2b31d03efad8 100644 --- a/src/analytics/backend/service/Streamer.py +++ b/src/analytics/backend/service/Streamer.py @@ -19,7 +19,7 @@ import logging from confluent_kafka import KafkaException, KafkaError from common.tools.kafka.Variables import KafkaTopic -from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler +from analytics.backend.service.AnalyzerHandlers import Handlers, aggregation_handler, aggregation_handler_three_to_one , select_handler from analytics.backend.service.AnalyzerHelper import AnalyzerHelper @@ -27,8 +27,8 @@ logger = logging.getLogger(__name__) class DaskStreamer(threading.Thread): - def __init__(self, key, input_kpis, output_kpis, thresholds, - batch_size = 5, + def __init__(self, key, input_kpis, output_kpis, thresholds, + batch_size = 5, batch_duration = None, window_size = None, cluster_instance = None, @@ -114,11 +114,13 @@ class DaskStreamer(threading.Thread): if Handlers.is_valid_handler(self.thresholds["task_type"]): if self.client is not None and self.client.status == 'running': try: - future = self.client.submit(aggregation_handler, "batch size", self.key, - self.batch, self.input_kpis, self.output_kpis, self.thresholds) + future = self.client.submit(select_handler(self.thresholds["task_type"]), "batch size", + self.key, + self.batch, self.input_kpis, self.output_kpis, self.thresholds) future.add_done_callback(lambda fut: self.produce_result(fut.result(), KafkaTopic.ALARMS.value)) except Exception as e: - logger.error(f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") + logger.error( + f"Failed to submit task to Dask client or unable to process future. See error for detail: {e}") else: logger.warning("Dask client is not running. Skipping processing.") else: diff --git a/src/automation/.gitlab-ci.yml b/src/automation/.gitlab-ci.yml index 485045c12f8f33be7fbc91753c4d18fc5b7ea903..584656248b1e70df2ba68cb9949527532ab851a3 100644 --- a/src/automation/.gitlab-ci.yml +++ b/src/automation/.gitlab-ci.yml @@ -67,8 +67,7 @@ unit_test automation: - sleep 5 - docker ps -a - docker logs $IMAGE_NAME - - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_emulated.py --junitxml=/opt/results/${IMAGE_NAME}_report_emulated.xml" - - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_ietf_actn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml" + - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_automation_handlers.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml" - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: @@ -89,7 +88,7 @@ unit_test automation: artifacts: when: always reports: - junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report_*.xml + junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml ## Deployment of the service in Kubernetes Cluster #deploy automation: diff --git a/src/automation/service/AutomationServiceServicerImpl.py b/src/automation/service/AutomationServiceServicerImpl.py index 1f94f572ec51e6accd2ccfedc7c32fe4381d7f1c..984c0f1f61c9b63d01877fc6aac8890c84987bf1 100644 --- a/src/automation/service/AutomationServiceServicerImpl.py +++ b/src/automation/service/AutomationServiceServicerImpl.py @@ -15,210 +15,103 @@ import grpc, json, logging from uuid import uuid4 from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.proto.automation_pb2_grpc import AutomationServiceServicer from common.method_wrappers.ServiceExceptions import InvalidArgumentException -from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService, ZSMServiceID, ZSMServiceState, ZSMCreateUpdate -from common.proto.automation_pb2_grpc import AutomationServiceServicer from common.proto.context_pb2 import Service, ServiceId -from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor -from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState -from common.proto.policy_action_pb2 import PolicyRuleAction, PolicyRuleActionConfig -from common.proto.policy_condition_pb2 import PolicyRuleCondition -from common.proto.telemetry_frontend_pb2 import Collector, CollectorId - -from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient -from automation.client.PolicyClient import PolicyClient +from automation.service.zsm_handlers import ZSM_SERVICE_HANDLERS +from automation.service.zsm_handler_api.ZSMFilterFields import ZSMFilterFieldEnum , TELEMETRY_SERVICE_TYPE_VALUES, TARGET_SERVICE_TYPE_VALUES , ZSM_FILTER_FIELD_ALLOWED_VALUES +from common.proto.context_pb2 import ServiceTypeEnum , DeviceDriverEnum from context.client.ContextClient import ContextClient -from kpi_manager.client.KpiManagerClient import KpiManagerClient -from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient +from automation.service.database.AutomationDB import AutomationDB +from automation.service.database.AutomationModel import AutomationModel +from common.method_wrappers.ServiceExceptions import NotFoundException LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Automation', 'RPC') class AutomationServiceServicerImpl(AutomationServiceServicer): def __init__(self): + self.automation_db_obj = AutomationDB(AutomationModel) LOGGER.info('Init AutomationService') @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService: - - # check that service does not exist + LOGGER.info("Received gRPC message object: {:}".format(request)) context_client = ContextClient() - kpi_manager_client = KpiManagerClient() - policy_client = PolicyClient() - telemetry_frontend_client = TelemetryFrontendClient() - analytics_frontend_client = AnalyticsFrontendClient() - - try: - # TODO: Remove static variables(get them from ZSMCreateRequest) - # TODO: Refactor policy component (remove unnecessary variables) + targetService = context_client.GetService(request.target_service_id) + telemetryService = context_client.GetService(request.telemetry_service_id) - ####### GET Context ####################### - LOGGER.info('Get the service from Context: ') - service: Service = context_client.GetService(request.serviceId) - LOGGER.info('Service ({:s}) :'.format(str(service))) - ########################################### - - ####### SET Kpi Descriptor LAT ################ - LOGGER.info('Set Kpi Descriptor LAT: ') - - if(len(service.service_constraints) == 0): - raise InvalidArgumentException("service_constraints" , "empty", []); - - if(len(service.service_constraints) > 1): - raise InvalidArgumentException("service_constraints" , ">1", []); - - if(service.service_constraints[0].sla_latency is None ): - raise InvalidArgumentException("sla_latency", "empty", []); - - ## Static Implementation Applied only in case of SLA Latency Constraint ## - - # KPI Descriptor - kpi_descriptor_lat = KpiDescriptor() - kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS' #static service.service_constraints[].sla_latency.e2e_latency_ms - kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid - kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4()) - - kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat) - LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat))) - ########################################### - - ####### SET Kpi Descriptor TX ################ - LOGGER.info('Set Kpi Descriptor TX: ') + handler_cls = self.get_service_handler_based_on_service_types(targetService.service_type, telemetryService.service_type, ZSM_SERVICE_HANDLERS) - kpi_descriptor_tx = KpiDescriptor() - kpi_descriptor_tx.kpi_sample_type = 101 # static KPISAMPLETYPE_PACKETS_TRANSMITTED - kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid - kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4()) - - kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx) - LOGGER.info('The kpi_id_tx({:s})'.format(str(kpi_id_tx))) - ########################################### - - ####### SET Kpi Descriptor RX ################ - LOGGER.info('Set Kpi Descriptor RX: ') - - kpi_descriptor_rx = KpiDescriptor() - kpi_descriptor_rx.kpi_sample_type = 102 # static KPISAMPLETYPE_PACKETS_RECEIVED - kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid - kpi_descriptor_rx.kpi_id.kpi_id.uuid = str(uuid4()) - - kpi_id_rx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_rx) - LOGGER.info('kpi_id_rx({:s})'.format(str(kpi_id_rx))) - ########################################### - - - - ####### START Collector TX ################# - collect_tx = Collector() - collect_tx.collector_id.collector_id.uuid = str(uuid4()) - collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid - collect_tx.duration_s = 20000 # static - collect_tx.interval_s = 1 # static - LOGGER.info('Start Collector TX'.format(str(collect_tx))) - - collect_id_tx: CollectorId = telemetry_frontend_client.StartCollector(collect_tx) - LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_tx))) - ############################################# - - ####### START Collector RX ################## - collect_rx = Collector() - collect_rx.collector_id.collector_id.uuid = str(uuid4()) - collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid - collect_rx.duration_s = 20000 # static - collect_rx.interval_s = 1 # static - LOGGER.info('Start Collector RX'.format(str(collect_rx))) - - collect_id_rx: CollectorId = telemetry_frontend_client.StartCollector(collect_rx) - LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx))) - ############################################### - - ####### START Analyzer LAT ################ - analyzer = Analyzer() - analyzer.analyzer_id.analyzer_id.uuid = str(uuid4()) - analyzer.algorithm_name = 'Test_Aggregate_and_Threshold' # static - analyzer.operation_mode = 2 - analyzer.input_kpi_ids.append(kpi_id_rx) - analyzer.input_kpi_ids.append(kpi_id_tx) - analyzer.output_kpi_ids.append(kpi_id_lat) - - thresholdStr = service.service_constraints[0].custom.constraint_type - - _threshold_dict = {thresholdStr: (0, int(service.service_constraints[0].custom.constraint_value))} - analyzer.parameters['thresholds'] = json.dumps(_threshold_dict) - analyzer.parameters['window_size'] = "60s" - analyzer.parameters['window_slider'] = "30s" - - analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer) - LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat))) - ########################################################### - - ####### SET Policy LAT ################ - policy_lat = PolicyRuleService() - policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid - policy_lat.serviceId.context_id.context_uuid.uuid = request.serviceId.context_id.context_uuid.uuid - - # PolicyRuleBasic - policy_lat.policyRuleBasic.priority = 0 - policy_lat.policyRuleBasic.policyRuleId.uuid.uuid = str(uuid4()) - policy_lat.policyRuleBasic.booleanOperator = 2 - - # PolicyRuleAction - policyRuleActionConfig = PolicyRuleActionConfig() - policyRuleActionConfig.action_key = "" - policyRuleActionConfig.action_value = "" - - policyRuleAction = PolicyRuleAction() - policyRuleAction.action = 5 - policyRuleAction.action_config.append(policyRuleActionConfig) - policy_lat.policyRuleBasic.actionList.append(policyRuleAction) - - # for constraint in service.service_constraints: - - # PolicyRuleCondition - policyRuleCondition = PolicyRuleCondition() - policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid - policyRuleCondition.numericalOperator = 5 - policyRuleCondition.kpiValue.floatVal = 300 - - policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition) - - policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat) - LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state))) - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - LOGGER.exception('Unable to get Service({:s})'.format(str(request))) - context_client.close() - kpi_manager_client.close() - policy_client.close() - telemetry_frontend_client.close() - return None - - context_client.close() - kpi_manager_client.close() - policy_client.close() - telemetry_frontend_client.close() - return ZSMService() + if handler_cls: + handler_obj = handler_cls() # instantiate it + handler_obj.zsmCreate(request, context) + else: + LOGGER.info("No matching handler found.") - @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) - def ZSMUpdate(self, request : ZSMCreateUpdate, context : grpc.ServicerContext) -> ZSMService: - LOGGER.info('NOT IMPLEMENTED ZSMUpdate') - return ZSMService() + response = ZSMService() + automation_id = str(uuid4()) + zsm_to_insert = AutomationModel.convert_Automation_to_row(automation_id , "Test Description") + if self.automation_db_obj.add_row_to_db(zsm_to_insert): + response.zsmServiceId.uuid.uuid = automation_id + + return response @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMDelete(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMServiceState: - LOGGER.info('NOT IMPLEMENTED ZSMDelete') - return ZSMServiceState() + LOGGER.info("Received gRPC message object: {:}".format(request)) + zsm_id_to_search = request.uuid.uuid + + row = self.automation_db_obj.search_db_row_by_id(AutomationModel, 'zsm_id', zsm_id_to_search) + if row is None: + LOGGER.info('No matching row found zsm id: {:}'.format(zsm_id_to_search)) + raise NotFoundException('ZsmID', zsm_id_to_search) + + self.automation_db_obj.delete_db_row_by_id(AutomationModel, 'zsm_id', zsm_id_to_search) + + zsmServiceState = ZSMServiceState() + zsmServiceState.zsmServiceState = 5 + zsmServiceState.zsmServiceStateMessage = "Removed id: {:}".format(request) + + return zsmServiceState @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMGetById(self, request : ZSMServiceID, context : grpc.ServicerContext) -> ZSMService: - LOGGER.info('NOT IMPLEMENTED ZSMGetById') - return ZSMService() - + LOGGER.info("Received gRPC message object: {:}".format(request)) + zsm_id_to_search = request.uuid.uuid + row = self.automation_db_obj.search_db_row_by_id(AutomationModel, 'zsm_id', zsm_id_to_search) + if row is None: + LOGGER.info('No matching row found zsm id: {:}'.format(zsm_id_to_search)) + raise NotFoundException('ZsmID', zsm_id_to_search) + response = AutomationModel.convert_row_to_Automation(row) + return response @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMGetByService(self, request : ServiceId, context : grpc.ServicerContext) -> ZSMService: LOGGER.info('NOT IMPLEMENTED ZSMGetByService') return ZSMService() + + def get_service_handler_based_on_service_types(self, targetServiceType ,telemetryServiceType , ZSM_SERVICE_HANDLERS): + flag = True + for handler_cls, filters in ZSM_SERVICE_HANDLERS: + for filter in filters: + flag = self.check_if_requested_services_pass_filter_criteria(filter , targetServiceType, telemetryServiceType) + if flag: + return handler_cls + return None + + def check_if_requested_services_pass_filter_criteria(self ,filter , targetServiceType , telemetryServiceType): + flag = True + for filter_key, filter_value in filter.items(): + if filter_value in ZSM_FILTER_FIELD_ALLOWED_VALUES[filter_key.value]: + if filter_key.value == ZSMFilterFieldEnum.TARGET_SERVICE_TYPE.value: + if filter_value != targetServiceType: + flag = False + elif filter_key.value == ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE.value: + if filter_value != telemetryServiceType: + flag = False + else: + flag = False + return flag diff --git a/src/automation/service/__main__.py b/src/automation/service/__main__.py index 80c9d2b044e01cc354c7c5b14f62c88ec3fd7c07..d8ea030baaa8f06841c5e701c0287f3f735c18d4 100644 --- a/src/automation/service/__main__.py +++ b/src/automation/service/__main__.py @@ -22,6 +22,10 @@ from common.Settings import ( wait_for_environment_variables ) from .AutomationService import AutomationService +from common.tools.database.GenericDatabase import Database +from automation.service.database.AutomationModel import AutomationModel +from .database.Engine import Engine +from .database.models._Base import rebuild_database LOG_LEVEL = get_log_level() logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") @@ -58,9 +62,19 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) - # Start Event Collection+Dispatching Engine - event_engine = EventEngine(terminate=terminate) - event_engine.start() + # Get Database Engine instance and initialize database, if needed + LOGGER.info('Getting SQLAlchemy DB Engine...') + db_engine = Engine.get_engine() + if db_engine is None: + LOGGER.error('Unable to get SQLAlchemy DB Engine...') + return -1 + + try: + Engine.create_database(db_engine) + except: # pylint: disable=bare-except # pragma: no cover + LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url))) + + rebuild_database(db_engine) # Starting Automation service grpc_service = AutomationService() diff --git a/src/automation/service/database/AutomationDB.py b/src/automation/service/database/AutomationDB.py new file mode 100644 index 0000000000000000000000000000000000000000..0491cb590d1789f79af8508a44aa9baf93bbeff3 --- /dev/null +++ b/src/automation/service/database/AutomationDB.py @@ -0,0 +1,26 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from common.method_wrappers.Decorator import MetricsPool +from common.tools.database.GenericDatabase import Database +from common.method_wrappers.ServiceExceptions import OperationFailedException + +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('Automation', 'Database') + +class AutomationDB(Database): + def __init__(self, model) -> None: + LOGGER.info('Init AutomationService') + super().__init__(model) diff --git a/src/automation/service/database/AutomationModel.py b/src/automation/service/database/AutomationModel.py new file mode 100644 index 0000000000000000000000000000000000000000..156acbb93615be5e00bede49154271cb158b316d --- /dev/null +++ b/src/automation/service/database/AutomationModel.py @@ -0,0 +1,56 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy import Column, Integer, String, Text +from sqlalchemy.orm import registry +from .models._Base import _Base +from common.proto.automation_pb2 import ZSMService + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +class AutomationModel(_Base): + __tablename__ = 'automation' + + zsm_id = Column(UUID(as_uuid=False), primary_key=True) + zsm_description = Column(Text , nullable=False) + + # helps in logging the information + def __repr__(self): + return (f"") + + @classmethod + def convert_Automation_to_row(cls, uuid , zsm_description): + """ + Create an instance of Automation from a request object. + Args: request: The request object containing the data. + Returns: An instance of Automation initialized with data from the request. + """ + return cls( + zsm_id = uuid, + zsm_description = zsm_description + ) + + @classmethod + def convert_row_to_Automation(cls, row): + """ + Create and return a dictionary representation of a Automation instance. + Args: row: The Automation instance (row) containing the data. + Returns: Automation object + """ + response = ZSMService() + response.zsmServiceId.uuid.uuid = row.zsm_id + return response diff --git a/src/automation/service/database/Engine.py b/src/automation/service/database/Engine.py new file mode 100644 index 0000000000000000000000000000000000000000..43690382e513093feda4dfa89ee75ef85c08b651 --- /dev/null +++ b/src/automation/service/database/Engine.py @@ -0,0 +1,55 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, sqlalchemy, sqlalchemy_utils +from common.Settings import get_setting + +LOGGER = logging.getLogger(__name__) + +APP_NAME = 'tfs' +ECHO = False # true: dump SQL commands and transactions executed +CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' + +class Engine: + @staticmethod + def get_engine() -> sqlalchemy.engine.Engine: + crdb_uri = get_setting('CRDB_URI', default=None) + if crdb_uri is None: + CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE') + CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT') + CRDB_DATABASE = get_setting('CRDB_DATABASE') + CRDB_USERNAME = get_setting('CRDB_USERNAME') + CRDB_PASSWORD = get_setting('CRDB_PASSWORD') + CRDB_SSLMODE = get_setting('CRDB_SSLMODE') + crdb_uri = CRDB_URI_TEMPLATE.format( + CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) + + try: + engine = sqlalchemy.create_engine( + crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True) + except: # pylint: disable=bare-except # pragma: no cover + LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) + return None + + return engine + + @staticmethod + def create_database(engine : sqlalchemy.engine.Engine) -> None: + if not sqlalchemy_utils.database_exists(engine.url): + sqlalchemy_utils.create_database(engine.url) + + @staticmethod + def drop_database(engine : sqlalchemy.engine.Engine) -> None: + if sqlalchemy_utils.database_exists(engine.url): + sqlalchemy_utils.drop_database(engine.url) diff --git a/src/automation/service/database/__init__.py b/src/automation/service/database/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ccc21c7db78aac26daa1f8c5ff8e1ffd3f35460 --- /dev/null +++ b/src/automation/service/database/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/automation/service/database/models/_Base.py b/src/automation/service/database/models/_Base.py new file mode 100644 index 0000000000000000000000000000000000000000..06c5f6b02597802e0dcb1f3a5f941c977eb126a8 --- /dev/null +++ b/src/automation/service/database/models/_Base.py @@ -0,0 +1,25 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy +from typing import Any, List +from sqlalchemy.orm import Session, sessionmaker, declarative_base +from sqlalchemy.sql import text +from sqlalchemy_cockroachdb import run_transaction + +_Base = declarative_base() + +def rebuild_database(db_engine : sqlalchemy.engine.Engine, drop_if_exists : bool = False): + if drop_if_exists: _Base.metadata.drop_all(db_engine) + _Base.metadata.create_all(db_engine) diff --git a/src/automation/service/zsm_handler_api/ZSMFilterFields.py b/src/automation/service/zsm_handler_api/ZSMFilterFields.py new file mode 100644 index 0000000000000000000000000000000000000000..7b00de5bc474f8ac4a6a711acf9f7200723ddaae --- /dev/null +++ b/src/automation/service/zsm_handler_api/ZSMFilterFields.py @@ -0,0 +1,34 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from common.proto.context_pb2 import ServiceTypeEnum + +class ZSMFilterFieldEnum(Enum): + TARGET_SERVICE_TYPE = 'target_service_type' + TELEMETRY_SERVICE_TYPE = 'telemetry_service_type' + +TARGET_SERVICE_TYPE_VALUES = { + ServiceTypeEnum.SERVICETYPE_L2NM +} + +TELEMETRY_SERVICE_TYPE_VALUES = { + ServiceTypeEnum.SERVICETYPE_INT +} + +# Maps filter fields to allowed values per Filter field. # If no restriction (free text) None is specified +ZSM_FILTER_FIELD_ALLOWED_VALUES = { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE.value : TARGET_SERVICE_TYPE_VALUES, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE.value : TELEMETRY_SERVICE_TYPE_VALUES, +} diff --git a/src/automation/service/zsm_handler_api/_ZSMHandler.py b/src/automation/service/zsm_handler_api/_ZSMHandler.py new file mode 100644 index 0000000000000000000000000000000000000000..01d2614e2c8c1225aa4225f62a9f52c66cef0a0b --- /dev/null +++ b/src/automation/service/zsm_handler_api/_ZSMHandler.py @@ -0,0 +1,35 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc , logging +from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService, ZSMServiceID, ZSMServiceState, ZSMCreateUpdate +from common.proto.context_pb2 import ServiceId + +LOGGER = logging.getLogger(__name__) + +class _ZSMHandler: + def __init__(self): + LOGGER.info('Init Scenario') + + def zsmCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext): + LOGGER.info('zsmCreate method') + + def zsmDelete(self, request : ZSMServiceID, context : grpc.ServicerContext): + LOGGER.info('zsmDelete method') + + def zsmGetById(self, request : ZSMServiceID, context : grpc.ServicerContext): + LOGGER.info('zsmGetById method') + + def zsmGetByService(self, request : ServiceId, context : grpc.ServicerContext): + LOGGER.info('zsmGetByService method') diff --git a/src/automation/service/zsm_handler_api/__init__.py b/src/automation/service/zsm_handler_api/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7363515f07a52d996229bcbd72932ce1423258d7 --- /dev/null +++ b/src/automation/service/zsm_handler_api/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/automation/service/zsm_handlers/P4INTZSMPlugin.py b/src/automation/service/zsm_handlers/P4INTZSMPlugin.py new file mode 100644 index 0000000000000000000000000000000000000000..d1d5584bb90262f81e5982291d37c4aa553fb7e5 --- /dev/null +++ b/src/automation/service/zsm_handlers/P4INTZSMPlugin.py @@ -0,0 +1,89 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc , logging +from common.proto.analytics_frontend_pb2 import AnalyzerId +from common.proto.policy_pb2 import PolicyRuleState +from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService + +from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient +from automation.client.PolicyClient import PolicyClient +from context.client.ContextClient import ContextClient +from automation.service.zsm_handler_api._ZSMHandler import _ZSMHandler + +LOGGER = logging.getLogger(__name__) + +class P4INTZSMPlugin(_ZSMHandler): + def __init__(self): + LOGGER.info('Init P4INTZSMPlugin') + + def zsmCreate(self,request : ZSMCreateRequest, context : grpc.ServicerContext): + # check that service does not exist + context_client = ContextClient() + policy_client = PolicyClient() + analytics_frontend_client = AnalyticsFrontendClient() + + # Verify the input target service ID + try: + target_service_id = context_client.GetService(request.target_service_id) + except grpc.RpcError as ex: + if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + LOGGER.exception('Unable to get target service({:s})'.format(str(target_service_id))) + context_client.close() + return None + + # Verify the input telemetry service ID + try: + telemetry_service_id = context_client.GetService(request.telemetry_service_id) + except grpc.RpcError as ex: + if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + LOGGER.exception('Unable to get telemetry service({:s})'.format(str(telemetry_service_id))) + context_client.close() + return None + + # Start an analyzer + try: + analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(request.analyzer) # type: ignore + LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat))) + except grpc.RpcError as ex: + if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + LOGGER.exception('Unable to start analyzer({:s})'.format(str(request.analyzer))) + context_client.close() + analytics_frontend_client.close() + return None + + # Create a policy + try: + policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(request.policy) # type: ignore + LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state))) + except grpc.RpcError as ex: + if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member + LOGGER.exception('Unable to create policy({:s})'.format(str(request.policy))) + context_client.close() + policy_client.close() + return None + + context_client.close() + analytics_frontend_client.close() + policy_client.close() + return ZSMService() + + def zsmDelete(self): + LOGGER.info('zsmDelete method') + + def zsmGetById(self): + LOGGER.info('zsmGetById method') + + def zsmGetByService(self): + LOGGER.info('zsmGetByService method') diff --git a/src/automation/service/zsm_handlers/__init__.py b/src/automation/service/zsm_handlers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ae96f895eec410ee2788e47cfedb5abf8353af3a --- /dev/null +++ b/src/automation/service/zsm_handlers/__init__.py @@ -0,0 +1,26 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto.context_pb2 import ServiceTypeEnum +from ..zsm_handler_api.ZSMFilterFields import ZSMFilterFieldEnum +from automation.service.zsm_handlers.P4INTZSMPlugin import P4INTZSMPlugin + +ZSM_SERVICE_HANDLERS = [ + (P4INTZSMPlugin, [ + { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + } + ]) +] diff --git a/src/automation/tests/test_automation_handlers.py b/src/automation/tests/test_automation_handlers.py new file mode 100644 index 0000000000000000000000000000000000000000..16fbcb255bbfb03829f48ed8901cb23c5161deaa --- /dev/null +++ b/src/automation/tests/test_automation_handlers.py @@ -0,0 +1,83 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from ..service.AutomationServiceServicerImpl import AutomationServiceServicerImpl +from common.proto.context_pb2 import ServiceTypeEnum +from ..service.zsm_handler_api.ZSMFilterFields import ZSMFilterFieldEnum +from ..service.zsm_handlers import Poc1 , Poc2 + +LOGGER = logging.getLogger(__name__) + +ZSM_SERVICE_HANDLERS = [ + (Poc1, [ + { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + } + ]), + (Poc2, [ + { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + } + ]) +] + +def test_get_service_handler_based_on_service_types(): + automation = AutomationServiceServicerImpl() + + handler_cls = automation.get_service_handler_based_on_service_types(ServiceTypeEnum.SERVICETYPE_INT, ServiceTypeEnum.SERVICETYPE_L2NM , ZSM_SERVICE_HANDLERS) + if handler_cls: + assert True + else: + assert False + + +def test_get_service_handler_based_on_service_types_error(): + automation = AutomationServiceServicerImpl() + + handler_cls = automation.get_service_handler_based_on_service_types(ServiceTypeEnum.SERVICETYPE_INT , ServiceTypeEnum.SERVICETYPE_L2NM , ZSM_SERVICE_HANDLERS) + if handler_cls: + assert True + else: + assert False + +def test_check_if_requested_services_pass_filter_criteria(): + filter = { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + } + + automation = AutomationServiceServicerImpl() + flag = automation.check_if_requested_services_pass_filter_criteria(filter , ServiceTypeEnum.SERVICETYPE_L2NM , ServiceTypeEnum.SERVICETYPE_INT) + + if flag: + assert True + else: + assert False + +def test_check_if_requested_services_pass_filter_criteria_error(): + filter = { + ZSMFilterFieldEnum.TARGET_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_L2NM, + ZSMFilterFieldEnum.TELEMETRY_SERVICE_TYPE : ServiceTypeEnum.SERVICETYPE_INT, + } + + automation = AutomationServiceServicerImpl() + flag = automation.check_if_requested_services_pass_filter_criteria(filter , ServiceTypeEnum.SERVICETYPE_INT , ServiceTypeEnum.SERVICETYPE_L2NM) + + if flag: + assert False + else: + assert True