diff --git a/manifests/metallb.yaml b/manifests/metallb.yaml index 162768aca24df3a0f1b37362c65687e7f2da565b..60b82297e43934fca7f599eee64df61d035be7dc 100644 --- a/manifests/metallb.yaml +++ b/manifests/metallb.yaml @@ -17,7 +17,7 @@ apiVersion: metallb.io/v1beta1 kind: IPAddressPool metadata: - name: my-ip-pool + name: metallb-address-pool namespace: metallb-system spec: addresses: diff --git a/src/telemetry/backend/service/HelperMethods.py b/src/telemetry/backend/service/HelperMethods.py index 332b3f10d7ea9ec357897dcce62e856545d7f37c..2d57917c1e89ede7bc04ad53944066f8016c1209 100644 --- a/src/telemetry/backend/service/HelperMethods.py +++ b/src/telemetry/backend/service/HelperMethods.py @@ -16,6 +16,7 @@ import uuid import logging from .collector_api._Collector import _Collector from .collector_api.DriverInstanceCache import get_driver +from .collectors.int_collector.INTCollector import INTCollector from common.proto.kpi_manager_pb2 import KpiId from common.tools.context_queries.Device import get_device from common.tools.context_queries.EndPoint import get_endpoint_names @@ -61,19 +62,18 @@ def get_subscription_parameters( raise Exception(f"KPI ID: {kpi_id} - Device not found for KPI descriptor.") endpoints = device.device_endpoints - # LOGGER.info(f"Device for KPI ID: {kpi_id} - {endpoints}") - # LOGGER.info(f"--------------------") + LOGGER.debug(f"Device for KPI ID: {kpi_id} - {endpoints}") endpointsIds = [endpoint_id.endpoint_id for endpoint_id in endpoints] - # for endpoint_id in endpoints: - # LOGGER.info(f"Endpoint UUID: {endpoint_id.endpoint_id}") + for endpoint_id in endpoints: + LOGGER.debug(f"Endpoint UUID: {endpoint_id.endpoint_id}") # Getting endpoint names device_names, endpoint_data = get_endpoint_names( context_client = context_client, endpoint_ids = endpointsIds ) - # LOGGER.info(f"Device names: {device_names}") - # LOGGER.info(f"Endpoint data: {endpoint_data}") + LOGGER.debug(f"Device names: {device_names}") + LOGGER.debug(f"Endpoint data: {endpoint_data}") subscriptions = [] sub_id = None @@ -94,7 +94,6 @@ def get_subscription_parameters( ) return subscriptions - def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, driver_instance_cache ) -> Optional[_Collector]: """ @@ -110,21 +109,67 @@ def get_collector_by_kpi_id(kpi_id: str, kpi_manager_client, context_client, dri LOGGER.info(f"Getting collector for KPI ID: {kpi_id}") kpi_id_obj = KpiId() kpi_id_obj.kpi_id.uuid = kpi_id # pyright: ignore[reportAttributeAccessIssue] - kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj) - # LOGGER.info(f"KPI Descriptor: {kpi_descriptor}") + kpi_descriptor = kpi_manager_client.GetKpiDescriptor(kpi_id_obj) if not kpi_descriptor: raise Exception(f"KPI ID: {kpi_id} - Descriptor not found.") - # device_uuid = kpi_descriptor.device_id.device_uuid.uuid - device = get_device( context_client = context_client, - device_uuid = kpi_descriptor.device_id.device_uuid.uuid, - include_config_rules = True, - include_components = False, - ) + device_uuid = kpi_descriptor.device_id.device_uuid.uuid + device = get_device( + context_client = context_client, + device_uuid = device_uuid, + include_config_rules = True, + include_components = False, + ) # Getting device collector (testing) collector : _Collector = get_driver(driver_instance_cache, device) if collector is None: raise Exception(f"KPI ID: {kpi_id} - Collector not found for device {device.device_uuid.uuid}.") - # LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}") return collector + +def get_node_level_int_collector(collector_id: str, kpi_id: str, address: str, interface: str, port: int, + service_id: str, context_id: str) -> Optional[_Collector]: + """ + Method to instantiate an in-band network telemetry collector at a node level. + Such a collector binds to a physical/virtual interface of a node, expecting + packets from one or more switches. + Every packet contains multiple KPIs, therefore this collector is not bound to + a single KPI. + Returns: + - Collector instance if found, otherwise None. + Raises: + - Exception if the KPI ID is not found or the collector cannot be created. + """ + + LOGGER.debug(f"INT collector ID: {collector_id}") + LOGGER.debug(f"INT collector address: {address}") + LOGGER.debug(f"INT collector port: {port}") + LOGGER.debug(f"INT collector interface: {interface}") + LOGGER.debug(f"INT collector kpi_id: {kpi_id}") + LOGGER.debug(f"INT collector service_id: {service_id}") + LOGGER.debug(f"INT collector context_id: {context_id}") + # Initialize an INT collector + try: + collector : _Collector = INTCollector( + address=address, + port=port, + collector_id=collector_id, + interface=interface, + kpi_id=kpi_id, + service_id=service_id, + context_id=context_id + ) + except Exception as ex: + LOGGER.exception(f"Failed to create INT Collector object on node {address}, {interface}:{port}") + + connected = False + if not collector: + return None + LOGGER.info(f"Collector for KPI ID: {kpi_id} - {collector.__class__.__name__}") + + try: + connected = collector.Connect() + except Exception as ex: + LOGGER.exception(f"Failed to connect INT Collector on node {address}, {interface}:{port}") + + return collector if connected else None diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 28b0c39890f4e9a677d4218a4d06a920ec375cc4..d4e99f300b2450259e7f64c2745a96d88513e57b 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -17,7 +17,7 @@ import time import logging import threading -from .HelperMethods import get_collector_by_kpi_id, get_subscription_parameters +from .HelperMethods import get_collector_by_kpi_id, get_subscription_parameters, get_node_level_int_collector from common.Constants import ServiceNameEnum from common.Settings import get_service_port_grpc from confluent_kafka import Consumer as KafkaConsumer @@ -26,12 +26,8 @@ from confluent_kafka import Producer as KafkaProducer from datetime import datetime, timezone from typing import Any, Dict -from .collector_api._Collector import _Collector -from .collector_api.DriverInstanceCache import DriverInstanceCache, get_driver -from .collectors.emulated.EmulatedCollector import EmulatedCollector +from .collector_api.DriverInstanceCache import DriverInstanceCache from common.method_wrappers.Decorator import MetricsPool -from common.proto.kpi_manager_pb2 import KpiId -from common.tools.context_queries.Device import get_device from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService from context.client.ContextClient import ContextClient @@ -56,7 +52,6 @@ class TelemetryBackendService(GenericGrpcService): 'auto.offset.reset' : 'latest'}) self.driver_instance_cache = driver_instance_cache self.device_collector = None - self.collector = None # This should be replaced with device_collector (later to be removed) self.context_client = ContextClient() self.kpi_manager_client = KpiManagerClient() self.active_jobs = {} @@ -84,40 +79,40 @@ class TelemetryBackendService(GenericGrpcService): f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") continue else: - LOGGER.error("Consumer error: {}".format(receive_msg.error())) + LOGGER.error(f"Consumer error: {receive_msg.error()}") break try: collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') - LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) + LOGGER.debug(f"Received Collector: {collector_id} - {collector}") duration = collector.get('duration', 0) if duration == -1 and collector['interval'] == -1: self.TerminateCollector(collector_id) else: - LOGGER.info("Received Collector ID: {:} - Scheduling...".format(collector_id)) + LOGGER.info(f"Received Collector ID: {collector_id} - Scheduling...") if collector_id not in self.active_jobs: stop_event = threading.Event() self.active_jobs[collector_id] = stop_event threading.Thread(target=self.GenericCollectorHandler, - args=( - collector_id, - collector['kpi_id'], - duration, - collector['interval'], - collector['interface'], # for INT collector - collector['transport_port'], # for INT collector - collector['service_id'], # for INT collector - collector['context_id'], # for INT collector - stop_event - )).start() + args=( + collector_id, + collector['kpi_id'], + duration, + collector['interval'], + collector['interface'], # for INT collector + collector['transport_port'], # for INT collector + collector['service_id'], # for INT collector + collector['context_id'], # for INT collector + stop_event + )).start() # Stop the Collector after the given duration if duration > 0: def stop_after_duration(completion_time, stop_event): time.sleep(completion_time) if not stop_event.is_set(): LOGGER.warning( - f"Execution duration ({completion_time}) completed of Collector: {collector_id}") + f"Execution duration ({completion_time}) completed for Collector: {collector_id}") self.TerminateCollector(collector_id) duration_thread = threading.Thread( @@ -126,66 +121,60 @@ class TelemetryBackendService(GenericGrpcService): ) duration_thread.start() else: - LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id)) + LOGGER.warning(f"Collector ID: {collector_id} - Already scheduled or running") except Exception as e: LOGGER.warning( - "Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) + f"Unable to consume message from topic: {KafkaTopic.TELEMETRY_REQUEST.value}. ERROR: {e}") - def GenericCollectorHandler(self, collector_id, kpi_id, duration, interval, interface , port , service_id , context_id , stop_event ): + def GenericCollectorHandler(self, collector_id, kpi_id, duration, interval, interface, port, service_id, context_id, stop_event): """ Method to handle collector request. """ - # CONFIRM: The method (get_collector_by_kpi_id) is working correctly. testcase in integration tests. - self.device_collector = get_collector_by_kpi_id( - kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache) + + # INT collector invocation + if interface: + self.device_collector = get_node_level_int_collector( + collector_id=collector_id, + kpi_id=kpi_id, + address="127.0.0.1", + interface=interface, + port=port, + service_id=service_id, + context_id=context_id + ) + return + # Rest of the collectors + else: + self.device_collector = get_collector_by_kpi_id( + kpi_id, self.kpi_manager_client, self.context_client, self.driver_instance_cache) if not self.device_collector: LOGGER.warning(f"KPI ID: {kpi_id} - Collector not found. Skipping...") raise Exception(f"KPI ID: {kpi_id} - Collector not found.") - LOGGER.info(("----- Number done -----")) - # CONFIRM: The method (get_subscription_parameters) is working correctly. testcase in telemetery backend tests. + # CONFIRM: The method (get_subscription_parameters) is working correctly. testcase in telemetry backend tests resource_to_subscribe = get_subscription_parameters( kpi_id, self.kpi_manager_client, self.context_client, duration, interval ) if not resource_to_subscribe: LOGGER.warning(f"KPI ID: {kpi_id} - Resource to subscribe not found. Skipping...") raise Exception(f"KPI ID: {kpi_id} - Resource to subscribe not found.") - LOGGER.info("------ Number done 2 -----") responses = self.device_collector.SubscribeState(resource_to_subscribe) - for status in responses: if isinstance(status, Exception): LOGGER.error(f"Subscription failed for KPI ID: {kpi_id} - Error: {status}") raise status else: LOGGER.info(f"Subscription successful for KPI ID: {kpi_id} - Status: {status}") - LOGGER.info("------ Number done 3 -----") for samples in self.device_collector.GetState(duration=duration, blocking=True): LOGGER.info(f"KPI ID: {kpi_id} - Samples: {samples}") self.GenerateKpiValue(collector_id, kpi_id, samples) - LOGGER.info("------ Number done 4 -----") - self.device_collector.Disconnect() - # TODO: Stop_event should be managed here is this method because there will be no more specific collector. - -# --- START: Kept for INT compatibility, to be removed later --- - def INTCollectorHandler(self, subscription, duration, collector_id, kpi_id, interface, port, service_id, context_id , stop_event): - self.collector = INTCollector( - collector_id=collector_id , - address="127.0.0.1", - interface=interface, - port=port, - kpi_id=kpi_id, - service_id=service_id, - context_id=context_id - ) - self.collector.Connect() + # TODO: Stop_event should be managed in this method because there will be no more specific collector if stop_event.is_set(): - self.collector.Disconnect() -# --- END: Kept for INT compatibility, to be removed later --- + self.device_collector.Disconnect() def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ @@ -207,25 +196,24 @@ class TelemetryBackendService(GenericGrpcService): def delivery_callback(self, err, msg): if err: - LOGGER.error('Message delivery failed: {:s}'.format(str(err))) + LOGGER.error(f"Message delivery failed: {str(err)}") def TerminateCollector(self, job_id): LOGGER.debug("Terminating collector backend...") try: if job_id not in self.active_jobs: - self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.") + self.logger.warning(f"No active jobs found for {job_id}. It might have already been terminated.") else: LOGGER.info(f"Terminating job: {job_id}") stop_event = self.active_jobs.pop(job_id, None) if stop_event: stop_event.set() LOGGER.info(f"Job {job_id} terminated.") - if self.collector.UnsubscribeState(job_id): + if self.device_collector.UnsubscribeState(job_id): LOGGER.info(f"Unsubscribed from collector: {job_id}") else: LOGGER.warning(f"Failed to unsubscribe from collector: {job_id}") else: LOGGER.warning(f"Job {job_id} not found in active jobs.") except: - LOGGER.exception("Error terminating job: {:}".format(job_id)) - + LOGGER.exception(f"Error terminating job: {job_id}") diff --git a/src/telemetry/backend/service/collectors/__init__.py b/src/telemetry/backend/service/collectors/__init__.py index 6148703631f83d59ef6837eb933671fbb9524a93..6b45dcd41c19bb64d4ff4b26ba1ca5972d5b26f6 100644 --- a/src/telemetry/backend/service/collectors/__init__.py +++ b/src/telemetry/backend/service/collectors/__init__.py @@ -34,8 +34,6 @@ COLLECTORS.append( }, ])) -# TODO: Import for gNMI OpenConfig Collector ... - if LOAD_ALL_DEVICE_DRIVERS: from .gnmi_oc.GnmiOpenConfigCollector import GNMIOpenConfigCollector # pylint: disable=wrong-import-position COLLECTORS.append( @@ -49,7 +47,7 @@ if LOAD_ALL_DEVICE_DRIVERS: ) if LOAD_ALL_DEVICE_DRIVERS: - from .intcollector.INTCollector import INTCollector # pylint: disable=wrong-import-position + from .int_collector.INTCollector import INTCollector # pylint: disable=wrong-import-position COLLECTORS.append( (INTCollector, [ { diff --git a/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py b/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py index c383e1cabe717b4b68756560f1fc5023dc969868..31cdce39a0c079bb3c853276421a390d9947e7ab 100644 --- a/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py +++ b/src/telemetry/backend/service/collectors/gnmi_oc/GnmiOpenConfigCollector.py @@ -33,7 +33,7 @@ class GNMIOpenConfigCollector(_Collector): ========================= Lightweight wrapper around *pygnmi* with subscribe / get / unsubscribe helpers. """ - def __init__(self, address: str = '', port: int = -1, **setting ) -> None: + def __init__(self, address: str = '', port: int = -1, **setting) -> None: super().__init__('gNMI_openconfig_collector', address, port, **setting) self._subscriptions : Dict[str, Subscription] = {} diff --git a/src/telemetry/backend/service/collectors/intcollector/INTCollector.py b/src/telemetry/backend/service/collectors/int_collector/INTCollector.py similarity index 57% rename from src/telemetry/backend/service/collectors/intcollector/INTCollector.py rename to src/telemetry/backend/service/collectors/int_collector/INTCollector.py index 5931f33f34b1d097c435c0a8483379026e819213..7f945efe9a5deedaf1cb44a112d50764fa015188 100644 --- a/src/telemetry/backend/service/collectors/intcollector/INTCollector.py +++ b/src/telemetry/backend/service/collectors/int_collector/INTCollector.py @@ -27,17 +27,16 @@ import struct import socket import ipaddress -from .INTCollectorCommon import IntDropReport, IntLocalReport, IntFixedReport, FlowInfo +from .INTCollectorCommon import IntDropReport, IntLocalReport, IntFixedReport, FlowInfo, IPPacket, UDPPacket from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor from confluent_kafka import Producer as KafkaProducer from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from uuid import uuid4 -from typing import Dict +from typing import Dict, List, Tuple from datetime import datetime, timezone import json from kpi_manager.client.KpiManagerClient import KpiManagerClient -from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId from context.client.ContextClient import ContextClient from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient from common.proto.kpi_sample_types_pb2 import KpiSampleType @@ -46,50 +45,68 @@ import logging LOGGER = logging.getLogger(__name__) -class INTCollector(_Collector): +DEF_SW_NUM = 10 - last_packet_time = time.time() # Track last packet time +class INTCollector(_Collector): - max_idle_time = 5 # for how long we tolerate inactivity - sniff_timeout = 3 # how often we stop sniffing to check for inactivity + last_packet_time = 0.0 # Track the timestamp of the last packet + max_idle_time = 5 # For how long we tolerate inactivity + sniff_timeout = 3 # How often we stop sniffing to check for inactivity """ - INTCollector is a class that simulates a network collector for testing purposes. - It provides functionalities to manage configurations, state subscriptions, and synthetic data generation. + INTCollector spawns a packet sniffer at the interface of the Telemetry service, + which is mapped to the interface of the TFS host. + INT packets arriving there are: + - picked up by the INT collector + - parsed, and + - telemetry KPI metric values are extracted and reported to Kafka as KPIDescriptors """ - def __init__(self, collector_id: str , address: str, interface: str, port: str, kpi_id: str, service_id: str, context_id: str, **settings): - super().__init__('int_collector', address, port, **settings) - self._out_samples = queue.Queue() # Queue to hold synthetic state samples - self._scheduler = BackgroundScheduler(daemon=True) + + def __init__(self, address: str, port: int, **settings) -> None: + super().__init__('INTCollector', address, port, **settings) + self.collector_id = settings.pop('collector_id', None) + self.interface = settings.pop('interface', None) + self.kpi_id = settings.pop('kpi_id', None) + self.service_id = settings.pop('service_id', None) + self.context_id = settings.pop('context_id', None) + + if any(item is None for item in [ + self.collector_id, self.interface, self.kpi_id, self.service_id, self.context_id]): + LOGGER.error("INT collector not instantiated properly: Bad input") + return + + self._out_samples = queue.Queue() + self._scheduler = BackgroundScheduler(daemon=True) self._scheduler.configure( jobstores = {'default': MemoryJobStore()}, executors = {'default': ThreadPoolExecutor(max_workers=1)}, timezone = pytz.utc ) self.kafka_producer = KafkaProducer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) - self.collector_id = collector_id - self.interface = interface self.kpi_manager_client = KpiManagerClient() self.analytics_frontend_client = AnalyticsFrontendClient() self.context_client = ContextClient() - self.kpi_id = kpi_id - self.service_id = service_id - self.context_id = context_id self.table = {} - self.connected = False # To track connection state - LOGGER.info("INT Collector initialized") + self.connected = False + LOGGER.info("=== INT Collector initialized") def Connect(self) -> bool: + LOGGER.info("=== INT Collector Connect()") LOGGER.info(f"Connecting to {self.interface}:{self.port}") - self.connected = True - self._scheduler.add_job(self.sniff_with_restarts_on_idle, id=self.kpi_id ,args=[self.interface , self.port , self.service_id, self.context_id]) + self._scheduler.add_job( + self.sniff_with_restarts_on_idle, + id=self.kpi_id, + args=[self.interface, self.port, self.service_id] + ) self._scheduler.start() + self.connected = True LOGGER.info(f"Successfully connected to {self.interface}:{self.port}") return True def Disconnect(self) -> bool: + LOGGER.info("=== INT Collector Disconnect()") LOGGER.info(f"Disconnecting from {self.interface}:{self.port}") if not self.connected: LOGGER.warning("INT Collector is not connected. Nothing to disconnect.") @@ -102,14 +119,54 @@ class INTCollector(_Collector): LOGGER.info(f"Successfully disconnected from {self.interface}:{self.port}") return True + def require_connection(self): + if not self.connected: + raise RuntimeError("INT collector is not connected. Please connect before performing operations.") + + def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float, str, int, str, str]]) -> bool: + LOGGER.info("=== INT Collector SubscribeState()") + self.require_connection() + try: + _, _, _, _, interface, port, service_id, _ = subscriptions + except: + LOGGER.exception(f"Invalid subscription format: {subscriptions}") + return False + + if self.kpi_id: + self._scheduler.add_job( + self.sniff_with_restarts_on_idle, + id=self.kpi_id, + args=[interface, port, service_id] + ) + + return True + + def UnsubscribeState(self, resource_key: str) -> bool: + LOGGER.info("=== INT Collector UnsubscribeState()") + self.require_connection() + try: + # Check if job exists + job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] + if not job_ids: + LOGGER.warning(f"No active jobs found for {resource_key}. It might have already been terminated.") + return False + for job_id in job_ids: + self._scheduler.remove_job(job_id) + LOGGER.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") + return True + except: + LOGGER.exception(f"Failed to unsubscribe from {resource_key}") + return False + def on_idle_timeout(self): - LOGGER.info(f"Sniffer idle for more than {self.max_idle_time} seconds.") + LOGGER.info(f"=== INT Collector IDLE() - No INT packets arrived during the last {self.max_idle_time}") LOGGER.debug(f"last_packet_time {self.last_packet_time} seconds.") + # Report a zero value for the P4 switch KPIs values = [0] - for sw_id in range(1, 6): + for sw_id in range(1, DEF_SW_NUM+1): sw = self.table.get(sw_id) - self.overwrite_switch_values(sw , values) + self.overwrite_switch_values(sw, values) def overwrite_switch_values(self, switch, values): if not switch: @@ -122,38 +179,52 @@ class INTCollector(_Collector): for key, value in switch.items(): self.send_message_to_kafka(key, value) - def process_packet(self , packet, port, service_id , context_id): - # global last_packet_time + def process_packet(self, packet, port, service_id): + LOGGER.debug("=== INT Collector Packet-In()") + LOGGER.debug(packet) - # Check for IP layer + # Check for IP header if IP not in packet: return None ip_layer = packet[IP] - # ip_pkt = IPPacket(ip_layer[:20]) - # ip_pkt.show() - # Check for UDP + # IP parsing + try: + ihl = ip_layer.ihl * 4 + raw_ip = bytes(ip_layer) + ip_pkt = IPPacket(raw_ip[:ihl]) # exclude options if any + src_ip_str = str(ipaddress.IPv4Address(ip_pkt.ip_src)) + dst_ip_str = str(ipaddress.IPv4Address(ip_pkt.ip_dst)) + # ip_pkt.show() + except Exception as ex: + LOGGER.exception(f"Failed to parse IP packet: {ex}") + return None + + LOGGER.debug(f"ip src: {src_ip_str}") + LOGGER.debug(f"ip dst: {dst_ip_str}") + LOGGER.debug(f"ip-proto: {ip_pkt.ip_proto}") + + # Check for UDP header if UDP not in ip_layer: return None udp_layer = ip_layer[UDP] - # Only the INT port + # We care about datagrams arriving on the INT port if udp_layer.dport != port: + LOGGER.warning(f"Expected UDP INT packet on port {udp_layer.dport}. Received packet on port {port}") return None - # udp_dgram = UDPPacket(bytes(udp_layer)) - # udp_dgram.show() - src_ip = socket.ntohl(struct.unpack(' 0, "Egress timestamp must be > ingress timestamp" + assert lat > 0, f"Egress timestamp must be > ingress timestamp. Got a diff: {lat}" # local_report.show() # Create flow info flow_info = FlowInfo( - src_ip=src_ip, - dst_ip=dst_ip, + src_ip=ip_pkt.ip_src, + dst_ip=ip_pkt.ip_dst, src_port=udp_layer.sport, dst_port=udp_layer.dport, ip_proto=ip_layer.proto, @@ -201,25 +274,26 @@ class INTCollector(_Collector): ) LOGGER.debug(f"Flow info: {flow_info}") - self.create_descriptors_and_send_to_kafka(flow_info , service_id , context_id) - + self.create_descriptors_and_send_to_kafka(flow_info, service_id) self.last_packet_time = time.time() + return flow_info - def set_kpi_descriptor(self , kpi_uuid , service_id , device_id , endpoint_id , sample_type): + def set_kpi_descriptor(self, kpi_uuid, service_id, sample_type): kpi_descriptor = KpiDescriptor() kpi_descriptor.kpi_sample_type = sample_type kpi_descriptor.service_id.service_uuid.uuid = service_id - # kpi_descriptor.device_id.device_uuid.uuid = device_id - # kpi_descriptor.endpoint_id.endpoint_uuid.uuid = endpoint_id kpi_descriptor.kpi_id.kpi_id.uuid = kpi_uuid - kpi_id: KpiId = self.kpi_manager_client.SetKpiDescriptor(kpi_descriptor) + try: + kpi_id: KpiId = self.kpi_manager_client.SetKpiDescriptor(kpi_descriptor) + except Exception as ex: + LOGGER.exception(f"Failed to set KPI descriptor {kpi_uuid}: {ex}") return kpi_id - def create_descriptors_and_send_to_kafka(self, flow_info , service_id , context_id): - LOGGER.debug(f"PACKET FROM SWITCH: {flow_info.switch_id} LATENCY: {flow_info.hop_latency}") + def create_descriptors_and_send_to_kafka(self, flow_info, service_id): + LOGGER.debug(f"Packet from switch: {flow_info.switch_id} with latency: {flow_info.hop_latency}") if(self.table.get(flow_info.switch_id) == None): seq_num_kpi_id = str(uuid4()) ingress_ts_kpi_id = str(uuid4()) @@ -241,14 +315,14 @@ class INTCollector(_Collector): LOGGER.debug(f"is_drop_kpi_id for switch {flow_info.switch_id}: {is_drop_kpi_id}") LOGGER.debug(f"sw_lat_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}") - seq_num_kpi = self.set_kpi_descriptor(seq_num_kpi_id, service_id ,'', '', KpiSampleType.KPISAMPLETYPE_INT_SEQ_NUM) - ingress_timestamp_kpi = self.set_kpi_descriptor(ingress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_ING) - egress_timestamp_kpi = self.set_kpi_descriptor(egress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_EGR) - hop_latency_kpi = self.set_kpi_descriptor(hop_lat_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT) - ingress_port_id_kpi = self.set_kpi_descriptor(ing_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_ING) - egress_port_id_kpi = self.set_kpi_descriptor(egr_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_EGR) - queue_occup_kpi = self.set_kpi_descriptor(queue_occup_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_QUEUE_OCCUP) - is_drop_kpi = self.set_kpi_descriptor(is_drop_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_IS_DROP) + seq_num_kpi = self.set_kpi_descriptor(seq_num_kpi_id, service_id ,KpiSampleType.KPISAMPLETYPE_INT_SEQ_NUM) + ingress_timestamp_kpi = self.set_kpi_descriptor(ingress_ts_kpi_id, service_id, KpiSampleType.KPISAMPLETYPE_INT_TS_ING) + egress_timestamp_kpi = self.set_kpi_descriptor(egress_ts_kpi_id, service_id, KpiSampleType.KPISAMPLETYPE_INT_TS_EGR) + hop_latency_kpi = self.set_kpi_descriptor(hop_lat_kpi_id, service_id, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT) + ingress_port_id_kpi = self.set_kpi_descriptor(ing_port_id_kpi_id, service_id, KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_ING) + egress_port_id_kpi = self.set_kpi_descriptor(egr_port_id_kpi_id, service_id, KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_EGR) + queue_occup_kpi = self.set_kpi_descriptor(queue_occup_kpi_id, service_id, KpiSampleType.KPISAMPLETYPE_INT_QUEUE_OCCUP) + is_drop_kpi = self.set_kpi_descriptor(is_drop_kpi_id, service_id, KpiSampleType.KPISAMPLETYPE_INT_IS_DROP) # Set a dedicated KPI descriptor for every switch sw_lat_kpi = None @@ -259,10 +333,10 @@ class INTCollector(_Collector): KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW07, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW08, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW09, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW10 ] - for i, sw_id in enumerate(range(1, 11)): + for i, sw_id in enumerate(range(1, DEF_SW_NUM+1)): if flow_info.switch_id == sw_id: - LOGGER.debug(f"SET KPI : seq_num_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}") - sw_lat_kpi = self.set_kpi_descriptor(sw_lat_kpi_id, service_id, '', '', sw_sample_types[i]) + LOGGER.debug(f"Set latency KPI for switch {flow_info.switch_id}: {sw_lat_kpi_id}") + sw_lat_kpi = self.set_kpi_descriptor(sw_lat_kpi_id, service_id, sw_sample_types[i]) # Gather keys and values keys = [ @@ -310,48 +384,67 @@ class INTCollector(_Collector): switch = self.table.get(flow_info.switch_id) # Overwrite values using zip - self.overwrite_switch_values(switch , values) + self.overwrite_switch_values(switch, values) - def send_message_to_kafka(self , kpi_id , measured_kpi_value): + def send_message_to_kafka(self, kpi_id, measured_kpi_value): + LOGGER.debug("=== INT Collector Kafka Writer()") producer = self.kafka_producer kpi_value: Dict = { "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id": kpi_id, "kpi_value": measured_kpi_value } - producer.produce( - KafkaTopic.VALUE.value, - key=self.collector_id, - value=json.dumps(kpi_value), - callback=self.delivery_callback - ) - producer.flush() - LOGGER.debug(f"Message with kpi_id: {kpi_id} was send to kafka!") + try: + producer.produce( + KafkaTopic.VALUE.value, + key=self.collector_id, + value=json.dumps(kpi_value), + callback=self.delivery_callback + ) + producer.flush() + except Exception as ex: + LOGGER.error(f"Message with kpi_id: {kpi_id} is NOT sent to kafka!") + LOGGER.exception(f"{ex}") + return + LOGGER.debug(f"Message with kpi_id: {kpi_id} is sent to kafka!") - def packet_callback(self, packet, port , service_id,context_id): - flow_info = self.process_packet(packet , port , service_id, context_id) + def packet_callback(self, packet, port, service_id): + flow_info = self.process_packet(packet, port, service_id) if flow_info: LOGGER.debug(f"Flow info: {flow_info}") - def sniff_with_restarts_on_idle(self, interface, port, service_id , context_id): + def sniff_with_restarts_on_idle(self, interface, port, service_id): + LOGGER.info("=== INT Collector Sniffer Start") while True: # Run sniff for a short period to periodically check for idle timeout - sniff( - iface=interface, - filter=f"udp port {port}", - prn=lambda pkt: self.packet_callback(pkt, port, service_id , context_id), - timeout=self.sniff_timeout - ) + try: + sniff( # type: ignore + iface=interface, + filter=f"udp port {port}", + prn=lambda pkt: self.packet_callback(pkt, port, service_id), + timeout=self.sniff_timeout + ) + except Exception as ex: + LOGGER.exception(ex) + self.Disconnect() if not self.connected: break # Check if idle period has been exceeded now = time.time() + LOGGER.debug(f"Time now: {self.epoch_to_day_time(now)}") + LOGGER.debug(f"Time last pkt: {self.epoch_to_day_time(self.last_packet_time)}") + diff = now - self.last_packet_time + assert diff > 0, f"Time diff: {diff} sec must be positive" if (now - self.last_packet_time) > self.max_idle_time: self.on_idle_timeout() self.last_packet_time = now # Reset timer after action + LOGGER.info("=== INT Collector Sniffer End") def delivery_callback(self, err, msg): if err: - LOGGER.error('Message delivery failed: {:s}'.format(str(err))) + LOGGER.error(f"Kafka message delivery failed: {str(err)}") + + def epoch_to_day_time(self, ep_time : float): + return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ep_time)) diff --git a/src/telemetry/backend/service/collectors/intcollector/INTCollectorCommon.py b/src/telemetry/backend/service/collectors/int_collector/INTCollectorCommon.py similarity index 74% rename from src/telemetry/backend/service/collectors/intcollector/INTCollectorCommon.py rename to src/telemetry/backend/service/collectors/int_collector/INTCollectorCommon.py index baed12fd29f4707f187c9e3d2377fe29b4f034ad..a8ce410680aabb87192e8d907cbaf25c12436245 100644 --- a/src/telemetry/backend/service/collectors/intcollector/INTCollectorCommon.py +++ b/src/telemetry/backend/service/collectors/int_collector/INTCollectorCommon.py @@ -35,7 +35,7 @@ class IPPacket(Packet): ] class UDPPacket(Packet): - name = "UDPPacket" + name = "UDPDatagram" fields_desc = [ BitField("udp_port_src", 0, 16), BitField("udp_port_dst", 0, 16), @@ -43,37 +43,6 @@ class UDPPacket(Packet): BitField("udp_csum", 0, 16) ] -""" - Private source repo: https://github.com/SNVLab-WUT/int-collector/blob/main/collector/BPFCollector_v0_5.c - fabric-tna repo: https://github.com/stratum/fabric-tna/blob/main/p4src/shared/header.p4 - - Little Endian order - BitField("nproto", 0, 4), - BitField("ver", 0, 4), - BitField("rsvd1", 0, 5), - BitField("f", 0, 1), - BitField("q", 0, 1), - BitField("d", 0, 1), - BitField("hw_id", 0, 6), - BitField("rsvd2", 0, 10), - - BitField("queue_id", 0, 8), - BitField("queue_occupancy", 0, 24), - - Big Endian order - BitField("ver", 0, 4), - BitField("nproto", 0, 4), - BitField("d", 0, 1), - BitField("q", 0, 1), - BitField("f", 0, 1), - BitField("rsvd1", 0, 5), - BitField("rsvd2", 0, 10), - BitField("hw_id", 0, 6), - - BitField("queue_occupancy", 0, 24), - BitField("queue_id", 0, 8), -""" - class IntFixedReport(Packet): name = "IntFixedReport" fields_desc = [ diff --git a/src/telemetry/backend/service/collectors/intcollector/__init__.py b/src/telemetry/backend/service/collectors/int_collector/__init__.py similarity index 100% rename from src/telemetry/backend/service/collectors/intcollector/__init__.py rename to src/telemetry/backend/service/collectors/int_collector/__init__.py