Loading src/simap_connector/service/telemetry/worker/data/AggregationCache.py +25 −3 Original line number Original line Diff line number Diff line Loading @@ -15,8 +15,8 @@ import logging, threading import logging, threading from dataclasses import dataclass, field from dataclasses import dataclass, field from datetime import datetime from datetime import datetime, timezone from typing import Dict, Set, Tuple from typing import Dict, Optional, Set, Tuple LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__) Loading @@ -43,6 +43,7 @@ class AggregationCache: def __init__(self) -> None: def __init__(self) -> None: self._lock = threading.Lock() self._lock = threading.Lock() self._samples : Dict[Tuple[str, str], LinkSample] = dict() self._samples : Dict[Tuple[str, str], LinkSample] = dict() self._last_valid_aggregation : Optional[AggregatedLinkSample] = None def update(self, link_sample : LinkSample) -> None: def update(self, link_sample : LinkSample) -> None: Loading @@ -65,7 +66,26 @@ class AggregationCache: MSG = '[aggregate] Aggregating {:d} supporting link(s)' MSG = '[aggregate] Aggregating {:d} supporting link(s)' LOGGER.info(MSG.format(num_samples)) LOGGER.info(MSG.format(num_samples)) agg = AggregatedLinkSample(timestamp=datetime.utcnow()) if num_samples == 0: if self._last_valid_aggregation is not None: MSG = '[aggregate] No samples available, reusing last valid aggregation: BW={:.2f}%, Latency={:.3f}ms' LOGGER.warning(MSG.format( self._last_valid_aggregation.bandwidth_utilization, self._last_valid_aggregation.latency )) # Return a copy with updated timestamp return AggregatedLinkSample( timestamp=datetime.now(timezone.utc).isoformat(), bandwidth_utilization=self._last_valid_aggregation.bandwidth_utilization, latency=self._last_valid_aggregation.latency, related_service_ids=self._last_valid_aggregation.related_service_ids.copy() ) else: MSG = '[aggregate] No samples available and no cached data, returning zeros' LOGGER.warning(MSG) return AggregatedLinkSample(timestamp=datetime.now(timezone.utc).isoformat()) agg = AggregatedLinkSample(timestamp=datetime.now(timezone.utc).isoformat()) for link_key, sample in self._samples.items(): for link_key, sample in self._samples.items(): network_id, link_id = link_key network_id, link_id = link_key Loading @@ -90,5 +110,7 @@ class AggregationCache: agg.bandwidth_utilization, agg.latency, agg.bandwidth_utilization, agg.latency, str(agg.related_service_ids) str(agg.related_service_ids) )) )) # Cache this valid aggregation for future use self._last_valid_aggregation = agg return agg return agg Loading
src/simap_connector/service/telemetry/worker/data/AggregationCache.py +25 −3 Original line number Original line Diff line number Diff line Loading @@ -15,8 +15,8 @@ import logging, threading import logging, threading from dataclasses import dataclass, field from dataclasses import dataclass, field from datetime import datetime from datetime import datetime, timezone from typing import Dict, Set, Tuple from typing import Dict, Optional, Set, Tuple LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__) Loading @@ -43,6 +43,7 @@ class AggregationCache: def __init__(self) -> None: def __init__(self) -> None: self._lock = threading.Lock() self._lock = threading.Lock() self._samples : Dict[Tuple[str, str], LinkSample] = dict() self._samples : Dict[Tuple[str, str], LinkSample] = dict() self._last_valid_aggregation : Optional[AggregatedLinkSample] = None def update(self, link_sample : LinkSample) -> None: def update(self, link_sample : LinkSample) -> None: Loading @@ -65,7 +66,26 @@ class AggregationCache: MSG = '[aggregate] Aggregating {:d} supporting link(s)' MSG = '[aggregate] Aggregating {:d} supporting link(s)' LOGGER.info(MSG.format(num_samples)) LOGGER.info(MSG.format(num_samples)) agg = AggregatedLinkSample(timestamp=datetime.utcnow()) if num_samples == 0: if self._last_valid_aggregation is not None: MSG = '[aggregate] No samples available, reusing last valid aggregation: BW={:.2f}%, Latency={:.3f}ms' LOGGER.warning(MSG.format( self._last_valid_aggregation.bandwidth_utilization, self._last_valid_aggregation.latency )) # Return a copy with updated timestamp return AggregatedLinkSample( timestamp=datetime.now(timezone.utc).isoformat(), bandwidth_utilization=self._last_valid_aggregation.bandwidth_utilization, latency=self._last_valid_aggregation.latency, related_service_ids=self._last_valid_aggregation.related_service_ids.copy() ) else: MSG = '[aggregate] No samples available and no cached data, returning zeros' LOGGER.warning(MSG) return AggregatedLinkSample(timestamp=datetime.now(timezone.utc).isoformat()) agg = AggregatedLinkSample(timestamp=datetime.now(timezone.utc).isoformat()) for link_key, sample in self._samples.items(): for link_key, sample in self._samples.items(): network_id, link_id = link_key network_id, link_id = link_key Loading @@ -90,5 +110,7 @@ class AggregationCache: agg.bandwidth_utilization, agg.latency, agg.bandwidth_utilization, agg.latency, str(agg.related_service_ids) str(agg.related_service_ids) )) )) # Cache this valid aggregation for future use self._last_valid_aggregation = agg return agg return agg