Commit 4997bc17 authored by Konstantinos Poulakakis's avatar Konstantinos Poulakakis
Browse files

MetalLb configuration for interface. Add Int Collector components for getting...

MetalLb configuration for interface. Add Int Collector components for getting and extract inband telemetry. Handle P4 packets on Telemetry Backend
parent fc051206
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -94,10 +94,16 @@ metadata:
  labels:
    app: telemetryservice
spec:
  type: ClusterIP
  type: LoadBalancer
  loadBalancerIP: 192.168.5.250
  externalTrafficPolicy: Local
  selector:
    app: telemetryservice
  ports:
    - name: dnsudp
      protocol: UDP
      port: 12345
      targetPort: 12345
    - name: grpc
      protocol: TCP
      port: 30050
+15 −7
Original line number Diff line number Diff line
@@ -35,6 +35,14 @@ message Collector {
   float              interval_s    = 4; // Interval between collected samples
   context.Timestamp  start_time    = 5; // Timestamp when Collector start execution
   context.Timestamp  end_time      = 6; // Timestamp when Collector stop execution
   INTCollector       int_collector = 7;
 }

message INTCollector {
    int32               transport_port  = 1; // The port where the collector listens to packets
    string              interface       = 2; // Network interface to collect data from
    string              service_id      = 3; // Service identifier related to this collector
    string              context_id      = 4; // Service identifier related to this collector
}

