Skip to content
Snippets Groups Projects
Commit fa1babd6 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

feat: new in-band network telemetry collector plugin

parent 5f3f7d28
No related branches found
No related tags found
1 merge request!366feat: new in-band network telemetry collector plugin
......@@ -94,10 +94,16 @@ metadata:
labels:
app: telemetryservice
spec:
type: ClusterIP
type: LoadBalancer
loadBalancerIP: 192.168.5.250
externalTrafficPolicy: Local
selector:
app: telemetryservice
ports:
- name: dnsudp
protocol: UDP
port: 12345
targetPort: 12345
- name: grpc
protocol: TCP
port: 30050
......
......@@ -40,13 +40,38 @@ enum KpiSampleType {
KPISAMPLETYPE_SERVICE_LATENCY_MS = 701;
// output KPIs
KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101;
KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102;
KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103;
KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201;
KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202;
KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203;
KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701;
// output KPIs
KPISAMPLETYPE_PACKETS_TRANSMITTED_AGG_OUTPUT = 1101;
KPISAMPLETYPE_PACKETS_RECEIVED_AGG_OUTPUT = 1102;
KPISAMPLETYPE_PACKETS_DROPPED_AGG_OUTPUT = 1103;
KPISAMPLETYPE_BYTES_TRANSMITTED_AGG_OUTPUT = 1201;
KPISAMPLETYPE_BYTES_RECEIVED_AGG_OUTPUT = 1202;
KPISAMPLETYPE_BYTES_DROPPED_AGG_OUTPUT = 1203;
KPISAMPLETYPE_SERVICE_LATENCY_MS_AGG_OUTPUT = 1701;
// INT KPIs
KPISAMPLETYPE_INT_SEQ_NUM = 2001;
KPISAMPLETYPE_INT_TS_ING = 2002;
KPISAMPLETYPE_INT_TS_EGR = 2003;
KPISAMPLETYPE_INT_HOP_LAT = 2004;
KPISAMPLETYPE_INT_PORT_ID_ING = 2005;
KPISAMPLETYPE_INT_PORT_ID_EGR = 2006;
KPISAMPLETYPE_INT_QUEUE_OCCUP = 2007;
KPISAMPLETYPE_INT_QUEUE_ID = 2008;
KPISAMPLETYPE_INT_HOP_LAT_SW01 = 2101;
KPISAMPLETYPE_INT_HOP_LAT_SW02 = 2102;
KPISAMPLETYPE_INT_HOP_LAT_SW03 = 2103;
KPISAMPLETYPE_INT_HOP_LAT_SW04 = 2104;
KPISAMPLETYPE_INT_HOP_LAT_SW05 = 2105;
KPISAMPLETYPE_INT_HOP_LAT_SW06 = 2106;
KPISAMPLETYPE_INT_HOP_LAT_SW07 = 2107;
KPISAMPLETYPE_INT_HOP_LAT_SW08 = 2108;
KPISAMPLETYPE_INT_HOP_LAT_SW09 = 2109;
KPISAMPLETYPE_INT_HOP_LAT_SW10 = 2110;
KPISAMPLETYPE_INT_LAT_ON_TOTAL = 2120;
KPISAMPLETYPE_INT_IS_DROP = 2201;
KPISAMPLETYPE_INT_DROP_REASON = 2202;
}
......@@ -28,13 +28,21 @@ message CollectorId {
context.Uuid collector_id = 1;
}
message Collector {
CollectorId collector_id = 1; // The Collector ID
kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples
float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely
float interval_s = 4; // Interval between collected samples
context.Timestamp start_time = 5; // Timestamp when Collector start execution
context.Timestamp end_time = 6; // Timestamp when Collector stop execution
message Collector {
CollectorId collector_id = 1; // The Collector ID
kpi_manager.KpiId kpi_id = 2; // The KPI Id to be associated to the collected samples
float duration_s = 3; // Terminate data collection after duration[seconds]; duration==0 means indefinitely
float interval_s = 4; // Interval between collected samples
context.Timestamp start_time = 5; // Timestamp when Collector start execution
context.Timestamp end_time = 6; // Timestamp when Collector stop execution
INTCollector int_collector = 7; // Extra optional information about INT collectors
}
message INTCollector {
int32 transport_port = 1; // The port where the collector listens to packets
string interface = 2; // Network interface to collect data from
string service_id = 3; // Service identifier related to this collector
string context_id = 4; // Context identifier related to this collector
}
message CollectorFilter {
......
......@@ -51,10 +51,10 @@ class MetricWriterToPrometheus:
'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid,
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value.timestamp.timestamp,
#'time_stamp' : kpi_value["time_stamp"],
'kpi_value' : kpi_value.kpi_value_type.floatVal
#'kpi_value' : kpi_value["kpi_value"]
# 'time_stamp' : kpi_value.timestamp.timestamp,
'time_stamp' : kpi_value["time_stamp"],
# 'kpi_value' : kpi_value.kpi_value_type.floatVal
'kpi_value' : kpi_value["kpi_value"]
}
LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi
......@@ -62,12 +62,12 @@ class MetricWriterToPrometheus:
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
# merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags
metric_name = cooked_kpi['kpi_sample_type']
try:
if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists
PROM_METRICS[metric_name] = Gauge (
PROM_METRICS[metric_name] = Gauge (
metric_name,
cooked_kpi['kpi_description'],
metric_tags,
......@@ -89,7 +89,6 @@ class MetricWriterToPrometheus:
# Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway
push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry)
LOGGER.debug("Metric pushed to Prometheus Gateway.")
except ValueError as e:
if 'Duplicated timeseries' in str(e):
LOGGER.debug("Metric {:} is already registered. Skipping.".format(metric_name))
......@@ -97,4 +96,3 @@ class MetricWriterToPrometheus:
else:
LOGGER.error("Error while pushing metric: {}".format(e))
raise
......@@ -16,7 +16,7 @@ FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
apt-get --yes --quiet --quiet install wget g++ git libpcap-dev && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
......@@ -31,6 +31,7 @@ RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
RUN python3 -m pip install --upgrade scapy
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
......@@ -76,6 +77,9 @@ COPY src/vnt_manager/__init__.py vnt_manager/__init__.py
COPY src/vnt_manager/client/. vnt_manager/client/
COPY src/telemetry/__init__.py telemetry/__init__.py
COPY src/telemetry/backend/. telemetry/backend/
COPY src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py
COPY src/analytics/frontend/client/. analytics/frontend/client/
# Start the service
ENTRYPOINT ["python", "-m", "telemetry.backend.service"]
......@@ -149,7 +149,7 @@ class _Collector:
- True if a resource is successfully subscribed.
- Exception if an error occurs during the subscription process.
List[Union[bool, Exception]]:
"""
"""
raise NotImplementedError()
def UnsubscribeState(self, resource_key: str) \
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytz
import queue
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime
from telemetry.backend.collector_api._Collector import _Collector
from scapy.all import *
import struct
import socket
import ipaddress
from .INTCollectorCommon import IntDropReport, IntLocalReport, IntFixedReport, FlowInfo
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor
from confluent_kafka import Producer as KafkaProducer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from uuid import uuid4
from typing import Dict
from datetime import datetime, timezone
import json
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId
from context.client.ContextClient import ContextClient
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from common.proto.kpi_sample_types_pb2 import KpiSampleType
import time
import logging
LOGGER = logging.getLogger(__name__)
class INTCollector(_Collector):
last_packet_time = time.time() # Track last packet time
max_idle_time = 5 # for how long we tolerate inactivity
sniff_timeout = 3 # how often we stop sniffing to check for inactivity
"""
INTCollector is a class that simulates a network collector for testing purposes.
It provides functionalities to manage configurations, state subscriptions, and synthetic data generation.
"""
def __init__(self, collector_id: str , address: str, interface: str, port: str, kpi_id: str, service_id: str, context_id: str, **settings):
super().__init__('int_collector', address, port, **settings)
self._out_samples = queue.Queue() # Queue to hold synthetic state samples
self._scheduler = BackgroundScheduler(daemon=True)
self._scheduler.configure(
jobstores = {'default': MemoryJobStore()},
executors = {'default': ThreadPoolExecutor(max_workers=1)},
timezone = pytz.utc
)
self.kafka_producer = KafkaProducer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
self.collector_id = collector_id
self.interface = interface
self.kpi_manager_client = KpiManagerClient()
self.analytics_frontend_client = AnalyticsFrontendClient()
self.context_client = ContextClient()
self.kpi_id = kpi_id
self.service_id = service_id
self.context_id = context_id
self.table = {}
self.connected = False # To track connection state
LOGGER.info("INT Collector initialized")
def Connect(self) -> bool:
LOGGER.info(f"Connecting to {self.interface}:{self.port}")
self.connected = True
self._scheduler.add_job(self.sniff_with_restarts_on_idle, id=self.kpi_id ,args=[self.interface , self.port , self.service_id, self.context_id])
self._scheduler.start()
LOGGER.info(f"Successfully connected to {self.interface}:{self.port}")
return True
def Disconnect(self) -> bool:
LOGGER.info(f"Disconnecting from {self.interface}:{self.port}")
if not self.connected:
LOGGER.warning("INT Collector is not connected. Nothing to disconnect.")
return False
self._scheduler.remove_job(self.kpi_id)
self._scheduler.shutdown()
self.connected = False
LOGGER.info(f"Successfully disconnected from {self.interface}:{self.port}")
return True
def on_idle_timeout(self):
LOGGER.info(f"Sniffer idle for more than {self.max_idle_time} seconds.")
LOGGER.debug(f"last_packet_time {self.last_packet_time} seconds.")
values = [0]
for sw_id in range(1, 6):
sw = self.table.get(sw_id)
self.overwrite_switch_values(sw , values)
def overwrite_switch_values(self, switch, values):
if not switch:
return
# Overwrite values using zip
for key, new_value in zip(switch, values):
switch[key] = new_value
for key, value in switch.items():
self.send_message_to_kafka(key, value)
def process_packet(self , packet, port, service_id , context_id):
# global last_packet_time
# Check for IP layer
if IP not in packet:
return None
ip_layer = packet[IP]
# ip_pkt = IPPacket(ip_layer[:20])
# ip_pkt.show()
# Check for UDP
if UDP not in ip_layer:
return None
udp_layer = ip_layer[UDP]
# Only the INT port
if udp_layer.dport != port:
return None
# udp_dgram = UDPPacket(bytes(udp_layer))
# udp_dgram.show()
src_ip = socket.ntohl(struct.unpack('<I', socket.inet_aton(ip_layer.src))[0])
src_ip_str = str(ipaddress.IPv4Address(src_ip))
LOGGER.debug("ip src: {}".format(src_ip_str))
dst_ip = socket.ntohl(struct.unpack('<I', socket.inet_aton(ip_layer.dst))[0])
dst_ip_str = str(ipaddress.IPv4Address(dst_ip))
LOGGER.debug("ip dst: {}".format(dst_ip_str))
LOGGER.debug("ip-proto: {}".format(ip_layer.proto))
LOGGER.debug("port src: {}".format(udp_layer.sport))
LOGGER.debug("port dst: {}".format(udp_layer.dport))
# Get the INT report data (after UDP header)
int_data = bytes(udp_layer.payload)
# Parse fixed report (first 20 bytes)
offset = 20
fixed_report = IntFixedReport(int_data[:offset])
# fixed_report.show()
drop_report = None
local_report = None
lat = 0
if fixed_report.d == 1:
drop_report = IntDropReport(int_data[offset:offset + 4])
offset += 4
# drop_report.show()
elif fixed_report.f == 1 or fixed_report.q == 1:
local_report = IntLocalReport(int_data[offset:offset + 8])
offset += 8
lat = local_report.egress_timestamp - fixed_report.ingress_timestamp
assert lat > 0, "Egress timestamp must be > ingress timestamp"
# local_report.show()
# Create flow info
flow_info = FlowInfo(
src_ip=src_ip,
dst_ip=dst_ip,
src_port=udp_layer.sport,
dst_port=udp_layer.dport,
ip_proto=ip_layer.proto,
flow_sink_time=fixed_report.ingress_timestamp,
num_int_hop=1,
seq_num=fixed_report.seq_num,
switch_id=fixed_report.switch_id,
ingress_timestamp=fixed_report.ingress_timestamp,
ingress_port_id=fixed_report.ingress_port_id,
egress_port_id=fixed_report.egress_port_id,
queue_id=local_report.queue_id if local_report else 0,
queue_occupancy=local_report.queue_occupancy if local_report else 0,
egress_timestamp=local_report.egress_timestamp if local_report else 0,
is_drop=1 if drop_report else 0,
drop_reason=drop_report.drop_reason if drop_report else 0,
hop_latency=lat
)
LOGGER.debug(f"Flow info: {flow_info}")
self.create_descriptors_and_send_to_kafka(flow_info , service_id , context_id)
self.last_packet_time = time.time()
return flow_info
def set_kpi_descriptor(self , kpi_uuid , service_id , device_id , endpoint_id , sample_type):
kpi_descriptor = KpiDescriptor()
kpi_descriptor.kpi_sample_type = sample_type
kpi_descriptor.service_id.service_uuid.uuid = service_id
# kpi_descriptor.device_id.device_uuid.uuid = device_id
# kpi_descriptor.endpoint_id.endpoint_uuid.uuid = endpoint_id
kpi_descriptor.kpi_id.kpi_id.uuid = kpi_uuid
kpi_id: KpiId = self.kpi_manager_client.SetKpiDescriptor(kpi_descriptor)
return kpi_id
def create_descriptors_and_send_to_kafka(self, flow_info , service_id , context_id):
LOGGER.debug(f"PACKET FROM SWITCH: {flow_info.switch_id} LATENCY: {flow_info.hop_latency}")
if(self.table.get(flow_info.switch_id) == None):
# seq_num_kpi_id = str(uuid4())
ingress_ts_kpi_id = str(uuid4())
egress_ts_kpi_id = str(uuid4())
hop_lat_kpi_id = str(uuid4())
ing_port_id_kpi_id = str(uuid4())
egr_port_id_kpi_id = str(uuid4())
queue_occup_kpi_id = str(uuid4())
is_drop_kpi_id = str(uuid4())
sw_lat_kpi_id = str(uuid4())
# LOGGER.debug(f"seq_num_kpi_id for switch {flow_info.switch_id}: {seq_num_kpi_id}")
# LOGGER.debug(f"ingress_ts_kpi_id for switch {flow_info.switch_id}: {ingress_ts_kpi_id}")
# LOGGER.debug(f"egress_ts_kpi_id for switch {flow_info.switch_id}: {egress_ts_kpi_id}")
# LOGGER.debug(f"hop_lat_kpi_id for switch {flow_info.switch_id}: {hop_lat_kpi_id}")
# LOGGER.debug(f"ing_port_id_kpi_id for switch {flow_info.switch_id}: {ing_port_id_kpi_id}")
# LOGGER.debug(f"egr_port_id_kpi_id for switch {flow_info.switch_id}: {egr_port_id_kpi_id}")
# LOGGER.debug(f"queue_occup_kpi_id for switch {flow_info.switch_id}: {queue_occup_kpi_id}")
# LOGGER.debug(f"is_drop_kpi_id for switch {flow_info.switch_id}: {is_drop_kpi_id}")
# LOGGER.debug(f"sw_lat_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}")
# seq_num_kpi = self.set_kpi_descriptor(seq_num_kpi_id, service_id ,'', '', KpiSampleType.KPISAMPLETYPE_INT_SEQ_NUM)
# ingress_timestamp_kpi = self.set_kpi_descriptor(ingress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_ING)
# egress_timestamp_kpi = self.set_kpi_descriptor(egress_ts_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_TS_EGR)
# hop_latency_kpi = self.set_kpi_descriptor(hop_lat_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT)
# ingress_port_id_kpi = self.set_kpi_descriptor(ing_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_ING)
# egress_port_id_kpi = self.set_kpi_descriptor(egr_port_id_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_PORT_ID_EGR)
# queue_occup_kpi = self.set_kpi_descriptor(queue_occup_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_QUEUE_OCCUP)
# is_drop_kpi = self.set_kpi_descriptor(is_drop_kpi_id, service_id, '', '', KpiSampleType.KPISAMPLETYPE_INT_IS_DROP)
# Set a dedicated KPI descriptor for every switch
sw_lat_kpi = None
sw_sample_types = [
KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW01, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW02,
KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW03, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW04,
KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW05, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW06,
KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW07, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW08,
KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW09, KpiSampleType.KPISAMPLETYPE_INT_HOP_LAT_SW10
]
for i, sw_id in enumerate(range(1, 11)):
if flow_info.switch_id == sw_id:
LOGGER.debug(f"SET KPI : seq_num_kpi_id for switch {flow_info.switch_id}: {sw_lat_kpi_id}")
sw_lat_kpi = self.set_kpi_descriptor(sw_lat_kpi_id, service_id, '', '', sw_sample_types[i])
# Gather keys and values
keys = [
# seq_num_kpi.kpi_id.uuid,
# ingress_timestamp_kpi.kpi_id.uuid,
# egress_timestamp_kpi.kpi_id.uuid,
# hop_latency_kpi.kpi_id.uuid,
# ingress_port_id_kpi.kpi_id.uuid,
# egress_port_id_kpi.kpi_id.uuid,
# queue_occup_kpi.kpi_id.uuid,
# is_drop_kpi.kpi_id.uuid,
sw_lat_kpi.kpi_id.uuid
]
values = [
# flow_info.seq_num,
# flow_info.ingress_timestamp,
# flow_info.egress_timestamp,
# flow_info.hop_latency,
# flow_info.ingress_port_id,
# flow_info.egress_port_id,
# flow_info.queue_occupancy,
# flow_info.is_drop,
flow_info.hop_latency
]
assert len(keys) == len(values), "KPI keys and values must agree"
switch = {keys[i]: values[i] for i in range(len(keys))}
self.table[flow_info.switch_id] = switch
# Dispatch to Kafka
for key, value in switch.items():
self.send_message_to_kafka(key, value)
else:
values = [
# flow_info.seq_num,
# flow_info.ingress_timestamp,
# flow_info.egress_timestamp,
# flow_info.hop_latency,
# flow_info.ingress_port_id,
# flow_info.egress_port_id,
# flow_info.queue_occupancy,
# flow_info.is_drop,
flow_info.hop_latency
]
switch = self.table.get(flow_info.switch_id)
# Overwrite values using zip
self.overwrite_switch_values(switch , values)
def send_message_to_kafka(self , kpi_id , measured_kpi_value):
producer = self.kafka_producer
kpi_value: Dict = {
"time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"kpi_id": kpi_id,
"kpi_value": measured_kpi_value
}
producer.produce(
KafkaTopic.VALUE.value,
key=self.collector_id,
value=json.dumps(kpi_value),
callback=self.delivery_callback
)
producer.flush()
LOGGER.debug(f"Message with kpi_id: {kpi_id} was send to kafka!")
def packet_callback(self, packet, port , service_id,context_id):
flow_info = self.process_packet(packet , port , service_id, context_id)
if flow_info:
LOGGER.debug(f"Flow info: {flow_info}")
def sniff_with_restarts_on_idle(self, interface, port, service_id , context_id):
while True:
# Run sniff for a short period to periodically check for idle timeout
sniff(
iface=interface,
filter=f"udp port {port}",
prn=lambda pkt: self.packet_callback(pkt, port, service_id , context_id),
timeout=self.sniff_timeout
)
if not self.connected:
break
# Check if idle period has been exceeded
now = time.time()
if (now - self.last_packet_time) > self.max_idle_time:
self.on_idle_timeout()
self.last_packet_time = now # Reset timer after action
def delivery_callback(self, err, msg):
if err:
LOGGER.error('Message delivery failed: {:s}'.format(str(err)))
from scapy.all import Packet, BitField
from collections import namedtuple
class IPPacket(Packet):
name = "IPPacket"
fields_desc = [
BitField("ip_ver", 0, 4),
BitField("ip_ihl", 0, 4),
BitField("ip_dscp", 0, 6),
BitField("ip_ecn", 0, 2),
BitField("ip_len", 0, 16),
BitField("ip_id", 0, 16),
BitField("ip_flags", 0, 3),
BitField("ip_frag", 0, 13),
BitField("ip_ttl", 0, 8),
BitField("ip_proto", 0, 8),
BitField("ip_csum", 0, 16),
BitField("ip_src", 0, 32),
BitField("ip_dst", 0, 32)
]
class UDPPacket(Packet):
name = "UDPPacket"
fields_desc = [
BitField("udp_port_src", 0, 16),
BitField("udp_port_dst", 0, 16),
BitField("udp_len", 0, 16),
BitField("udp_csum", 0, 16)
]
"""
Private source repo: https://github.com/SNVLab-WUT/int-collector/blob/main/collector/BPFCollector_v0_5.c
fabric-tna repo: https://github.com/stratum/fabric-tna/blob/main/p4src/shared/header.p4
Little Endian order
BitField("nproto", 0, 4),
BitField("ver", 0, 4),
BitField("rsvd1", 0, 5),
BitField("f", 0, 1),
BitField("q", 0, 1),
BitField("d", 0, 1),
BitField("hw_id", 0, 6),
BitField("rsvd2", 0, 10),
BitField("queue_id", 0, 8),
BitField("queue_occupancy", 0, 24),
Big Endian order
BitField("ver", 0, 4),
BitField("nproto", 0, 4),
BitField("d", 0, 1),
BitField("q", 0, 1),
BitField("f", 0, 1),
BitField("rsvd1", 0, 5),
BitField("rsvd2", 0, 10),
BitField("hw_id", 0, 6),
BitField("queue_occupancy", 0, 24),
BitField("queue_id", 0, 8),
"""
class IntFixedReport(Packet):
name = "IntFixedReport"
fields_desc = [
BitField("ver", 0, 4),
BitField("nproto", 0, 4),
BitField("d", 0, 1),
BitField("q", 0, 1),
BitField("f", 0, 1),
BitField("rsvd1", 0, 5),
BitField("rsvd2", 0, 10),
BitField("hw_id", 0, 6),
BitField("seq_num", 0, 32),
BitField("ingress_timestamp", 0, 32),
BitField("switch_id", 0, 32),
BitField("ingress_port_id", 0, 16),
BitField("egress_port_id", 0, 16)
]
class IntLocalReport(Packet):
name = "IntLocalReport"
fields_desc = [
BitField("queue_occupancy", 0, 24),
BitField("queue_id", 0, 8),
BitField("egress_timestamp", 0, 32)
]
class IntDropReport(Packet):
name = "IntDropReport"
fields_desc = [
BitField("queue_id", 0 , 8),
BitField("drop_reason", 0, 8),
BitField("_pad", 0, 16)
]
# Flow information structure
FlowInfo = namedtuple('FlowInfo', [
'src_ip', 'dst_ip', 'src_port', 'dst_port', 'ip_proto',
'flow_sink_time', 'num_int_hop', 'seq_num', 'switch_id',
'ingress_timestamp', 'ingress_port_id', 'egress_port_id', 'queue_id',
'queue_occupancy', 'egress_timestamp', 'is_drop', 'drop_reason',
'hop_latency'
])
......@@ -16,24 +16,25 @@ import json
import time
import logging
import threading
from typing import Any, Dict, Tuple
from datetime import datetime, timezone
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from typing import Any, Dict, Tuple
from datetime import datetime, timezone
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import KafkaError
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.method_wrappers.Decorator import MetricsPool
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.Settings import get_service_port_grpc
from common.method_wrappers.Decorator import MetricsPool
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.context_queries.Device import get_device
from common.proto.kpi_manager_pb2 import KpiId
from common.tools.context_queries.Device import get_device
from common.proto.kpi_manager_pb2 import KpiId
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from context.client.ContextClient import ContextClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from context.client.ContextClient import ContextClient
from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector
from telemetry.backend.collectors.intcollector.INTCollector import INTCollector
LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService')
class TelemetryBackendService(GenericGrpcService):
......@@ -41,16 +42,17 @@ class TelemetryBackendService(GenericGrpcService):
Class listens for request on Kafka topic, fetches requested metrics from device.
Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics.
"""
def __init__(self, cls_name : str = __name__) -> None:
def __init__(self, cls_name: str = __name__) -> None:
LOGGER.info('Init TelemetryBackendService')
port = get_service_port_grpc(ServiceNameEnum.TELEMETRYBACKEND)
super().__init__(port, cls_name=cls_name)
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'backend',
'auto.offset.reset' : 'latest'})
self.collector = None
self.context_client = ContextClient()
self.kafka_producer = KafkaProducer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers': KafkaConfig.get_kafka_address(),
'group.id': 'backend',
'auto.offset.reset': 'latest'})
self.collector = None
self.context_client = ContextClient()
self.kpi_manager_client = KpiManagerClient()
self.active_jobs = {}
......@@ -73,12 +75,13 @@ class TelemetryBackendService(GenericGrpcService):
if receive_msg.error().code() == KafkaError._PARTITION_EOF:
continue
elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.")
LOGGER.warning(
f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.")
continue
else:
LOGGER.error("Consumer error: {}".format(receive_msg.error()))
break
try:
try:
collector = json.loads(
receive_msg.value().decode('utf-8')
)
......@@ -93,33 +96,39 @@ class TelemetryBackendService(GenericGrpcService):
if collector_id not in self.active_jobs:
stop_event = threading.Event()
self.active_jobs[collector_id] = stop_event
threading.Thread(target = self.CollectorHandler,
args=(
collector_id,
collector['kpi_id'],
duration,
collector['interval'],
stop_event
)).start()
threading.Thread(target=self.CollectorHandler,
args=(
collector_id,
collector['kpi_id'],
duration,
collector['interval'],
collector['interface'],
collector['transport_port'],
collector['service_id'],
collector['context_id'],
stop_event
)).start()
# Stop the Collector after the given duration
if duration > 0:
def stop_after_duration(completion_time, stop_event):
time.sleep(completion_time)
if not stop_event.is_set():
LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}")
LOGGER.warning(
f"Execution duration ({completion_time}) completed of Collector: {collector_id}")
self.TerminateCollector(collector_id)
duration_thread = threading.Thread(
target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}",
args=(duration, stop_event)
)
args=(duration, stop_event)
)
duration_thread.start()
else:
LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id))
except Exception as e:
LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
LOGGER.warning(
"Unable to consume message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e))
def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event):
def CollectorHandler(self, collector_id, kpi_id, duration, interval, interface , port , service_id , context_id , stop_event):
"""
Method to handle collector request.
"""
......@@ -133,48 +142,65 @@ class TelemetryBackendService(GenericGrpcService):
LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points))
subscription = [collector_id, end_points, duration, interval]
self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event)
elif device_type and "p4" in device_type:
LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points))
subscription = [collector_id, end_points, duration, interval]
self.INTCollectorHandler(subscription, duration, collector_id, kpi_id, interface, port, service_id , context_id ,stop_event)
else:
LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type))
def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event):
# EmulatedCollector
self.collector = EmulatedCollector(address="127.0.0.1", port=8000)
self.collector.Connect()
if not self.collector.SubscribeState(subscription):
LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id))
else:
while not stop_event.is_set():
samples = list(self.collector.GetState(duration=duration, blocking=True))
LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples))
self.GenerateKpiValue(collector_id, kpi_id, samples)
time.sleep(1)
# EmulatedCollector
self.collector = EmulatedCollector(address="127.0.0.1", port=8000)
self.collector.Connect()
if not self.collector.SubscribeState(subscription):
LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id))
else:
while not stop_event.is_set():
samples = list(self.collector.GetState(duration=duration, blocking=True))
LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples))
self.GenerateKpiValue(collector_id, kpi_id, samples)
time.sleep(1)
self.collector.Disconnect()
# self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration.
def INTCollectorHandler(self, subscription, duration, collector_id, kpi_id, interface, port, service_id, context_id , stop_event):
self.collector = INTCollector(
collector_id=collector_id ,
address="127.0.0.1",
interface=interface,
port=port,
kpi_id=kpi_id,
service_id=service_id,
context_id=context_id
)
self.collector.Connect()
if stop_event.is_set():
self.collector.Disconnect()
# self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration.
def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any):
"""
Method to write kpi value on VALUE Kafka topic
"""
producer = self.kafka_producer
kpi_value : Dict = {
kpi_value: Dict = {
"time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"kpi_id" : kpi_id,
"kpi_value" : measured_kpi_value
"kpi_id": kpi_id,
"kpi_value": measured_kpi_value
}
producer.produce(
KafkaTopic.VALUE.value,
key = collector_id,
value = json.dumps(kpi_value),
callback = self.delivery_callback
key=collector_id,
value=json.dumps(kpi_value),
callback=self.delivery_callback
)
producer.flush()
def TerminateCollector(self, job_id):
LOGGER.debug("Terminating collector backend...")
try:
if job_id not in self.active_jobs: # not job_ids:
# self.logger.warning(f"Active jobs: {self.active_jobs}")
if job_id not in self.active_jobs:
self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.")
else:
LOGGER.info(f"Terminating job: {job_id}")
......@@ -202,18 +228,21 @@ class TelemetryBackendService(GenericGrpcService):
LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...")
return (None, None)
device_id = kpi_descriptor.device_id.device_uuid.uuid
LOGGER.warning(f"kpi_descriptor {kpi_descriptor} ")
LOGGER.warning(f"kpi_descriptor.device_id.device_uuid.uuid {kpi_descriptor.device_id.device_uuid.uuid} ")
device_id = kpi_descriptor.device_id.device_uuid.uuid
endpoint_id = kpi_descriptor.endpoint_id.endpoint_uuid.uuid
device = get_device( context_client = self.context_client,
device_uuid = device_id,
include_config_rules = False,
include_components = False,
)
device = get_device(context_client=self.context_client,
device_uuid=device_id,
include_config_rules=False,
include_components=False,
)
if device:
for endpoint in device.device_endpoints:
if endpoint.endpoint_id.endpoint_uuid.uuid == endpoint_id:
endpoint_dict = {}
kpi_sample_types = []
endpoint_dict = {}
kpi_sample_types = []
endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_dict["name"] = endpoint.name
endpoint_dict["type"] = endpoint.endpoint_type
......@@ -222,10 +251,10 @@ class TelemetryBackendService(GenericGrpcService):
endpoint_dict["sample_types"] = kpi_sample_types
return (device.device_type, endpoint_dict)
LOGGER.warning(f"Device ID: {device_id} - Endpoint ID: {endpoint_id} - Not Found")
return (None, None)
def delivery_callback(self, err, msg):
if err:
if err:
LOGGER.error('Message delivery failed: {:s}'.format(str(err)))
......@@ -65,6 +65,9 @@ WORKDIR /var/teraflow
COPY src/telemetry/__init__.py telemetry/__init__.py
COPY src/telemetry/frontend/. telemetry/frontend/
COPY src/telemetry/database/. telemetry/database/
COPY src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py
COPY src/analytics/frontend/client/. analytics/frontend/client/
# Start the service
ENTRYPOINT ["python", "-m", "telemetry.frontend.service"]
......@@ -69,7 +69,11 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
collector_to_generate : Dict = {
"kpi_id" : collector_obj.kpi_id.kpi_id.uuid,
"duration": collector_obj.duration_s,
"interval": collector_obj.interval_s
"interval": collector_obj.interval_s,
"interface": collector_obj.int_collector.interface,
"transport_port": collector_obj.int_collector.transport_port,
"service_id": collector_obj.int_collector.service_id,
"context_id": collector_obj.int_collector.context_id
}
self.kafka_producer.produce(
KafkaTopic.TELEMETRY_REQUEST.value,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment