From fa1babd60263f8ddbca404ffd9e426e9e3f984f0 Mon Sep 17 00:00:00 2001 From: kpoulakakis Date: Tue, 17 Jun 2025 15:25:46 +0300 Subject: [PATCH 1/6] feat: new in-band network telemetry collector plugin --- manifests/telemetryservice.yaml | 8 +- proto/kpi_sample_types.proto | 43 ++- proto/telemetry_frontend.proto | 22 +- .../service/MetricWriterToPrometheus.py | 14 +- src/telemetry/backend/Dockerfile | 6 +- .../backend/collector_api/_Collector.py | 2 +- .../collectors/intcollector/INTCollector.py | 356 ++++++++++++++++++ .../intcollector/INTCollectorCommon.py | 103 +++++ .../service/TelemetryBackendService.py | 163 ++++---- src/telemetry/frontend/Dockerfile | 3 + .../TelemetryFrontendServiceServicerImpl.py | 6 +- 11 files changed, 631 insertions(+), 95 deletions(-) create mode 100644 src/telemetry/backend/collectors/intcollector/INTCollector.py create mode 100644 src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 6d2d3087b..2add96516 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -94,10 +94,16 @@ metadata: labels: app: telemetryservice spec: - type: ClusterIP + type: LoadBalancer + loadBalancerIP: 192.168.5.250 + externalTrafficPolicy: Local selector: app: telemetryservice ports: + - name: dnsudp + protocol: UDP + port: 12345 + targetPort: 12345 - name: grpc protocol: TCP port: 30050 diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 142336950..5fcda6df9 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -40,13 +40,38 @@ enum KpiSampleType { KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; -// output KPIs - KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101; - KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102; - KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103; - KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201; - KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202; - KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203; - - KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701; + // output KPIs + KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101; + KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102; + KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103; + KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201; + KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202; + KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203; + + KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701; + + // INT KPIs + KPISAMPLETYPE_INT_SEQ_NUM = 2001; + KPISAMPLETYPE_INT_TS_ING = 2002; + KPISAMPLETYPE_INT_TS_EGR = 2003; + KPISAMPLETYPE_INT_HOP_LAT = 2004; + KPISAMPLETYPE_INT_PORT_ID_ING = 2005; + KPISAMPLETYPE_INT_PORT_ID_EGR = 2006; + KPISAMPLETYPE_INT_QUEUE_OCCUP = 2007; + KPISAMPLETYPE_INT_QUEUE_ID = 2008; + + KPISAMPLETYPE_INT_HOP_LAT_SW01 = 2101; + KPISAMPLETYPE_INT_HOP_LAT_SW02 = 2102; + KPISAMPLETYPE_INT_HOP_LAT_SW03 = 2103; + KPISAMPLETYPE_INT_HOP_LAT_SW04 = 2104; + KPISAMPLETYPE_INT_HOP_LAT_SW05 = 2105; + KPISAMPLETYPE_INT_HOP_LAT_SW06 = 2106; + KPISAMPLETYPE_INT_HOP_LAT_SW07 = 2107; + KPISAMPLETYPE_INT_HOP_LAT_SW08 = 2108; + KPISAMPLETYPE_INT_HOP_LAT_SW09 = 2109; + KPISAMPLETYPE_INT_HOP_LAT_SW10 = 2110; + KPISAMPLETYPE_INT_LAT_ON_TOTAL = 2120; + + KPISAMPLETYPE_INT_IS_DROP = 2201; + KPISAMPLETYPE_INT_DROP_REASON = 2202; } diff --git a/proto/telemetry_frontend.proto b/proto/telemetry_frontend.proto index d1c0420d9..cb92506f0 100644 --- a/proto/telemetry_frontend.proto +++ b/proto/telemetry_frontend.proto @@ -28,13 +28,21 @@ message CollectorId { context.Uuid collector_id = 1; } -message Collector { - CollectorId collector_id = 1; // The Collector ID - kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples - float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely - float interval_s = 4; // Interval between collected samples - context.Timestamp start_time = 5; // Timestamp when Collector start execution - context.Timestamp end_time = 6; // Timestamp when Collector stop execution + message Collector { + CollectorId collector_id = 1; // The Collector ID + kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples + float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely + float interval_s = 4; // Interval between collected samples + context.Timestamp start_time = 5; // Timestamp when Collector start execution + context.Timestamp end_time = 6; // Timestamp when Collector stop execution + INTCollector int_collector = 7; // Extra optional information about INT collectors + } + +message INTCollector { + int32 transport_port = 1; // The port where the collector listens to packets + string interface = 2; // Network interface to collect data from + string service_id = 3; // Service identifier related to this collector + string context_id = 4; // Context identifier related to this collector } message CollectorFilter { diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index e649e4531..290f7b477 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -51,10 +51,10 @@ class MetricWriterToPrometheus: 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, - 'time_stamp' : kpi_value.timestamp.timestamp, - #'time_stamp' : kpi_value["time_stamp"], - 'kpi_value' : kpi_value.kpi_value_type.floatVal - #'kpi_value' : kpi_value["kpi_value"] + # 'time_stamp' : kpi_value.timestamp.timestamp, + 'time_stamp' : kpi_value["time_stamp"], + # 'kpi_value' : kpi_value.kpi_value_type.floatVal + 'kpi_value' : kpi_value["kpi_value"] } LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi @@ -62,12 +62,12 @@ class MetricWriterToPrometheus: def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): # merge both gRPC messages into single varible. cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) - tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} + tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags metric_name = cooked_kpi['kpi_sample_type'] try: if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists - PROM_METRICS[metric_name] = Gauge ( + PROM_METRICS[metric_name] = Gauge ( metric_name, cooked_kpi['kpi_description'], metric_tags, @@ -89,7 +89,6 @@ class MetricWriterToPrometheus: # Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry) LOGGER.debug("Metric pushed to Prometheus Gateway.") - except ValueError as e: if 'Duplicated timeseries' in str(e): LOGGER.debug("Metric {:} is already registered. Skipping.".format(metric_name)) @@ -97,4 +96,3 @@ class MetricWriterToPrometheus: else: LOGGER.error("Error while pushing metric: {}".format(e)) raise - diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile index bbb41c95d..a619be74c 100644 --- a/src/telemetry/backend/Dockerfile +++ b/src/telemetry/backend/Dockerfile @@ -16,7 +16,7 @@ FROM python:3.9-slim # Install dependencies RUN apt-get --yes --quiet --quiet update && \ - apt-get --yes --quiet --quiet install wget g++ git && \ + apt-get --yes --quiet --quiet install wget g++ git libpcap-dev && \ rm -rf /var/lib/apt/lists/* # Set Python to show logs as they occur @@ -31,6 +31,7 @@ RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ RUN python3 -m pip install --upgrade pip RUN python3 -m pip install --upgrade setuptools wheel RUN python3 -m pip install --upgrade pip-tools +RUN python3 -m pip install --upgrade scapy # Get common Python packages # Note: this step enables sharing the previous Docker build steps among all the Python components @@ -76,6 +77,9 @@ COPY src/vnt_manager/__init__.py vnt_manager/__init__.py COPY src/vnt_manager/client/. vnt_manager/client/ COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/backend/. telemetry/backend/ +COPY src/analytics/__init__.py analytics/__init__.py +COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py +COPY src/analytics/frontend/client/. analytics/frontend/client/ # Start the service ENTRYPOINT ["python", "-m", "telemetry.backend.service"] diff --git a/src/telemetry/backend/collector_api/_Collector.py b/src/telemetry/backend/collector_api/_Collector.py index 6b0d2de08..cf984427f 100644 --- a/src/telemetry/backend/collector_api/_Collector.py +++ b/src/telemetry/backend/collector_api/_Collector.py @@ -149,7 +149,7 @@ class _Collector: - True if a resource is successfully subscribed. - Exception if an error occurs during the subscription process. List[Union[bool, Exception]]: - """ + """ raise NotImplementedError() def UnsubscribeState(self, resource_key: str) \ diff --git a/src/telemetry/backend/collectors/intcollector/INTCollector.py b/src/telemetry/backend/collectors/intcollector/INTCollector.py new file mode 100644 index 000000000..678e9cdbd --- /dev/null +++ b/src/telemetry/backend/collectors/intcollector/INTCollector.py @@ -0,0 +1,356 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytz +import queue +import logging +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.executors.pool import ThreadPoolExecutor +from datetime import datetime +from telemetry.backend.collector_api._Collector import _Collector + +from scapy.all import * +import struct +import socket +import ipaddress + +from .INTCollectorCommon import IntDropReport, IntLocalReport, IntFixedReport, FlowInfo +from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor +from confluent_kafka import Producer as KafkaProducer +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from uuid import uuid4 +from typing import Dict +from datetime import datetime, timezone +import json + +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId +from context.client.ContextClient import ContextClient +from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient +from common.proto.kpi_sample_types_pb2 import KpiSampleType +import time +import logging + +LOGGER = logging.getLogger(__name__) + +class INTCollector(_Collector): + + last_packet_time = time.time() # Track last packet time + + max_idle_time = 5 # for how long we tolerate inactivity + sniff_timeout = 3 # how often we stop sniffing to check for inactivity + + """ + INTCollector is a class that simulates a network collector for testing purposes. + It provides functionalities to manage configurations, state subscriptions, and synthetic data generation. + """ + def __init__(self, collector_id: str , address: str, interface: str, port: str, kpi_id: str, service_id: str, context_id: str, **settings): + super().__init__('int_collector', address, port, **settings) + self._out_samples = queue.Queue() # Queue to hold synthetic state samples + self._scheduler = BackgroundScheduler(daemon=True) + self._scheduler.configure( + jobstores = {'default': MemoryJobStore()}, + executors = {'default': ThreadPoolExecutor(max_workers=1)}, + timezone = pytz.utc + ) + self.kafka_producer = KafkaProducer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + self.collector_id = collector_id + self.interface = interface + self.kpi_manager_client = KpiManagerClient() + self.analytics_frontend_client = AnalyticsFrontendClient() + self.context_client = ContextClient() + self.kpi_id = kpi_id + self.service_id = service_id + self.context_id = context_id + self.table = {} + self.connected = False # To track connection state + LOGGER.info("INT Collector initialized") + + def Connect(self) -> bool: + LOGGER.info(f"Connecting to {self.interface}:{self.port}") + self.connected = True + + self._scheduler.add_job(self.sniff_with_restarts_on_idle, id=self.kpi_id ,args=[self.interface , self.port , self.service_id, self.context_id]) + + self._scheduler.start() + LOGGER.info(f"Successfully connected to {self.interface}:{self.port}") + return True + + def Disconnect(self) -> bool: + LOGGER.info(f"Disconnecting from {self.interface}:{self.port}") + if not self.connected: + LOGGER.warning("INT Collector is not connected. Nothing to disconnect.") + return False + + self._scheduler.remove_job(self.kpi_id) + self._scheduler.shutdown() + + self.connected = False + LOGGER.info(f"Successfully disconnected from {self.interface}:{self.port}") + return True + + def on_idle_timeout(self): + LOGGER.info(f"Sniffer idle for more than {self.max_idle_time} seconds.") + LOGGER.debug(f"last_packet_time {self.last_packet_time} seconds.") + + values = [0] + for sw_id in range(1, 6): + sw = self.table.get(sw_id) + self.overwrite_switch_values(sw , values) + + def overwrite_switch_values(self, switch, values): + if not switch: + return + + # Overwrite values using zip + for key, new_value in zip(switch, values): + switch[key] = new_value + + for key, value in switch.items(): + self.send_message_to_kafka(key, value) + + def process_packet(self , packet, port, service_id , context_id): + # global last_packet_time + + # Check for IP layer + if IP not in packet: + return None + ip_layer = packet[IP] + # ip_pkt = IPPacket(ip_layer[:20]) + # ip_pkt.show() + + # Check for UDP + if UDP not in ip_layer: + return None + udp_layer = ip_layer[UDP] + + # Only the INT port + if udp_layer.dport != port: + return None + # udp_dgram = UDPPacket(bytes(udp_layer)) + # udp_dgram.show() + + src_ip = socket.ntohl(struct.unpack(' 0, "Egress timestamp must be > ingress timestamp" + # local_report.show() + + # Create flow info + flow_info = FlowInfo( + src_ip=src_ip, + dst_ip=dst_ip, + src_port=udp_layer.sport, + dst_port=udp_layer.dport, + ip_proto=ip_layer.proto, + flow_sink_time=fixed_report.ingress_timestamp, + num_int_hop=1, + seq_num=fixed_report.seq_num, + switch_id=fixed_report.switch_id, + ingress_timestamp=fixed_report.ingress_timestamp, + ingress_port_id=fixed_report.ingress_port_id, + egress_port_id=fixed_report.egress_port_id, + queue_id=local_report.queue_id if local_report else 0, + queue_occupancy=local_report.queue_occupancy if local_report else 0, + egress_timestamp=local_report.egress_timestamp if local_report else 0, + is_drop=1 if drop_report else 0, + drop_reason=drop_report.drop_reason if drop_report else 0, + hop_latency=lat + ) + LOGGER.debug(f"Flow info: {flow_info}") + + self.create_descriptors_and_send_to_kafka(flow_info , service_id , context_id) + + self.last_packet_time = time.time() + return flow_info + + def set_kpi_descriptor(self , kpi_uuid , service_id , device_id , endpoint_id , sample_type): + kpi_descriptor = KpiDescriptor() + kpi_descriptor.kpi_sample_type = sample_type + kpi_descriptor.service_id.service_uuid.uuid = service_id + # kpi_descriptor.device_id.device_uuid.uuid = device_id + # kpi_descriptor.endpoint_id.endpoint_uuid.uuid = endpoint_id + kpi_descriptor.kpi_id.kpi_id.uuid = kpi_uuid + + kpi_id: KpiId = self.kpi_manager_client.SetKpiDescriptor(kpi_descriptor) + + return kpi_id + + def create_descriptors_and_send_to_kafka(self, flow_info , service_id , context_id): + LOGGER.debug(f"PACKET FROM SWITCH: {flow_info.switch_id} LATENCY: {flow_info.hop_latency}") + if(self.table.get(flow_info.switch_id) == None): + # seq_num_kpi_id = str(uuid4()) + ingress_ts_kpi_id = str(uuid4()) + egress_ts_kpi_id = str(uuid4()) + hop_lat_kpi_id = str(uuid4()) + ing_port_id_kpi_id = str(uuid4()) + egr_port_id_kpi_id = str(uuid4()) + queue_occup_kpi_id = str(uuid4()) + is_drop_kpi_id = str(uuid4()) + sw_lat_kpi_id = str(uuid4()) + + # LOGGER.debug(f"seq_num_kpi_id for switch {flow_info.switch_id}: {seq_num_kpi_id}") + # LOGGER.debug(f"ingress_ts_kpi_id for switch {flow_info.switch_id}: {ingress_ts_kpi_id}") + # LOGGER.debug(f"egress_ts_kpi_id for switch {flow_info.switch_id}: {egress_ts_kpi_id}") + # LOGGER.debug(f"hop_lat_kpi_id for switch {flow_info.switch_id}: {hop_lat_kpi_id}") + # LOGGER.debug(f"ing_port_id_kpi_id for switch {flow_info.switch_id}: {ing_port_id_kpi_id}") + # LOGGER.debug(f"egr_port_id_kpi_id for switch {flow_info.switch_id}: {egr_port_id_kpi_id}") + # LOGGER.debug(f"queue_occup_kpi_id for switch {flow_info.switch_id}: {queue_occup_kpi_id}") + # LOGGER.debug(f"is_drop_kpi_id for switch {flow_info.switch_id}: {is_drop_kpi_id}") + # LOGGER.debug(f"sw_lat_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}") + + # seq_num_kpi = self.set_kpi_descriptor(seq_num_kpi_id, service_id ,'', '', KpiSampleType.KPISAMPLETYPE_INT_SEQ_NUM) + # ingress_timestamp_kpi = self.set_kpi_descriptor(ingress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_ING) + # egress_timestamp_kpi = self.set_kpi_descriptor(egress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_EGR) + # hop_latency_kpi = self.set_kpi_descriptor(hop_lat_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT) + # ingress_port_id_kpi = self.set_kpi_descriptor(ing_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_ING) + # egress_port_id_kpi = self.set_kpi_descriptor(egr_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_EGR) + # queue_occup_kpi = self.set_kpi_descriptor(queue_occup_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_QUEUE_OCCUP) + # is_drop_kpi = self.set_kpi_descriptor(is_drop_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_IS_DROP) + + # Set a dedicated KPI descriptor for every switch + sw_lat_kpi = None + sw_sample_types = [ + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW01, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW02, + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW03, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW04, + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW05, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW06, + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW07, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW08, + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW09, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW10 + ] + for i, sw_id in enumerate(range(1, 11)): + if flow_info.switch_id == sw_id: + LOGGER.debug(f"SET KPI : seq_num_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}") + sw_lat_kpi = self.set_kpi_descriptor(sw_lat_kpi_id, service_id, '', '', sw_sample_types[i]) + + # Gather keys and values + keys = [ + # seq_num_kpi.kpi_id.uuid, + # ingress_timestamp_kpi.kpi_id.uuid, + # egress_timestamp_kpi.kpi_id.uuid, + # hop_latency_kpi.kpi_id.uuid, + # ingress_port_id_kpi.kpi_id.uuid, + # egress_port_id_kpi.kpi_id.uuid, + # queue_occup_kpi.kpi_id.uuid, + # is_drop_kpi.kpi_id.uuid, + sw_lat_kpi.kpi_id.uuid + ] + values = [ + # flow_info.seq_num, + # flow_info.ingress_timestamp, + # flow_info.egress_timestamp, + # flow_info.hop_latency, + # flow_info.ingress_port_id, + # flow_info.egress_port_id, + # flow_info.queue_occupancy, + # flow_info.is_drop, + flow_info.hop_latency + ] + assert len(keys) == len(values), "KPI keys and values must agree" + switch = {keys[i]: values[i] for i in range(len(keys))} + + self.table[flow_info.switch_id] = switch + + # Dispatch to Kafka + for key, value in switch.items(): + self.send_message_to_kafka(key, value) + else: + values = [ + # flow_info.seq_num, + # flow_info.ingress_timestamp, + # flow_info.egress_timestamp, + # flow_info.hop_latency, + # flow_info.ingress_port_id, + # flow_info.egress_port_id, + # flow_info.queue_occupancy, + # flow_info.is_drop, + flow_info.hop_latency + ] + switch = self.table.get(flow_info.switch_id) + + # Overwrite values using zip + self.overwrite_switch_values(switch , values) + + def send_message_to_kafka(self , kpi_id , measured_kpi_value): + producer = self.kafka_producer + kpi_value: Dict = { + "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "kpi_id": kpi_id, + "kpi_value": measured_kpi_value + } + producer.produce( + KafkaTopic.VALUE.value, + key=self.collector_id, + value=json.dumps(kpi_value), + callback=self.delivery_callback + ) + producer.flush() + LOGGER.debug(f"Message with kpi_id: {kpi_id} was send to kafka!") + + def packet_callback(self, packet, port , service_id,context_id): + flow_info = self.process_packet(packet , port , service_id, context_id) + if flow_info: + LOGGER.debug(f"Flow info: {flow_info}") + + def sniff_with_restarts_on_idle(self, interface, port, service_id , context_id): + while True: + # Run sniff for a short period to periodically check for idle timeout + sniff( + iface=interface, + filter=f"udp port {port}", + prn=lambda pkt: self.packet_callback(pkt, port, service_id , context_id), + timeout=self.sniff_timeout + ) + + if not self.connected: + break + + # Check if idle period has been exceeded + now = time.time() + if (now - self.last_packet_time) > self.max_idle_time: + self.on_idle_timeout() + self.last_packet_time = now # Reset timer after action + + def delivery_callback(self, err, msg): + if err: + LOGGER.error('Message delivery failed: {:s}'.format(str(err))) diff --git a/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py b/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py new file mode 100644 index 000000000..085221431 --- /dev/null +++ b/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py @@ -0,0 +1,103 @@ +from scapy.all import Packet, BitField +from collections import namedtuple + +class IPPacket(Packet): + name = "IPPacket" + fields_desc = [ + BitField("ip_ver", 0, 4), + BitField("ip_ihl", 0, 4), + BitField("ip_dscp", 0, 6), + BitField("ip_ecn", 0, 2), + BitField("ip_len", 0, 16), + BitField("ip_id", 0, 16), + BitField("ip_flags", 0, 3), + BitField("ip_frag", 0, 13), + BitField("ip_ttl", 0, 8), + BitField("ip_proto", 0, 8), + BitField("ip_csum", 0, 16), + BitField("ip_src", 0, 32), + BitField("ip_dst", 0, 32) + ] + +class UDPPacket(Packet): + name = "UDPPacket" + fields_desc = [ + BitField("udp_port_src", 0, 16), + BitField("udp_port_dst", 0, 16), + BitField("udp_len", 0, 16), + BitField("udp_csum", 0, 16) + ] + +""" + Private source repo: https://github.com/SNVLab-WUT/int-collector/blob/main/collector/BPFCollector_v0_5.c + fabric-tna repo: https://github.com/stratum/fabric-tna/blob/main/p4src/shared/header.p4 + + Little Endian order + BitField("nproto", 0, 4), + BitField("ver", 0, 4), + BitField("rsvd1", 0, 5), + BitField("f", 0, 1), + BitField("q", 0, 1), + BitField("d", 0, 1), + BitField("hw_id", 0, 6), + BitField("rsvd2", 0, 10), + + BitField("queue_id", 0, 8), + BitField("queue_occupancy", 0, 24), + + Big Endian order + BitField("ver", 0, 4), + BitField("nproto", 0, 4), + BitField("d", 0, 1), + BitField("q", 0, 1), + BitField("f", 0, 1), + BitField("rsvd1", 0, 5), + BitField("rsvd2", 0, 10), + BitField("hw_id", 0, 6), + + BitField("queue_occupancy", 0, 24), + BitField("queue_id", 0, 8), +""" + +class IntFixedReport(Packet): + name = "IntFixedReport" + fields_desc = [ + BitField("ver", 0, 4), + BitField("nproto", 0, 4), + BitField("d", 0, 1), + BitField("q", 0, 1), + BitField("f", 0, 1), + BitField("rsvd1", 0, 5), + BitField("rsvd2", 0, 10), + BitField("hw_id", 0, 6), + BitField("seq_num", 0, 32), + BitField("ingress_timestamp", 0, 32), + BitField("switch_id", 0, 32), + BitField("ingress_port_id", 0, 16), + BitField("egress_port_id", 0, 16) + ] + +class IntLocalReport(Packet): + name = "IntLocalReport" + fields_desc = [ + BitField("queue_occupancy", 0, 24), + BitField("queue_id", 0, 8), + BitField("egress_timestamp", 0, 32) + ] + +class IntDropReport(Packet): + name = "IntDropReport" + fields_desc = [ + BitField("queue_id", 0 , 8), + BitField("drop_reason", 0, 8), + BitField("_pad", 0, 16) + ] + +# Flow information structure +FlowInfo = namedtuple('FlowInfo', [ + 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'ip_proto', + 'flow_sink_time', 'num_int_hop', 'seq_num', 'switch_id', + 'ingress_timestamp', 'ingress_port_id', 'egress_port_id', 'queue_id', + 'queue_occupancy', 'egress_timestamp', 'is_drop', 'drop_reason', + 'hop_latency' +]) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 2952bd19d..2a81fe3e7 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -16,24 +16,25 @@ import json import time import logging import threading -from typing import Any, Dict, Tuple -from datetime import datetime, timezone -from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import Consumer as KafkaConsumer -from confluent_kafka import KafkaError +from typing import Any, Dict, Tuple +from datetime import datetime, timezone +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum -from common.Settings import get_service_port_grpc -from common.method_wrappers.Decorator import MetricsPool -from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from common.Settings import get_service_port_grpc +from common.method_wrappers.Decorator import MetricsPool +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService -from common.tools.context_queries.Device import get_device -from common.proto.kpi_manager_pb2 import KpiId +from common.tools.context_queries.Device import get_device +from common.proto.kpi_manager_pb2 import KpiId -from kpi_manager.client.KpiManagerClient import KpiManagerClient -from context.client.ContextClient import ContextClient +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from context.client.ContextClient import ContextClient from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector +from telemetry.backend.collectors.intcollector.INTCollector import INTCollector -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): @@ -41,16 +42,17 @@ class TelemetryBackendService(GenericGrpcService): Class listens for request on Kafka topic, fetches requested metrics from device. Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics. """ - def __init__(self, cls_name : str = __name__) -> None: + + def __init__(self, cls_name: str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND) super().__init__(port, cls_name=cls_name) - self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'backend', - 'auto.offset.reset' : 'latest'}) - self.collector = None - self.context_client = ContextClient() + self.kafka_producer = KafkaProducer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers': KafkaConfig.get_kafka_address(), + 'group.id': 'backend', + 'auto.offset.reset': 'latest'}) + self.collector = None + self.context_client = ContextClient() self.kpi_manager_client = KpiManagerClient() self.active_jobs = {} @@ -73,12 +75,13 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: - LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") + LOGGER.warning( + f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) break - try: + try: collector = json.loads( receive_msg.value().decode('utf-8') ) @@ -93,33 +96,39 @@ class TelemetryBackendService(GenericGrpcService): if collector_id not in self.active_jobs: stop_event = threading.Event() self.active_jobs[collector_id] = stop_event - threading.Thread(target = self.CollectorHandler, - args=( - collector_id, - collector['kpi_id'], - duration, - collector['interval'], - stop_event - )).start() + threading.Thread(target=self.CollectorHandler, + args=( + collector_id, + collector['kpi_id'], + duration, + collector['interval'], + collector['interface'], + collector['transport_port'], + collector['service_id'], + collector['context_id'], + stop_event + )).start() # Stop the Collector after the given duration if duration > 0: def stop_after_duration(completion_time, stop_event): time.sleep(completion_time) if not stop_event.is_set(): - LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}") + LOGGER.warning( + f"Execution duration ({completion_time}) completed of Collector: {collector_id}") self.TerminateCollector(collector_id) - + duration_thread = threading.Thread( target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}", - args=(duration, stop_event) - ) + args=(duration, stop_event) + ) duration_thread.start() else: LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id)) except Exception as e: - LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) + LOGGER.warning( + "Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) - def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event): + def CollectorHandler(self, collector_id, kpi_id, duration, interval, interface , port , service_id , context_id , stop_event): """ Method to handle collector request. """ @@ -133,48 +142,65 @@ class TelemetryBackendService(GenericGrpcService): LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) subscription = [collector_id, end_points, duration, interval] self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event) + elif device_type and "p4" in device_type: + LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) + subscription = [collector_id, end_points, duration, interval] + self.INTCollectorHandler(subscription, duration, collector_id, kpi_id, interface, port, service_id , context_id ,stop_event) else: LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type)) def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event): - # EmulatedCollector - - self.collector = EmulatedCollector(address="127.0.0.1", port=8000) - self.collector.Connect() - if not self.collector.SubscribeState(subscription): - LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) - else: - while not stop_event.is_set(): - samples = list(self.collector.GetState(duration=duration, blocking=True)) - LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples)) - self.GenerateKpiValue(collector_id, kpi_id, samples) - time.sleep(1) + # EmulatedCollector + + self.collector = EmulatedCollector(address="127.0.0.1", port=8000) + self.collector.Connect() + if not self.collector.SubscribeState(subscription): + LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) + else: + while not stop_event.is_set(): + samples = list(self.collector.GetState(duration=duration, blocking=True)) + LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples)) + self.GenerateKpiValue(collector_id, kpi_id, samples) + time.sleep(1) + self.collector.Disconnect() + # self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration. + + def INTCollectorHandler(self, subscription, duration, collector_id, kpi_id, interface, port, service_id, context_id , stop_event): + self.collector = INTCollector( + collector_id=collector_id , + address="127.0.0.1", + interface=interface, + port=port, + kpi_id=kpi_id, + service_id=service_id, + context_id=context_id + ) + self.collector.Connect() + if stop_event.is_set(): self.collector.Disconnect() - # self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration. def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ Method to write kpi value on VALUE Kafka topic """ producer = self.kafka_producer - kpi_value : Dict = { + kpi_value: Dict = { "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), - "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value + "kpi_id": kpi_id, + "kpi_value": measured_kpi_value } producer.produce( KafkaTopic.VALUE.value, - key = collector_id, - value = json.dumps(kpi_value), - callback = self.delivery_callback + key=collector_id, + value=json.dumps(kpi_value), + callback=self.delivery_callback ) producer.flush() def TerminateCollector(self, job_id): LOGGER.debug("Terminating collector backend...") try: - if job_id not in self.active_jobs: # not job_ids: - # self.logger.warning(f"Active jobs: {self.active_jobs}") + if job_id not in self.active_jobs: self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.") else: LOGGER.info(f"Terminating job: {job_id}") @@ -202,18 +228,21 @@ class TelemetryBackendService(GenericGrpcService): LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...") return (None, None) - device_id = kpi_descriptor.device_id.device_uuid.uuid + LOGGER.warning(f"kpi_descriptor {kpi_descriptor} ") + LOGGER.warning(f"kpi_descriptor.device_id.device_uuid.uuid {kpi_descriptor.device_id.device_uuid.uuid} ") + + device_id = kpi_descriptor.device_id.device_uuid.uuid endpoint_id = kpi_descriptor.endpoint_id.endpoint_uuid.uuid - device = get_device( context_client = self.context_client, - device_uuid = device_id, - include_config_rules = False, - include_components = False, - ) + device = get_device(context_client=self.context_client, + device_uuid=device_id, + include_config_rules=False, + include_components=False, + ) if device: for endpoint in device.device_endpoints: if endpoint.endpoint_id.endpoint_uuid.uuid == endpoint_id: - endpoint_dict = {} - kpi_sample_types = [] + endpoint_dict = {} + kpi_sample_types = [] endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_dict["name"] = endpoint.name endpoint_dict["type"] = endpoint.endpoint_type @@ -222,10 +251,10 @@ class TelemetryBackendService(GenericGrpcService): endpoint_dict["sample_types"] = kpi_sample_types return (device.device_type, endpoint_dict) - + LOGGER.warning(f"Device ID: {device_id} - Endpoint ID: {endpoint_id} - Not Found") return (None, None) def delivery_callback(self, err, msg): - if err: + if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err))) diff --git a/src/telemetry/frontend/Dockerfile b/src/telemetry/frontend/Dockerfile index 74a0a87a0..dc90b7d16 100644 --- a/src/telemetry/frontend/Dockerfile +++ b/src/telemetry/frontend/Dockerfile @@ -65,6 +65,9 @@ WORKDIR /var/teraflow COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/frontend/. telemetry/frontend/ COPY src/telemetry/database/. telemetry/database/ +COPY src/analytics/__init__.py analytics/__init__.py +COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py +COPY src/analytics/frontend/client/. analytics/frontend/client/ # Start the service ENTRYPOINT ["python", "-m", "telemetry.frontend.service"] diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 123c10a65..aa7b0b921 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -69,7 +69,11 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): collector_to_generate : Dict = { "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, "duration": collector_obj.duration_s, - "interval": collector_obj.interval_s + "interval": collector_obj.interval_s, + "interface": collector_obj.int_collector.interface, + "transport_port": collector_obj.int_collector.transport_port, + "service_id": collector_obj.int_collector.service_id, + "context_id": collector_obj.int_collector.context_id } self.kafka_producer.produce( KafkaTopic.TELEMETRY_REQUEST.value, -- GitLab From c13aa29df51d76054fbedc1c60a5f8205ba25db2 Mon Sep 17 00:00:00 2001 From: kpoulakakis Date: Fri, 20 Jun 2025 12:24:24 +0300 Subject: [PATCH 2/6] feat: add metallb configuration for telemetry service. --- deploy/tfs.sh | 11 +++++++++++ manifests/telemetryservice.yaml | 2 +- my_deploy.sh | 5 +++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/deploy/tfs.sh b/deploy/tfs.sh index b73bbbf81..ea6923155 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -123,6 +123,11 @@ export KFK_SERVER_PORT=${KFK_SERVER_PORT:-"9092"} # If not already set, if flag is YES, Apache Kafka will be redeployed and topic will be lost. export KFK_REDEPLOY=${KFK_REDEPLOY:-""} +# ----- Telemetry Config ------------------------------------------------------ + +# Replace LOAD_BALANCER_IP +export LOAD_BALANCER_IP=${LOAD_BALANCER_IP:-""} + ######################################################################################################################## # Automated steps start here ######################################################################################################################## @@ -241,6 +246,9 @@ else fi printf "\n" +# Before deployment change the telemetry LOAD_BALANCER_IP +sed -i "s|_LOAD_BALANCER_IP_|$LOAD_BALANCER_IP|g" manifests/telemetryservice.yaml + for COMPONENT in $TFS_COMPONENTS; do echo "Processing '$COMPONENT' component..." @@ -623,6 +631,9 @@ if [[ "$TFS_COMPONENTS" == *"monitoring"* ]] && [[ "$TFS_COMPONENTS" == *"webui" printf "\n\n" fi +# Revert _LOAD_BALANCER_IP_ for the next deployment +sed -i "s|$LOAD_BALANCER_IP|_LOAD_BALANCER_IP_|g" manifests/telemetryservice.yaml + echo "Pruning Docker Images..." docker image prune --force printf "\n\n" diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 2add96516..1a8794dc7 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -95,7 +95,7 @@ metadata: app: telemetryservice spec: type: LoadBalancer - loadBalancerIP: 192.168.5.250 + loadBalancerIP: externalTrafficPolicy: Local selector: app: telemetryservice diff --git a/my_deploy.sh b/my_deploy.sh index 4d3820f41..7ba0c9c38 100644 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -211,3 +211,8 @@ export KFK_SERVER_PORT="9092" # Set the flag to YES for redeploying of Apache Kafka export KFK_REDEPLOY="" + +# ----- Telemetry Config ------------------------------------------------------ + +# Replace LOAD_BALANCER_IP +export LOAD_BALANCER_IP="192.168.5.250" -- GitLab From cce3bb2290060672c2e9dfe9863222e7f9242777 Mon Sep 17 00:00:00 2001 From: kpoulakakis Date: Wed, 25 Jun 2025 16:41:14 +0300 Subject: [PATCH 3/6] feat: add metallb configuration. Uncomment metrics logs and values. --- metallb.yaml | 14 +++ .../collectors/intcollector/INTCollector.py | 86 +++++++++---------- 2 files changed, 57 insertions(+), 43 deletions(-) create mode 100644 metallb.yaml diff --git a/metallb.yaml b/metallb.yaml new file mode 100644 index 000000000..c9dbd73dc --- /dev/null +++ b/metallb.yaml @@ -0,0 +1,14 @@ +apiVersion: metallb.io/v1beta1 +kind: IPAddressPool +metadata: + name: my-ip-pool + namespace: metallb-system +spec: + addresses: + - 192.168.5.250-192.168.5.251 # <-- Change this to match your network +--- +apiVersion: metallb.io/v1beta1 +kind: L2Advertisement +metadata: + name: l2-adv + namespace: metallb-system diff --git a/src/telemetry/backend/collectors/intcollector/INTCollector.py b/src/telemetry/backend/collectors/intcollector/INTCollector.py index 678e9cdbd..e4958900a 100644 --- a/src/telemetry/backend/collectors/intcollector/INTCollector.py +++ b/src/telemetry/backend/collectors/intcollector/INTCollector.py @@ -220,7 +220,7 @@ class INTCollector(_Collector): def create_descriptors_and_send_to_kafka(self, flow_info , service_id , context_id): LOGGER.debug(f"PACKET FROM SWITCH: {flow_info.switch_id} LATENCY: {flow_info.hop_latency}") if(self.table.get(flow_info.switch_id) == None): - # seq_num_kpi_id = str(uuid4()) + seq_num_kpi_id = str(uuid4()) ingress_ts_kpi_id = str(uuid4()) egress_ts_kpi_id = str(uuid4()) hop_lat_kpi_id = str(uuid4()) @@ -230,24 +230,24 @@ class INTCollector(_Collector): is_drop_kpi_id = str(uuid4()) sw_lat_kpi_id = str(uuid4()) - # LOGGER.debug(f"seq_num_kpi_id for switch {flow_info.switch_id}: {seq_num_kpi_id}") - # LOGGER.debug(f"ingress_ts_kpi_id for switch {flow_info.switch_id}: {ingress_ts_kpi_id}") - # LOGGER.debug(f"egress_ts_kpi_id for switch {flow_info.switch_id}: {egress_ts_kpi_id}") - # LOGGER.debug(f"hop_lat_kpi_id for switch {flow_info.switch_id}: {hop_lat_kpi_id}") - # LOGGER.debug(f"ing_port_id_kpi_id for switch {flow_info.switch_id}: {ing_port_id_kpi_id}") - # LOGGER.debug(f"egr_port_id_kpi_id for switch {flow_info.switch_id}: {egr_port_id_kpi_id}") - # LOGGER.debug(f"queue_occup_kpi_id for switch {flow_info.switch_id}: {queue_occup_kpi_id}") - # LOGGER.debug(f"is_drop_kpi_id for switch {flow_info.switch_id}: {is_drop_kpi_id}") - # LOGGER.debug(f"sw_lat_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}") - - # seq_num_kpi = self.set_kpi_descriptor(seq_num_kpi_id, service_id ,'', '', KpiSampleType.KPISAMPLETYPE_INT_SEQ_NUM) - # ingress_timestamp_kpi = self.set_kpi_descriptor(ingress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_ING) - # egress_timestamp_kpi = self.set_kpi_descriptor(egress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_EGR) - # hop_latency_kpi = self.set_kpi_descriptor(hop_lat_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT) - # ingress_port_id_kpi = self.set_kpi_descriptor(ing_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_ING) - # egress_port_id_kpi = self.set_kpi_descriptor(egr_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_EGR) - # queue_occup_kpi = self.set_kpi_descriptor(queue_occup_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_QUEUE_OCCUP) - # is_drop_kpi = self.set_kpi_descriptor(is_drop_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_IS_DROP) + LOGGER.debug(f"seq_num_kpi_id for switch {flow_info.switch_id}: {seq_num_kpi_id}") + LOGGER.debug(f"ingress_ts_kpi_id for switch {flow_info.switch_id}: {ingress_ts_kpi_id}") + LOGGER.debug(f"egress_ts_kpi_id for switch {flow_info.switch_id}: {egress_ts_kpi_id}") + LOGGER.debug(f"hop_lat_kpi_id for switch {flow_info.switch_id}: {hop_lat_kpi_id}") + LOGGER.debug(f"ing_port_id_kpi_id for switch {flow_info.switch_id}: {ing_port_id_kpi_id}") + LOGGER.debug(f"egr_port_id_kpi_id for switch {flow_info.switch_id}: {egr_port_id_kpi_id}") + LOGGER.debug(f"queue_occup_kpi_id for switch {flow_info.switch_id}: {queue_occup_kpi_id}") + LOGGER.debug(f"is_drop_kpi_id for switch {flow_info.switch_id}: {is_drop_kpi_id}") + LOGGER.debug(f"sw_lat_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}") + + seq_num_kpi = self.set_kpi_descriptor(seq_num_kpi_id, service_id ,'', '', KpiSampleType.KPISAMPLETYPE_INT_SEQ_NUM) + ingress_timestamp_kpi = self.set_kpi_descriptor(ingress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_ING) + egress_timestamp_kpi = self.set_kpi_descriptor(egress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_EGR) + hop_latency_kpi = self.set_kpi_descriptor(hop_lat_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT) + ingress_port_id_kpi = self.set_kpi_descriptor(ing_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_ING) + egress_port_id_kpi = self.set_kpi_descriptor(egr_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_EGR) + queue_occup_kpi = self.set_kpi_descriptor(queue_occup_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_QUEUE_OCCUP) + is_drop_kpi = self.set_kpi_descriptor(is_drop_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_IS_DROP) # Set a dedicated KPI descriptor for every switch sw_lat_kpi = None @@ -265,25 +265,25 @@ class INTCollector(_Collector): # Gather keys and values keys = [ - # seq_num_kpi.kpi_id.uuid, - # ingress_timestamp_kpi.kpi_id.uuid, - # egress_timestamp_kpi.kpi_id.uuid, - # hop_latency_kpi.kpi_id.uuid, - # ingress_port_id_kpi.kpi_id.uuid, - # egress_port_id_kpi.kpi_id.uuid, - # queue_occup_kpi.kpi_id.uuid, - # is_drop_kpi.kpi_id.uuid, + seq_num_kpi.kpi_id.uuid, + ingress_timestamp_kpi.kpi_id.uuid, + egress_timestamp_kpi.kpi_id.uuid, + hop_latency_kpi.kpi_id.uuid, + ingress_port_id_kpi.kpi_id.uuid, + egress_port_id_kpi.kpi_id.uuid, + queue_occup_kpi.kpi_id.uuid, + is_drop_kpi.kpi_id.uuid, sw_lat_kpi.kpi_id.uuid ] values = [ - # flow_info.seq_num, - # flow_info.ingress_timestamp, - # flow_info.egress_timestamp, - # flow_info.hop_latency, - # flow_info.ingress_port_id, - # flow_info.egress_port_id, - # flow_info.queue_occupancy, - # flow_info.is_drop, + flow_info.seq_num, + flow_info.ingress_timestamp, + flow_info.egress_timestamp, + flow_info.hop_latency, + flow_info.ingress_port_id, + flow_info.egress_port_id, + flow_info.queue_occupancy, + flow_info.is_drop, flow_info.hop_latency ] assert len(keys) == len(values), "KPI keys and values must agree" @@ -296,14 +296,14 @@ class INTCollector(_Collector): self.send_message_to_kafka(key, value) else: values = [ - # flow_info.seq_num, - # flow_info.ingress_timestamp, - # flow_info.egress_timestamp, - # flow_info.hop_latency, - # flow_info.ingress_port_id, - # flow_info.egress_port_id, - # flow_info.queue_occupancy, - # flow_info.is_drop, + flow_info.seq_num, + flow_info.ingress_timestamp, + flow_info.egress_timestamp, + flow_info.hop_latency, + flow_info.ingress_port_id, + flow_info.egress_port_id, + flow_info.queue_occupancy, + flow_info.is_drop, flow_info.hop_latency ] switch = self.table.get(flow_info.switch_id) -- GitLab From 45dc1788c3c77539359d21eba86e57ac6839b189 Mon Sep 17 00:00:00 2001 From: kpoulakakis Date: Thu, 17 Jul 2025 14:11:25 +0300 Subject: [PATCH 4/6] Load Balancer env variable. --- manifests/telemetryservice.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 1a8794dc7..935267250 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -95,7 +95,7 @@ metadata: app: telemetryservice spec: type: LoadBalancer - loadBalancerIP: + loadBalancerIP: _LOAD_BALANCER_IP_ externalTrafficPolicy: Local selector: app: telemetryservice -- GitLab From 0bc9841b5bc323559b86c9e79386049d9cc4e72c Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Fri, 26 Sep 2025 09:29:09 +0000 Subject: [PATCH 5/6] Manifests: - Moved MetalLB config YAML to manifests/ folder --- manifests/metallb.yaml | 30 ++++++++++++++++++++++++++++++ metallb.yaml | 14 -------------- 2 files changed, 30 insertions(+), 14 deletions(-) create mode 100644 manifests/metallb.yaml delete mode 100644 metallb.yaml diff --git a/manifests/metallb.yaml b/manifests/metallb.yaml new file mode 100644 index 000000000..162768aca --- /dev/null +++ b/manifests/metallb.yaml @@ -0,0 +1,30 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Use this YAML to change configuration of MetalLB in real time. + +apiVersion: metallb.io/v1beta1 +kind: IPAddressPool +metadata: + name: my-ip-pool + namespace: metallb-system +spec: + addresses: + - 192.168.5.250-192.168.5.251 # <-- Change this to match your network +--- +apiVersion: metallb.io/v1beta1 +kind: L2Advertisement +metadata: + name: l2-adv + namespace: metallb-system diff --git a/metallb.yaml b/metallb.yaml deleted file mode 100644 index c9dbd73dc..000000000 --- a/metallb.yaml +++ /dev/null @@ -1,14 +0,0 @@ -apiVersion: metallb.io/v1beta1 -kind: IPAddressPool -metadata: - name: my-ip-pool - namespace: metallb-system -spec: - addresses: - - 192.168.5.250-192.168.5.251 # <-- Change this to match your network ---- -apiVersion: metallb.io/v1beta1 -kind: L2Advertisement -metadata: - name: l2-adv - namespace: metallb-system -- GitLab From 5e8e34c4e2386755cb0d8fbed3fa8bc1fe6862df Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Fri, 26 Sep 2025 09:39:56 +0000 Subject: [PATCH 6/6] Telemetry component - Backend: - Fixed file headers --- .../collectors/intcollector/INTCollector.py | 3 ++- .../collectors/intcollector/INTCollectorCommon.py | 15 +++++++++++++++ .../backend/collectors/intcollector/__init__.py | 13 +++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 src/telemetry/backend/collectors/intcollector/__init__.py diff --git a/src/telemetry/backend/collectors/intcollector/INTCollector.py b/src/telemetry/backend/collectors/intcollector/INTCollector.py index e4958900a..9d89827f4 100644 --- a/src/telemetry/backend/collectors/intcollector/INTCollector.py +++ b/src/telemetry/backend/collectors/intcollector/INTCollector.py @@ -1,4 +1,4 @@ -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + import pytz import queue import logging diff --git a/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py b/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py index 085221431..baed12fd2 100644 --- a/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py +++ b/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py @@ -1,3 +1,18 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + from scapy.all import Packet, BitField from collections import namedtuple diff --git a/src/telemetry/backend/collectors/intcollector/__init__.py b/src/telemetry/backend/collectors/intcollector/__init__.py new file mode 100644 index 000000000..7363515f0 --- /dev/null +++ b/src/telemetry/backend/collectors/intcollector/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -- GitLab