Skip to content
Snippets Groups Projects
Commit ef2960a1 authored by karamchandan's avatar karamchandan
Browse files

Cleanup of the DAD service implementation

parent bc1d7a58
No related branches found
No related tags found
2 merge requests!142Release TeraFlowSDN 2.1,!141Pre-release fixes and code cleanup in L3 cybersecurity components
...@@ -66,7 +66,7 @@ BATCH_SIZE = 10 ...@@ -66,7 +66,7 @@ BATCH_SIZE = 10
class l3_distributedattackdetector: class l3_distributedattackdetector:
def __init__(self): def __init__(self):
LOGGER.info("Creating Distributed Attack Detector") LOGGER.info("Creating Distributed Attack Detector")
self.feature_ids = [] self.feature_ids = []
self.cad_features = {} self.cad_features = {}
...@@ -76,26 +76,26 @@ class l3_distributedattackdetector: ...@@ -76,26 +76,26 @@ class l3_distributedattackdetector:
self.new_connections = {} # Dictionary for storing new connections data self.new_connections = {} # Dictionary for storing new connections data
self.known_attack_ips = self.read_kwnown_attack_ips() self.known_attack_ips = self.read_kwnown_attack_ips()
signal.signal(signal.SIGINT, self.handler) signal.signal(signal.SIGINT, self.handler)
with grpc.insecure_channel(CENTRALIZED_ATTACK_DETECTOR) as channel: with grpc.insecure_channel(CENTRALIZED_ATTACK_DETECTOR) as channel:
self.cad = L3CentralizedattackdetectorStub(channel) self.cad = L3CentralizedattackdetectorStub(channel)
LOGGER.info("Connected to the Centralized Attack Detector") LOGGER.info("Connected to the Centralized Attack Detector")
LOGGER.info("Obtaining features Ids. from the Centralized Attack Detector...") LOGGER.info("Obtaining features Ids. from the Centralized Attack Detector...")
self.feature_ids = self.get_features_ids() self.feature_ids = self.get_features_ids()
LOGGER.info("Features Ids.: {:s}".format(str(self.feature_ids))) LOGGER.info("Features Ids.: {:s}".format(str(self.feature_ids)))
asyncio.run(self.process_traffic()) asyncio.run(self.process_traffic())
def read_kwnown_attack_ips(self): def read_kwnown_attack_ips(self):
known_attack_ips = [] known_attack_ips = []
# open known attack ips csv file # open known attack ips csv file
with open("known_attack_ips.csv", "r") as f: with open("known_attack_ips.csv", "r") as f:
known_attack_ips = f.read().split(",") known_attack_ips = f.read().split(",")
return known_attack_ips return known_attack_ips
def handler(self): def handler(self):
...@@ -140,14 +140,14 @@ class l3_distributedattackdetector: ...@@ -140,14 +140,14 @@ class l3_distributedattackdetector:
here = os.path.dirname(os.path.abspath(__file__)) here = os.path.dirname(os.path.abspath(__file__))
tstat_piped = os.path.join(here, dirname) tstat_piped = os.path.join(here, dirname)
tstat_dirs = os.listdir(tstat_piped) tstat_dirs = os.listdir(tstat_piped)
if len(tstat_dirs) > 0: if len(tstat_dirs) > 0:
tstat_dirs.sort() tstat_dirs.sort()
new_dir = tstat_dirs[-1] new_dir = tstat_dirs[-1]
tstat_file = tstat_piped + new_dir + "/log_tcp_temp_complete" tstat_file = tstat_piped + new_dir + "/log_tcp_temp_complete"
LOGGER.info("Following: {:s}".format(str(tstat_file))) LOGGER.info("Following: {:s}".format(str(tstat_file)))
return tstat_file return tstat_file
else: else:
LOGGER.info("No Tstat directory found. Waiting...") LOGGER.info("No Tstat directory found. Waiting...")
...@@ -177,7 +177,7 @@ class l3_distributedattackdetector: ...@@ -177,7 +177,7 @@ class l3_distributedattackdetector:
stub = ContextServiceStub(channel) stub = ContextServiceStub(channel)
context_id = ContextId() context_id = ContextId()
context_id.context_uuid.uuid = context_id_str context_id.context_uuid.uuid = context_id_str
return stub.ListServiceIds(context_id) return stub.ListServiceIds(context_id)
def get_services(self, context_id_str): def get_services(self, context_id_str):
...@@ -185,31 +185,31 @@ class l3_distributedattackdetector: ...@@ -185,31 +185,31 @@ class l3_distributedattackdetector:
stub = ContextServiceStub(channel) stub = ContextServiceStub(channel)
context_id = ContextId() context_id = ContextId()
context_id.context_uuid.uuid = context_id_str context_id.context_uuid.uuid = context_id_str
return stub.ListServices(context_id) return stub.ListServices(context_id)
def get_service_id(self, context_id): def get_service_id(self, context_id):
service_list = self.get_services(context_id) service_list = self.get_services(context_id)
service_id = None service_id = None
for s in service_list.services: for s in service_list.services:
if s.service_type == ServiceTypeEnum.SERVICETYPE_L3NM: if s.service_type == ServiceTypeEnum.SERVICETYPE_L3NM:
service_id = s.service_id service_id = s.service_id
break break
else: else:
pass pass
return service_id return service_id
def get_endpoint_id(self, context_id): def get_endpoint_id(self, context_id):
service_list = self.get_services(context_id) service_list = self.get_services(context_id)
endpoint_id = None endpoint_id = None
for s in service_list.services: for s in service_list.services:
if s.service_type == ServiceTypeEnum.SERVICETYPE_L3NM: if s.service_type == ServiceTypeEnum.SERVICETYPE_L3NM:
endpoint_id = s.service_endpoint_ids[0] endpoint_id = s.service_endpoint_ids[0]
break break
return endpoint_id return endpoint_id
def get_features_ids(self): def get_features_ids(self):
...@@ -246,7 +246,7 @@ class l3_distributedattackdetector: ...@@ -246,7 +246,7 @@ class l3_distributedattackdetector:
def check_if_connection_is_attack(self): def check_if_connection_is_attack(self):
if self.conn_id[0] in self.known_attack_ips or self.conn_id[2] in self.known_attack_ips: if self.conn_id[0] in self.known_attack_ips or self.conn_id[2] in self.known_attack_ips:
LOGGER.info("Attack detected. Origin: {0}, destination: {1}".format(self.conn_id[0], self.conn_id[2])) LOGGER.info("Attack detected. Origin IP address: {0}, destination IP address: {1}".format(self.conn_id[0], self.conn_id[2]))
def create_cad_features(self): def create_cad_features(self):
self.cad_features = { self.cad_features = {
...@@ -323,20 +323,21 @@ class l3_distributedattackdetector: ...@@ -323,20 +323,21 @@ class l3_distributedattackdetector:
while line is None: while line is None:
LOGGER.info("Waiting for new data...") LOGGER.info("Waiting for new data...")
time.sleep(1 / 100) time.sleep(1 / 100)
line = next(loglines, None) line = next(loglines, None)
if index == 0 and IGNORE_FIRST_LINE_TSTAT: if index == 0 and IGNORE_FIRST_LINE_TSTAT:
index = index + 1 index = index + 1
continue continue
if STOP: if STOP:
break break
num_lines += 1 num_lines += 1
start = time.time() start = time.time()
line_id = line.split(" ") line_id = line.split(" ")
self.conn_id = (line_id[0], line_id[1], line_id[14], line_id[15]) self.conn_id = (line_id[0], line_id[1], line_id[14], line_id[15])
self.new_connections[self.conn_id] = self.process_line(line) self.new_connections[self.conn_id] = self.process_line(line)
...@@ -358,7 +359,7 @@ class l3_distributedattackdetector: ...@@ -358,7 +359,7 @@ class l3_distributedattackdetector:
metrics_list_pb, send_data_times = await self.send_data(metrics_list_pb, send_data_times) metrics_list_pb, send_data_times = await self.send_data(metrics_list_pb, send_data_times)
index = index + 1 index += 1
process_time.append(time.time() - start) process_time.append(time.time() - start)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment