Commit 64ee5cce authored by Waleed Akbar's avatar Waleed Akbar
Browse files

feat: Update allowed links configuration and implement special triggering...

feat: Update allowed links configuration and implement special triggering rules for L6 in trans-pkt domain
parent d9750f58
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -13,9 +13,9 @@
# limitations under the License.

ALLOWED_LINKS_PER_CONTROLLER = {
    'e2e'      : { 'L1', 'L2', 'L3', 'L4'   },
    'agg'      : { 'L13', 'L14'             },
    'trans-pkt': { 'L5', 'L6', 'L9',  'L10' },
    'e2e'      : { 'L1',  'L2'                          },
    'agg'      : { 'L14'                                },
    'trans-pkt': { 'L3',  'L5', 'L6', 'L9', 'L10', 'L13' },
    # The remaining can not be monitored therefore they are not included in the allowed links for the controllers
    # 'agg'      : { 'L7ab',  'L7ba',  'L8ab',  'L8ba', 'L11ab', 'L11ba', 'L12ab', 'L12ba',  },
}
+1 −1
Original line number Diff line number Diff line
@@ -194,7 +194,7 @@ def set_simap_network(context_client: ContextClient, simap_client: SimapClient,
            link.update(
                'sdp1', endpoints[0], 'sdp2', endpoints[1],
                supporting_link_ids=[
                    ('trans-pkt', 'Trans-L1'), ('admin', 'L13'), ('admin', 'L3')
                    ('trans-pkt', 'Trans-L1'), ('trans-pkt', 'L13'), ('trans-pkt', 'L3')
                ]
            )
        except (KeyError, IndexError, ValueError) as e:
+97 −5
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@


import logging, queue, threading, uuid
from typing import Any, Optional, Set
from typing import Any, List, Optional, Set, Tuple
from common.Constants import DEFAULT_TOPOLOGY_NAME
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import (
@@ -674,6 +674,21 @@ class EventDispatcher(BaseEventDispatcher):
                LOGGER.info('Connection {:s} uses allowed link: {:s} (uuid: {:s})'.format(connection_uuid, link_name, link_uuid))
                worker_name = '{:s}:{:s}'.format(link_topology_name, link_name)

                # --- TEMPORTYY: Check for special triggering rules for L6 in trans-pkt domain ---
                if link_name == "L6":
                    # Check for special triggering rules (e.g., L6 triggers L3 and L13)
                    triggered_links = self._check_and_trigger_additional_links(
                                                link_topology_name, active_conn_count)
                                # Update the cached mapping to include triggered links
                    if triggered_links:
                        mapping = self._object_cache.get(CachedEntities.CONNECTION, connection_uuid, auto_retrieve=False)
                        if mapping and isinstance(mapping, dict):
                            mapping['triggered_links'] = triggered_links
                            self._object_cache.set(CachedEntities.CONNECTION, mapping, connection_uuid)
                            LOGGER.debug('Updated connection {:s} mapping with {:d} triggered links'.format(
                                connection_uuid, len(triggered_links)))
                # --- END OF TEMPORARY LOGIC ---

                # Worker should already exist from _dispatch_link_set (link creation event)
                if not self._telemetry_pool.has_worker(WorkerTypeEnum.SYNTHESIZER, worker_name):
                    LOGGER.warning('Worker not found for link {:s}, creating and starting new worker'.format(link_name))
@@ -687,7 +702,7 @@ class EventDispatcher(BaseEventDispatcher):
                            connection_count = active_conn_count,
                            link_capacity    = LINKS_CAPACITY.get(link_name, 100.0)
                        ),
                        related_service_ids=[],
                        related_service_ids=[],     # TODO: populate with actual related services if needed (later)
                    ))
                    sampling_interval = 1.0
                    self._telemetry_pool.start_synthesizer(worker_name, resources, sampling_interval)
