Skip to content
Snippets Groups Projects
Select Git revision
  • 9fedfbf4a74a1523f9b48ffa0f7d07724e813c59
  • master default
  • cnit_ofc26
  • feat/344-implement-a-new-firewall-agent-controllable-through-restconf-openconfig
  • feat/343-integration-of-mimir-deployment-in-production-environment
  • ofc_polimi
  • feat/305-cttc-enhanced-netconf-openconfig-sbi-driver-for-dscm-pluggables
  • feat/306-cttc-enhanced-restconf-based-openconfig-nbi-for-dscm-pluggables
  • feat/301-cttc-dscm-pluggables
  • CTTC-IMPLEMENT-NBI-CONNECTOR-NOS-ZTP
  • CTTC-TEST-SMARTNICS-6GMICROSDN-ZTP
  • develop protected
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit_tapi
  • feat/330-tid-pcep-component
  • feat/tid-newer-pcep-component
  • feat/116-ubi-updates-in-telemetry-backend-to-support-p4-in-band-network-telemetry
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

test_unitary.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    AnalyzerHandlers.py 5.53 KiB
    # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import logging
    from enum import Enum
    import pandas as pd
    
    logger = logging.getLogger(__name__)
    
    
    class Handlers(Enum):
        AGGREGATION_HANDLER = "AggregationHandler"
        UNSUPPORTED_HANDLER = "UnsupportedHandler"
    
        @classmethod
        def is_valid_handler(cls, handler_name):
            return handler_name in cls._value2member_map_
    
    # This method is top-level and should not be part of the class due to serialization issues.
    def threshold_handler(key, aggregated_df, thresholds):
        """
        Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
        on the aggregated DataFrame.
    
        Args:
            key (str): Key for the aggregated DataFrame.
            aggregated_df (pd.DataFrame): DataFrame with aggregated metrics.
            thresholds (dict): Thresholds dictionary with keys in the format '<metricName>' and values as (fail_th, raise_th).
    
        Returns:
            pd.DataFrame: DataFrame with additional threshold columns.
        """
        for metric_name, threshold_values in thresholds.items():
            # Ensure the metric column exists in the DataFrame
            if metric_name not in aggregated_df.columns:
                logger.warning(f"Metric '{metric_name}' does not exist in the DataFrame for key: {key}. Skipping threshold application.")
                continue
            
            # Ensure the threshold values are valid (check for tuple specifically)
            if isinstance(threshold_values, list) and len(threshold_values) == 2:
                fail_th, raise_th = threshold_values
                
                # Add threshold columns with updated naming
                aggregated_df[f"{metric_name}_TH_RAISE"] = aggregated_df[metric_name] > raise_th
                aggregated_df[f"{metric_name}_TH_FALL"]  = aggregated_df[metric_name] < fail_th
            else:
                logger.warning(f"Threshold values for '{metric_name}' ({threshold_values}) are not a list of length 2. Skipping threshold application.")
        return aggregated_df
    
    def aggregation_handler(
            batch_type_name, key, batch, input_kpi_list, output_kpi_list, thresholds
        ):
        """
          Process a batch of data and calculate aggregated values for each input KPI
          and maps them to the output KPIs. """
    
        logger.info(f"({batch_type_name}) Processing batch for key: {key}")
        if not batch:
            logger.info("Empty batch received. Skipping processing.")
            return []
        else:
            logger.info(f" >>>>> Processing {len(batch)} records for key: {key}")
            
            # Convert data into a DataFrame
            df = pd.DataFrame(batch)
    
            # Filter the DataFrame to retain rows where kpi_id is in the input list (subscribed endpoints only)
            df = df[df['kpi_id'].isin(input_kpi_list)].copy()
    
            if df.empty:
                logger.warning(f"No data available for KPIs: {input_kpi_list}. Skipping processing.")
                return []
    
            # Define all possible aggregation methods
            aggregation_methods = {
                "min"     : ('kpi_value', 'min'),
                "max"     : ('kpi_value', 'max'),
                "avg"     : ('kpi_value', 'mean'),
                "first"   : ('kpi_value', lambda x: x.iloc[0]),
                "last"    : ('kpi_value', lambda x: x.iloc[-1]),
                "variance": ('kpi_value', 'var'),
                "count"   : ('kpi_value', 'count'),
                "range"   : ('kpi_value', lambda x: x.max() - x.min()),
                "sum"     : ('kpi_value', 'sum'),
            }
    
            results = []
            
            # Process each KPI-specific task parameter
            for kpi_index, kpi_id in enumerate(input_kpi_list):
    
                # logger.info(f"1.Processing KPI: {kpi_id}")
                kpi_task_parameters = thresholds["task_parameter"][kpi_index]
                
                # Get valid task parameters for this KPI
                valid_task_parameters = [
                    method for method in kpi_task_parameters.keys() 
                    if method in aggregation_methods
                ]
    
                # Select the aggregation methods based on valid task parameters
                selected_methods = {method: aggregation_methods[method] for method in valid_task_parameters}
    
                # logger.info(f"2. Processing KPI: {kpi_id} with task parameters: {kpi_task_parameters}")
                kpi_df = df[df['kpi_id'] == kpi_id]
    
                # Check if kpi_df is not empty before applying the aggregation methods
                if not kpi_df.empty:
                    agg_df = kpi_df.groupby('kpi_id').agg(**selected_methods).reset_index()
    
                    # logger.info(f"3. Aggregated DataFrame for KPI: {kpi_id}: {agg_df}")
    
                    agg_df['kpi_id'] = output_kpi_list[kpi_index]
    
                    # logger.info(f"4. Applying thresholds for df: {agg_df['kpi_id']}")
                    record = threshold_handler(key, agg_df, kpi_task_parameters)
    
                    results.extend(record.to_dict(orient='records'))
                else:
                    logger.warning(f"No data available for KPIs: {kpi_id}. Skipping aggregation.")
                    continue
            if results:
                return results
            else:
                return []