Newer
Older
from __future__ import print_function
import argparse
import sys
import time
import os
import grpc
import logging
import grpc, logging
from l3_distributedattackdetector.proto.l3_centralizedattackdetector_pb2 import (
)
from l3_distributedattackdetector.proto.l3_centralizedattackdetector_pb2_grpc import (
)
LOGGER = logging.getLogger(__name__)
TSTAT_DIR_NAME = "piped/"
class l3_distributedattackdetectorServiceServicerImpl():
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
LOGGER.debug("Creating Servicer...")
JSON_BLANK = {
"ip_o": "", # Client IP
"port_o": "", # Client port
"ip_d": "", # Server ip
"port_d": "", # Server port
"flow_id": "", # Identifier:c_ip,c_port,s_ip,s_port,time_start
"protocol": "", # Connection protocol
"time_start": 0, # Start of connection
"time_end": 0, # Time of last packet
}
def follow(self, thefile, time_sleep):
"""
Generator function that yields new lines in a file
It reads the logfie (the opened file)
"""
# seek the end of the file
thefile.seek(0, os.SEEK_END)
trozo = ""
# start infinite loop
while True:
# read last line of file
line = thefile.readline()
# sleep if file hasn't been updated
if not line:
time.sleep(time_sleep) # FIXME
continue
if line[-1] != "\n":
trozo += line
# print ("OJO :"+line+":")
else:
if trozo != "":
line = trozo + line
trozo = ""
yield line
def load_file(self, dirname=TSTAT_DIR_NAME):
"""
- Client side -
"""
# "/home/dapi/Tstat/TOSHI/tstat/tstat_DRv4/tstat/piped/"
while True:
here = os.path.dirname(os.path.abspath(__file__))
tstat_piped = os.path.join(here, dirname)
tstat_dirs = os.listdir(tstat_piped)
if len(tstat_dirs) > 0:
tstat_dirs.sort()
new_dir = tstat_dirs[-1]
print(new_dir)
# print("dir: {0}".format(new_dir))
tstat_file = tstat_piped + new_dir + "/log_tcp_temp_complete"
print("tstat_file: {0}".format(tstat_file))
return tstat_file
else:
print("No tstat directory!")
time.sleep(1)
def process_line(self, line):
"""
- Preprocessing before a message per line
- Avoids crash when nan are found by generating a 0s array
- Returns a list of values
"""
def makeDivision(i, j):
"""
Helper function
"""
return i / j if (j and type(i) != str and type(j) != str) else 0
line = line.split(" ")
try:
n_packets_server, n_packets_client = float(line[16]), float(line[2])
except:
return [0 for i in range(9)]
n_bits_server, n_bits_client = float(line[22]), float(line[8])
seconds = float(line[30]) / 1e6 # Duration in ms
values = [
makeDivision(n_packets_server, seconds),
makeDivision(n_packets_client, seconds),
makeDivision(n_bits_server, seconds),
makeDivision(n_bits_client, seconds),
makeDivision(n_bits_server, n_packets_server),
makeDivision(n_bits_client, n_packets_client),
makeDivision(n_packets_server, n_packets_client),
makeDivision(n_bits_server, n_bits_client),
]
return values
def open_channel(self, input_information):
with grpc.insecure_channel("localhost:10001") as channel:
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
logging.debug("Inferencer send_input sent and received: ",
response.message)
# response = stub.get_output(Inferencer_pb2.empty(message=""))
# print("Inferencer get_output response: \n", response)
def run(self, time_sleep, max_lines):
filename = self.load_file()
write_salida = None
print(
"following: ",
filename,
" time to wait:",
time_sleep,
"lineas_tope:",
max_lines,
"write salida:",
write_salida,
)
logfile = open(filename, "r")
loglines = self.follow(logfile, time_sleep) # iterate over the generator
lin = 0
ultima_lin = 0
last_line = ""
cryptos = 0
new_connections = {} # Dict for storing NEW data
connections_db = {} # Dict for storing ALL data
print('Reading lines')
for line in loglines:
print('Received Line')
start = time.time()
line_id = line.split(" ")
conn_id = (line_id[0], line_id[1], line_id[14], line_id[15])
new_connections[conn_id] = self.process_line(line)
try:
connections_db[conn_id]["time_end"] = time.time()
except KeyError:
connections_db[conn_id] = self.JSON_BLANK.copy()
connections_db[conn_id]["time_start"] = time.time()
connections_db[conn_id]["time_end"] = time.time()
connections_db[conn_id]["ip_o"] = conn_id[0]
connections_db[conn_id]["port_o"] = conn_id[1]
connections_db[conn_id]["flow_id"] = "".join(conn_id)
connections_db[conn_id]["protocol"] = "TCP"
connections_db[conn_id]["ip_d"] = conn_id[2]
connections_db[conn_id]["port_d"] = conn_id[3]
# CRAFT DICT
inference_information = {
"n_packets_server_seconds": new_connections[conn_id][0],
"n_packets_client_seconds": new_connections[conn_id][1],
"n_bits_server_seconds": new_connections[conn_id][2],
"n_bits_client_seconds": new_connections[conn_id][3],
"n_bits_server_n_packets_server": new_connections[conn_id][4],
"n_bits_client_n_packets_client": new_connections[conn_id][5],
"n_packets_server_n_packets_client": new_connections[conn_id][6],
"n_bits_server_n_bits_client": new_connections[conn_id][7],
"ip_o": connections_db[conn_id]["ip_o"],
"port_o": connections_db[conn_id]["port_o"],
"ip_d": connections_db[conn_id]["ip_d"],
"port_d": connections_db[conn_id]["port_d"],
"flow_id": connections_db[conn_id]["flow_id"],
"protocol": connections_db[conn_id]["protocol"],
"time_start": connections_db[conn_id]["time_start"],
"time_end": connections_db[conn_id]["time_end"],
}
# SEND MSG
try:
self.open_channel(inference_information)
except:
print("Centralized Attack Mitigator is not up")
if write_salida:
print(line, end="")
sys.stdout.flush()
lin += 1
if lin >= max_lines:
break
elif lin == 1:
print("primera:", ultima_lin)
end = time.time() - start
print(end)
l3_distributedattackdetectorServiceServicerImpl().run(5, 70)