Commit dd69461b authored by Waleed Akbar's avatar Waleed Akbar
Browse files

feat: Enhance AI Analytics Engine with new connection event handling and forecasting capabilities

parent 983c0282
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -73,7 +73,7 @@ def main():
    except: # pylint: disable=bare-except # pragma: no cover
        LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url)))

    rebuild_database(db_engine)
    rebuild_database(db_engine)         # drop and recreate all tables

    restconf_client = RestConfClient(
        scheme   = SIMAP_SERVER_SCHEME, address  = SIMAP_SERVER_ADDRESS,
+64 −1
Original line number Diff line number Diff line
@@ -18,7 +18,8 @@ from typing import Any, Optional, Set
from common.Constants import DEFAULT_TOPOLOGY_NAME
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import (
    ContextEvent, DeviceEvent, Empty, LinkEvent, ServiceEvent, SliceEvent, TopologyEvent
    ContextEvent, DeviceEvent, Empty, LinkEvent, ServiceEvent,
    SliceEvent, TopologyEvent, ConnectionEvent
)
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher
@@ -87,6 +88,7 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Skipping Slice Event: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(slice_event)))

#  ----- Topology Events -----

    def _dispatch_topology_set(self, topology_event : TopologyEvent) -> bool:
        MSG = 'Processing Topology Event: {:s}'
@@ -137,6 +139,7 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Topology Remove: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(topology_event)))

#  ----- Device Events -----

    def _dispatch_device_set(self, device_event : DeviceEvent) -> bool:
        MSG = 'Processing Device Event: {:s}'
@@ -270,6 +273,7 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Device Remove: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))

# ----- Link Events -----

    def _dispatch_link_set(self, link_event : LinkEvent) -> bool:
        MSG = 'Processing Link Event: {:s}'
@@ -450,6 +454,7 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Link Removed: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(link_event)))

#  ----- Service Events -----

    def _dispatch_service_set(self, service_event : ServiceEvent) -> bool:
        MSG = 'Processing Service Event: {:s}'
@@ -652,6 +657,64 @@ class EventDispatcher(BaseEventDispatcher):
        MSG = 'Logical Link Removed for Service: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))

#   ----- Connection Events -----

    def dispatch_connection_set(self, connection_event : ConnectionEvent) -> bool:
        MSG = 'Processing Connection Event: {:s}'
        LOGGER.info(MSG.format(grpc_message_to_json_string(connection_event)))

        #  Here a connection object from context is received in connection_event.
        #  Here is gRPC message definition: message Connection {  ConnectionId connection_id = 1;  ServiceId service_id = 2;  repeated EndPointId path_hops_endpoint_ids = 3;  repeated ServiceId sub_service_ids = 4;  ConnectionSettings settings = 5;}
        #  discard sub_service_ids and settings for now, as not used in SIMAP population.
        #  Extract service_id, endpoint_ids from connection_event to identify the connection.
        #  Get all links using gRPC ListLinkIds() from context,  and find which link(s) correspond to the connection's endpoint_ids.
        #  Then update SIMAP accordingly.
        #  Then, do this only for connections that correspond to links that this controller is allowed to manage, as per ALLOWED_LINKS_PER_CONTROLLER.
        #  Then, do something like this (pseudocode):
        # worker_name = '{:s}:{:s}'.format(topology_name, link_name)
        # resources = Resources()
        # resources.links.append(ResourceLink(
        #     domain_name=topology_name, link_name=link_name,
        #     bandwidth_utilization_sampler=SyntheticSampler.create_random(
        #         amplitude_scale = 25.0,
        #         phase_scale     = 1e-7,
        #         period_scale    = 86_400,
        #         offset_scale    = 25,
        #         noise_ratio     = 0.05,
        #         min_value       = 0.0,
        #         max_value       = 100.0,
        #     ),
        #     latency_sampler=SyntheticSampler.create_random(
        #         amplitude_scale = 0.5,
        #         phase_scale     = 1e-7,
        #         period_scale    = 60.0,
        #         offset_scale    = 10.0,
        #         noise_ratio     = 0.05,
        #         min_value       = 0.0,
        #     ),
        #     related_service_ids=[],
        # ))
        # sampling_interval = 1.0
        # self._telemetry_pool.start_synthesizer(worker_name, resources, sampling_interval)

        return True

    def dispatch_connection_create(self, connection_event : ConnectionEvent) -> None:
        if not self.dispatch_connection_set(connection_event): return

        MSG = 'Skipping Connection Create Event: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event)))
    
    def dispatch_connection_update(self, connection_event : ConnectionEvent) -> None:
        if not self.dispatch_connection_set(connection_event): return

        MSG = 'Skipping Connection Update Event: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event)))
    
    def dispatch_connection_remove(self, connection_event : ConnectionEvent) -> None:
        MSG = 'Skipping Connection Remove Event: {:s}'
        LOGGER.debug(MSG.format(grpc_message_to_json_string(connection_event)))
    

