diff --git a/deploy/tfs.sh b/deploy/tfs.sh index 917edb6fa86beafe864f8d2cd3cf5193fb4d4e67..49c2c69c99a76a8d3cec84f5d56ef36fab3cd2fa 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -97,6 +97,11 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} export KFK_NAMESPACE=${KFK_NAMESPACE:-"kafka"} +# ----- Telemetry Config ------------------------------------------------------ + +# Replace LOAD_BALANCER_IP +export LOAD_BALANCER_IP=${LOAD_BALANCER_IP:-""} + ######################################################################################################################## # Automated steps start here ######################################################################################################################## @@ -215,6 +220,9 @@ else fi printf "\n" +# Before deployment change the telemetry LOAD_BALANCER_IP +sed -i "s|_LOAD_BALANCER_IP_|$LOAD_BALANCER_IP|g" manifests/telemetryservice.yaml + for COMPONENT in $TFS_COMPONENTS; do echo "Processing '$COMPONENT' component..." @@ -597,6 +605,9 @@ if [[ "$TFS_COMPONENTS" == *"monitoring"* ]] && [[ "$TFS_COMPONENTS" == *"webui" printf "\n\n" fi +# Revert _LOAD_BALANCER_IP_ for the next deployment +sed -i "s|$LOAD_BALANCER_IP|_LOAD_BALANCER_IP_|g" manifests/telemetryservice.yaml + echo "Pruning Docker Images..." docker image prune --force printf "\n\n" diff --git a/manifests/metallb.yaml b/manifests/metallb.yaml new file mode 100644 index 0000000000000000000000000000000000000000..162768aca24df3a0f1b37362c65687e7f2da565b --- /dev/null +++ b/manifests/metallb.yaml @@ -0,0 +1,30 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Use this YAML to change configuration of MetalLB in real time. + +apiVersion: metallb.io/v1beta1 +kind: IPAddressPool +metadata: + name: my-ip-pool + namespace: metallb-system +spec: + addresses: + - 192.168.5.250-192.168.5.251 # <-- Change this to match your network +--- +apiVersion: metallb.io/v1beta1 +kind: L2Advertisement +metadata: + name: l2-adv + namespace: metallb-system diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index 6d2d3087b3b96550871a0205d9c2f90ec1d07285..935267250169a3dd13d37afb8a0f17aab50b5c76 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -94,10 +94,16 @@ metadata: labels: app: telemetryservice spec: - type: ClusterIP + type: LoadBalancer + loadBalancerIP: _LOAD_BALANCER_IP_ + externalTrafficPolicy: Local selector: app: telemetryservice ports: + - name: dnsudp + protocol: UDP + port: 12345 + targetPort: 12345 - name: grpc protocol: TCP port: 30050 diff --git a/my_deploy.sh b/my_deploy.sh index 662dc389b123daabe02bedf2f43232edde8f3bc3..f4f8d203e718af3655bd27f72edd1b1af629b895 100644 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -215,3 +215,9 @@ export PROM_EXT_PORT_HTTP="9090" # Set the external port Grafana HTTP Dashboards will be exposed to. export GRAF_EXT_PORT_HTTP="3000" + + +# ----- Telemetry Config ------------------------------------------------------ + +# Define a Load Balancer IP for Telemetry Collector components +export LOAD_BALANCER_IP="192.168.5.250" # <-- Change this to match your network diff --git a/proto/kpi_sample_types.proto b/proto/kpi_sample_types.proto index 142336950c9e49badce911020a4a2dd1db4e6221..5fcda6df95c6797b4ae03ca72ec78d114e965cbd 100644 --- a/proto/kpi_sample_types.proto +++ b/proto/kpi_sample_types.proto @@ -40,13 +40,38 @@ enum KpiSampleType { KPISAMPLETYPE_SERVICE_LATENCY_MS = 701; -// output KPIs - KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101; - KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102; - KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103; - KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201; - KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202; - KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203; - - KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701; + // output KPIs + KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101; + KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102; + KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103; + KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201; + KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202; + KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203; + + KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701; + + // INT KPIs + KPISAMPLETYPE_INT_SEQ_NUM = 2001; + KPISAMPLETYPE_INT_TS_ING = 2002; + KPISAMPLETYPE_INT_TS_EGR = 2003; + KPISAMPLETYPE_INT_HOP_LAT = 2004; + KPISAMPLETYPE_INT_PORT_ID_ING = 2005; + KPISAMPLETYPE_INT_PORT_ID_EGR = 2006; + KPISAMPLETYPE_INT_QUEUE_OCCUP = 2007; + KPISAMPLETYPE_INT_QUEUE_ID = 2008; + + KPISAMPLETYPE_INT_HOP_LAT_SW01 = 2101; + KPISAMPLETYPE_INT_HOP_LAT_SW02 = 2102; + KPISAMPLETYPE_INT_HOP_LAT_SW03 = 2103; + KPISAMPLETYPE_INT_HOP_LAT_SW04 = 2104; + KPISAMPLETYPE_INT_HOP_LAT_SW05 = 2105; + KPISAMPLETYPE_INT_HOP_LAT_SW06 = 2106; + KPISAMPLETYPE_INT_HOP_LAT_SW07 = 2107; + KPISAMPLETYPE_INT_HOP_LAT_SW08 = 2108; + KPISAMPLETYPE_INT_HOP_LAT_SW09 = 2109; + KPISAMPLETYPE_INT_HOP_LAT_SW10 = 2110; + KPISAMPLETYPE_INT_LAT_ON_TOTAL = 2120; + + KPISAMPLETYPE_INT_IS_DROP = 2201; + KPISAMPLETYPE_INT_DROP_REASON = 2202; } diff --git a/proto/telemetry_frontend.proto b/proto/telemetry_frontend.proto index d1c0420d9331e09b031062cb1a8e5c6ead442253..cb92506f0b520a1921f454ac232b27ec6bf1694c 100644 --- a/proto/telemetry_frontend.proto +++ b/proto/telemetry_frontend.proto @@ -28,13 +28,21 @@ message CollectorId { context.Uuid collector_id = 1; } -message Collector { - CollectorId collector_id = 1; // The Collector ID - kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples - float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely - float interval_s = 4; // Interval between collected samples - context.Timestamp start_time = 5; // Timestamp when Collector start execution - context.Timestamp end_time = 6; // Timestamp when Collector stop execution + message Collector { + CollectorId collector_id = 1; // The Collector ID + kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples + float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely + float interval_s = 4; // Interval between collected samples + context.Timestamp start_time = 5; // Timestamp when Collector start execution + context.Timestamp end_time = 6; // Timestamp when Collector stop execution + INTCollector int_collector = 7; // Extra optional information about INT collectors + } + +message INTCollector { + int32 transport_port = 1; // The port where the collector listens to packets + string interface = 2; // Network interface to collect data from + string service_id = 3; // Service identifier related to this collector + string context_id = 4; // Context identifier related to this collector } message CollectorFilter { diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index e649e4531e45d33703253a129873a93dc70de163..290f7b4775b4873db4c2b60e46ea4f6147ec6f84 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -51,10 +51,10 @@ class MetricWriterToPrometheus: 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, - 'time_stamp' : kpi_value.timestamp.timestamp, - #'time_stamp' : kpi_value["time_stamp"], - 'kpi_value' : kpi_value.kpi_value_type.floatVal - #'kpi_value' : kpi_value["kpi_value"] + # 'time_stamp' : kpi_value.timestamp.timestamp, + 'time_stamp' : kpi_value["time_stamp"], + # 'kpi_value' : kpi_value.kpi_value_type.floatVal + 'kpi_value' : kpi_value["kpi_value"] } LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi @@ -62,12 +62,12 @@ class MetricWriterToPrometheus: def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): # merge both gRPC messages into single varible. cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) - tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} + tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags metric_name = cooked_kpi['kpi_sample_type'] try: if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists - PROM_METRICS[metric_name] = Gauge ( + PROM_METRICS[metric_name] = Gauge ( metric_name, cooked_kpi['kpi_description'], metric_tags, @@ -89,7 +89,6 @@ class MetricWriterToPrometheus: # Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry) LOGGER.debug("Metric pushed to Prometheus Gateway.") - except ValueError as e: if 'Duplicated timeseries' in str(e): LOGGER.debug("Metric {:} is already registered. Skipping.".format(metric_name)) @@ -97,4 +96,3 @@ class MetricWriterToPrometheus: else: LOGGER.error("Error while pushing metric: {}".format(e)) raise - diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile index bbb41c95db8d58c7534cad63d59309f9c5408b7c..a619be74c4a932325d11b72d16d5efdfddf00b03 100644 --- a/src/telemetry/backend/Dockerfile +++ b/src/telemetry/backend/Dockerfile @@ -16,7 +16,7 @@ FROM python:3.9-slim # Install dependencies RUN apt-get --yes --quiet --quiet update && \ - apt-get --yes --quiet --quiet install wget g++ git && \ + apt-get --yes --quiet --quiet install wget g++ git libpcap-dev && \ rm -rf /var/lib/apt/lists/* # Set Python to show logs as they occur @@ -31,6 +31,7 @@ RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ RUN python3 -m pip install --upgrade pip RUN python3 -m pip install --upgrade setuptools wheel RUN python3 -m pip install --upgrade pip-tools +RUN python3 -m pip install --upgrade scapy # Get common Python packages # Note: this step enables sharing the previous Docker build steps among all the Python components @@ -76,6 +77,9 @@ COPY src/vnt_manager/__init__.py vnt_manager/__init__.py COPY src/vnt_manager/client/. vnt_manager/client/ COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/backend/. telemetry/backend/ +COPY src/analytics/__init__.py analytics/__init__.py +COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py +COPY src/analytics/frontend/client/. analytics/frontend/client/ # Start the service ENTRYPOINT ["python", "-m", "telemetry.backend.service"] diff --git a/src/telemetry/backend/collector_api/_Collector.py b/src/telemetry/backend/collector_api/_Collector.py index 6b0d2de086fe930e465e8bd0a83103aa9c359ce9..cf984427fba8c259d2ad7442ecaddf5425d03d4d 100644 --- a/src/telemetry/backend/collector_api/_Collector.py +++ b/src/telemetry/backend/collector_api/_Collector.py @@ -149,7 +149,7 @@ class _Collector: - True if a resource is successfully subscribed. - Exception if an error occurs during the subscription process. List[Union[bool, Exception]]: - """ + """ raise NotImplementedError() def UnsubscribeState(self, resource_key: str) \ diff --git a/src/telemetry/backend/collectors/intcollector/INTCollector.py b/src/telemetry/backend/collectors/intcollector/INTCollector.py new file mode 100644 index 0000000000000000000000000000000000000000..9d89827f4eecbbb80a092763488f87296acffb5a --- /dev/null +++ b/src/telemetry/backend/collectors/intcollector/INTCollector.py @@ -0,0 +1,357 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytz +import queue +import logging +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.executors.pool import ThreadPoolExecutor +from datetime import datetime +from telemetry.backend.collector_api._Collector import _Collector + +from scapy.all import * +import struct +import socket +import ipaddress + +from .INTCollectorCommon import IntDropReport, IntLocalReport, IntFixedReport, FlowInfo +from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor +from confluent_kafka import Producer as KafkaProducer +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from uuid import uuid4 +from typing import Dict +from datetime import datetime, timezone +import json + +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId +from context.client.ContextClient import ContextClient +from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient +from common.proto.kpi_sample_types_pb2 import KpiSampleType +import time +import logging + +LOGGER = logging.getLogger(__name__) + +class INTCollector(_Collector): + + last_packet_time = time.time() # Track last packet time + + max_idle_time = 5 # for how long we tolerate inactivity + sniff_timeout = 3 # how often we stop sniffing to check for inactivity + + """ + INTCollector is a class that simulates a network collector for testing purposes. + It provides functionalities to manage configurations, state subscriptions, and synthetic data generation. + """ + def __init__(self, collector_id: str , address: str, interface: str, port: str, kpi_id: str, service_id: str, context_id: str, **settings): + super().__init__('int_collector', address, port, **settings) + self._out_samples = queue.Queue() # Queue to hold synthetic state samples + self._scheduler = BackgroundScheduler(daemon=True) + self._scheduler.configure( + jobstores = {'default': MemoryJobStore()}, + executors = {'default': ThreadPoolExecutor(max_workers=1)}, + timezone = pytz.utc + ) + self.kafka_producer = KafkaProducer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + self.collector_id = collector_id + self.interface = interface + self.kpi_manager_client = KpiManagerClient() + self.analytics_frontend_client = AnalyticsFrontendClient() + self.context_client = ContextClient() + self.kpi_id = kpi_id + self.service_id = service_id + self.context_id = context_id + self.table = {} + self.connected = False # To track connection state + LOGGER.info("INT Collector initialized") + + def Connect(self) -> bool: + LOGGER.info(f"Connecting to {self.interface}:{self.port}") + self.connected = True + + self._scheduler.add_job(self.sniff_with_restarts_on_idle, id=self.kpi_id ,args=[self.interface , self.port , self.service_id, self.context_id]) + + self._scheduler.start() + LOGGER.info(f"Successfully connected to {self.interface}:{self.port}") + return True + + def Disconnect(self) -> bool: + LOGGER.info(f"Disconnecting from {self.interface}:{self.port}") + if not self.connected: + LOGGER.warning("INT Collector is not connected. Nothing to disconnect.") + return False + + self._scheduler.remove_job(self.kpi_id) + self._scheduler.shutdown() + + self.connected = False + LOGGER.info(f"Successfully disconnected from {self.interface}:{self.port}") + return True + + def on_idle_timeout(self): + LOGGER.info(f"Sniffer idle for more than {self.max_idle_time} seconds.") + LOGGER.debug(f"last_packet_time {self.last_packet_time} seconds.") + + values = [0] + for sw_id in range(1, 6): + sw = self.table.get(sw_id) + self.overwrite_switch_values(sw , values) + + def overwrite_switch_values(self, switch, values): + if not switch: + return + + # Overwrite values using zip + for key, new_value in zip(switch, values): + switch[key] = new_value + + for key, value in switch.items(): + self.send_message_to_kafka(key, value) + + def process_packet(self , packet, port, service_id , context_id): + # global last_packet_time + + # Check for IP layer + if IP not in packet: + return None + ip_layer = packet[IP] + # ip_pkt = IPPacket(ip_layer[:20]) + # ip_pkt.show() + + # Check for UDP + if UDP not in ip_layer: + return None + udp_layer = ip_layer[UDP] + + # Only the INT port + if udp_layer.dport != port: + return None + # udp_dgram = UDPPacket(bytes(udp_layer)) + # udp_dgram.show() + + src_ip = socket.ntohl(struct.unpack(' 0, "Egress timestamp must be > ingress timestamp" + # local_report.show() + + # Create flow info + flow_info = FlowInfo( + src_ip=src_ip, + dst_ip=dst_ip, + src_port=udp_layer.sport, + dst_port=udp_layer.dport, + ip_proto=ip_layer.proto, + flow_sink_time=fixed_report.ingress_timestamp, + num_int_hop=1, + seq_num=fixed_report.seq_num, + switch_id=fixed_report.switch_id, + ingress_timestamp=fixed_report.ingress_timestamp, + ingress_port_id=fixed_report.ingress_port_id, + egress_port_id=fixed_report.egress_port_id, + queue_id=local_report.queue_id if local_report else 0, + queue_occupancy=local_report.queue_occupancy if local_report else 0, + egress_timestamp=local_report.egress_timestamp if local_report else 0, + is_drop=1 if drop_report else 0, + drop_reason=drop_report.drop_reason if drop_report else 0, + hop_latency=lat + ) + LOGGER.debug(f"Flow info: {flow_info}") + + self.create_descriptors_and_send_to_kafka(flow_info , service_id , context_id) + + self.last_packet_time = time.time() + return flow_info + + def set_kpi_descriptor(self , kpi_uuid , service_id , device_id , endpoint_id , sample_type): + kpi_descriptor = KpiDescriptor() + kpi_descriptor.kpi_sample_type = sample_type + kpi_descriptor.service_id.service_uuid.uuid = service_id + # kpi_descriptor.device_id.device_uuid.uuid = device_id + # kpi_descriptor.endpoint_id.endpoint_uuid.uuid = endpoint_id + kpi_descriptor.kpi_id.kpi_id.uuid = kpi_uuid + + kpi_id: KpiId = self.kpi_manager_client.SetKpiDescriptor(kpi_descriptor) + + return kpi_id + + def create_descriptors_and_send_to_kafka(self, flow_info , service_id , context_id): + LOGGER.debug(f"PACKET FROM SWITCH: {flow_info.switch_id} LATENCY: {flow_info.hop_latency}") + if(self.table.get(flow_info.switch_id) == None): + seq_num_kpi_id = str(uuid4()) + ingress_ts_kpi_id = str(uuid4()) + egress_ts_kpi_id = str(uuid4()) + hop_lat_kpi_id = str(uuid4()) + ing_port_id_kpi_id = str(uuid4()) + egr_port_id_kpi_id = str(uuid4()) + queue_occup_kpi_id = str(uuid4()) + is_drop_kpi_id = str(uuid4()) + sw_lat_kpi_id = str(uuid4()) + + LOGGER.debug(f"seq_num_kpi_id for switch {flow_info.switch_id}: {seq_num_kpi_id}") + LOGGER.debug(f"ingress_ts_kpi_id for switch {flow_info.switch_id}: {ingress_ts_kpi_id}") + LOGGER.debug(f"egress_ts_kpi_id for switch {flow_info.switch_id}: {egress_ts_kpi_id}") + LOGGER.debug(f"hop_lat_kpi_id for switch {flow_info.switch_id}: {hop_lat_kpi_id}") + LOGGER.debug(f"ing_port_id_kpi_id for switch {flow_info.switch_id}: {ing_port_id_kpi_id}") + LOGGER.debug(f"egr_port_id_kpi_id for switch {flow_info.switch_id}: {egr_port_id_kpi_id}") + LOGGER.debug(f"queue_occup_kpi_id for switch {flow_info.switch_id}: {queue_occup_kpi_id}") + LOGGER.debug(f"is_drop_kpi_id for switch {flow_info.switch_id}: {is_drop_kpi_id}") + LOGGER.debug(f"sw_lat_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}") + + seq_num_kpi = self.set_kpi_descriptor(seq_num_kpi_id, service_id ,'', '', KpiSampleType.KPISAMPLETYPE_INT_SEQ_NUM) + ingress_timestamp_kpi = self.set_kpi_descriptor(ingress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_ING) + egress_timestamp_kpi = self.set_kpi_descriptor(egress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_EGR) + hop_latency_kpi = self.set_kpi_descriptor(hop_lat_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT) + ingress_port_id_kpi = self.set_kpi_descriptor(ing_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_ING) + egress_port_id_kpi = self.set_kpi_descriptor(egr_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_EGR) + queue_occup_kpi = self.set_kpi_descriptor(queue_occup_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_QUEUE_OCCUP) + is_drop_kpi = self.set_kpi_descriptor(is_drop_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_IS_DROP) + + # Set a dedicated KPI descriptor for every switch + sw_lat_kpi = None + sw_sample_types = [ + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW01, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW02, + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW03, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW04, + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW05, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW06, + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW07, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW08, + KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW09, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW10 + ] + for i, sw_id in enumerate(range(1, 11)): + if flow_info.switch_id == sw_id: + LOGGER.debug(f"SET KPI : seq_num_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}") + sw_lat_kpi = self.set_kpi_descriptor(sw_lat_kpi_id, service_id, '', '', sw_sample_types[i]) + + # Gather keys and values + keys = [ + seq_num_kpi.kpi_id.uuid, + ingress_timestamp_kpi.kpi_id.uuid, + egress_timestamp_kpi.kpi_id.uuid, + hop_latency_kpi.kpi_id.uuid, + ingress_port_id_kpi.kpi_id.uuid, + egress_port_id_kpi.kpi_id.uuid, + queue_occup_kpi.kpi_id.uuid, + is_drop_kpi.kpi_id.uuid, + sw_lat_kpi.kpi_id.uuid + ] + values = [ + flow_info.seq_num, + flow_info.ingress_timestamp, + flow_info.egress_timestamp, + flow_info.hop_latency, + flow_info.ingress_port_id, + flow_info.egress_port_id, + flow_info.queue_occupancy, + flow_info.is_drop, + flow_info.hop_latency + ] + assert len(keys) == len(values), "KPI keys and values must agree" + switch = {keys[i]: values[i] for i in range(len(keys))} + + self.table[flow_info.switch_id] = switch + + # Dispatch to Kafka + for key, value in switch.items(): + self.send_message_to_kafka(key, value) + else: + values = [ + flow_info.seq_num, + flow_info.ingress_timestamp, + flow_info.egress_timestamp, + flow_info.hop_latency, + flow_info.ingress_port_id, + flow_info.egress_port_id, + flow_info.queue_occupancy, + flow_info.is_drop, + flow_info.hop_latency + ] + switch = self.table.get(flow_info.switch_id) + + # Overwrite values using zip + self.overwrite_switch_values(switch , values) + + def send_message_to_kafka(self , kpi_id , measured_kpi_value): + producer = self.kafka_producer + kpi_value: Dict = { + "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "kpi_id": kpi_id, + "kpi_value": measured_kpi_value + } + producer.produce( + KafkaTopic.VALUE.value, + key=self.collector_id, + value=json.dumps(kpi_value), + callback=self.delivery_callback + ) + producer.flush() + LOGGER.debug(f"Message with kpi_id: {kpi_id} was send to kafka!") + + def packet_callback(self, packet, port , service_id,context_id): + flow_info = self.process_packet(packet , port , service_id, context_id) + if flow_info: + LOGGER.debug(f"Flow info: {flow_info}") + + def sniff_with_restarts_on_idle(self, interface, port, service_id , context_id): + while True: + # Run sniff for a short period to periodically check for idle timeout + sniff( + iface=interface, + filter=f"udp port {port}", + prn=lambda pkt: self.packet_callback(pkt, port, service_id , context_id), + timeout=self.sniff_timeout + ) + + if not self.connected: + break + + # Check if idle period has been exceeded + now = time.time() + if (now - self.last_packet_time) > self.max_idle_time: + self.on_idle_timeout() + self.last_packet_time = now # Reset timer after action + + def delivery_callback(self, err, msg): + if err: + LOGGER.error('Message delivery failed: {:s}'.format(str(err))) diff --git a/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py b/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py new file mode 100644 index 0000000000000000000000000000000000000000..baed12fd29f4707f187c9e3d2377fe29b4f034ad --- /dev/null +++ b/src/telemetry/backend/collectors/intcollector/INTCollectorCommon.py @@ -0,0 +1,118 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from scapy.all import Packet, BitField +from collections import namedtuple + +class IPPacket(Packet): + name = "IPPacket" + fields_desc = [ + BitField("ip_ver", 0, 4), + BitField("ip_ihl", 0, 4), + BitField("ip_dscp", 0, 6), + BitField("ip_ecn", 0, 2), + BitField("ip_len", 0, 16), + BitField("ip_id", 0, 16), + BitField("ip_flags", 0, 3), + BitField("ip_frag", 0, 13), + BitField("ip_ttl", 0, 8), + BitField("ip_proto", 0, 8), + BitField("ip_csum", 0, 16), + BitField("ip_src", 0, 32), + BitField("ip_dst", 0, 32) + ] + +class UDPPacket(Packet): + name = "UDPPacket" + fields_desc = [ + BitField("udp_port_src", 0, 16), + BitField("udp_port_dst", 0, 16), + BitField("udp_len", 0, 16), + BitField("udp_csum", 0, 16) + ] + +""" + Private source repo: https://github.com/SNVLab-WUT/int-collector/blob/main/collector/BPFCollector_v0_5.c + fabric-tna repo: https://github.com/stratum/fabric-tna/blob/main/p4src/shared/header.p4 + + Little Endian order + BitField("nproto", 0, 4), + BitField("ver", 0, 4), + BitField("rsvd1", 0, 5), + BitField("f", 0, 1), + BitField("q", 0, 1), + BitField("d", 0, 1), + BitField("hw_id", 0, 6), + BitField("rsvd2", 0, 10), + + BitField("queue_id", 0, 8), + BitField("queue_occupancy", 0, 24), + + Big Endian order + BitField("ver", 0, 4), + BitField("nproto", 0, 4), + BitField("d", 0, 1), + BitField("q", 0, 1), + BitField("f", 0, 1), + BitField("rsvd1", 0, 5), + BitField("rsvd2", 0, 10), + BitField("hw_id", 0, 6), + + BitField("queue_occupancy", 0, 24), + BitField("queue_id", 0, 8), +""" + +class IntFixedReport(Packet): + name = "IntFixedReport" + fields_desc = [ + BitField("ver", 0, 4), + BitField("nproto", 0, 4), + BitField("d", 0, 1), + BitField("q", 0, 1), + BitField("f", 0, 1), + BitField("rsvd1", 0, 5), + BitField("rsvd2", 0, 10), + BitField("hw_id", 0, 6), + BitField("seq_num", 0, 32), + BitField("ingress_timestamp", 0, 32), + BitField("switch_id", 0, 32), + BitField("ingress_port_id", 0, 16), + BitField("egress_port_id", 0, 16) + ] + +class IntLocalReport(Packet): + name = "IntLocalReport" + fields_desc = [ + BitField("queue_occupancy", 0, 24), + BitField("queue_id", 0, 8), + BitField("egress_timestamp", 0, 32) + ] + +class IntDropReport(Packet): + name = "IntDropReport" + fields_desc = [ + BitField("queue_id", 0 , 8), + BitField("drop_reason", 0, 8), + BitField("_pad", 0, 16) + ] + +# Flow information structure +FlowInfo = namedtuple('FlowInfo', [ + 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'ip_proto', + 'flow_sink_time', 'num_int_hop', 'seq_num', 'switch_id', + 'ingress_timestamp', 'ingress_port_id', 'egress_port_id', 'queue_id', + 'queue_occupancy', 'egress_timestamp', 'is_drop', 'drop_reason', + 'hop_latency' +]) diff --git a/src/telemetry/backend/collectors/intcollector/__init__.py b/src/telemetry/backend/collectors/intcollector/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7363515f07a52d996229bcbd72932ce1423258d7 --- /dev/null +++ b/src/telemetry/backend/collectors/intcollector/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 2952bd19d39bebd0ea26e097c19a5726fa54a1b0..2a81fe3e7cfa358ac96e2849c1b4244a71979f58 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -16,24 +16,25 @@ import json import time import logging import threading -from typing import Any, Dict, Tuple -from datetime import datetime, timezone -from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import Consumer as KafkaConsumer -from confluent_kafka import KafkaError +from typing import Any, Dict, Tuple +from datetime import datetime, timezone +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import KafkaError from common.Constants import ServiceNameEnum -from common.Settings import get_service_port_grpc -from common.method_wrappers.Decorator import MetricsPool -from common.tools.kafka.Variables import KafkaConfig, KafkaTopic +from common.Settings import get_service_port_grpc +from common.method_wrappers.Decorator import MetricsPool +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService -from common.tools.context_queries.Device import get_device -from common.proto.kpi_manager_pb2 import KpiId +from common.tools.context_queries.Device import get_device +from common.proto.kpi_manager_pb2 import KpiId -from kpi_manager.client.KpiManagerClient import KpiManagerClient -from context.client.ContextClient import ContextClient +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from context.client.ContextClient import ContextClient from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector +from telemetry.backend.collectors.intcollector.INTCollector import INTCollector -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): @@ -41,16 +42,17 @@ class TelemetryBackendService(GenericGrpcService): Class listens for request on Kafka topic, fetches requested metrics from device. Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics. """ - def __init__(self, cls_name : str = __name__) -> None: + + def __init__(self, cls_name: str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND) super().__init__(port, cls_name=cls_name) - self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) - self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), - 'group.id' : 'backend', - 'auto.offset.reset' : 'latest'}) - self.collector = None - self.context_client = ContextClient() + self.kafka_producer = KafkaProducer({'bootstrap.servers': KafkaConfig.get_kafka_address()}) + self.kafka_consumer = KafkaConsumer({'bootstrap.servers': KafkaConfig.get_kafka_address(), + 'group.id': 'backend', + 'auto.offset.reset': 'latest'}) + self.collector = None + self.context_client = ContextClient() self.kpi_manager_client = KpiManagerClient() self.active_jobs = {} @@ -73,12 +75,13 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: - LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") + LOGGER.warning( + f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) break - try: + try: collector = json.loads( receive_msg.value().decode('utf-8') ) @@ -93,33 +96,39 @@ class TelemetryBackendService(GenericGrpcService): if collector_id not in self.active_jobs: stop_event = threading.Event() self.active_jobs[collector_id] = stop_event - threading.Thread(target = self.CollectorHandler, - args=( - collector_id, - collector['kpi_id'], - duration, - collector['interval'], - stop_event - )).start() + threading.Thread(target=self.CollectorHandler, + args=( + collector_id, + collector['kpi_id'], + duration, + collector['interval'], + collector['interface'], + collector['transport_port'], + collector['service_id'], + collector['context_id'], + stop_event + )).start() # Stop the Collector after the given duration if duration > 0: def stop_after_duration(completion_time, stop_event): time.sleep(completion_time) if not stop_event.is_set(): - LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}") + LOGGER.warning( + f"Execution duration ({completion_time}) completed of Collector: {collector_id}") self.TerminateCollector(collector_id) - + duration_thread = threading.Thread( target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}", - args=(duration, stop_event) - ) + args=(duration, stop_event) + ) duration_thread.start() else: LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id)) except Exception as e: - LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) + LOGGER.warning( + "Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) - def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event): + def CollectorHandler(self, collector_id, kpi_id, duration, interval, interface , port , service_id , context_id , stop_event): """ Method to handle collector request. """ @@ -133,48 +142,65 @@ class TelemetryBackendService(GenericGrpcService): LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) subscription = [collector_id, end_points, duration, interval] self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event) + elif device_type and "p4" in device_type: + LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) + subscription = [collector_id, end_points, duration, interval] + self.INTCollectorHandler(subscription, duration, collector_id, kpi_id, interface, port, service_id , context_id ,stop_event) else: LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type)) def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event): - # EmulatedCollector - - self.collector = EmulatedCollector(address="127.0.0.1", port=8000) - self.collector.Connect() - if not self.collector.SubscribeState(subscription): - LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) - else: - while not stop_event.is_set(): - samples = list(self.collector.GetState(duration=duration, blocking=True)) - LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples)) - self.GenerateKpiValue(collector_id, kpi_id, samples) - time.sleep(1) + # EmulatedCollector + + self.collector = EmulatedCollector(address="127.0.0.1", port=8000) + self.collector.Connect() + if not self.collector.SubscribeState(subscription): + LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) + else: + while not stop_event.is_set(): + samples = list(self.collector.GetState(duration=duration, blocking=True)) + LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples)) + self.GenerateKpiValue(collector_id, kpi_id, samples) + time.sleep(1) + self.collector.Disconnect() + # self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration. + + def INTCollectorHandler(self, subscription, duration, collector_id, kpi_id, interface, port, service_id, context_id , stop_event): + self.collector = INTCollector( + collector_id=collector_id , + address="127.0.0.1", + interface=interface, + port=port, + kpi_id=kpi_id, + service_id=service_id, + context_id=context_id + ) + self.collector.Connect() + if stop_event.is_set(): self.collector.Disconnect() - # self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration. def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ Method to write kpi value on VALUE Kafka topic """ producer = self.kafka_producer - kpi_value : Dict = { + kpi_value: Dict = { "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), - "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value + "kpi_id": kpi_id, + "kpi_value": measured_kpi_value } producer.produce( KafkaTopic.VALUE.value, - key = collector_id, - value = json.dumps(kpi_value), - callback = self.delivery_callback + key=collector_id, + value=json.dumps(kpi_value), + callback=self.delivery_callback ) producer.flush() def TerminateCollector(self, job_id): LOGGER.debug("Terminating collector backend...") try: - if job_id not in self.active_jobs: # not job_ids: - # self.logger.warning(f"Active jobs: {self.active_jobs}") + if job_id not in self.active_jobs: self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.") else: LOGGER.info(f"Terminating job: {job_id}") @@ -202,18 +228,21 @@ class TelemetryBackendService(GenericGrpcService): LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...") return (None, None) - device_id = kpi_descriptor.device_id.device_uuid.uuid + LOGGER.warning(f"kpi_descriptor {kpi_descriptor} ") + LOGGER.warning(f"kpi_descriptor.device_id.device_uuid.uuid {kpi_descriptor.device_id.device_uuid.uuid} ") + + device_id = kpi_descriptor.device_id.device_uuid.uuid endpoint_id = kpi_descriptor.endpoint_id.endpoint_uuid.uuid - device = get_device( context_client = self.context_client, - device_uuid = device_id, - include_config_rules = False, - include_components = False, - ) + device = get_device(context_client=self.context_client, + device_uuid=device_id, + include_config_rules=False, + include_components=False, + ) if device: for endpoint in device.device_endpoints: if endpoint.endpoint_id.endpoint_uuid.uuid == endpoint_id: - endpoint_dict = {} - kpi_sample_types = [] + endpoint_dict = {} + kpi_sample_types = [] endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_dict["name"] = endpoint.name endpoint_dict["type"] = endpoint.endpoint_type @@ -222,10 +251,10 @@ class TelemetryBackendService(GenericGrpcService): endpoint_dict["sample_types"] = kpi_sample_types return (device.device_type, endpoint_dict) - + LOGGER.warning(f"Device ID: {device_id} - Endpoint ID: {endpoint_id} - Not Found") return (None, None) def delivery_callback(self, err, msg): - if err: + if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err))) diff --git a/src/telemetry/frontend/Dockerfile b/src/telemetry/frontend/Dockerfile index 74a0a87a06563e5f4121d85f4805e9b37fc27342..dc90b7d164dbaaf8249d36628fc60eee88bb8034 100644 --- a/src/telemetry/frontend/Dockerfile +++ b/src/telemetry/frontend/Dockerfile @@ -65,6 +65,9 @@ WORKDIR /var/teraflow COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/frontend/. telemetry/frontend/ COPY src/telemetry/database/. telemetry/database/ +COPY src/analytics/__init__.py analytics/__init__.py +COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py +COPY src/analytics/frontend/client/. analytics/frontend/client/ # Start the service ENTRYPOINT ["python", "-m", "telemetry.frontend.service"] diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 123c10a6566cbc5b6394550de8e23f535adbfda4..aa7b0b921d7b3ec8048a26c1024d7717fe9031c3 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -69,7 +69,11 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): collector_to_generate : Dict = { "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, "duration": collector_obj.duration_s, - "interval": collector_obj.interval_s + "interval": collector_obj.interval_s, + "interface": collector_obj.int_collector.interface, + "transport_port": collector_obj.int_collector.transport_port, + "service_id": collector_obj.int_collector.service_id, + "context_id": collector_obj.int_collector.context_id } self.kafka_producer.produce( KafkaTopic.TELEMETRY_REQUEST.value,