from __future__ import print_function import argparse import sys import time import os import grpc import logging import grpc, logging from l3_distributedattackdetector.proto.l3_centralizedattackdetector_pb2 import ( ModelInput, ) from l3_distributedattackdetector.proto.l3_centralizedattackdetector_pb2_grpc import ( L3CentralizedattackdetectorStub, ) LOGGER = logging.getLogger(__name__) TSTAT_DIR_NAME = "piped/" class l3_distributedattackdetectorServiceServicerImpl(): def __init__(self): LOGGER.debug("Creating Servicer...") JSON_BLANK = { "ip_o": "", # Client IP "port_o": "", # Client port "ip_d": "", # Server ip "port_d": "", # Server port "flow_id": "", # Identifier:c_ip,c_port,s_ip,s_port,time_start "protocol": "", # Connection protocol "time_start": 0, # Start of connection "time_end": 0, # Time of last packet } def follow(self, thefile, time_sleep): """ Generator function that yields new lines in a file It reads the logfie (the opened file) """ # seek the end of the file thefile.seek(0, os.SEEK_END) trozo = "" # start infinite loop while True: # read last line of file line = thefile.readline() # sleep if file hasn't been updated if not line: time.sleep(time_sleep) # FIXME continue if line[-1] != "\n": trozo += line # print ("OJO :"+line+":") else: if trozo != "": line = trozo + line trozo = "" yield line def load_file(self, dirname=TSTAT_DIR_NAME): """ - Client side - """ # "/home/dapi/Tstat/TOSHI/tstat/tstat_DRv4/tstat/piped/" while True: here = os.path.dirname(os.path.abspath(__file__)) tstat_piped = os.path.join(here, dirname) tstat_dirs = os.listdir(tstat_piped) if len(tstat_dirs) > 0: tstat_dirs.sort() new_dir = tstat_dirs[-1] print(new_dir) # print("dir: {0}".format(new_dir)) tstat_file = tstat_piped + new_dir + "/log_tcp_temp_complete" print("tstat_file: {0}".format(tstat_file)) return tstat_file else: print("No tstat directory!") time.sleep(1) def process_line(self, line): """ - Preprocessing before a message per line - Avoids crash when nan are found by generating a 0s array - Returns a list of values """ def makeDivision(i, j): """ Helper function """ return i / j if (j and type(i) != str and type(j) != str) else 0 line = line.split(" ") try: n_packets_server, n_packets_client = float(line[16]), float(line[2]) except: return [0 for i in range(9)] n_bits_server, n_bits_client = float(line[22]), float(line[8]) seconds = float(line[30]) / 1e6 # Duration in ms values = [ makeDivision(n_packets_server, seconds), makeDivision(n_packets_client, seconds), makeDivision(n_bits_server, seconds), makeDivision(n_bits_client, seconds), makeDivision(n_bits_server, n_packets_server), makeDivision(n_bits_client, n_packets_client), makeDivision(n_packets_server, n_packets_client), makeDivision(n_bits_server, n_bits_client), ] return values def open_channel(self, input_information): with grpc.insecure_channel("localhost:10001") as channel: stub = L3CentralizedattackdetectorStub(channel) response = stub.SendInput( ModelInput(**input_information)) logging.debug("Inferencer send_input sent and received: ", response.message) # response = stub.get_output(Inferencer_pb2.empty(message="")) # print("Inferencer get_output response: \n", response) def run(self, time_sleep, max_lines): filename = self.load_file() write_salida = None print( "following: ", filename, " time to wait:", time_sleep, "lineas_tope:", max_lines, "write salida:", write_salida, ) logfile = open(filename, "r") loglines = self.follow(logfile, time_sleep) # iterate over the generator lin = 0 ultima_lin = 0 last_line = "" cryptos = 0 new_connections = {} # Dict for storing NEW data connections_db = {} # Dict for storing ALL data print('Reading lines') for line in loglines: print('Received Line') start = time.time() line_id = line.split(" ") conn_id = (line_id[0], line_id[1], line_id[14], line_id[15]) new_connections[conn_id] = self.process_line(line) try: connections_db[conn_id]["time_end"] = time.time() except KeyError: connections_db[conn_id] = self.JSON_BLANK.copy() connections_db[conn_id]["time_start"] = time.time() connections_db[conn_id]["time_end"] = time.time() connections_db[conn_id]["ip_o"] = conn_id[0] connections_db[conn_id]["port_o"] = conn_id[1] connections_db[conn_id]["flow_id"] = "".join(conn_id) connections_db[conn_id]["protocol"] = "TCP" connections_db[conn_id]["ip_d"] = conn_id[2] connections_db[conn_id]["port_d"] = conn_id[3] # CRAFT DICT inference_information = { "n_packets_server_seconds": new_connections[conn_id][0], "n_packets_client_seconds": new_connections[conn_id][1], "n_bits_server_seconds": new_connections[conn_id][2], "n_bits_client_seconds": new_connections[conn_id][3], "n_bits_server_n_packets_server": new_connections[conn_id][4], "n_bits_client_n_packets_client": new_connections[conn_id][5], "n_packets_server_n_packets_client": new_connections[conn_id][6], "n_bits_server_n_bits_client": new_connections[conn_id][7], "ip_o": connections_db[conn_id]["ip_o"], "port_o": connections_db[conn_id]["port_o"], "ip_d": connections_db[conn_id]["ip_d"], "port_d": connections_db[conn_id]["port_d"], "flow_id": connections_db[conn_id]["flow_id"], "protocol": connections_db[conn_id]["protocol"], "time_start": connections_db[conn_id]["time_start"], "time_end": connections_db[conn_id]["time_end"], } # SEND MSG try: self.open_channel(inference_information) except: print("Centralized Attack Mitigator is not up") if write_salida: print(line, end="") sys.stdout.flush() lin += 1 if lin >= max_lines: break elif lin == 1: print("primera:", ultima_lin) end = time.time() - start print(end) def setupl3_distributedattackdetector(self): logging.basicConfig() l3_distributedattackdetectorServiceServicerImpl().run(5, 70) self.serve()