Commit 8622c047 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/366-cttc-simap-ai-engine-framework' into 'develop'

Resolve "(CTTC) SIMAP AI-Engine Framework"

See merge request !420
parents c09fb126 6204edc5
Loading
Loading
Loading
Loading
+34 −0
Original line number Diff line number Diff line
#!/bin/bash
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# SIMAP Connector Integration Tests
#
# This test does NOT require:
# - TFS Context service
# - SIMAP server
# - Any external services
#
# Uses in-memory context store and mock HTTP server.

PROJECTDIR=`pwd`

cd $PROJECTDIR/src

export PYTHONPATH=$PROJECTDIR/src:$PROJECTDIR/src/tests/tools/simap_ai_engine

# Run the mocked integration test (no external dependencies)
python3 -m pytest --verbose \
    --log-cli-level=DEBUG -v -s \
    tests/tools/simap_ai_engine/ai_engine/tests/test_api.py
+1 −1
Original line number Diff line number Diff line
@@ -73,7 +73,7 @@ def main():
    except: # pylint: disable=bare-except # pragma: no cover
        LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url)))

    rebuild_database(db_engine)
    rebuild_database(db_engine)         # drop and recreate all tables

    restconf_client = RestConfClient(
        scheme   = SIMAP_DATASTORE_SCHEME, address  = SIMAP_DATASTORE_ADDRESS,
+64 −1
Original line number Diff line number Diff line
@@ -18,7 +18,8 @@ from typing import Any, Optional, Set
from common.Constants import DEFAULT_TOPOLOGY_NAME
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import (
    ContextEvent, DeviceEvent, Empty, LinkEvent, ServiceEvent, SliceEvent, TopologyEvent
    ContextEvent, DeviceEvent, Empty, LinkEvent, ServiceEvent,
    SliceEvent, TopologyEvent, ConnectionEvent
)
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher
@@ -87,6 +88,7 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Skipping Slice Event: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(slice_event)))

#  ----- Topology Events -----

    def _dispatch_topology_set(self, topology_event : TopologyEvent) -> bool:
        MSG = 'Processing Topology Event: {:s}'
@@ -137,6 +139,7 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Topology Remove: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(topology_event)))

#  ----- Device Events -----

    def _dispatch_device_set(self, device_event : DeviceEvent) -> bool:
        MSG = 'Processing Device Event: {:s}'
@@ -270,6 +273,7 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Device Remove: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))

# ----- Link Events -----

    def _dispatch_link_set(self, link_event : LinkEvent) -> bool:
        MSG = 'Processing Link Event: {:s}'
@@ -450,6 +454,7 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Link Removed: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(link_event)))

#  ----- Service Events -----

    def _dispatch_service_set(self, service_event : ServiceEvent) -> bool:
        MSG = 'Processing Service Event: {:s}'
@@ -652,6 +657,64 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Logical Link Removed for Service: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))

#   ----- Connection Events -----

    def dispatch_connection_set(self, connection_event : ConnectionEvent) -> bool:
        MSG = 'Processing Connection Event: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(connection_event)))

        #  Here a connection object from context is received in connection_event.
        #  Here is gRPC message definition: message Connection {  ConnectionId connection_id = 1;  ServiceId service_id = 2;  repeated EndPointId path_hops_endpoint_ids = 3;  repeated ServiceId sub_service_ids = 4;  ConnectionSettings settings = 5;}
        #  discard sub_service_ids and settings for now, as not used in SIMAP population.
        #  Extract service_id, endpoint_ids from connection_event to identify the connection.
        #  Get all links using gRPC ListLinkIds() from context,  and find which link(s) correspond to the connection's endpoint_ids.
        #  Then update SIMAP accordingly.
        #  Then, do this only for connections that correspond to links that this controller is allowed to manage, as per ALLOWED_LINKS_PER_CONTROLLER.
        #  Then, do something like this (pseudocode):
        # worker_name = '{:s}:{:s}'.format(topology_name, link_name)
        # resources = Resources()
        # resources.links.append(ResourceLink(
        #     domain_name=topology_name, link_name=link_name,
        #     bandwidth_utilization_sampler=SyntheticSampler.create_random(
        #         amplitude_scale = 25.0,
        #         phase_scale     = 1e-7,
        #         period_scale    = 86_400,
        #         offset_scale    = 25,
        #         noise_ratio     = 0.05,
        #         min_value       = 0.0,
        #         max_value       = 100.0,
        #     ),
        #     latency_sampler=SyntheticSampler.create_random(
        #         amplitude_scale = 0.5,
        #         phase_scale     = 1e-7,
        #         period_scale    = 60.0,
        #         offset_scale    = 10.0,
        #         noise_ratio     = 0.05,
        #         min_value       = 0.0,
        #     ),
        #     related_service_ids=[],
        # ))
        # sampling_interval = 1.0
        # self._telemetry_pool.start_synthesizer(worker_name, resources, sampling_interval)

        return True

    def dispatch_connection_create(self, connection_event : ConnectionEvent) -> None:
        if not self.dispatch_connection_set(connection_event): return

        MSG = 'Skipping Connection Create Event: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event)))
    
    def dispatch_connection_update(self, connection_event : ConnectionEvent) -> None:
        if not self.dispatch_connection_set(connection_event): return

        MSG = 'Skipping Connection Update Event: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event)))
    
    def dispatch_connection_remove(self, connection_event : ConnectionEvent) -> None:
        MSG = 'Skipping Connection Remove Event: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event)))
    

