diff --git a/proto/automation.proto b/proto/automation.proto index 3525d41b90d3f5eeb6aa64e050a8d71cd5d97812..4c628b76fc131868f17ba50c8c49330183852dba 100644 --- a/proto/automation.proto +++ b/proto/automation.proto @@ -37,15 +37,11 @@ enum ZSMServiceStateEnum { ZSM_REMOVED = 5; // ZSM loop is removed } -enum ZSMTypeEnum { - ZSMTYPE_UNKNOWN = 0; -} - message ZSMCreateRequest { - context.ServiceId target_service_id = 1; - context.ServiceId telemetry_service_id = 2; + context.ServiceId target_service_id = 1; + context.ServiceId telemetry_service_id = 2; analytics_frontend.Analyzer analyzer = 3; - policy.PolicyRuleService policy = 4; + policy.PolicyRuleService policy = 4; } // A unique identifier per ZSM service @@ -62,9 +58,6 @@ message ZSMServiceState { // Basic ZSM service attributes message ZSMService { ZSMServiceID zsmServiceId = 1; - context.ServiceId serviceId = 2; policy.PolicyRuleList policyList = 3; - - // TODO: When new Analytics and updated Monitoring are in place, add the necessary binding to them } diff --git a/src/automation/README.md b/src/automation/README.md new file mode 100644 index 0000000000000000000000000000000000000000..b946732e1815581d2430be1e86f5bd84014255c3 --- /dev/null +++ b/src/automation/README.md @@ -0,0 +1,116 @@ +# ETSI TFS Automation Service + +The Automation service exposes a gPRC API with the following methods: +- ZSMCreate +- ZSMDelete +- ZSMGetById +- ZSMGetByService + +To Invoke this API follow the steps below: + +## Install grpcurl + +```bash +curl -sSL "https://github.com/fullstorydev/grpcurl/releases/download/v1.8.7/grpcurl_1.8.7_linux_x86_64.tar.gz" | sudo tar -xz -C /usr/local/bin +``` + +## Fetch available methods around Automation + +```bash +cd ~/tfs-ctrl/ +grpcurl -import-path ./proto -proto automation.proto list +grpcurl -import-path ./proto -proto automation.proto describe automation.AutomationService +grpcurl -import-path ./proto -proto automation.proto describe automation.AutomationService.ZSMCreate +``` + +## Try an example ZSMCreate + +TFS tests are now augmented with a new Automation example that shows the way to trigger a ZSM loop creaiton. +The following script invokes Automation on top of a specific example topology. + +```bash +bash src/tests/automation/run_test_automation.sh +``` + +More details are provided in `src/tests/automation/README.md` + +## Important Services + +WebUI + +``` +http://10.10.10.41/webui/ +``` + +Grafana + +``` +http://10.10.10.41/grafana +``` + +Prometheus + +``` +http://10.10.10.41:30090/ +``` + +## Check Kafka topics + +The following commands may help you debug a closed loop that involves Telemetry, Analytics, Policy, all managed by Automation. +Kafka is a key element for Automation as the KPIs managed by Analyzer create alarms that propagate to Policy via a dedicated Kafka topic. +Checking this topic is key for ensuring proper communication between Analytics and Policy. + +Get all pods in Kafka namespace: + +```bash +kubectl get pods -n kafka + +NAME READY STATUS RESTARTS AGE +kafka-0 1/1 Running 2 (90d ago) 104d +kafka-broker-5f9656cc68-d8fzz 1/1 Running 13 (44h ago) 429d +zookeeper-8664c6774d-nvbgg 1/1 Running 6 (90d ago) 429d +``` + +Query Kafka from within the Kafka broker's pod: + +```bash +kubectl exec -it kafka-0 -n kafka -- \ + kafka-consumer-groups.sh \ + --bootstrap-server kafka-service.kafka.svc.cluster.local:9092 \ + --list + +Output> + backend + KpiValueWriter + analytics-backend + policy +``` + +Now let's see the consumer groups. + +```bash +kubectl exec -it kafka-0 -n kafka -- \ + kafka-consumer-groups.sh \ + --bootstrap-server kafka-service.kafka.svc.cluster.local:9092 \ + --list + +Output> + backend + KpiValueWriter + analytics-backend + policy +``` + +Let's describe the policy group: + +```bash +kubectl exec -it kafka-0 -n kafka -- \ + kafka-consumer-groups.sh \ + --bootstrap-server kafka-service.kafka.svc.cluster.local:9092 \ + --describe \ + --group policy + +Output> +GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID +policy topic_alarms 0 40 41 1 kafka-consumer-topic_alarms-c8f09ac5-8c44-42ec-a13b-1af2c47fb86c /10.1.181.228 kafka-consumer-topic_alarms +``` diff --git a/src/automation/client/AutomationClient.py b/src/automation/client/AutomationClient.py new file mode 100644 index 0000000000000000000000000000000000000000..09a39f71e191a89106d0635d227e9e827774a088 --- /dev/null +++ b/src/automation/client/AutomationClient.py @@ -0,0 +1,54 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc +from common.proto.automation_pb2_grpc import AutomationServiceStub +from common.proto.automation_pb2 import ZSMCreateRequest, ZSMService, ZSMServiceID, ZSMServiceState +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class AutomationClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.AUTOMATION) + if not port: port = get_service_port_grpc(ServiceNameEnum.AUTOMATION) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.info('Creating channel to {:s}...'.format(str(self.endpoint))) + self.channel = None + self.stub = None + self.openconfig_stub = None + self.connect() + LOGGER.info('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = AutomationServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def ZSMCreate(self, request : ZSMCreateRequest) -> ZSMService: # type: ignore + LOGGER.info('ZSMCreate request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.ZSMCreate(request) + LOGGER.info('ZSMCreate result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/automation/client/PolicyClient.py b/src/automation/client/PolicyClient.py index 7b66049398fa7c6f5b4684245b8e1a18ad81bca7..e364bf17bc5e76c4986af9ba316c5e8f1916139f 100644 --- a/src/automation/client/PolicyClient.py +++ b/src/automation/client/PolicyClient.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import grpc, logging, uuid from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.policy_pb2 import PolicyRuleService, PolicyRuleState @@ -47,8 +47,8 @@ class PolicyClient: self.stub = None @RETRY_DECORATOR - def PolicyAddService(self, request : PolicyRuleService) -> PolicyRuleState: - LOGGER.debug('AddPolicy request: {:s}'.format(grpc_message_to_json_string(request))) + def PolicyAddService(self, request : PolicyRuleService) -> PolicyRuleState: # type: ignore + LOGGER.info('AddPolicy request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.PolicyAddService(request) - LOGGER.debug('AddPolicy result: {:s}'.format(grpc_message_to_json_string(response))) + LOGGER.info('AddPolicy result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/automation/service/AutomationServiceServicerImpl.py b/src/automation/service/AutomationServiceServicerImpl.py index 187eba055cdfd614abd680cc8ef34ebe5911fe3a..7113c268b73a40899151ff4bb3b69154d85a2977 100644 --- a/src/automation/service/AutomationServiceServicerImpl.py +++ b/src/automation/service/AutomationServiceServicerImpl.py @@ -38,7 +38,7 @@ class AutomationServiceServicerImpl(AutomationServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL,LOGGER) def ZSMCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext) -> ZSMService: - LOGGER.info("Received gRPC message object: {:}".format(request)) + LOGGER.info("Received gRPC message object:\n{:}".format(request)) context_client = ContextClient() targetService = context_client.GetService(request.target_service_id) diff --git a/src/automation/service/__main__.py b/src/automation/service/__main__.py index d8ea030baaa8f06841c5e701c0287f3f735c18d4..9d7a2aceebf86e43c40c04d8a33db6b498cf83e0 100644 --- a/src/automation/service/__main__.py +++ b/src/automation/service/__main__.py @@ -14,7 +14,7 @@ import logging, signal, sys, threading from prometheus_client import start_http_server -from automation.service.EventEngine import EventEngine +# from automation.service.EventEngine import EventEngine from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, @@ -22,8 +22,8 @@ from common.Settings import ( wait_for_environment_variables ) from .AutomationService import AutomationService -from common.tools.database.GenericDatabase import Database -from automation.service.database.AutomationModel import AutomationModel +# from common.tools.database.GenericDatabase import Database +# from automation.service.database.AutomationModel import AutomationModel from .database.Engine import Engine from .database.models._Base import rebuild_database @@ -34,11 +34,11 @@ LOGGER = logging.getLogger(__name__) terminate = threading.Event() def signal_handler(signal, frame): # pylint: disable=redefined-outer-name,unused-argument - LOGGER.warning('Terminate signal received') + LOGGER.warning("Terminate signal received") terminate.set() def main(): - LOGGER.info('Starting...') + LOGGER.info("Starting...") wait_for_environment_variables([ get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), @@ -63,16 +63,17 @@ def main(): start_http_server(metrics_port) # Get Database Engine instance and initialize database, if needed - LOGGER.info('Getting SQLAlchemy DB Engine...') + LOGGER.info("Getting SQLAlchemy DB Engine...") db_engine = Engine.get_engine() if db_engine is None: - LOGGER.error('Unable to get SQLAlchemy DB Engine...') + LOGGER.error("Unable to get SQLAlchemy DB Engine...") return -1 + # Create the database try: Engine.create_database(db_engine) - except: # pylint: disable=bare-except # pragma: no cover - LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url))) + except Exception as ex: + LOGGER.exception(f"Failed to check/create the database: {str(db_engine.url)}") rebuild_database(db_engine) @@ -83,12 +84,18 @@ def main(): # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass - LOGGER.info('Terminating...') + LOGGER.info("Terminating...") grpc_service.stop() - event_engine.stop() + # event_engine.stop() - LOGGER.info('Bye') + # Drop the database + try: + Engine.drop_database(db_engine) + except Exception as ex: + LOGGER.exception(f"Failed to check/create the database: {str(db_engine.url)}") + + LOGGER.info("Bye") return 0 -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/src/automation/service/database/AutomationDB.py b/src/automation/service/database/AutomationDB.py index 0491cb590d1789f79af8508a44aa9baf93bbeff3..9b13ebd03a362e2c569312dfbc8c1251b1299955 100644 --- a/src/automation/service/database/AutomationDB.py +++ b/src/automation/service/database/AutomationDB.py @@ -17,10 +17,10 @@ from common.method_wrappers.Decorator import MetricsPool from common.tools.database.GenericDatabase import Database from common.method_wrappers.ServiceExceptions import OperationFailedException -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Automation', 'Database') class AutomationDB(Database): def __init__(self, model) -> None: - LOGGER.info('Init AutomationService') + LOGGER.info('Init Automation database') super().__init__(model) diff --git a/src/automation/service/database/AutomationModel.py b/src/automation/service/database/AutomationModel.py index 156acbb93615be5e00bede49154271cb158b316d..bbf4d5f1f88a8a21e5b23f7b6cb3f48cc00e42a1 100644 --- a/src/automation/service/database/AutomationModel.py +++ b/src/automation/service/database/AutomationModel.py @@ -19,7 +19,6 @@ from sqlalchemy.orm import registry from .models._Base import _Base from common.proto.automation_pb2 import ZSMService -logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) class AutomationModel(_Base): @@ -47,10 +46,10 @@ class AutomationModel(_Base): @classmethod def convert_row_to_Automation(cls, row): """ - Create and return a dictionary representation of a Automation instance. + Create and return a dictionary representation of an Automation instance. Args: row: The Automation instance (row) containing the data. Returns: Automation object """ response = ZSMService() - response.zsmServiceId.uuid.uuid = row.zsm_id + response.zsmServiceId.uuid.uuid = row.zsm_id return response diff --git a/src/automation/service/zsm_handler_api/_ZSMHandler.py b/src/automation/service/zsm_handler_api/_ZSMHandler.py index e79601cf755ced11f734bdfa7f12f63d857b25cd..aaf5f9cc5927e7010539b94922e6496ea06dfffd 100644 --- a/src/automation/service/zsm_handler_api/_ZSMHandler.py +++ b/src/automation/service/zsm_handler_api/_ZSMHandler.py @@ -23,7 +23,7 @@ LOGGER = logging.getLogger(__name__) class _ZSMHandler: def __init__(self): - LOGGER.info('Init Scenario') + LOGGER.info('ZSM init') def zsmCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext): LOGGER.info('zsmCreate method') diff --git a/src/automation/service/zsm_handlers/P4INTZSMPlugin.py b/src/automation/service/zsm_handlers/P4INTZSMPlugin.py index e71e2e6e3e87c43ee6b9b29997f7f674d3029227..05d866ce812257751cbfb3c5c232bc1f6a2b1df1 100644 --- a/src/automation/service/zsm_handlers/P4INTZSMPlugin.py +++ b/src/automation/service/zsm_handlers/P4INTZSMPlugin.py @@ -21,7 +21,6 @@ from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendC from automation.client.PolicyClient import PolicyClient from context.client.ContextClient import ContextClient from automation.service.zsm_handler_api._ZSMHandler import _ZSMHandler -from common.proto.policy_condition_pb2 import PolicyRuleCondition LOGGER = logging.getLogger(__name__) @@ -29,8 +28,7 @@ class P4INTZSMPlugin(_ZSMHandler): def __init__(self): LOGGER.info('Init P4INTZSMPlugin') - def zsmCreate(self,request : ZSMCreateRequest, context : grpc.ServicerContext): - # check that service does not exist + def zsmCreate(self, request : ZSMCreateRequest, context : grpc.ServicerContext): # type: ignore context_client = ContextClient() policy_client = PolicyClient() analytics_frontend_client = AnalyticsFrontendClient() @@ -39,8 +37,8 @@ class P4INTZSMPlugin(_ZSMHandler): try: target_service_id = context_client.GetService(request.target_service_id) except grpc.RpcError as ex: + LOGGER.exception(f'Unable to get target service:\n{str(target_service_id)}') if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - LOGGER.exception('Unable to get target service({:s})'.format(str(target_service_id))) context_client.close() return None @@ -48,38 +46,30 @@ class P4INTZSMPlugin(_ZSMHandler): try: telemetry_service_id = context_client.GetService(request.telemetry_service_id) except grpc.RpcError as ex: + LOGGER.exception(f'Unable to get telemetry service:\n{str(telemetry_service_id)}') if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - LOGGER.exception('Unable to get telemetry service({:s})'.format(str(telemetry_service_id))) context_client.close() return None # Start an analyzer try: - analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(request.analyzer) # type: ignore - LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat))) + analyzer_id: AnalyzerId = analytics_frontend_client.StartAnalyzer(request.analyzer) # type: ignore + LOGGER.info('Analyzer_id:\n{:s}'.format(str(analyzer_id))) except grpc.RpcError as ex: + LOGGER.exception(f'Unable to start Analyzer:\n{str(request.analyzer)}') if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - LOGGER.exception('Unable to start analyzer({:s})'.format(str(request.analyzer))) context_client.close() analytics_frontend_client.close() return None # Create a policy try: - LOGGER.info('policy({:s})'.format(str(request.policy))) - # PolicyRuleCondition - policyRuleCondition = PolicyRuleCondition() - # policyRuleCondition.kpiId.kpi_id.uuid = request.analyzer.output_kpi_ids[0].kpi_id.uuid - # policyRuleCondition.numericalOperator = 5 - # policyRuleCondition.kpiValue.floatVal = 300 - # request.policy.policyRuleBasic.conditionList.append(policyRuleCondition) - LOGGER.info('policy after({:s})'.format(str(request.policy))) - + LOGGER.info(f'Policy:\n{str(request.policy)}') policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(request.policy) # type: ignore - LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state))) - except grpc.RpcError as ex: + LOGGER.info(f'Policy rule state:\n{policy_rule_state}') + except Exception as ex: + LOGGER.exception(f'Unable to create policy:\n{str(request.policy)}') if ex.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member - LOGGER.exception('Unable to create policy({:s})'.format(str(request.policy))) context_client.close() policy_client.close() return None diff --git a/src/tests/Fixtures.py b/src/tests/Fixtures.py index 687642762e6a1faf3cc61282ade13099171dde9a..9aa9a1c6a83ab7ea7125fd3564748830c07af18d 100644 --- a/src/tests/Fixtures.py +++ b/src/tests/Fixtures.py @@ -15,9 +15,10 @@ import pytest from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient -from monitoring.client.MonitoringClient import MonitoringClient from e2e_orchestrator.client.E2EOrchestratorClient import E2EOrchestratorClient +from kpi_manager.client.KpiManagerClient import KpiManagerClient from service.client.ServiceClient import ServiceClient +from automation.client.AutomationClient import AutomationClient @pytest.fixture(scope='session') @@ -39,13 +40,19 @@ def device_client(): _client.close() @pytest.fixture(scope='session') -def monitoring_client(): - _client = MonitoringClient() +def e2eorchestrator_client(): + _client = E2EOrchestratorClient() yield _client _client.close() @pytest.fixture(scope='session') -def e2eorchestrator_client(): - _client = E2EOrchestratorClient() +def automation_client(): + _client = AutomationClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def kpi_manager_client(): + _client = KpiManagerClient() yield _client _client.close()