message CollectorFilter {
+5 −1
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ FROM python:3.9-slim

# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
    apt-get --yes --quiet --quiet install wget g++ git && \
    apt-get --yes --quiet --quiet install wget g++ git libpcap-dev && \
    rm -rf /var/lib/apt/lists/*

# Set Python to show logs as they occur
@@ -31,6 +31,7 @@ RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
RUN python3 -m pip install --upgrade scapy

# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
@@ -64,6 +65,9 @@ RUN python3 -m pip install -r requirements.txt
WORKDIR /var/teraflow
COPY src/telemetry/__init__.py telemetry/__init__.py
COPY src/telemetry/backend/. telemetry/backend/
COPY src/analytics/__init__.py analytics/__init__.py
COPY src/analytics/frontend/__init__.py analytics/frontend/__init__.py
COPY src/analytics/frontend/client/. analytics/frontend/client/

# Start the service
ENTRYPOINT ["python", "-m", "telemetry.backend.service"]
+238 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytz
import queue
import logging
from anytree import Node, Resolver
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
from typing import Any, Iterator, List, Tuple, Union, Optional
from telemetry.backend.collector_api._Collector import _Collector
# from .SyntheticMetricsGenerator import SyntheticMetricsGenerator

from scapy.all import *
import struct
import socket

from .INTCollectorCommon import IntDropReport, IntLocalReport, IntFixedReport, FlowInfo
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor
from confluent_kafka import Producer as KafkaProducer
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from uuid import uuid4
from typing import Any, Dict, Tuple
from datetime import datetime, timezone
import json

from kpi_manager.client.KpiManagerClient import KpiManagerClient
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId
from context.client.ContextClient import ContextClient
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from common.proto.context_pb2 import Service, ServiceId

import logging

class INTCollector(_Collector):
    """
    INTCollector is a class that simulates a network collector for testing purposes.
    It provides functionalities to manage configurations, state subscriptions, and synthetic data generation.
    """
    def __init__(self, address: str, interface: str, port: str, kpi_id: str, service_id: str, context_id: str, **settings):
        super().__init__('int_collector', address, port, **settings)
        self._out_samples    = queue.Queue()                # Queue to hold synthetic state samples
        # self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples)  # Placeholder for synthetic data generator
        self._scheduler      = BackgroundScheduler(daemon=True)
        self._scheduler.configure(
            jobstores = {'default': MemoryJobStore()},
            executors = {'default': ThreadPoolExecutor(max_workers=1)},
            timezone  = pytz.utc
        )
        self.kafka_producer = KafkaProducer({'bootstrap.servers': KafkaConfig.get_kafka_address()})
        self.interface    = interface
        self.kpi_manager_client = KpiManagerClient()
        self.analytics_frontend_client = AnalyticsFrontendClient()
        self.context_client = ContextClient()
        self.kpi_id    = kpi_id
        self.service_id= service_id
        self.context_id= context_id
        self.table = {}
        self.logger    = logging.getLogger(__name__)
        self.connected = False          # To track connection state
        self.logger.info("INTCollector initialized")

    def Connect(self) -> bool:
        self.logger.info(f"Connecting to {self.interface}:{self.port}")
        self.connected = True

        self._scheduler.add_job(self.start_listening, id=self.kpi_id ,args=[self.interface , self.port , self.service_id, self.context_id])

        self._scheduler.start()
        self.logger.info(f"Successfully connected to {self.interface}:{self.port}")
        return True

    def Disconnect(self) -> bool:
        self.logger.info(f"Disconnecting from {self.interface}:{self.port}")
        if not self.connected:
            self.logger.warning("Collector is not connected. Nothing to disconnect.")
            return False

        self._scheduler.remove_job(self.kpi_id)
        self._scheduler.shutdown()

        self.connected = False
        self.logger.info(f"Successfully disconnected from {self.interface}:{self.port}")
        return True

    def _require_connection(self):
        if not self.connected:
            raise RuntimeError("Collector is not connected. Please connect before performing operations.")

    def process_packet(self , packet , service_id , context_id):
        # Check for IP layer
        if IP not in packet:
            return None

        ip_layer = packet[IP]

        # Check for UDP
        if UDP not in ip_layer:
            return None

        udp_layer = ip_layer[UDP]

        # Get the INT report data (after UDP header)
        int_data = bytes(udp_layer.payload)

        # Parse fixed report (first 20 bytes)
        fixed_report = IntFixedReport(int_data[:50])
        remaining_data = int_data[50:]

        drop_report = None
        local_rep = None

        if fixed_report.d == 1:
            drop_report = IntDropReport(remaining_data[:4])
            remaining_data = remaining_data[4:]
        elif fixed_report.f == 1 or fixed_report.q == 1:
            local_rep = IntLocalReport(remaining_data[:8])
            remaining_data = remaining_data[8:]

        # Create flow info
        flow_info = FlowInfo(
            src_ip=socket.ntohl(struct.unpack('!I', socket.inet_aton(ip_layer.src))[0]),
            dst_ip=socket.ntohl(struct.unpack('!I', socket.inet_aton(ip_layer.dst))[0]),
            src_port=udp_layer.sport,
            dst_port=udp_layer.dport,
            ip_proto=ip_layer.proto,
            flow_sink_time=fixed_report.ingress_tstamp,
            num_INT_hop=1,
            seq_num=fixed_report.seq_no,
            switchID=fixed_report.switch_id,
            ingr_times=[fixed_report.ingress_tstamp],
            in_port_ids=[fixed_report.ingress_port_id],
            e_port_ids=[fixed_report.egress_port_id],
            queue_ids=[local_rep.queue_id] if local_rep else [],
            queue_occups=[local_rep.queue_occupancy] if local_rep else [],
            egr_times=[local_rep.egress_times] if local_rep else [],
            is_drop=1 if drop_report else 0,
            drop_reason=drop_report.drop_reason if drop_report else 0,
            message=fixed_report.message
        )

        self.create_descriptors_and_send_them_to_kafka(flow_info, service_id , context_id)

        return flow_info

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )


    def set_kpi_descriptor(self , kpi_uuid , service_id , device_id , endpoint_id ):
        kpi_descriptor = KpiDescriptor()
        kpi_descriptor.kpi_sample_type = 701
        kpi_descriptor.service_id.service_uuid.uuid = service_id
        # kpi_descriptor.device_id.device_uuid.uuid = device_id
        # kpi_descriptor.endpoint_id.endpoint_uuid.uuid = endpoint_id
        kpi_descriptor.kpi_id.kpi_id.uuid = kpi_uuid

        kpi_id: KpiId = self.kpi_manager_client.SetKpiDescriptor(kpi_descriptor)
        logging.info('kpi_id({:s})'.format(str(kpi_id)))

        return kpi_id

    def create_descriptors_and_send_them_to_kafka(self, flow_info , service_id , context_id):

        if(self.table.get(flow_info.switchID) == None):

            flow_sink_time_kpi_id = self.set_kpi_descriptor(str(uuid4()), service_id, '', '')
            seq_num_kpi_id = self.set_kpi_descriptor( str(uuid4()), service_id , '' , '')
            ingr_times_kpi_id = self.set_kpi_descriptor( str(uuid4()), service_id, '' , '')
            in_port_ids_kpi_id = self.set_kpi_descriptor( str(uuid4()), service_id,'', '')
            e_port_ids_kpi_id = self.set_kpi_descriptor( str(uuid4()), service_id, '', '')
            egr_times_kpi_id = self.set_kpi_descriptor( str(uuid4()), service_id, '', '')

            keys = [flow_sink_time_kpi_id.kpi_id.uuid, seq_num_kpi_id.kpi_id.uuid, ingr_times_kpi_id.kpi_id.uuid ,in_port_ids_kpi_id.kpi_id.uuid, e_port_ids_kpi_id.kpi_id.uuid ]
            values = [flow_info.flow_sink_time, flow_info.seq_num, flow_info.ingr_times, flow_info.in_port_ids, flow_info.e_port_ids]
            my_dictionary = {keys[i]: values[i] for i in range(len(keys))}

            self.table[flow_info.switchID] = my_dictionary

            for key, value in my_dictionary.items():
                self.send_message_to_kafka(key, value)
                print(f"Message with kpi_id: {key} was send to kafka!")

        else:
            values = [flow_info.flow_sink_time, flow_info.seq_num, flow_info.ingr_times, flow_info.in_port_ids, flow_info.e_port_ids]
            my_dictionary = self.table.get(flow_info.switchID)

            # Overwrite values using zip
            for key, new_value in zip(my_dictionary, values):
                my_dictionary[key] = new_value

            for key, value in my_dictionary.items():
                self.send_message_to_kafka(key, value)
                print(f"Message with kpi_id: {key} was send to kafka!")

    def send_message_to_kafka(self , kpi_id , measured_kpi_value):
        producer = self.kafka_producer
        kpi_value: Dict = {
            "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
            "kpi_id": kpi_id,
            "kpi_value": measured_kpi_value
        }
        producer.produce(
            KafkaTopic.VALUE.value,
            key="collector_id",
            value=json.dumps(kpi_value),
            callback=self.delivery_callback
        )
        producer.flush()

    # Example usage
    def packet_callback(self,packet,service_id,context_id):
        flow_info = self.process_packet(packet , service_id, context_id)
        if flow_info:
            logging.info(f"Flow info: {flow_info}")

    def start_listening(self, interface , port , service_id , context_id):
        logging.info(f"Listening on interface {interface}, port {port}")
        sniff(iface=interface, filter=f"udp port {port}", prn=lambda pkt: self.packet_callback(pkt, service_id , context_id))

    def delivery_callback(self, err, msg):
        if err:
            logging.error('Message delivery failed: {:s}'.format(str(err)))
 No newline at end of file
+46 −0
Original line number Diff line number Diff line
from scapy.all import Packet, BitField, ByteField, ShortField, IntField , StrFixedLenField
from collections import namedtuple

class IntFixedReport(Packet):
    name = "IntFixedReport"
    fields_desc = [
        BitField("ver", 0, 4),
        BitField("nproto", 0, 4),
        BitField("d", 0, 1),
        BitField("q", 0, 1),
        BitField("f", 0, 1),
        BitField("reserved", 0, 5),
        ShortField("hw_id", 0),
        IntField("seq_no", 0),
        IntField("ingress_tstamp", 0),
        IntField("switch_id", 0),
        ShortField("ingress_port_id", 0),
        ShortField("egress_port_id", 0),
        StrFixedLenField("message", b"DefaultMsg", 16),
    ]

class IntLocalReport(Packet):
    name = "IntLocalReport"
    fields_desc = [
        BitField("queue_id", 0, 8),
        BitField("queue_occupancy", 0, 24),
        IntField("egress_tstamp", 0),
    ]

class IntDropReport(Packet):
    name = "IntDropReport"
    fields_desc = [
        ByteField("queue_id", 0),
        ByteField("drop_reason", 0),
        ShortField("_pad", 0),  # Padding to match structure size
    ]



# Flow information structure
FlowInfo = namedtuple('FlowInfo', [
    'src_ip', 'dst_ip', 'src_port', 'dst_port', 'ip_proto',
    'flow_sink_time', 'num_INT_hop', 'seq_num', 'switchID',
    'ingr_times', 'in_port_ids', 'e_port_ids', 'queue_ids',
    'queue_occups', 'egr_times', 'is_drop', 'drop_reason','message'
])
Loading