class SimapUpdater:
    def __init__(
+96 −14
Original line number Diff line number Diff line
@@ -20,7 +20,11 @@ Provides AI/ML processing functionality for SLA analysis.
import logging
from datetime import datetime, UTC
from random import Random
from typing import Any, Dict
from typing import Any, Dict, Optional

from numpy import average
import pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing

from .sla_policy import SLAPolicyConfig

@@ -46,37 +50,115 @@ class AIModelProcessor:
        # TODO: Load AI/ML models here
        # Example: self.model = load_model('sla_violation_detector.h5')


    def ai_model_processor(
        self,
        metric_values: list[float]
    ) -> Optional[list[float]]:
        """
        Process device and performance data through AI models.

        Args:
            metric_values: List of performance metric values.
        Returns:
            List of 3 forecasted values, or None if insufficient data.

        """
        LOGGER.debug("Processing data through AI models")
        # LOGGER.debug(f"Number of performance data points: {len(metric_values)}")

        if not metric_values or len(metric_values) < 4:
            LOGGER.warning("Insufficient data for forecasting")
            return None

        try:
            # Convert metric_values to pandas Series
            data = pd.Series(metric_values)
            
            # Create and fit Exponential Smoothing model
            model = ExponentialSmoothing(
                data,
                trend="add",
                seasonal=None  # No seasonal component for this data
            )
            
            fit = model.fit()
            
            # Forecast next 3 values
            forecast = fit.forecast(steps=3)
            
            forecasted_values = forecast.tolist()
            
            # Calculate confidence score based on model fit quality
            # Using residual standard error as inverse confidence metric
            residuals = fit.resid
            mse = (residuals ** 2).mean()
            rmse = mse ** 0.5
            
            # Normalize confidence: lower RMSE = higher confidence
            # Use data scale (std dev) to normalize RMSE
            data_std = data.std()
            if data_std > 0:
                normalized_error = rmse / data_std
                # Convert to confidence score (0-1 range, higher is better)
                confidence = max(0, min(1, 1 - normalized_error))
            else:
                confidence = 0.5  # Default if std dev is 0
            
            # LOGGER.info(f"Model RMSE: {rmse:.4f}, Confidence: {confidence:.4f}")
            LOGGER.info(f"Forecasted next 3 values: {forecasted_values}")
            
            # return forecasted_values
            return [confidence]
        
        except Exception as e:
            LOGGER.error(f"Error during forecasting: {e}")
            return None


    def process_data(
        self,
        device_data: Dict[str, Any],
        performance_data: Dict[str, Any],
        sla_policy: SLAPolicyConfig
    ) -> Dict[str, Any]:
        """
        Process device and performance data through AI models.

        Analyzes the collected data against the SLA policy thresholds,
        detects violations, and generates recommendations for remediation.

        Args:
            device_data: Device and topology data from SIMAP.
            performance_data: Performance metrics from InfluxDB.
            sla_policy: The SLA policy configuration with thresholds.

        Returns:
            Dictionary containing:
                - 'violations': List of detected SLA violations with details.
                - 'recommendations': List of recommended actions.
                - 'confidence_scores': AI model confidence scores.
                - 'summary': Dictionary with analysis summary statistics.
        """
        LOGGER.debug("Processing data through AI models")
        # TODO: Implement actual AI/ML processing logic
        # Example:
        # violations = self.model.predict(features)
        # recommendations = self.generate_recommendations(violations)

        metric_values = performance_data.get('metric_values', [])
        LOGGER.debug(f"Number of performance data points: {len(metric_values)}")
        # LOGGER.debug(f"Performance data values: {metric_values}")

        # forecasted_values = self.ai_model_processor(metric_values)
        # if forecasted_values is None:
        #     LOGGER.warning("AI model processing failed or insufficient data")
        
        # if forecasted_values:
        #     # Exponential weights: more weight on earlier (starting) values
        #     # Example: for 3 values -> weights = [0.5, 0.33, 0.17] (exponential decay)
        #     weights = [2**(-i) for i in range(len(forecasted_values))]
        #     # Normalize weights to sum to 1
        #     total_weight = sum(weights)
        #     weights = [w / total_weight for w in weights]
        #     score = average(forecasted_values, weights=weights)
        #     # LOGGER.debug(f"Weighted average with exponential weights: {score}, weights: {weights}")
        # else:
        #     score = None

        score = self.ai_model_processor(metric_values)

        return {
            'violations': [],
            'confidence_scores': round(Random().random(), 2),  
            'confidence_scores': score,  
            'summary': {
            'sla_policy': sla_policy.to_dict(),
            'timestamp': datetime.now(UTC).isoformat()
+8 −8
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ SLA Policy Configuration dataclass.

Defines the structure for SLA policy configurations used in AI analysis.
"""

from __future__ import annotations
from dataclasses import asdict, dataclass
from typing import Any, Dict

@@ -35,12 +35,12 @@ class SLAPolicyConfig:
        time_window_seconds: Time window in seconds for data analysis.
    """
    simap_id: str
    latency_threshold_ms: float
    bandwidth_utilization_threshold_pct: float
    latency_threshold_ms: float|None
    bandwidth_utilization_threshold_pct: float|None
    time_window_seconds: int

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'SLAPolicyConfig':
    def from_dict(cls, data: Dict[str, Any]) -> SLAPolicyConfig:
        """
        Create an SLAPolicyConfig instance from a dictionary.

@@ -62,7 +62,7 @@ class SLAPolicyConfig:
            simap_id             = str(data['simap_id'])
            metrics              = data['sla_metrics']
            latency_threshold_ms = float(metrics['latency_threshold_ms'])
            bandwidth_threshold  = float(metrics.get('bandwidth_utilization_threshold_pct'))
            bandwidth_threshold  = float(metrics.get('bandwidth_utilization_threshold_pct', 0.0))
            time_window          = int(data.get('window_size_sec', 300))
            
            return cls(
+12 −11
Original line number Diff line number Diff line
@@ -66,7 +66,7 @@ def create_ai_analytics_blueprint(
        Run SLA policy analysis.

        Expects JSON payload with SLA policy configuration.
        Orchestrates the full analysis workflow: fetch data from SIMAP,
        Orchestrates the full analysis workflow: 
        fetch metrics from InfluxDB, process through AI models, and
        send results to Decision Engine.

@@ -111,23 +111,24 @@ def create_ai_analytics_blueprint(
        # Execute analysis workflow
        try:
            # Step 1: Fetch device data from SIMAP 
            LOGGER.debug("Step 1: Fetching device data from SIMAP")
            device_data = simap_fetcher.fetch_device_data(sla_policy)
            # (At the moment, leaving it as it is. No more needed, to be removed in future)
            # LOGGER.debug("Step 1: Fetching device data from SIMAP")
            # device_data = simap_fetcher.fetch_device_data(sla_policy)

            # Step 2: Fetch performance data from InfluxDB
            LOGGER.debug("Step 2: Fetching performance data from InfluxDB")
            LOGGER.debug(">>> Step 2: Fetching performance data from InfluxDB")
            performance_data = influxdb_fetcher.fetch_performance_data(
                sla_policy, device_data
                sla_policy
            )

            # Step 3: Process data through AI models
            LOGGER.debug("Step 3: Processing data through AI models")
            # >>> Step 3: Process data through AI models
            LOGGER.debug(">>> Step 3: Processing data through AI models")
            results = ai_processor.process_data(
                device_data, performance_data, sla_policy
                performance_data, sla_policy
            )

            # Step 4: Send results to Decision Engine
            LOGGER.debug("Step 4: Sending results to Decision Engine")
            # >>> Step 4: Send results to Decision Engine
            LOGGER.debug(">>> Step 4: Sending results to Decision Engine")
            if not decision_client.send_results(results):
                LOGGER.error("Failed to send results to Decision Engine")
                return jsonify({
Loading