diff --git a/src/l3_distributedattackdetector/service/known_attack_ips.csv b/src/l3_distributedattackdetector/service/known_attack_ips.csv new file mode 100644 index 0000000000000000000000000000000000000000..254dad83da9342581186335ca63c6ea1e8b2a219 --- /dev/null +++ b/src/l3_distributedattackdetector/service/known_attack_ips.csv @@ -0,0 +1 @@ +37.187.95.110,91.121.140.167,94.23.23.52,94.23.247.226,149.202.83.171 \ No newline at end of file diff --git a/src/l3_distributedattackdetector/service/l3_distributedattackdetector.py b/src/l3_distributedattackdetector/service/l3_distributedattackdetector.py index 357f44a9ab2037438252fb0ca40b1a7dc3c74c54..32c7c190c4296926b3fbdc14a75908554fee81ea 100644 --- a/src/l3_distributedattackdetector/service/l3_distributedattackdetector.py +++ b/src/l3_distributedattackdetector/service/l3_distributedattackdetector.py @@ -13,28 +13,26 @@ # limitations under the License. import asyncio -import grpc import logging -import numpy as np import os import signal import time from sys import stdout -from common.proto.context_pb2 import ( - Empty, - ServiceTypeEnum, - ContextId, -) + +import grpc +import numpy as np + +from common.proto.context_pb2 import ContextId, Empty, ServiceTypeEnum from common.proto.context_pb2_grpc import ContextServiceStub from common.proto.l3_centralizedattackdetector_pb2 import ( - L3CentralizedattackdetectorMetrics, - L3CentralizedattackdetectorBatchInput, ConnectionMetadata, Feature, + L3CentralizedattackdetectorBatchInput, + L3CentralizedattackdetectorMetrics, ) from common.proto.l3_centralizedattackdetector_pb2_grpc import L3CentralizedattackdetectorStub -# Setup LOGGER +# Setup LOGGER LOGGER = logging.getLogger("dad_LOGGER") LOGGER.setLevel(logging.INFO) logFormatter = logging.Formatter(fmt="%(levelname)-8s %(message)s") @@ -42,8 +40,12 @@ consoleHandler = logging.StreamHandler(stdout) consoleHandler.setFormatter(logFormatter) LOGGER.addHandler(consoleHandler) +# Define constants TSTAT_DIR_NAME = "piped/" -CENTRALIZED_ATTACK_DETECTOR = "192.168.165.78:10001" +CONTROLLER_IP = "192.168.165.78" # Change this to the IP of the controller +CONTEXT_ID = "admin" # Change this to the context ID to be used +CONTEXT_CHANNEL = f"{CONTROLLER_IP}:1010" # Change this to the IP of the controller +CENTRALIZED_ATTACK_DETECTOR = f"{CONTROLLER_IP}:10001" JSON_BLANK = { "ip_o": "", # Client IP "port_o": "", # Client port @@ -54,31 +56,29 @@ JSON_BLANK = { "time_start": 0.0, # Start of connection "time_end": 0.0, # Time of last packet } - STOP = False IGNORE_FIRST_LINE_TSTAT = True - -CONTEXT_ID = "admin" -CONTEXT_CHANNEL = "192.168.165.78:1010" PROFILING = False SEND_DATA_IN_BATCHES = False BATCH_SIZE = 10 -ATTACK_IPS = ["37.187.95.110", "91.121.140.167", "94.23.23.52", "94.23.247.226", "149.202.83.171"] -class l3_distributedattackdetector(): + +class l3_distributedattackdetector: def __init__(self): LOGGER.info("Creating Distributed Attack Detector") self.feature_ids = [] - + self.cad_features = {} self.conn_id = () - - self.connections_dict = {} # Dict for storing ALL data - self.new_connections = {} # Dict for storing NEW data + + self.connections_dict = {} # Dict for storing ALL data + self.new_connections = {} # Dict for storing NEW data + + self.known_attack_ips = self.read_kwnown_attack_ips() signal.signal(signal.SIGINT, self.handler) - + with grpc.insecure_channel(CENTRALIZED_ATTACK_DETECTOR) as channel: self.cad = L3CentralizedattackdetectorStub(channel) LOGGER.info("Connected to the centralized attack detector") @@ -86,10 +86,18 @@ class l3_distributedattackdetector(): LOGGER.info("Obtaining features...") self.feature_ids = self.get_features_ids() LOGGER.info("Features Ids.: {:s}".format(str(self.feature_ids))) - + asyncio.run(self.process_traffic()) - + + def read_kwnown_attack_ips(self): + known_attack_ips = [] + # open known attack ips csv file + with open("known_attack_ips.csv", "r") as f: + known_attack_ips = f.read().splitlines() + + return known_attack_ips + def handler(self): if STOP: exit() @@ -97,34 +105,35 @@ class l3_distributedattackdetector(): STOP = True LOGGER.info("Gracefully stopping...") - - def follow(self, thefile, time_sleep): + + def follow(self, file, time_sleep): """ Generator function that yields new lines in a file - It reads the logfie (the opened file) """ + # seek the end of the file - # thefile.seek(0, os.SEEK_END) + # file.seek(0, os.SEEK_END) - trozo = "" + chunk = "" - # start infinite loop + # start an infinite loop while True: # read last line of file - line = thefile.readline() + line = file.readline() # sleep if file hasn't been updated if not line: time.sleep(time_sleep) continue + if line[-1] != "\n": - trozo += line + chunk += line else: - if trozo != "": - line = trozo + line - trozo = "" - yield line + if chunk != "": + line = chunk + line + chunk = "" + yield line def load_file(self, dirname=TSTAT_DIR_NAME): # - Client side - while True: @@ -141,7 +150,6 @@ class l3_distributedattackdetector(): LOGGER.info("No Tstat directory!") time.sleep(5) - def process_line(self, line): """ - Preprocessing before a message per line @@ -161,7 +169,6 @@ class l3_distributedattackdetector(): return values - def get_service_ids(self, context_id_str): with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: stub = ContextServiceStub(channel) @@ -169,7 +176,6 @@ class l3_distributedattackdetector(): context_id.context_uuid.uuid = context_id_str return stub.ListServiceIds(context_id) - def get_services(self, context_id_str): with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: stub = ContextServiceStub(channel) @@ -177,7 +183,6 @@ class l3_distributedattackdetector(): context_id.context_uuid.uuid = context_id_str return stub.ListServices(context_id) - def get_service_id(self, context_id): service_id_list = self.get_service_ids(context_id) service_id = None @@ -190,7 +195,6 @@ class l3_distributedattackdetector(): return service_id - def get_service_id2(self, context_id): service_list = self.get_services(context_id) service_id = None @@ -202,7 +206,6 @@ class l3_distributedattackdetector(): pass return service_id - def get_endpoint_id(self, context_id): service_list = self.get_services(context_id) endpoint_id = None @@ -212,11 +215,9 @@ class l3_distributedattackdetector(): break return endpoint_id - def get_features_ids(self): return self.cad.GetFeaturesIds(Empty()).auto_features - def check_types(self): for feature in self.cad_features["features"]: assert isinstance(feature, float) @@ -230,7 +231,6 @@ class l3_distributedattackdetector(): assert isinstance(self.cad_features["connection_metadata"]["time_start"], float) assert isinstance(self.cad_features["connection_metadata"]["time_end"], float) - def insert_connection(self): try: self.connections_dict[self.conn_id]["time_end"] = time.time() @@ -247,12 +247,10 @@ class l3_distributedattackdetector(): self.connections_dict[self.conn_id]["ip_d"] = self.conn_id[2] self.connections_dict[self.conn_id]["port_d"] = self.conn_id[3] - def check_if_connection_is_attack(self): - if self.conn_id[0] in ATTACK_IPS or self.conn_id[2] in ATTACK_IPS: + if self.conn_id[0] in self.known_attack_ips or self.conn_id[2] in self.known_attack_ips: LOGGER.info("Attack detected. Origin: {0}, destination: {1}".format(self.conn_id[0], self.conn_id[2])) - def create_cad_features(self): self.cad_features = { "features": self.new_connections[self.conn_id][0:10], @@ -270,7 +268,6 @@ class l3_distributedattackdetector(): }, } - async def send_batch_async(self, metrics_list_pb): loop = asyncio.get_running_loop() @@ -279,16 +276,13 @@ class l3_distributedattackdetector(): metrics_batch.metrics.extend(metrics_list_pb) # Send batch - future = loop.run_in_executor( - None, self.cad.AnalyzeBatchConnectionStatistics, metrics_batch - ) + future = loop.run_in_executor(None, self.cad.AnalyzeBatchConnectionStatistics, metrics_batch) try: await future except Exception as e: LOGGER.error(f"Error sending batch: {e}") - async def send_data(self, metrics_list_pb, send_data_times): # Send to CAD if SEND_DATA_IN_BATCHES: @@ -311,7 +305,6 @@ class l3_distributedattackdetector(): return metrics_list_pb, send_data_times - async def process_traffic(self): LOGGER.info("Loading Tstat log file...") logfile = open(self.load_file(), "r") @@ -326,7 +319,7 @@ class l3_distributedattackdetector(): metrics_list_pb = [] LOGGER.info("Starting to process data...") - + index = 0 while True: line = next(loglines, None) @@ -352,9 +345,9 @@ class l3_distributedattackdetector(): self.insert_connection() self.create_cad_features() - + self.check_types() - + connection_metadata = ConnectionMetadata(**self.cad_features["connection_metadata"]) metrics = L3CentralizedattackdetectorMetrics() @@ -369,8 +362,8 @@ class l3_distributedattackdetector(): metrics_list_pb, send_data_times = await self.send_data(metrics_list_pb, send_data_times) index = index + 1 - + process_time.append(time.time() - start) - + if num_lines % 10 == 0: - LOGGER.info(f"Number of lines: {num_lines} - Average processing time: {sum(process_time) / num_lines}") \ No newline at end of file + LOGGER.info(f"Number of lines: {num_lines} - Average processing time: {sum(process_time) / num_lines}")