@@ -708,6 +723,56 @@ class EventDispatcher(BaseEventDispatcher):

        return True

    # TEMPORARY: This function implements the special triggering rules for L6 in trans-pkt domain. 
    def _check_and_trigger_additional_links(
            self, link_topology_name: str, active_conn_count: int
        ) -> List[Tuple[str, str, str]]:
        """
        Check for special triggering rules and start additional workers.
        
        Rule: When L6 is processed in trans-pkt domain, also start workers for L3 and L13.
        
        Args:
            connection_uuid: UUID of the connection being processed
            domain_name: Domain name (e.g., 'trans-pkt')
            processed_links: List of (link_uuid, link_name, link_topology_name) already processed
        
        Returns:
            List of triggered links with format: (link_uuid, link_name, topology_name)
        """
        triggered_links = []
        
        # Trigger workers for L3 and L13 using same topology as L6
        for link_name in ['L3', 'L13']:
            # Generate UUID for the triggered link
            link_uuid   = str(uuid.uuid4())
            worker_name = '{:s}:{:s}'.format(link_topology_name, link_name)
            
            LOGGER.info('Triggering worker for link {:s} (generated uuid: {:s})'.format(link_name, link_uuid))
            
            # Check if worker already exists
            if not self._telemetry_pool.has_worker(WorkerTypeEnum.SYNTHESIZER, worker_name):
                # Create and start worker
                resources = Resources()
                resources.links.append(ResourceLink(
                    domain_name = link_topology_name,
                    link_name   = link_name,
                    metrics_sampler = SyntheticSampler.create_random(
                        connection_count = active_conn_count,
                        link_capacity    = LINKS_CAPACITY.get(link_name, 100.0)
                    ),
                    related_service_ids = [],
                ))
                sampling_interval = 1.0
                self._telemetry_pool.start_synthesizer(worker_name, resources, sampling_interval)
                LOGGER.info('Started triggered synthesizer worker: {:s}'.format(worker_name))
            else:
                LOGGER.info('Worker {:s} already exists, skipping creation'.format(worker_name))
            
            triggered_links.append((link_uuid, link_name, link_topology_name))
        
        return triggered_links


    def _prepare_connection_processing(self, connection_uuid: str):
        """
@@ -762,7 +827,8 @@ class EventDispatcher(BaseEventDispatcher):
        # Cache the connection-to-links mapping for later retrieval (e.g., during REMOVE events)
        mapping = {
            'domain': domain_name,
            'links': {link_uuid: {'name': link_name, 'topology': link_topo_name} for link_uuid, link_name, link_topo_name in processed_links}
            'links': {link_uuid: {'name': link_name, 'topology': link_topo_name} for link_uuid, link_name, link_topo_name in processed_links},
            'triggered_links': []  # Will store additional links triggered by special rules
        }
        self._object_cache.set(CachedEntities.CONNECTION, mapping, connection_uuid)
        LOGGER.debug('Cached connection {:s} mapping with {:d} links for domain {:s}'.format(
@@ -845,6 +911,7 @@ class EventDispatcher(BaseEventDispatcher):
            LOGGER.debug('Deleted cached mapping for connection {:s}'.format(connection_uuid))

            # Process each link: count remaining connections and stop/update worker accordingly
            all_links_stopped = True  # Track if all links have been stopped
            for link_uuid, link_name, link_topology_name in processed_links:
                worker_name = '{:s}:{:s}'.format(link_topology_name, link_name)

@@ -860,10 +927,28 @@ class EventDispatcher(BaseEventDispatcher):
                    self._telemetry_pool.stop_worker(WorkerTypeEnum.SYNTHESIZER, worker_name)
                    LOGGER.info('Stopped telemetry worker for link {:s}, no connections remain'.format(link_name))

                    delete_simap_network(self._simap_client, domain_name)
                    LOGGER.info('Deleted SIMAP network for domain {:s} after connection removal'.format(domain_name))
                    # ---- TEMPORARY: Stop triggered links (L3 and L13 when L6 is removed from trans-pkt) ----
                    if link_name == "L6":
                        try:
                            triggered_links = mapping.get('triggered_links', [])
                            if triggered_links:
                                LOGGER.info('Connection {:s} has {:d} triggered links to clean up'.format(
                                    connection_uuid, len(triggered_links)))
                                
                                for _, trig_link_name, trig_link_topology_name in triggered_links:
                                    trig_worker_name = '{:s}:{:s}'.format(trig_link_topology_name, trig_link_name)
                                    
                                    if self._telemetry_pool.has_worker(WorkerTypeEnum.SYNTHESIZER, trig_worker_name):
                                        self._telemetry_pool.stop_worker(WorkerTypeEnum.SYNTHESIZER, trig_worker_name)
                                        LOGGER.info('Stopped triggered telemetry worker for link {:s}'.format(trig_link_name))
                                    else:
                                        LOGGER.warning('Triggered worker {:s} not found during cleanup'.format(trig_worker_name))
                        except Exception as e:
                            LOGGER.exception('Failed to stop triggered links for connection {:s}: {:s}'.format(connection_uuid, str(e)))
                    # ---- END OF TEMPORARY LOGIC ----
                else:
                    # Other connections still use this link, update worker with new count
                    all_links_stopped = False
                    worker = self._telemetry_pool.get_worker(WorkerTypeEnum.SYNTHESIZER, worker_name)
                    assert isinstance(worker, SynthesizerWorker), \
                        'Expected SynthesizerWorker, got {:s}'.format(type(worker).__name__)
@@ -872,6 +957,13 @@ class EventDispatcher(BaseEventDispatcher):
                    LOGGER.info('Updated telemetry for link {:s} after connection removal, {:d} connections remain'.format(
                        link_name, remaining_conn_count))

            # Delete SIMAP network only if all links have been stopped
            if all_links_stopped:
                delete_simap_network(self._simap_client, domain_name)
                LOGGER.info('Deleted SIMAP network for domain {:s} after all links stopped'.format(domain_name))
            else:
                LOGGER.debug('SIMAP network {:s} retained, some links still have active connections'.format(domain_name))

        except Exception as e:
            LOGGER.exception('Failed to process connection removal {:s}: {:s}'.format(
                connection_uuid, str(e)))
+4 −4
Original line number Diff line number Diff line
@@ -89,10 +89,10 @@ class SyntheticSampler:
        bw_utilization = max(min_bw, min(max_bw, bw_utilization))
        self.prev_bw   = bw_utilization
        
        # Generate latency using same pattern as bandwidth (BW ranges / 10 = 0-10ms range)
        avg_lat  = avg / 10.0
        min_lat  = min_bw / 10.0
        max_lat  = max_bw / 10.0
        # Generate latency using same pattern as bandwidth (BW ranges / 9 = 0-10ms range)
        avg_lat  = avg / 9.0
        min_lat  = min_bw / 9.0
        max_lat  = max_bw / 9.0
        
        if self.prev_latency is None:
            # First sample: start at average for this connection count