class SimapUpdater:
    def __init__(
+98 −16
Original line number Diff line number Diff line
@@ -18,9 +18,13 @@ Provides AI/ML processing functionality for SLA analysis.
"""

import logging
from datetime import datetime, UTC
from datetime import datetime, timezone
from random import Random
from typing import Any, Dict
from typing import Any, Dict, Optional

from numpy import average
import pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing

from .sla_policy import SLAPolicyConfig

@@ -46,39 +50,117 @@ class AIModelProcessor:
        # TODO: Load AI/ML models here
        # Example: self.model = load_model('sla_violation_detector.h5')


    def ai_model_processor(
        self,
        metric_values: list[float]
    ) -> Optional[list[float]]:
        """
        Process device and performance data through AI models.

        Args:
            metric_values: List of performance metric values.
        Returns:
            List of 3 forecasted values, or None if insufficient data.

        """
        LOGGER.debug("Processing data through AI models")
        # LOGGER.debug(f"Number of performance data points: {len(metric_values)}")

        if not metric_values or len(metric_values) < 4:
            LOGGER.warning("Insufficient data for forecasting")
            return None

        try:
            # Convert metric_values to pandas Series
            data = pd.Series(metric_values)
            
            # Create and fit Exponential Smoothing model
            model = ExponentialSmoothing(
                data,
                trend="add",
                seasonal=None  # No seasonal component for this data
            )
            
            fit = model.fit()
            
            # Forecast next 3 values
            forecast = fit.forecast(steps=3)
            
            forecasted_values = forecast.tolist()
            
            # Calculate confidence score based on model fit quality
            # Using residual standard error as inverse confidence metric
            residuals = fit.resid
            mse = (residuals ** 2).mean()
            rmse = mse ** 0.5
            
            # Normalize confidence: lower RMSE = higher confidence
            # Use data scale (std dev) to normalize RMSE
            data_std = data.std()
            if data_std > 0:
                normalized_error = rmse / data_std
                # Convert to confidence score (0-1 range, higher is better)
                confidence = max(0, min(1, 1 - normalized_error))
            else:
                confidence = 0.5  # Default if std dev is 0
            
            # LOGGER.info(f"Model RMSE: {rmse:.4f}, Confidence: {confidence:.4f}")
            LOGGER.info(f"Forecasted next 3 values: {forecasted_values}")
            
            # return forecasted_values
            return [confidence]
        
        except Exception as e:
            LOGGER.error(f"Error during forecasting: {e}")
            return None


    def process_data(
        self,
        device_data: Dict[str, Any],
        performance_data: Dict[str, Any],
        sla_policy: SLAPolicyConfig
    ) -> Dict[str, Any]:
        """
        Process device and performance data through AI models.

        Analyzes the collected data against the SLA policy thresholds,
        detects violations, and generates recommendations for remediation.

        Args:
            device_data: Device and topology data from SIMAP.
            performance_data: Performance metrics from InfluxDB.
            sla_policy: The SLA policy configuration with thresholds.

        Returns:
            Dictionary containing:
                - 'violations': List of detected SLA violations with details.
                - 'recommendations': List of recommended actions.
                - 'confidence_scores': AI model confidence scores.
                - 'summary': Dictionary with analysis summary statistics.
        """
        LOGGER.debug("Processing data through AI models")
        # TODO: Implement actual AI/ML processing logic
        # Example:
        # violations = self.model.predict(features)
        # recommendations = self.generate_recommendations(violations)

        metric_values = performance_data.get('metric_values', [])
        LOGGER.debug(f"Number of performance data points: {len(metric_values)}")
        # LOGGER.debug(f"Performance data values: {metric_values}")

        # forecasted_values = self.ai_model_processor(metric_values)
        # if forecasted_values is None:
        #     LOGGER.warning("AI model processing failed or insufficient data")
        
        # if forecasted_values:
        #     # Exponential weights: more weight on earlier (starting) values
        #     # Example: for 3 values -> weights = [0.5, 0.33, 0.17] (exponential decay)
        #     weights = [2**(-i) for i in range(len(forecasted_values))]
        #     # Normalize weights to sum to 1
        #     total_weight = sum(weights)
        #     weights = [w / total_weight for w in weights]
        #     score = average(forecasted_values, weights=weights)
        #     # LOGGER.debug(f"Weighted average with exponential weights: {score}, weights: {weights}")
        # else:
        #     score = None

        score = self.ai_model_processor(metric_values)

        return {
            'violations': [],
            'confidence_scores': round(Random().random(), 2),  
            'confidence_scores': score,  
            'summary': {
            'sla_policy': sla_policy.to_dict(),
            'timestamp': datetime.now(UTC).isoformat()
            'timestamp': datetime.now(timezone.utc).isoformat()
            }
        }
+8 −8
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ SLA Policy Configuration dataclass.

Defines the structure for SLA policy configurations used in AI analysis.
"""

from __future__ import annotations
from dataclasses import asdict, dataclass
from typing import Any, Dict

@@ -35,12 +35,12 @@ class SLAPolicyConfig:
        time_window_seconds: Time window in seconds for data analysis.
    """
    simap_id: str
    latency_threshold_ms: float
    bandwidth_utilization_threshold_pct: float
    latency_threshold_ms: float|None
    bandwidth_utilization_threshold_pct: float|None
    time_window_seconds: int

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'SLAPolicyConfig':
    def from_dict(cls, data: Dict[str, Any]) -> SLAPolicyConfig:
        """
        Create an SLAPolicyConfig instance from a dictionary.

@@ -62,7 +62,7 @@ class SLAPolicyConfig:
            simap_id             = str(data['simap_id'])
            metrics              = data['sla_metrics']
            latency_threshold_ms = float(metrics['latency_threshold_ms'])
            bandwidth_threshold  = float(metrics.get('bandwidth_utilization_threshold_pct'))
            bandwidth_threshold  = float(metrics.get('bandwidth_utilization_threshold_pct', 0.0))
            time_window          = int(data.get('window_size_sec', 300))
            
            return cls(
Loading