From dd69461b39086fb64cef05a192faa71def767378 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 30 Jan 2026 10:48:49 +0000 Subject: [PATCH 1/4] feat: Enhance AI Analytics Engine with new connection event handling and forecasting capabilities --- src/simap_connector/service/__main__.py | 2 +- .../service/simap_updater/SimapUpdater.py | 65 ++++++- .../ai_model/ai_processor.py | 110 +++++++++-- .../ai_model/sla_policy.py | 16 +- .../AI_analytics_engine/api/api_blueprint.py | 23 +-- .../clients/decision_client.py | 2 +- .../clients/influxdb_fetcher.py | 164 +++++++++++++--- .../AI_analytics_engine/config/Config.py | 3 +- .../AI_analytics_engine/tests/test_api.py | 176 ++++++++---------- .../simap_server/simap_client/SimapClient.py | 22 +-- .../simap_server/simap_client/__main__.py | 11 +- 11 files changed, 412 insertions(+), 182 deletions(-) diff --git a/src/simap_connector/service/__main__.py b/src/simap_connector/service/__main__.py index 1e8e8d88a..01c36f717 100644 --- a/src/simap_connector/service/__main__.py +++ b/src/simap_connector/service/__main__.py @@ -73,7 +73,7 @@ def main(): except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url))) - rebuild_database(db_engine) + rebuild_database(db_engine) # drop and recreate all tables restconf_client = RestConfClient( scheme = SIMAP_SERVER_SCHEME, address = SIMAP_SERVER_ADDRESS, diff --git a/src/simap_connector/service/simap_updater/SimapUpdater.py b/src/simap_connector/service/simap_updater/SimapUpdater.py index 343b373c7..573085ac9 100644 --- a/src/simap_connector/service/simap_updater/SimapUpdater.py +++ b/src/simap_connector/service/simap_updater/SimapUpdater.py @@ -18,7 +18,8 @@ from typing import Any, Optional, Set from common.Constants import DEFAULT_TOPOLOGY_NAME from common.DeviceTypes import DeviceTypeEnum from common.proto.context_pb2 import ( - ContextEvent, DeviceEvent, Empty, LinkEvent, ServiceEvent, SliceEvent, TopologyEvent + ContextEvent, DeviceEvent, Empty, LinkEvent, ServiceEvent, + SliceEvent, TopologyEvent, ConnectionEvent ) from common.tools.grpc.BaseEventCollector import BaseEventCollector from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher @@ -87,6 +88,7 @@ class EventDispatcher(BaseEventDispatcher): MSG = 'Skipping Slice Event: {:s}' LOGGER.debug(MSG.format(grpc_message_to_json_string(slice_event))) +# ----- Topology Events ----- def _dispatch_topology_set(self, topology_event : TopologyEvent) -> bool: MSG = 'Processing Topology Event: {:s}' @@ -137,6 +139,7 @@ class EventDispatcher(BaseEventDispatcher): MSG = 'Topology Remove: {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(topology_event))) +# ----- Device Events ----- def _dispatch_device_set(self, device_event : DeviceEvent) -> bool: MSG = 'Processing Device Event: {:s}' @@ -270,6 +273,7 @@ class EventDispatcher(BaseEventDispatcher): MSG = 'Device Remove: {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) +# ----- Link Events ----- def _dispatch_link_set(self, link_event : LinkEvent) -> bool: MSG = 'Processing Link Event: {:s}' @@ -450,6 +454,7 @@ class EventDispatcher(BaseEventDispatcher): MSG = 'Link Removed: {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(link_event))) +# ----- Service Events ----- def _dispatch_service_set(self, service_event : ServiceEvent) -> bool: MSG = 'Processing Service Event: {:s}' @@ -652,6 +657,64 @@ class EventDispatcher(BaseEventDispatcher): MSG = 'Logical Link Removed for Service: {:s}' LOGGER.info(MSG.format(grpc_message_to_json_string(service_event))) +# ----- Connection Events ----- + + def dispatch_connection_set(self, connection_event : ConnectionEvent) -> bool: + MSG = 'Processing Connection Event: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(connection_event))) + + # Here a connection object from context is received in connection_event. + # Here is gRPC message definition: message Connection { ConnectionId connection_id = 1; ServiceId service_id = 2; repeated EndPointId path_hops_endpoint_ids = 3; repeated ServiceId sub_service_ids = 4; ConnectionSettings settings = 5;} + # discard sub_service_ids and settings for now, as not used in SIMAP population. + # Extract service_id, endpoint_ids from connection_event to identify the connection. + # Get all links using gRPC ListLinkIds() from context, and find which link(s) correspond to the connection's endpoint_ids. + # Then update SIMAP accordingly. + # Then, do this only for connections that correspond to links that this controller is allowed to manage, as per ALLOWED_LINKS_PER_CONTROLLER. + # Then, do something like this (pseudocode): + # worker_name = '{:s}:{:s}'.format(topology_name, link_name) + # resources = Resources() + # resources.links.append(ResourceLink( + # domain_name=topology_name, link_name=link_name, + # bandwidth_utilization_sampler=SyntheticSampler.create_random( + # amplitude_scale = 25.0, + # phase_scale = 1e-7, + # period_scale = 86_400, + # offset_scale = 25, + # noise_ratio = 0.05, + # min_value = 0.0, + # max_value = 100.0, + # ), + # latency_sampler=SyntheticSampler.create_random( + # amplitude_scale = 0.5, + # phase_scale = 1e-7, + # period_scale = 60.0, + # offset_scale = 10.0, + # noise_ratio = 0.05, + # min_value = 0.0, + # ), + # related_service_ids=[], + # )) + # sampling_interval = 1.0 + # self._telemetry_pool.start_synthesizer(worker_name, resources, sampling_interval) + + return True + + def dispatch_connection_create(self, connection_event : ConnectionEvent) -> None: + if not self.dispatch_connection_set(connection_event): return + + MSG = 'Skipping Connection Create Event: {:s}' + LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event))) + + def dispatch_connection_update(self, connection_event : ConnectionEvent) -> None: + if not self.dispatch_connection_set(connection_event): return + + MSG = 'Skipping Connection Update Event: {:s}' + LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event))) + + def dispatch_connection_remove(self, connection_event : ConnectionEvent) -> None: + MSG = 'Skipping Connection Remove Event: {:s}' + LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event))) + class SimapUpdater: def __init__( diff --git a/src/tests/mwc26-f5ga/AI_analytics_engine/ai_model/ai_processor.py b/src/tests/mwc26-f5ga/AI_analytics_engine/ai_model/ai_processor.py index ace48a1bf..5395e38ac 100644 --- a/src/tests/mwc26-f5ga/AI_analytics_engine/ai_model/ai_processor.py +++ b/src/tests/mwc26-f5ga/AI_analytics_engine/ai_model/ai_processor.py @@ -20,7 +20,11 @@ Provides AI/ML processing functionality for SLA analysis. import logging from datetime import datetime, UTC from random import Random -from typing import Any, Dict +from typing import Any, Dict, Optional + +from numpy import average +import pandas as pd +from statsmodels.tsa.holtwinters import ExponentialSmoothing from .sla_policy import SLAPolicyConfig @@ -46,37 +50,115 @@ class AIModelProcessor: # TODO: Load AI/ML models here # Example: self.model = load_model('sla_violation_detector.h5') + + def ai_model_processor( + self, + metric_values: list[float] + ) -> Optional[list[float]]: + """ + Process device and performance data through AI models. + + Args: + metric_values: List of performance metric values. + Returns: + List of 3 forecasted values, or None if insufficient data. + + """ + LOGGER.debug("Processing data through AI models") + # LOGGER.debug(f"Number of performance data points: {len(metric_values)}") + + if not metric_values or len(metric_values) < 4: + LOGGER.warning("Insufficient data for forecasting") + return None + + try: + # Convert metric_values to pandas Series + data = pd.Series(metric_values) + + # Create and fit Exponential Smoothing model + model = ExponentialSmoothing( + data, + trend="add", + seasonal=None # No seasonal component for this data + ) + + fit = model.fit() + + # Forecast next 3 values + forecast = fit.forecast(steps=3) + + forecasted_values = forecast.tolist() + + # Calculate confidence score based on model fit quality + # Using residual standard error as inverse confidence metric + residuals = fit.resid + mse = (residuals ** 2).mean() + rmse = mse ** 0.5 + + # Normalize confidence: lower RMSE = higher confidence + # Use data scale (std dev) to normalize RMSE + data_std = data.std() + if data_std > 0: + normalized_error = rmse / data_std + # Convert to confidence score (0-1 range, higher is better) + confidence = max(0, min(1, 1 - normalized_error)) + else: + confidence = 0.5 # Default if std dev is 0 + + # LOGGER.info(f"Model RMSE: {rmse:.4f}, Confidence: {confidence:.4f}") + LOGGER.info(f"Forecasted next 3 values: {forecasted_values}") + + # return forecasted_values + return [confidence] + + except Exception as e: + LOGGER.error(f"Error during forecasting: {e}") + return None + + def process_data( self, - device_data: Dict[str, Any], performance_data: Dict[str, Any], sla_policy: SLAPolicyConfig ) -> Dict[str, Any]: """ Process device and performance data through AI models. - Analyzes the collected data against the SLA policy thresholds, - detects violations, and generates recommendations for remediation. - Args: - device_data: Device and topology data from SIMAP. performance_data: Performance metrics from InfluxDB. sla_policy: The SLA policy configuration with thresholds. Returns: Dictionary containing: - - 'violations': List of detected SLA violations with details. - - 'recommendations': List of recommended actions. + - 'confidence_scores': AI model confidence scores. - 'summary': Dictionary with analysis summary statistics. """ LOGGER.debug("Processing data through AI models") - # TODO: Implement actual AI/ML processing logic - # Example: - # violations = self.model.predict(features) - # recommendations = self.generate_recommendations(violations) + + metric_values = performance_data.get('metric_values', []) + LOGGER.debug(f"Number of performance data points: {len(metric_values)}") + # LOGGER.debug(f"Performance data values: {metric_values}") + + # forecasted_values = self.ai_model_processor(metric_values) + # if forecasted_values is None: + # LOGGER.warning("AI model processing failed or insufficient data") + + # if forecasted_values: + # # Exponential weights: more weight on earlier (starting) values + # # Example: for 3 values -> weights = [0.5, 0.33, 0.17] (exponential decay) + # weights = [2**(-i) for i in range(len(forecasted_values))] + # # Normalize weights to sum to 1 + # total_weight = sum(weights) + # weights = [w / total_weight for w in weights] + # score = average(forecasted_values, weights=weights) + # # LOGGER.debug(f"Weighted average with exponential weights: {score}, weights: {weights}") + # else: + # score = None + + score = self.ai_model_processor(metric_values) + return { - 'violations': [], - 'confidence_scores': round(Random().random(), 2), + 'confidence_scores': score, 'summary': { 'sla_policy': sla_policy.to_dict(), 'timestamp': datetime.now(UTC).isoformat() diff --git a/src/tests/mwc26-f5ga/AI_analytics_engine/ai_model/sla_policy.py b/src/tests/mwc26-f5ga/AI_analytics_engine/ai_model/sla_policy.py index 478725823..ff957ad0a 100644 --- a/src/tests/mwc26-f5ga/AI_analytics_engine/ai_model/sla_policy.py +++ b/src/tests/mwc26-f5ga/AI_analytics_engine/ai_model/sla_policy.py @@ -17,7 +17,7 @@ SLA Policy Configuration dataclass. Defines the structure for SLA policy configurations used in AI analysis. """ - +from __future__ import annotations from dataclasses import asdict, dataclass from typing import Any, Dict @@ -35,12 +35,12 @@ class SLAPolicyConfig: time_window_seconds: Time window in seconds for data analysis. """ simap_id: str - latency_threshold_ms: float - bandwidth_utilization_threshold_pct: float + latency_threshold_ms: float|None + bandwidth_utilization_threshold_pct: float|None time_window_seconds: int @classmethod - def from_dict(cls, data: Dict[str, Any]) -> 'SLAPolicyConfig': + def from_dict(cls, data: Dict[str, Any]) -> SLAPolicyConfig: """ Create an SLAPolicyConfig instance from a dictionary. @@ -59,11 +59,11 @@ class SLAPolicyConfig: ValueError: If a field has an invalid value. """ try: - simap_id = str(data['simap_id']) - metrics = data['sla_metrics'] + simap_id = str(data['simap_id']) + metrics = data['sla_metrics'] latency_threshold_ms = float(metrics['latency_threshold_ms']) - bandwidth_threshold = float(metrics.get('bandwidth_utilization_threshold_pct')) - time_window = int(data.get('window_size_sec', 300)) + bandwidth_threshold = float(metrics.get('bandwidth_utilization_threshold_pct', 0.0)) + time_window = int(data.get('window_size_sec', 300)) return cls( simap_id = simap_id, diff --git a/src/tests/mwc26-f5ga/AI_analytics_engine/api/api_blueprint.py b/src/tests/mwc26-f5ga/AI_analytics_engine/api/api_blueprint.py index a21fcc40b..ea28ccfb1 100644 --- a/src/tests/mwc26-f5ga/AI_analytics_engine/api/api_blueprint.py +++ b/src/tests/mwc26-f5ga/AI_analytics_engine/api/api_blueprint.py @@ -66,7 +66,7 @@ def create_ai_analytics_blueprint( Run SLA policy analysis. Expects JSON payload with SLA policy configuration. - Orchestrates the full analysis workflow: fetch data from SIMAP, + Orchestrates the full analysis workflow: fetch metrics from InfluxDB, process through AI models, and send results to Decision Engine. @@ -110,24 +110,25 @@ def create_ai_analytics_blueprint( # Execute analysis workflow try: - # Step 1: Fetch device data from SIMAP - LOGGER.debug("Step 1: Fetching device data from SIMAP") - device_data = simap_fetcher.fetch_device_data(sla_policy) + # Step 1: Fetch device data from SIMAP + # (At the moment, leaving it as it is. No more needed, to be removed in future) + # LOGGER.debug("Step 1: Fetching device data from SIMAP") + # device_data = simap_fetcher.fetch_device_data(sla_policy) # Step 2: Fetch performance data from InfluxDB - LOGGER.debug("Step 2: Fetching performance data from InfluxDB") + LOGGER.debug(">>> Step 2: Fetching performance data from InfluxDB") performance_data = influxdb_fetcher.fetch_performance_data( - sla_policy, device_data + sla_policy ) - # Step 3: Process data through AI models - LOGGER.debug("Step 3: Processing data through AI models") + # >>> Step 3: Process data through AI models + LOGGER.debug(">>> Step 3: Processing data through AI models") results = ai_processor.process_data( - device_data, performance_data, sla_policy + performance_data, sla_policy ) - # Step 4: Send results to Decision Engine - LOGGER.debug("Step 4: Sending results to Decision Engine") + # >>> Step 4: Send results to Decision Engine + LOGGER.debug(">>> Step 4: Sending results to Decision Engine") if not decision_client.send_results(results): LOGGER.error("Failed to send results to Decision Engine") return jsonify({ diff --git a/src/tests/mwc26-f5ga/AI_analytics_engine/clients/decision_client.py b/src/tests/mwc26-f5ga/AI_analytics_engine/clients/decision_client.py index 51eaa676b..7797b96ce 100644 --- a/src/tests/mwc26-f5ga/AI_analytics_engine/clients/decision_client.py +++ b/src/tests/mwc26-f5ga/AI_analytics_engine/clients/decision_client.py @@ -64,7 +64,7 @@ class DecisionEngineClient: """ LOGGER.info("Sending results to Decision Engine") try: - print(json.dumps(results, indent=2)) + # print(json.dumps(results, indent=2)) return True except Exception as e: LOGGER.error(f"Failed to send results to Decision Engine: {e}") diff --git a/src/tests/mwc26-f5ga/AI_analytics_engine/clients/influxdb_fetcher.py b/src/tests/mwc26-f5ga/AI_analytics_engine/clients/influxdb_fetcher.py index d7c05cc28..d989011dc 100644 --- a/src/tests/mwc26-f5ga/AI_analytics_engine/clients/influxdb_fetcher.py +++ b/src/tests/mwc26-f5ga/AI_analytics_engine/clients/influxdb_fetcher.py @@ -18,12 +18,14 @@ InfluxDB Fetcher module. Provides functionality to fetch performance metrics from InfluxDB. """ +from urllib import response +import pandas as pd import logging -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict from common.tools.client.RetryDecorator import delay_exponential, retry - +from influxdb_client_3 import InfluxDBClient3 from ..ai_model.sla_policy import SLAPolicyConfig LOGGER = logging.getLogger(__name__) @@ -63,16 +65,98 @@ class InfluxDBFetcher: self.influxdb_port = influxdb_port self.influxdb_token = influxdb_token self.influxdb_database = influxdb_database + + # Construct full URL with port for InfluxDB v3 + self.influxdb_url = f"http://{influxdb_host}:{influxdb_port}" + LOGGER.info( f"InfluxDBFetcher initialized for database '{influxdb_database}' " - f"at {influxdb_host}:{influxdb_port}" + f"at {self.influxdb_url}" ) + self._client = InfluxDBClient3( + host=self.influxdb_url, + token=self.influxdb_token, + database=self.influxdb_database + ) + + def is_connected(self) -> bool: + """ + Check if InfluxDB client is initialized and ready. + + Returns: + True if client is ready, False otherwise. + """ + try: + if isinstance(self._client, InfluxDBClient3): + LOGGER.debug("InfluxDB client is initialized") + return True + else: + LOGGER.warning("InfluxDB client is not initialized") + return False + except Exception as e: + LOGGER.error(f"Error checking InfluxDB connection: {e}") + return False + + def process_response_table( + self, + table: Any, + metric_to_process: str + ) -> Dict[str, Any]: + """ + Process InfluxDB response table into structured data. + + Args: + table: Raw response table from InfluxDB query. + + Returns: + Dictionary containing processed performance metrics and values. + """ + metrics = [] + if table is not None and isinstance(table, pd.DataFrame): + metrics = table.to_dict('records') + else: + LOGGER.warning("No data returned from InfluxDB query ") + + keys_to_check = [ 'time', 'link_id'] + sla_metric = 'bandwidth_utilization' + keys_to_check.append(sla_metric) + + # if metric_to_process == 'latency_threshold_ms': + # sla_metric = 'latency' + # keys_to_check.append(sla_metric) + # elif metric_to_process == 'bandwidth_utilization_threshold_pct': + # sla_metric = 'bandwidth_utilization' + # keys_to_check.append(sla_metric) + # else: + # sla_metric = None + # LOGGER.warning(f"Unknown metric to process: {metric_to_process}") + + + LOGGER.debug(f"Processed {len(metrics)} metric records from InfluxDB response") + data = [] + metric_value = [] + if sla_metric is not None: + for row in metrics: + # LOGGER.debug(f"Metric record: {row}") + new_row = {} + for key, value in row.items(): + if key in keys_to_check: + new_row[key] = value + if key == sla_metric: + metric_value.append(value) + data.append(new_row) + LOGGER.debug(f">>> Processed metric values: {metric_value}") + + return { + 'metrics': data, + 'metric_values': metric_value + } + @RETRY_DECORATOR def fetch_performance_data( self, - sla_policy: SLAPolicyConfig, - device_data: Dict[str, Any] + sla_policy: SLAPolicyConfig ) -> Dict[str, Any]: """ Fetch performance metrics from InfluxDB. @@ -84,8 +168,6 @@ class InfluxDBFetcher: Args: sla_policy: The SLA policy configuration containing time window and threshold parameters. - device_data: Device and topology data from SIMAP used to - identify which metrics to query. Returns: Dictionary containing: @@ -97,30 +179,56 @@ class InfluxDBFetcher: Exception: If InfluxDB is unavailable after all retries, or if the query fails. """ + + if not self.is_connected(): + raise ConnectionError("Unable to connect to InfluxDB") + + if sla_policy.latency_threshold_ms is None: + raise ValueError("SLA policy missing latency threshold for data fetch") + + metric_to_process = sla_policy.latency_threshold_ms + LOGGER.debug( - f"Fetching performance data for time window: " - f"{sla_policy.time_window_seconds}s" + f"Fetching performance data for simap_id={sla_policy.simap_id}, " + f"time_window={sla_policy.time_window_seconds}s " + f"for metric={metric_to_process} " ) - # TODO: Implement actual InfluxDB query - # Example implementation: - # from influxdb_client_3 import InfluxDBClient3 - # client = InfluxDBClient3( - # host=self.influxdb_host, - # token=self.influxdb_token, - # database=self.influxdb_database - # ) - # query = f''' - # SELECT * FROM metrics - # WHERE time >= now() - {sla_policy.time_window_seconds}s - # ''' - # return client.query(query) - return { - 'metrics': [], - 'timestamp_range': { - 'start': datetime.utcnow().isoformat(), - 'end': datetime.utcnow().isoformat() + + try: + query = ( + f"SELECT * FROM link_telemetry " + f"WHERE link_id = '{sla_policy.simap_id}' " + f"AND time >= now() - INTERVAL '{sla_policy.time_window_seconds} seconds' " + f"ORDER BY time DESC" + ) + + LOGGER.debug(f"Executing query: {query}") + + table = self._client.query(query=query, language="sql", mode="pandas") + + result = self.process_response_table(table, metric_to_process) + # metrics = result.get('metrics', []) + + # start_time = datetime.now(timezone.utc) + # end_time = datetime.now(timezone.utc) + # if metrics: + # times = [m.get('time') for m in metrics if m.get('time')] + # if times is not None: + # start_time = min(times) + # end_time = max(times) + + LOGGER.info(f"Fetched {len(result.get('metrics', []))} metric records for simap_id={sla_policy.simap_id}") + + return { + 'metrics': result.get('metrics', []), + 'metric_values': result.get('metric_values', []), + 'timestamp_range': { + # 'start': start_time.isoformat() if isinstance(start_time, datetime) else str(start_time), + # 'end': end_time.isoformat() if isinstance(end_time, datetime) else str(end_time) + } } - } + finally: + self._client.close() @RETRY_DECORATOR def notify_telemetry_update( diff --git a/src/tests/mwc26-f5ga/AI_analytics_engine/config/Config.py b/src/tests/mwc26-f5ga/AI_analytics_engine/config/Config.py index 5738bd5cc..f2633797e 100644 --- a/src/tests/mwc26-f5ga/AI_analytics_engine/config/Config.py +++ b/src/tests/mwc26-f5ga/AI_analytics_engine/config/Config.py @@ -29,7 +29,8 @@ SIMAP_SERVER_USERNAME = get_setting('SIMAP_SERVER_USERNAME', default='admin') SIMAP_SERVER_PASSWORD = get_setting('SIMAP_SERVER_PASSWORD', default='admin') # InfluxDB Configuration -INFLUXDB_HOST = get_setting('INFLUXDB_HOST', default='localhost') +# INFLUXDB_HOST = get_setting('INFLUXDB_HOST', default='localhost') +INFLUXDB_HOST = 'localhost' INFLUXDB_PORT = int(get_setting('INFLUXDB_PORT', default='8181')) INFLUXDB_TOKEN = get_setting('INFLUXDB_TOKEN', default='') INFLUXDB_DATABASE = get_setting('INFLUXDB_DATABASE', default='simap_telemetry') diff --git a/src/tests/mwc26-f5ga/AI_analytics_engine/tests/test_api.py b/src/tests/mwc26-f5ga/AI_analytics_engine/tests/test_api.py index 0f63948e4..1866cd97a 100644 --- a/src/tests/mwc26-f5ga/AI_analytics_engine/tests/test_api.py +++ b/src/tests/mwc26-f5ga/AI_analytics_engine/tests/test_api.py @@ -15,14 +15,30 @@ """ Test suite for AI Analytics Engine REST API. -This module tests the /api/v1/config endpoint sending HTTP requests. +This module tests the /api/v1/analyze endpoint by starting the server +and sending HTTP requests. """ +from csv import Error import logging +import os +import sys +import threading +import time import pytest import requests +# Add TFS src directory to path for 'common' module +# Must be done before any imports from the AI Analytics Engine +current_dir = os.path.dirname(os.path.abspath(__file__)) +tfs_src_dir = os.path.abspath(os.path.join(current_dir, '../../../..')) +if tfs_src_dir not in sys.path: + sys.path.insert(0, tfs_src_dir) + +# Now we can import from the AI Analytics Engine package +from AI_analytics_engine import AIAnalyticsEngineAPI + # Configure logging for tests logging.basicConfig( level=logging.DEBUG, @@ -32,64 +48,62 @@ LOGGER = logging.getLogger(__name__) # Test server configuration TEST_HOST = '127.0.0.1' -TEST_PORT = 8084 # 18080 port for manual testing +TEST_PORT = 8085 # 18080 port for manual testing BASE_URL = f'http://{TEST_HOST}:{TEST_PORT}' -def test_config_endpoint(): -# def test_config_endpoint(ai_engine_server): +@pytest.fixture(scope='module') +def ai_engine_server(): """ - Test GET /api/v1/config returns valid configuration. - - Validates that the config endpoint returns: - - HTTP 200 status code - - JSON response with 'simap', 'influxdb', and 'api' sections - - Expected configuration values + Fixture to start the AI Analytics Engine server in a background thread. + + Yields control to tests once server is ready, then performs cleanup. """ - LOGGER.info(">>>>>> Starting test_case test_config_endpoint: /api/v1/config endpoint") - # Send request to config endpoint - response = requests.get(f'{BASE_URL}/api/v1/config', timeout=5) + LOGGER.info("Starting AI Analytics Engine server fixture") + + # Override config for testing + os.environ['AI_ENGINE_REST_HOST'] = TEST_HOST + os.environ['AI_ENGINE_REST_PORT'] = str(TEST_PORT) + + # Create engine instance + engine = AIAnalyticsEngineAPI() + + # Start server in background thread + server_thread = threading.Thread( + target=lambda: engine.app.run( + host=TEST_HOST, + port=TEST_PORT, + debug=False, + use_reloader=False + ), + daemon=True + ) + server_thread.start() + + # Wait for server to be ready + max_retries = 15 + for i in range(max_retries): + try: + LOGGER.debug(f"Waiting for server to start... ({i+1}/{max_retries})") + response = requests.get(f'{BASE_URL}/api/v1/config', timeout=1) + if response.status_code == 200: + LOGGER.info("AI Analytics Engine server is ready") + break + except requests.exceptions.RequestException as e: + # LOGGER.debug(f"Server not ready yet: {e}") + if i < max_retries - 1: + time.sleep(5) + else: + raise RuntimeError("Failed to start AI Analytics Engine server") + + yield engine + + LOGGER.info("AI Analytics Engine server fixture cleanup complete") - # Validate response status - assert response.status_code == 200, f"Expected 200, got {response.status_code}" - # Parse JSON response - data = response.json() - LOGGER.info(f"Config response: {data}") - # Validate response structure - assert 'simap' in data, "Response missing 'simap' section" - assert 'influxdb' in data, "Response missing 'influxdb' section" - assert 'api' in data, "Response missing 'api' section" - - # Validate SIMAP configuration fields - simap = data['simap'] - assert 'scheme' in simap, "SIMAP config missing 'scheme'" - assert 'address' in simap, "SIMAP config missing 'address'" - assert 'port' in simap, "SIMAP config missing 'port'" - assert 'username' in simap, "SIMAP config missing 'username'" - assert 'password' in simap, "SIMAP config missing 'password'" - - # Validate InfluxDB configuration fields - influxdb = data['influxdb'] - assert 'host' in influxdb, "InfluxDB config missing 'host'" - assert 'port' in influxdb, "InfluxDB config missing 'port'" - assert 'token' in influxdb, "InfluxDB config missing 'token'" - assert 'database' in influxdb, "InfluxDB config missing 'database'" - - # Validate API configuration fields - api = data['api'] - assert 'host' in api, "API config missing 'host'" - assert 'port' in api, "API config missing 'port'" - assert api['host'] == '0.0.0.0', f"Expected host {TEST_HOST}, got {api['host']}" - assert api['port'] == 8080, f"Expected port {8080}, got {api['port']}" - - LOGGER.info("Config endpoint test passed!") - LOGGER.info("<<<<<< Finished test_case test_config_endpoint") - - -def test_analyze_endpoint(): +def test_analyze_endpoint(ai_engine_server): """ Test POST /api/v1/analyze endpoint. @@ -98,16 +112,20 @@ def test_analyze_endpoint(): - Returns appropriate status codes (200 for success, 503 for service unavailable) - Returns JSON response with status and message fields """ + + if ai_engine_server is Error: + pytest.fail("AI Analytics Engine server failed to start") + LOGGER.info(">>>>>> Starting test_case test_analyze_endpoint: POST /api/v1/analyze endpoint") # Prepare test payload with SLA policy configuration payload = { - "simap_id": "test-slice-123", + "simap_id": "E2E-L1", "sla_metrics": { "latency_threshold_ms": 10, - "bandwidth_utilization_threshold_pct": 80.0 + "bandwidth_utilization_threshold_pct": 0.0 }, - "window_size_sec": 300 + "window_size_sec": 600 } LOGGER.info(f"Sending analyze request with payload: {payload}") @@ -118,6 +136,8 @@ def test_analyze_endpoint(): json=payload, timeout=10 ) + + # Add condition to validate response status code and content LOGGER.info(f"Analyze response status: {response.status_code}") @@ -136,60 +156,16 @@ def test_analyze_endpoint(): assert data['status'] == 'success', f"Expected status 'success', got '{data['status']}'" assert 'data' in data, "Successful response missing 'data' field" elif response.status_code == 503: - LOGGER.warning("External service unavailable (expected if SIMAP/InfluxDB not running)") + # LOGGER.error("External service unavailable (expected if SIMAP/InfluxDB not running)") assert data['status'] == 'error', f"Expected status 'error' for 503, got '{data['status']}'" + pytest.fail("External service unavailable (expected if SIMAP/InfluxDB not running)") elif response.status_code == 400: LOGGER.error(f"Bad request: {data['message']}") assert data['status'] == 'error', f"Expected status 'error' for 400, got '{data['status']}'" + pytest.fail(f"Bad request: {data['message']}") else: pytest.fail(f"Unexpected status code: {response.status_code}") LOGGER.info("Analyze endpoint test passed!") LOGGER.info("<<<<<< Finished test_case test_analyze_endpoint") - -def test_notify_endpoint(): - """ - Test POST /api/v1/notify endpoint with valid status-only payload. - - Validates that the notify endpoint: - - Accepts valid notification payload with only "status" key - - Returns appropriate status codes (200 for success, 503 for service unavailable) - - Returns JSON response with status and message fields - """ - LOGGER.info(">>>>>> Starting test_case test_notify_endpoint: POST /api/v1/notify endpoint") - - payload = {"status": "UPGRADE"} - LOGGER.info(f"Sending notify request with payload: {payload}") - - # Send POST request to notify endpoint - response = requests.post( - f'{BASE_URL}/api/v1/notify', - json=payload, - timeout=10 - ) - - LOGGER.info(f"Notify response status: {response.status_code}") - - # Parse JSON response - data = response.json() - LOGGER.info(f"Notify response body: {data}") - - # Validate response structure - assert 'status' in data, "Response missing 'status' field" - assert 'message' in data, "Response missing 'message' field" - - # Accept either success (200) or service unavailable (503) - # 503 is expected if InfluxDB is not running - if response.status_code == 200: - LOGGER.info("Notification processed successfully") - assert data['status'] == 'success', f"Expected status 'success', got '{data['status']}'" - elif response.status_code == 503: - LOGGER.warning("InfluxDB unavailable (expected if InfluxDB not running)") - assert data['status'] == 'error', f"Expected status 'error' for 503, got '{data['status']}'" - else: - pytest.fail(f"Unexpected status code: {response.status_code}") - - LOGGER.info("Notify endpoint test passed!") - LOGGER.info("<<<<<< Finished test_case test_notify_endpoint") - diff --git a/src/tests/tools/simap_server/simap_client/SimapClient.py b/src/tests/tools/simap_server/simap_client/SimapClient.py index 7c4834511..96237ba12 100644 --- a/src/tests/tools/simap_server/simap_client/SimapClient.py +++ b/src/tests/tools/simap_server/simap_client/SimapClient.py @@ -23,9 +23,9 @@ class TerminationPoint: def __init__(self, restconf_client : RestConfClient, network_id : str, node_id : str, tp_id : str): self._restconf_client = restconf_client - self._network_id = network_id - self._node_id = node_id - self._tp_id = tp_id + self._network_id = network_id + self._node_id = node_id + self._tp_id = tp_id def create(self, supporting_termination_point_ids : List[Tuple[str, str, str]] = []) -> None: endpoint = TerminationPoint.ENDPOINT_ID.format(self._network_id, self._node_id, self._tp_id) @@ -68,8 +68,8 @@ class NodeTelemetry: def __init__(self, restconf_client : RestConfClient, network_id : str, node_id : str): self._restconf_client = restconf_client - self._network_id = network_id - self._node_id = node_id + self._network_id = network_id + self._node_id = node_id def create( self, cpu_utilization : float, related_service_ids : List[str] = [] @@ -177,8 +177,8 @@ class LinkTelemetry: def __init__(self, restconf_client : RestConfClient, network_id : str, link_id : str): self._restconf_client = restconf_client - self._network_id = network_id - self._link_id = link_id + self._network_id = network_id + self._link_id = link_id def create( self, bandwidth_utilization : float, latency : float, @@ -226,8 +226,8 @@ class Link: def __init__(self, restconf_client : RestConfClient, network_id : str, link_id : str): self._restconf_client = restconf_client - self._network_id = network_id - self._link_id = link_id + self._network_id = network_id + self._link_id = link_id self._telemetry : Optional[LinkTelemetry] = None @property @@ -283,8 +283,8 @@ class Network: ENDPOINT_ID = ENDPOINT_NO_ID + '/network={:s}' def __init__(self, restconf_client : RestConfClient, network_id : str): - self._restconf_client = restconf_client - self._network_id = network_id + self._restconf_client = restconf_client + self._network_id = network_id self._nodes : Dict[str, Node] = dict() self._links : Dict[str, Link] = dict() diff --git a/src/tests/tools/simap_server/simap_client/__main__.py b/src/tests/tools/simap_server/simap_client/__main__.py index 6e8fdcc3d..7ff6fa3b2 100644 --- a/src/tests/tools/simap_server/simap_client/__main__.py +++ b/src/tests/tools/simap_server/simap_client/__main__.py @@ -32,13 +32,12 @@ def main() -> None: logger=logging.getLogger('RestConfClient') ) simap_client = SimapClient(restconf_client) - generator = SimapMetricsGenerator(service_count=3) + generator = SimapMetricsGenerator(service_count=5) - # ---> Only need to be created once in the lifetime of the SIMAP server <--- # - create_simap_te(simap_client) - create_simap_trans(simap_client) - create_simap_aggnet(simap_client) - create_simap_e2enet(simap_client) + # create_simap_te(simap_client) + # create_simap_trans(simap_client) + # create_simap_aggnet(simap_client) + # create_simap_e2enet(simap_client) print('networks=', json.dumps(simap_client.networks())) -- GitLab From d0d1e2aa0bab1da7d3402ba872a74c91e5060702 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 30 Jan 2026 10:50:06 +0000 Subject: [PATCH 2/4] feat: Add deployment and cleanup scripts for AI Analytics Engine and SIMAP components --- scripts/run_tests_locally-simap-connector.sh | 35 +++++++++++++++ .../AI_analytics_engine/tests/run_test.sh | 45 +++++++++++++++++++ src/tests/mwc26-f5ga/deploy.sh | 40 +++++++++++++++++ src/tests/mwc26-f5ga/destroy.sh | 9 ++++ 4 files changed, 129 insertions(+) create mode 100755 scripts/run_tests_locally-simap-connector.sh create mode 100755 src/tests/mwc26-f5ga/AI_analytics_engine/tests/run_test.sh create mode 100755 src/tests/mwc26-f5ga/deploy.sh create mode 100755 src/tests/mwc26-f5ga/destroy.sh diff --git a/scripts/run_tests_locally-simap-connector.sh b/scripts/run_tests_locally-simap-connector.sh new file mode 100755 index 000000000..c9f519e7a --- /dev/null +++ b/scripts/run_tests_locally-simap-connector.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SIMAP Connector Integration Tests +# +# This test does NOT require: +# - TFS Context service +# - SIMAP server +# - Any external services +# +# Uses in-memory context store and mock HTTP server. + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src + +export PYTHONPATH=$PROJECTDIR/src + +# Run the mocked integration test (no external dependencies) +python3 -m pytest --verbose \ + --log-cli-level=DEBUG -v -s \ + tests/mwc26-f5ga/AI_analytics_engine/tests/test_api.py + diff --git a/src/tests/mwc26-f5ga/AI_analytics_engine/tests/run_test.sh b/src/tests/mwc26-f5ga/AI_analytics_engine/tests/run_test.sh new file mode 100755 index 000000000..1a1ee209e --- /dev/null +++ b/src/tests/mwc26-f5ga/AI_analytics_engine/tests/run_test.sh @@ -0,0 +1,45 @@ +#!/bin/bash +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Run AI Analytics Engine API tests +# +# Usage: ./run_test.sh + +# Navigate to TFS root directory +cd "$(dirname "$0")/../../../../.." + +# Set Python path to include TFS src and AI Analytics Engine +export PYTHONPATH="${PWD}/src:${PWD}/src/tests/mwc26-f5ga" + +# Activate virtual environment if not already activated +if [ -z "$VIRTUAL_ENV" ]; then + if [ -d "$HOME/.env-simap" ]; then + source "$HOME/.env-simap/bin/activate" + fi +fi + +# Define log file path +LOG_FILE="${PWD}/src/tests/mwc26-f5ga/AI_analytics_engine/tests/test_api.log" + +# Run the test with logging enabled and capture output +pytest src/tests/mwc26-f5ga/AI_analytics_engine/tests/test_api.py::test_analyze_endpoint \ + -v -s \ + --log-cli-level=DEBUG \ + --log-file="${LOG_FILE}" \ + --log-file-level=DEBUG \ + "$@" + +echo "" +echo "Test logs saved to: ${LOG_FILE}" diff --git a/src/tests/mwc26-f5ga/deploy.sh b/src/tests/mwc26-f5ga/deploy.sh new file mode 100755 index 000000000..a558102e1 --- /dev/null +++ b/src/tests/mwc26-f5ga/deploy.sh @@ -0,0 +1,40 @@ + +echo "Building SIMAP Server..." +cd ~/tfs-ctrl/ +docker buildx build -t simap-server:mock -f ./src/tests/tools/simap_server/Dockerfile . + +echo "Building NCE-FAN Controller..." +cd ~/tfs-ctrl/ +docker buildx build -t nce-fan-ctrl:mock -f ./src/tests/tools/mock_nce_fan_ctrl/Dockerfile . + +echo "Building NCE-T Controller..." +cd ~/tfs-ctrl/ +docker buildx build -t nce-t-ctrl:mock -f ./src/tests/tools/mock_nce_t_ctrl/Dockerfile . + +echo "Building AI Analytics Engine..." +cd ~/tfs-ctrl/ +docker buildx build -t ai-engine:latest -f ./src/tests/mwc26-f5ga/AI_analytics_engine/Dockerfile . + +# echo "Building Traffic Changer..." +# cd ~/tfs-ctrl/ +# docker buildx build -t traffic-changer:mock -f ./src/tests/tools/traffic_changer/Dockerfile . + +# echo "Cleaning up..." +# docker rm --force simap-server +# docker rm --force nce-fan-ctrl +# docker rm --force nce-t-ctrl +# docker rm --force ai-engine +# docker rm --force traffic-changer + +echo "Deploying support services..." +docker run --detach --name simap-server --publish 8080:8080 simap-server:mock +docker run --detach --name nce-fan-ctrl --publish 8081:8080 --env SIMAP_ADDRESS=172.17.0.1 --env SIMAP_PORT=8080 nce-fan-ctrl:mock +docker run --detach --name nce-t-ctrl --publish 8082:8080 --env SIMAP_ADDRESS=172.17.0.1 --env SIMAP_PORT=8080 nce-t-ctrl:mock + +echo "Deploying AI Analytics Engine..." +docker run --detach --name ai-engine --publish 8084:8080 --env SIMAP_SERVER_ADDRESS=172.17.0.1 --env SIMAP_SERVER_PORT=8080 ai-engine:latest +# docker run --detach --name traffic-changer --publish 8083:8080 traffic-changer:mock + +sleep 2 +docker ps -a +echo "Deployment complete." diff --git a/src/tests/mwc26-f5ga/destroy.sh b/src/tests/mwc26-f5ga/destroy.sh new file mode 100755 index 000000000..1e6d6953e --- /dev/null +++ b/src/tests/mwc26-f5ga/destroy.sh @@ -0,0 +1,9 @@ + +echo "Cleaning up..." +docker rm --force simap-server +docker rm --force nce-fan-ctrl +docker rm --force nce-t-ctrl +docker rm --force ai-engine +docker rm --force traffic-changer +sleep 2 +docker ps -a \ No newline at end of file -- GitLab From be0f0e0b90f7c2ff2618897df50388fcfd718cd5 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Wed, 15 Apr 2026 16:16:00 +0000 Subject: [PATCH 3/4] Tests - Tools - SIMAP AI Engine: - Fixed paths - Minor code fixes --- scripts/run_tests_locally-simap-connector.sh | 5 +- src/tests/mwc26-f5ga/deploy.sh | 40 ------------- src/tests/mwc26-f5ga/destroy.sh | 9 --- .../ai_engine/ai_model/ai_processor.py | 4 +- .../ai_engine/api/api_blueprint.py | 4 +- .../ai_engine/tests/test_api.py | 15 +++-- src/tests/tools/simap_ai_engine/deploy.sh | 59 +++++++++++++++++++ src/tests/tools/simap_ai_engine/destroy.sh | 25 ++++++++ 8 files changed, 99 insertions(+), 62 deletions(-) delete mode 100755 src/tests/mwc26-f5ga/deploy.sh delete mode 100755 src/tests/mwc26-f5ga/destroy.sh create mode 100755 src/tests/tools/simap_ai_engine/deploy.sh create mode 100755 src/tests/tools/simap_ai_engine/destroy.sh diff --git a/scripts/run_tests_locally-simap-connector.sh b/scripts/run_tests_locally-simap-connector.sh index c9f519e7a..2e6293dd9 100755 --- a/scripts/run_tests_locally-simap-connector.sh +++ b/scripts/run_tests_locally-simap-connector.sh @@ -26,10 +26,9 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src -export PYTHONPATH=$PROJECTDIR/src +export PYTHONPATH=$PROJECTDIR/src:$PROJECTDIR/src/tests/tools/simap_ai_engine # Run the mocked integration test (no external dependencies) python3 -m pytest --verbose \ --log-cli-level=DEBUG -v -s \ - tests/mwc26-f5ga/AI_analytics_engine/tests/test_api.py - + tests/tools/simap_ai_engine/ai_engine/tests/test_api.py diff --git a/src/tests/mwc26-f5ga/deploy.sh b/src/tests/mwc26-f5ga/deploy.sh deleted file mode 100755 index a558102e1..000000000 --- a/src/tests/mwc26-f5ga/deploy.sh +++ /dev/null @@ -1,40 +0,0 @@ - -echo "Building SIMAP Server..." -cd ~/tfs-ctrl/ -docker buildx build -t simap-server:mock -f ./src/tests/tools/simap_server/Dockerfile . - -echo "Building NCE-FAN Controller..." -cd ~/tfs-ctrl/ -docker buildx build -t nce-fan-ctrl:mock -f ./src/tests/tools/mock_nce_fan_ctrl/Dockerfile . - -echo "Building NCE-T Controller..." -cd ~/tfs-ctrl/ -docker buildx build -t nce-t-ctrl:mock -f ./src/tests/tools/mock_nce_t_ctrl/Dockerfile . - -echo "Building AI Analytics Engine..." -cd ~/tfs-ctrl/ -docker buildx build -t ai-engine:latest -f ./src/tests/mwc26-f5ga/AI_analytics_engine/Dockerfile . - -# echo "Building Traffic Changer..." -# cd ~/tfs-ctrl/ -# docker buildx build -t traffic-changer:mock -f ./src/tests/tools/traffic_changer/Dockerfile . - -# echo "Cleaning up..." -# docker rm --force simap-server -# docker rm --force nce-fan-ctrl -# docker rm --force nce-t-ctrl -# docker rm --force ai-engine -# docker rm --force traffic-changer - -echo "Deploying support services..." -docker run --detach --name simap-server --publish 8080:8080 simap-server:mock -docker run --detach --name nce-fan-ctrl --publish 8081:8080 --env SIMAP_ADDRESS=172.17.0.1 --env SIMAP_PORT=8080 nce-fan-ctrl:mock -docker run --detach --name nce-t-ctrl --publish 8082:8080 --env SIMAP_ADDRESS=172.17.0.1 --env SIMAP_PORT=8080 nce-t-ctrl:mock - -echo "Deploying AI Analytics Engine..." -docker run --detach --name ai-engine --publish 8084:8080 --env SIMAP_SERVER_ADDRESS=172.17.0.1 --env SIMAP_SERVER_PORT=8080 ai-engine:latest -# docker run --detach --name traffic-changer --publish 8083:8080 traffic-changer:mock - -sleep 2 -docker ps -a -echo "Deployment complete." diff --git a/src/tests/mwc26-f5ga/destroy.sh b/src/tests/mwc26-f5ga/destroy.sh deleted file mode 100755 index 1e6d6953e..000000000 --- a/src/tests/mwc26-f5ga/destroy.sh +++ /dev/null @@ -1,9 +0,0 @@ - -echo "Cleaning up..." -docker rm --force simap-server -docker rm --force nce-fan-ctrl -docker rm --force nce-t-ctrl -docker rm --force ai-engine -docker rm --force traffic-changer -sleep 2 -docker ps -a \ No newline at end of file diff --git a/src/tests/tools/simap_ai_engine/ai_engine/ai_model/ai_processor.py b/src/tests/tools/simap_ai_engine/ai_engine/ai_model/ai_processor.py index 5395e38ac..e9fa99ca2 100644 --- a/src/tests/tools/simap_ai_engine/ai_engine/ai_model/ai_processor.py +++ b/src/tests/tools/simap_ai_engine/ai_engine/ai_model/ai_processor.py @@ -18,7 +18,7 @@ Provides AI/ML processing functionality for SLA analysis. """ import logging -from datetime import datetime, UTC +from datetime import datetime, timezone from random import Random from typing import Any, Dict, Optional @@ -161,6 +161,6 @@ class AIModelProcessor: 'confidence_scores': score, 'summary': { 'sla_policy': sla_policy.to_dict(), - 'timestamp': datetime.now(UTC).isoformat() + 'timestamp': datetime.now(timezone.utc).isoformat() } } diff --git a/src/tests/tools/simap_ai_engine/ai_engine/api/api_blueprint.py b/src/tests/tools/simap_ai_engine/ai_engine/api/api_blueprint.py index bc2bd00c2..3a2865dfa 100644 --- a/src/tests/tools/simap_ai_engine/ai_engine/api/api_blueprint.py +++ b/src/tests/tools/simap_ai_engine/ai_engine/api/api_blueprint.py @@ -19,7 +19,7 @@ Defines the REST API endpoints for the AI Engine. """ import logging -from datetime import datetime, UTC +from datetime import datetime, timezone from flask import Blueprint, jsonify, request @@ -171,7 +171,7 @@ def create_ai_engine_blueprint( return jsonify({ 'status': 'healthy', 'service': 'AI Engine', - 'timestamp': datetime.now(UTC).isoformat() + 'timestamp': datetime.now(timezone.utc).isoformat() }), 200 @blueprint.route('/config', methods=['GET']) diff --git a/src/tests/tools/simap_ai_engine/ai_engine/tests/test_api.py b/src/tests/tools/simap_ai_engine/ai_engine/tests/test_api.py index c6a5ebb45..72bb1f0b2 100644 --- a/src/tests/tools/simap_ai_engine/ai_engine/tests/test_api.py +++ b/src/tests/tools/simap_ai_engine/ai_engine/tests/test_api.py @@ -29,15 +29,18 @@ import time import pytest import requests -# Add TFS src directory to path for 'common' module -# Must be done before any imports from the AI Analytics Engine +# Add TFS src directory and AI Engine package root to path. +# Must be done before any package imports. current_dir = os.path.dirname(os.path.abspath(__file__)) -tfs_src_dir = os.path.abspath(os.path.join(current_dir, '../../../..')) +tfs_src_dir = os.path.abspath(os.path.join(current_dir, '../../../../..')) +ai_engine_root_dir = os.path.abspath(os.path.join(current_dir, '../..')) if tfs_src_dir not in sys.path: sys.path.insert(0, tfs_src_dir) +if ai_engine_root_dir not in sys.path: + sys.path.insert(0, ai_engine_root_dir) -# Now we can import from the AI Analytics Engine package -from AI_analytics_engine import AIAnalyticsEngineAPI +# Now we can import from the AI Engine package +from ai_engine import AIEngineAPI # Configure logging for tests logging.basicConfig( @@ -67,7 +70,7 @@ def ai_engine_server(): os.environ['AI_ENGINE_REST_PORT'] = str(TEST_PORT) # Create engine instance - engine = AIAnalyticsEngineAPI() + engine = AIEngineAPI() # Start server in background thread server_thread = threading.Thread( diff --git a/src/tests/tools/simap_ai_engine/deploy.sh b/src/tests/tools/simap_ai_engine/deploy.sh new file mode 100755 index 000000000..cc683b1f0 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/deploy.sh @@ -0,0 +1,59 @@ +#!/bin/bash +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../.." && pwd)" + +echo "Building SIMAP Server..." +cd "${REPO_ROOT}" +docker buildx build -t simap-server:mock -f ./src/tests/tools/simap_server/Dockerfile . + +echo "Building NCE-FAN Controller..." +docker buildx build -t nce-fan-ctrl:mock -f ./src/tests/tools/mock_nce_fan_ctrl/Dockerfile . + +echo "Building NCE-T Controller..." +docker buildx build -t nce-t-ctrl:mock -f ./src/tests/tools/mock_nce_t_ctrl/Dockerfile . + +echo "Building AI Engine..." +docker buildx build -t ai-engine:latest -f ./src/tests/tools/simap_ai_engine/ai_engine/Dockerfile . + +# echo "Building Traffic Changer..." +# docker buildx build -t traffic-changer:mock -f ./src/tests/tools/traffic_changer/Dockerfile . + +echo "Deploying support services..." +docker run --detach --name simap-server --publish 8080:8080 simap-server:mock +docker run --detach --name nce-fan-ctrl --publish 8081:8080 \ + --env SIMAP_ADDRESS=172.17.0.1 \ + --env SIMAP_PORT=8080 \ + nce-fan-ctrl:mock +docker run --detach --name nce-t-ctrl --publish 8082:8080 \ + --env SIMAP_ADDRESS=172.17.0.1 \ + --env SIMAP_PORT=8080 \ + nce-t-ctrl:mock + +echo "Deploying AI Engine..." +docker run --detach --name ai-engine --publish 8084:8080 \ + --env SIMAP_DATASTORE_ADDRESS=172.17.0.1 \ + --env SIMAP_DATASTORE_PORT=8080 \ + --env SIMAP_DATASTORE_USERNAME=admin \ + --env SIMAP_DATASTORE_PASSWORD=admin \ + ai-engine:latest +# docker run --detach --name traffic-changer --publish 8083:8080 traffic-changer:mock + +sleep 2 +docker ps -a +echo "Deployment complete." diff --git a/src/tests/tools/simap_ai_engine/destroy.sh b/src/tests/tools/simap_ai_engine/destroy.sh new file mode 100755 index 000000000..67408a8ef --- /dev/null +++ b/src/tests/tools/simap_ai_engine/destroy.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +echo "Cleaning up..." +docker rm --force simap-server 2>/dev/null || true +docker rm --force nce-fan-ctrl 2>/dev/null || true +docker rm --force nce-t-ctrl 2>/dev/null || true +docker rm --force ai-engine 2>/dev/null || true +docker rm --force traffic-changer 2>/dev/null || true +sleep 2 +docker ps -a -- GitLab From 9356cc45d51b4423b50459e027895540f4356f20 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Wed, 15 Apr 2026 16:20:56 +0000 Subject: [PATCH 4/4] pre-merge code cleanup --- src/tests/tools/simap_ai_engine/ai_engine/config/Config.py | 3 +-- src/tests/tools/simap_ai_engine/deploy.sh | 6 +++--- src/tests/tools/simap_ai_engine/destroy.sh | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/tests/tools/simap_ai_engine/ai_engine/config/Config.py b/src/tests/tools/simap_ai_engine/ai_engine/config/Config.py index 28656c1d8..2f7569fa4 100644 --- a/src/tests/tools/simap_ai_engine/ai_engine/config/Config.py +++ b/src/tests/tools/simap_ai_engine/ai_engine/config/Config.py @@ -29,8 +29,7 @@ SIMAP_DATASTORE_USERNAME = get_setting('SIMAP_DATASTORE_USERNAME', default='admi SIMAP_DATASTORE_PASSWORD = get_setting('SIMAP_DATASTORE_PASSWORD', default='admin') # InfluxDB Configuration -# INFLUXDB_HOST = get_setting('INFLUXDB_HOST', default='localhost') -INFLUXDB_HOST = 'localhost' +INFLUXDB_HOST = get_setting('INFLUXDB_HOST', default='localhost') INFLUXDB_PORT = int(get_setting('INFLUXDB_PORT', default='8181')) INFLUXDB_TOKEN = get_setting('INFLUXDB_TOKEN', default='') INFLUXDB_DATABASE = get_setting('INFLUXDB_DATABASE', default='simap_telemetry') diff --git a/src/tests/tools/simap_ai_engine/deploy.sh b/src/tests/tools/simap_ai_engine/deploy.sh index cc683b1f0..d5495a2fa 100755 --- a/src/tests/tools/simap_ai_engine/deploy.sh +++ b/src/tests/tools/simap_ai_engine/deploy.sh @@ -18,9 +18,9 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../.." && pwd)" -echo "Building SIMAP Server..." +echo "Building SIMAP DataStore..." cd "${REPO_ROOT}" -docker buildx build -t simap-server:mock -f ./src/tests/tools/simap_server/Dockerfile . +docker buildx build -t simap-datastore:mock -f ./src/tests/tools/simap_datastore/Dockerfile . echo "Building NCE-FAN Controller..." docker buildx build -t nce-fan-ctrl:mock -f ./src/tests/tools/mock_nce_fan_ctrl/Dockerfile . @@ -35,7 +35,7 @@ docker buildx build -t ai-engine:latest -f ./src/tests/tools/simap_ai_engine/ai_ # docker buildx build -t traffic-changer:mock -f ./src/tests/tools/traffic_changer/Dockerfile . echo "Deploying support services..." -docker run --detach --name simap-server --publish 8080:8080 simap-server:mock +docker run --detach --name simap-datastore --publish 8080:8080 simap-datastore:mock docker run --detach --name nce-fan-ctrl --publish 8081:8080 \ --env SIMAP_ADDRESS=172.17.0.1 \ --env SIMAP_PORT=8080 \ diff --git a/src/tests/tools/simap_ai_engine/destroy.sh b/src/tests/tools/simap_ai_engine/destroy.sh index 67408a8ef..6da3f9840 100755 --- a/src/tests/tools/simap_ai_engine/destroy.sh +++ b/src/tests/tools/simap_ai_engine/destroy.sh @@ -16,7 +16,7 @@ set -euo pipefail echo "Cleaning up..." -docker rm --force simap-server 2>/dev/null || true +docker rm --force simap-datastore 2>/dev/null || true docker rm --force nce-fan-ctrl 2>/dev/null || true docker rm --force nce-t-ctrl 2>/dev/null || true docker rm --force ai-engine 2>/dev/null || true -- GitLab