Skip to content
Snippets Groups Projects
l3_distributedattackdetectorServiceServicerImpl.py 7.56 KiB
Newer Older
  • Learn to ignore specific revisions
  • ldemarcosm's avatar
    ldemarcosm committed
    from __future__ import print_function
    import argparse
    import sys
    import time
    import os
    import grpc
    import logging
    import grpc, logging
    from l3_distributedattackdetector.proto.l3_centralizedattackdetector_pb2 import (
    
    ldemarcosm's avatar
    ldemarcosm committed
        ModelInput,
    
    ldemarcosm's avatar
    ldemarcosm committed
    )
    from l3_distributedattackdetector.proto.l3_centralizedattackdetector_pb2_grpc import (
    
    ldemarcosm's avatar
    ldemarcosm committed
        L3CentralizedattackdetectorStub,
    
    ldemarcosm's avatar
    ldemarcosm committed
    )
    
    LOGGER = logging.getLogger(__name__)
    TSTAT_DIR_NAME = "piped/"
    
    
    ldemarcosm's avatar
    ldemarcosm committed
    class l3_distributedattackdetectorServiceServicerImpl():
    
    ldemarcosm's avatar
    ldemarcosm committed
    
    
    ldemarcosm's avatar
    ldemarcosm committed
        def __init__(self):
    
    ldemarcosm's avatar
    ldemarcosm committed
            LOGGER.debug("Creating Servicer...")
    
        JSON_BLANK = {
            "ip_o": "",  # Client IP
            "port_o": "",  # Client port
            "ip_d": "",  # Server ip
            "port_d": "",  # Server port
            "flow_id": "",  # Identifier:c_ip,c_port,s_ip,s_port,time_start
            "protocol": "",  # Connection protocol
            "time_start": 0,  # Start of connection
            "time_end": 0,  # Time of last packet
        }
    
        def follow(self, thefile, time_sleep):
            """
            Generator function that yields new lines in a file
            It reads the logfie (the opened file)
            """
            # seek the end of the file
            thefile.seek(0, os.SEEK_END)
    
            trozo = ""
            # start infinite loop
            while True:
                # read last line of file
                line = thefile.readline()
                # sleep if file hasn't been updated
                if not line:
                    time.sleep(time_sleep)  # FIXME
                    continue
    
                if line[-1] != "\n":
                    trozo += line
                    # print ("OJO :"+line+":")
                else:
                    if trozo != "":
                        line = trozo + line
                        trozo = ""
                    yield line
    
        def load_file(self, dirname=TSTAT_DIR_NAME):
            """
            - Client side -
            """
            # "/home/dapi/Tstat/TOSHI/tstat/tstat_DRv4/tstat/piped/"
            
            while True:
                here = os.path.dirname(os.path.abspath(__file__))
                tstat_piped = os.path.join(here, dirname)
                tstat_dirs = os.listdir(tstat_piped)
                if len(tstat_dirs) > 0:
                    tstat_dirs.sort()
                    new_dir = tstat_dirs[-1]
                    print(new_dir)
                    # print("dir: {0}".format(new_dir))
                    tstat_file = tstat_piped + new_dir + "/log_tcp_temp_complete"
                    print("tstat_file: {0}".format(tstat_file))
                    return tstat_file
                else:
                    print("No tstat directory!")
                    time.sleep(1)
    
        def process_line(self, line):
            """
            - Preprocessing before a message per line
            - Avoids crash when nan are found by generating a 0s array
            - Returns a list of values
            """
    
            def makeDivision(i, j):
                """
                Helper function
                """
                return i / j if (j and type(i) != str and type(j) != str) else 0
    
            line = line.split(" ")
            try:
                n_packets_server, n_packets_client = float(line[16]), float(line[2])
            except:
                return [0 for i in range(9)]
            n_bits_server, n_bits_client = float(line[22]), float(line[8])
            seconds = float(line[30]) / 1e6  # Duration in ms
            values = [
                makeDivision(n_packets_server, seconds),
                makeDivision(n_packets_client, seconds),
                makeDivision(n_bits_server, seconds),
                makeDivision(n_bits_client, seconds),
                makeDivision(n_bits_server, n_packets_server),
                makeDivision(n_bits_client, n_packets_client),
                makeDivision(n_packets_server, n_packets_client),
                makeDivision(n_bits_server, n_bits_client),
            ]
            return values
    
        def open_channel(self, input_information):
            with grpc.insecure_channel("localhost:10001") as channel:
    
    ldemarcosm's avatar
    ldemarcosm committed
                stub = L3CentralizedattackdetectorStub(channel)
    
                response = stub.SendInput(
    
    ldemarcosm's avatar
    ldemarcosm committed
                    ModelInput(**input_information))
    
    ldemarcosm's avatar
    ldemarcosm committed
                logging.debug("Inferencer send_input sent and received: ",
                            response.message)
                # response = stub.get_output(Inferencer_pb2.empty(message=""))
                # print("Inferencer get_output response: \n", response)
    
    
        def run(self, time_sleep, max_lines):
    
            filename = self.load_file()
            write_salida = None
            print(
                "following: ",
                filename,
                " time to wait:",
                time_sleep,
                "lineas_tope:",
                max_lines,
                "write salida:",
                write_salida,
            )
            logfile = open(filename, "r")
            loglines = self.follow(logfile, time_sleep)  # iterate over the generator
            lin = 0
            ultima_lin = 0
            last_line = ""
            cryptos = 0
            new_connections = {}  # Dict for storing NEW data
            connections_db = {}  # Dict for storing ALL data
            print('Reading lines')
            for line in loglines:
                print('Received Line')
                start = time.time()
                line_id = line.split(" ")
                conn_id = (line_id[0], line_id[1], line_id[14], line_id[15])
                new_connections[conn_id] = self.process_line(line)
                try:
                    connections_db[conn_id]["time_end"] = time.time()
                except KeyError:
                    connections_db[conn_id] = self.JSON_BLANK.copy()
                    connections_db[conn_id]["time_start"] = time.time()
                    connections_db[conn_id]["time_end"] = time.time()
                    connections_db[conn_id]["ip_o"] = conn_id[0]
                    connections_db[conn_id]["port_o"] = conn_id[1]
                    connections_db[conn_id]["flow_id"] = "".join(conn_id)
                    connections_db[conn_id]["protocol"] = "TCP"
                    connections_db[conn_id]["ip_d"] = conn_id[2]
                    connections_db[conn_id]["port_d"] = conn_id[3]
    
                # CRAFT DICT
                inference_information = {
                    "n_packets_server_seconds": new_connections[conn_id][0],
                    "n_packets_client_seconds": new_connections[conn_id][1],
                    "n_bits_server_seconds": new_connections[conn_id][2],
                    "n_bits_client_seconds": new_connections[conn_id][3],
                    "n_bits_server_n_packets_server": new_connections[conn_id][4],
                    "n_bits_client_n_packets_client": new_connections[conn_id][5],
                    "n_packets_server_n_packets_client": new_connections[conn_id][6],
                    "n_bits_server_n_bits_client": new_connections[conn_id][7],
                    "ip_o": connections_db[conn_id]["ip_o"],
                    "port_o": connections_db[conn_id]["port_o"],
                    "ip_d": connections_db[conn_id]["ip_d"],
                    "port_d": connections_db[conn_id]["port_d"],
                    "flow_id": connections_db[conn_id]["flow_id"],
                    "protocol": connections_db[conn_id]["protocol"],
                    "time_start": connections_db[conn_id]["time_start"],
                    "time_end": connections_db[conn_id]["time_end"],
                }
    
                # SEND MSG
                try:
                    self.open_channel(inference_information)
                except:
                    print("Centralized Attack Mitigator is not up")
    
                if write_salida:
                    print(line, end="")
                    sys.stdout.flush()
                lin += 1
                if lin >= max_lines:
                    break
                elif lin == 1:
                    print("primera:", ultima_lin)
    
                end = time.time() - start
                print(end)
    
    
    
    ldemarcosm's avatar
    ldemarcosm committed
        def setupl3_distributedattackdetector(self):
    
    ldemarcosm's avatar
    ldemarcosm committed
            logging.basicConfig()
    
    ldemarcosm's avatar
    ldemarcosm committed
            l3_distributedattackdetectorServiceServicerImpl().run(5, 70)