Skip to content
Snippets Groups Projects
Commit df0fbd0b authored by delacal's avatar delacal
Browse files

- Removed pod scalability

- Changed cad to process connections by batches
parent 42f44c1a
No related branches found
No related tags found
No related merge requests found
Source diff could not be displayed: it is too large. Options to address this: view the blob.
......@@ -71,29 +71,6 @@ spec:
strategy:
type: Recreate
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: l3-attackmitigatorservice-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: l3-attackmitigatorservice
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 120
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
......
......@@ -70,29 +70,6 @@ spec:
targetPort: 10001
strategy:
type: Recreate
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: l3-centralizedattackdetectorservice-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: l3-centralizedattackdetectorservice
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 120
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
......
60.0 0.4998282492160797 0.9989699125289917 0.5 0.4996565878391266 0.5 0.9993131756782532 0.0 4369.0 0.0 0.0 4369.0 3.0 0.9999995827674866 04/05/2023-15:08:07 04/05/2023-15:20:07 12.0
30.0 1.0 1.0 1.0 1.0 1.0 1.0 0.0 2339.0 0.0 0.0 2339.0 0.0 0.9978645443916321 08/05/2023-13:40:46 08/05/2023-13:42:46 2.0
......@@ -55,6 +55,7 @@ current_dir = os.path.dirname(os.path.abspath(__file__))
# Demo constants
DEMO_MODE = True
ATTACK_IPS = ["37.187.95.110", "91.121.140.167", "94.23.23.52", "94.23.247.226", "149.202.83.171"]
BATCH_SIZE= 10
TIME_START = time.time()
METRICS_POOL = MetricsPool('l3_centralizedattackdetector', 'RPC')
......@@ -125,6 +126,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
self.l3_unique_attackers = 0
self.l3_non_empty_time_interval = False
self.active_requests = []
self.monitoring_client = MonitoringClient()
self.service_ids = []
......@@ -201,6 +204,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
self.precision_score = Value('f', 0)
self.recall_score = Value('f', 0)
self.replica_uuid = uuid.uuid4()
self.y_true = []
self.y_pred = []
......@@ -398,18 +403,18 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
-output: L3AttackmitigatorOutput object with information about the assigned class and prediction confidence
"""
def perform_inference(self, request):
x_data = np.array([[feature.feature for feature in request.features]])
def perform_distributed_inference(self, requests):
batch_size = len(requests)
# Print input data shape
#LOGGER.debug("x_data.shape: {}".format(x_data.shape))
# Create an empty array to hold the input data
x_data = np.empty((batch_size, len(requests[0].features)))
# Get batch size
batch_size = x_data.shape[0]
# Fill in the input data array with features from each request
for i, request in enumerate(requests):
x_data[i] = [feature.feature for feature in request.features]
# Print batch size
#LOGGER.debug("batch_size: {}".format(batch_size))
#LOGGER.debug("x_data.shape: {}".format(x_data.shape))
# Print input data shape
LOGGER.debug("x_data.shape: {}".format(x_data.shape))
inference_time_start = time.perf_counter()
......@@ -434,11 +439,11 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
std_inference_time = np.std(inference_times_np_array)
median_inference_time = np.median(inference_times_np_array)
'''LOGGER.debug("Average inference time: {}".format(avg_inference_time))
LOGGER.debug("Average inference time: {}".format(avg_inference_time))
LOGGER.debug("Max inference time: {}".format(max_inference_time))
LOGGER.debug("Min inference time: {}".format(min_inference_time))
LOGGER.debug("Standard deviation inference time: {}".format(std_inference_time))
LOGGER.debug("Median inference time: {}".format(median_inference_time))'''
LOGGER.debug("Median inference time: {}".format(median_inference_time))
with open(f"inference_times_stats_{batch_size}.txt", "w") as f:
f.write("Average inference time: {}\n".format(avg_inference_time))
......@@ -448,34 +453,36 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
f.write("Median inference time: {}\n".format(median_inference_time))
# Gather the predicted class, the probability of that class and other relevant information required to block the attack
output_message = {
"confidence": None,
"timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"ip_o": request.connection_metadata.ip_o,
"ip_d": request.connection_metadata.ip_d,
"tag_name": None,
"tag": None,
"flow_id": request.connection_metadata.flow_id,
"protocol": request.connection_metadata.protocol,
"port_o": request.connection_metadata.port_o,
"port_d": request.connection_metadata.port_d,
"ml_id": self.cryptomining_detector_file_name,
"service_id": request.connection_metadata.service_id,
"endpoint_id": request.connection_metadata.endpoint_id,
"time_start": request.connection_metadata.time_start,
"time_end": request.connection_metadata.time_end,
}
if predictions[0][1] >= self.CLASSIFICATION_THRESHOLD:
output_message["confidence"] = predictions[0][1]
output_message["tag_name"] = "Crypto"
output_message["tag"] = self.CRYPTO_CLASS
else:
output_message["confidence"] = predictions[0][0]
output_message["tag_name"] = "Normal"
output_message["tag"] = self.NORMAL_CLASS
output_messages = []
for i, request in enumerate(requests):
output_messages.append({
"confidence": None,
"timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"ip_o": request.connection_metadata.ip_o,
"ip_d": request.connection_metadata.ip_d,
"tag_name": None,
"tag": None,
"flow_id": request.connection_metadata.flow_id,
"protocol": request.connection_metadata.protocol,
"port_o": request.connection_metadata.port_o,
"port_d": request.connection_metadata.port_d,
"ml_id": self.cryptomining_detector_file_name,
"service_id": request.connection_metadata.service_id,
"endpoint_id": request.connection_metadata.endpoint_id,
"time_start": request.connection_metadata.time_start,
"time_end": request.connection_metadata.time_end,
})
if predictions[i][1] >= self.CLASSIFICATION_THRESHOLD:
output_messages[i]["confidence"] = predictions[i][1]
output_messages[i]["tag_name"] = "Crypto"
output_messages[i]["tag"] = self.CRYPTO_CLASS
else:
output_messages[i]["confidence"] = predictions[i][0]
output_messages[i]["tag_name"] = "Normal"
output_messages[i]["tag"] = self.NORMAL_CLASS
return output_message
return output_messages
"""
Receive features from Attack Mitigator, predict attack and communicate with Attack Mitigator
......@@ -486,157 +493,176 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def AnalyzeConnectionStatistics(self, request, context):
# Perform inference with the data sent in the request
logging.info("\nPerforming inference...")
self.active_requests.append(request)
inference_time_start = time.time()
cryptomining_detector_output = self.perform_inference(request)
inference_time_end = time.time()
if len(self.active_requests) == BATCH_SIZE:
csv_file_path = 'hola_mundo.csv'
LOGGER.debug("Inference performed in {} seconds".format(inference_time_end - inference_time_start))
logging.info("Inference performed correctly\n")
self.inference_results.append({"output": cryptomining_detector_output, "timestamp": datetime.now()})
#LOGGER.debug("inference_results length: {}".format(len(self.inference_results)))
service_id = request.connection_metadata.service_id
device_id = request.connection_metadata.endpoint_id.device_id
endpoint_id = request.connection_metadata.endpoint_id
# Check if a request of a new service has been received and, if so, create the monitored KPIs for that service
if service_id not in self.service_ids:
self.create_kpis(service_id, device_id, endpoint_id)
self.service_ids.append(service_id)
monitor_kpis_start = time.time()
self.monitor_kpis()
monitor_kpis_end = time.time()
'''LOGGER.debug("Monitoring KPIs performed in {} seconds".format(monitor_kpis_end - monitor_kpis_start))
LOGGER.debug("cryptomining_detector_output: {}".format(cryptomining_detector_output))'''
if DEMO_MODE:
self.confidence.value = cryptomining_detector_output["confidence"]
self.analyze_prediction_accuracy()
connection_info = ConnectionInfo(
request.connection_metadata.ip_o,
request.connection_metadata.port_o,
request.connection_metadata.ip_d,
request.connection_metadata.port_d,
)
col_values = [1, 2, 3]
with open(csv_file_path, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow(col_values)
logging.debug("Performing inference... {}".format(self.replica_uuid))
inference_time_start = time.time()
cryptomining_detector_output = self.perform_distributed_inference(self.active_requests)
inference_time_end = time.time()
LOGGER.debug("Inference performed in {} seconds".format(inference_time_end - inference_time_start))
logging.info("Inference performed correctly")
self.l3_non_empty_time_interval = True
self.inference_results.append({"output": cryptomining_detector_output, "timestamp": datetime.now()})
LOGGER.debug("inference_results length: {}".format(len(self.inference_results)))
if cryptomining_detector_output["tag_name"] == "Crypto":
self.l3_security_status = 1
for i, req in enumerate(self.active_requests):
service_id = req.connection_metadata.service_id
device_id = req.connection_metadata.endpoint_id.device_id
endpoint_id = req.connection_metadata.endpoint_id
self.l3_inferences_in_interval_counter_crypto += 1
self.l3_ml_model_confidence_crypto = (
self.l3_ml_model_confidence_crypto * (self.l3_inferences_in_interval_counter_crypto - 1)
+ cryptomining_detector_output["confidence"]
) / self.l3_inferences_in_interval_counter_crypto
# Check if a request of a new service has been received and, if so, create the monitored KPIs for that service
if service_id not in self.service_ids:
self.create_kpis(service_id, device_id, endpoint_id)
self.service_ids.append(service_id)
if connection_info not in self.l3_attacks:
self.l3_attacks.append(connection_info)
self.l3_unique_attack_conns += 1
monitor_kpis_start = time.time()
self.monitor_kpis()
monitor_kpis_end = time.time()
self.l3_unique_compromised_clients = len(set([conn.ip_o for conn in self.l3_attacks]))
self.l3_unique_attackers = len(set([conn.ip_d for conn in self.l3_attacks]))
LOGGER.debug("Monitoring KPIs performed in {} seconds".format(monitor_kpis_end - monitor_kpis_start))
LOGGER.debug("cryptomining_detector_output: {}".format(cryptomining_detector_output[i]))
else:
self.l3_inferences_in_interval_counter_normal += 1
self.l3_ml_model_confidence_normal = (
self.l3_ml_model_confidence_normal * (self.l3_inferences_in_interval_counter_normal - 1)
+ cryptomining_detector_output["confidence"]
) / self.l3_inferences_in_interval_counter_normal
# Only notify Attack Mitigator when a cryptomining connection has been detected
if cryptomining_detector_output["tag_name"] == "Crypto" and connection_info not in self.attack_connections:
self.attack_connections.append(connection_info)
# Calculate F1 score
self.y_pred.append(1)
if DEMO_MODE:
self.confidence.value = cryptomining_detector_output[i]["confidence"]
self.analyze_prediction_accuracy()
if connection_info.ip_o in ATTACK_IPS or connection_info.ip_d in ATTACK_IPS:
self.correct_attack_conns += 1
self.correct_predictions += 1
self.y_true.append(1)
else:
#LOGGER.debug("False positive: {}".format(connection_info))
self.false_positives.value = self.false_positives.value + 1
self.y_true.append(0)
connection_info = ConnectionInfo(
req.connection_metadata.ip_o,
req.connection_metadata.port_o,
req.connection_metadata.ip_d,
req.connection_metadata.port_d,
)
self.total_predictions.value = self.total_predictions.value + 1
self.l3_non_empty_time_interval = True
# if False:
notification_time_start = time.perf_counter()
if cryptomining_detector_output[i]["tag_name"] == "Crypto":
self.l3_security_status = 1
LOGGER.debug("Crypto attack detected")
self.l3_inferences_in_interval_counter_crypto += 1
self.l3_ml_model_confidence_crypto = (
self.l3_ml_model_confidence_crypto * (self.l3_inferences_in_interval_counter_crypto - 1)
+ cryptomining_detector_output[i]["confidence"]
) / self.l3_inferences_in_interval_counter_crypto
# Notify the Attack Mitigator component about the attack
logging.info(
"Notifying the Attack Mitigator component about the attack in order to block the connection..."
)
if connection_info not in self.l3_attacks:
self.l3_attacks.append(connection_info)
self.l3_unique_attack_conns += 1
try:
logging.info("Sending the connection information to the Attack Mitigator component...")
message = L3AttackmitigatorOutput(**cryptomining_detector_output)
response = self.attackmitigator_client.PerformMitigation(message)
notification_time_end = time.perf_counter()
self.am_notification_times.append(notification_time_end - notification_time_start)
#LOGGER.debug(f"am_notification_times length: {len(self.am_notification_times)}")
#LOGGER.debug(f"last am_notification_time: {self.am_notification_times[-1]}")
if len(self.am_notification_times) > 100:
am_notification_times_np_array = np.array(self.am_notification_times)
np.save("am_notification_times.npy", am_notification_times_np_array)
avg_notification_time = np.mean(am_notification_times_np_array)
max_notification_time = np.max(am_notification_times_np_array)
min_notification_time = np.min(am_notification_times_np_array)
std_notification_time = np.std(am_notification_times_np_array)
median_notification_time = np.median(am_notification_times_np_array)
'''LOGGER.debug("Average notification time: {}".format(avg_notification_time))
LOGGER.debug("Max notification time: {}".format(max_notification_time))
LOGGER.debug("Min notification time: {}".format(min_notification_time))
LOGGER.debug("Std notification time: {}".format(std_notification_time))
LOGGER.debug("Median notification time: {}".format(median_notification_time))'''
with open("am_notification_times_stats.txt", "w") as f:
f.write("Average notification time: {}\n".format(avg_notification_time))
f.write("Max notification time: {}\n".format(max_notification_time))
f.write("Min notification time: {}\n".format(min_notification_time))
f.write("Std notification time: {}\n".format(std_notification_time))
f.write("Median notification time: {}\n".format(median_notification_time))
# logging.info("Attack Mitigator notified and received response: ", response.message) # FIX No message received
logging.info("Attack Mitigator notified\n")
return Empty(message="OK, information received and mitigator notified abou the attack")
except Exception as e:
logging.error("Error notifying the Attack Mitigator component about the attack: ", e)
logging.error("Couldn't find l3_attackmitigator\n")
return Empty(message="Attack Mitigator not found")
else:
logging.info("No attack detected")
self.l3_unique_compromised_clients = len(set([conn.ip_o for conn in self.l3_attacks]))
self.l3_unique_attackers = len(set([conn.ip_d for conn in self.l3_attacks]))
if cryptomining_detector_output["tag_name"] != "Crypto":
self.y_pred.append(0)
if connection_info.ip_o not in ATTACK_IPS and connection_info.ip_d not in ATTACK_IPS:
self.correct_predictions += 1
self.y_true.append(0)
else:
#LOGGER.debug("False negative: {}".format(connection_info))
self.false_negatives.value = self.false_negatives.value + 1
self.y_true.append(1)
self.l3_inferences_in_interval_counter_normal += 1
self.l3_ml_model_confidence_normal = (
self.l3_ml_model_confidence_normal * (self.l3_inferences_in_interval_counter_normal - 1)
+ cryptomining_detector_output[i]["confidence"]
) / self.l3_inferences_in_interval_counter_normal
# Only notify Attack Mitigator when a cryptomining connection has been detected
if cryptomining_detector_output[i]["tag_name"] == "Crypto":
if DEMO_MODE:
self.attack_connections.append(connection_info)
# Calculate F1 score
self.y_pred.append(1)
if connection_info.ip_o in ATTACK_IPS or connection_info.ip_d in ATTACK_IPS:
self.correct_attack_conns += 1
self.correct_predictions += 1
self.y_true.append(1)
else:
LOGGER.debug("False positive: {}".format(connection_info))
self.false_positives.value = self.false_positives.value + 1
self.y_true.append(0)
self.total_predictions.value = self.total_predictions.value + 1
# if False:
notification_time_start = time.perf_counter()
LOGGER.debug("Crypto attack detected")
# Notify the Attack Mitigator component about the attack
logging.info(
"Notifying the Attack Mitigator component about the attack in order to block the connection..."
)
try:
logging.info("Sending the connection information to the Attack Mitigator component...")
message = L3AttackmitigatorOutput(**cryptomining_detector_output[i])
response = self.attackmitigator_client.PerformMitigation(message)
notification_time_end = time.perf_counter()
self.am_notification_times.append(notification_time_end - notification_time_start)
LOGGER.debug(f"am_notification_times length: {len(self.am_notification_times)}")
LOGGER.debug(f"last am_notification_time: {self.am_notification_times[-1]}")
if len(self.am_notification_times) > 100:
am_notification_times_np_array = np.array(self.am_notification_times)
np.save("am_notification_times.npy", am_notification_times_np_array)
avg_notification_time = np.mean(am_notification_times_np_array)
max_notification_time = np.max(am_notification_times_np_array)
min_notification_time = np.min(am_notification_times_np_array)
std_notification_time = np.std(am_notification_times_np_array)
median_notification_time = np.median(am_notification_times_np_array)
LOGGER.debug("Average notification time: {}".format(avg_notification_time))
LOGGER.debug("Max notification time: {}".format(max_notification_time))
LOGGER.debug("Min notification time: {}".format(min_notification_time))
LOGGER.debug("Std notification time: {}".format(std_notification_time))
LOGGER.debug("Median notification time: {}".format(median_notification_time))
with open("am_notification_times_stats.txt", "w") as f:
f.write("Average notification time: {}\n".format(avg_notification_time))
f.write("Max notification time: {}\n".format(max_notification_time))
f.write("Min notification time: {}\n".format(min_notification_time))
f.write("Std notification time: {}\n".format(std_notification_time))
f.write("Median notification time: {}\n".format(median_notification_time))
# logging.info("Attack Mitigator notified and received response: ", response.message) # FIX No message received
logging.info("Attack Mitigator notified")
#return Empty(message="OK, information received and mitigator notified abou the attack")
except Exception as e:
logging.error("Error notifying the Attack Mitigator component about the attack: ", e)
logging.error("Couldn't find l3_attackmitigator")
return Empty(message="Attack Mitigator not found")
else:
logging.info("No attack detected")
self.total_predictions.value = self.total_predictions.value + 1
if cryptomining_detector_output[i]["tag_name"] != "Crypto":
self.y_pred.append(0)
if connection_info.ip_o not in ATTACK_IPS and connection_info.ip_d not in ATTACK_IPS:
self.correct_predictions += 1
self.y_true.append(0)
else:
LOGGER.debug("False negative: {}".format(connection_info))
self.false_negatives.value = self.false_negatives.value + 1
self.y_true.append(1)
return Empty(message="Ok, information received (no attack detected)")
self.total_predictions.value = self.total_predictions.value + 1
# return Empty(message="Ok, information received (no attack detected)")
self.active_requests = []
return Empty(message="Ok, metrics processed")
return Empty(message="Ok, information received")
def analyze_prediction_accuracy(self):
#LOGGER.info("Number of Attack Connections Correctly Classified: {}".format(self.correct_attack_conns))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment