Skip to content
Snippets Groups Projects
Commit 1fb500f1 authored by delacal's avatar delacal
Browse files

Enhanced performance

parent 2e4805fe
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
...@@ -37,15 +37,15 @@ spec: ...@@ -37,15 +37,15 @@ spec:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "DEBUG" value: "DEBUG"
- name: BATCH_SIZE - name: BATCH_SIZE
value: "10" value: "50"
- name: SAMPLING_RATE - name: SAMPLING_RATE
value: "0.1" value: "0.8"
- name: SAMPLING_SNAPSHOTS - name: SAMPLING_SNAPSHOTS
value: "100" value: "100"
- name: SAMPLING_MODE - name: SAMPLING_MODE
value: "1" value: "3"
- name: TIME_TO_STABILIZE - name: TIME_TO_STABILIZE
value: "2" value: "35"
readinessProbe: readinessProbe:
exec: exec:
command: ["/bin/grpc_health_probe", "-addr=:10001"] command: ["/bin/grpc_health_probe", "-addr=:10001"]
......
TIME_CONS F1_SCORE_MACRO F1_SCORE_WEIGHTED BALANCED_ACCURACY PRECISION_SCORE RECALL_SCORE OVERALL_ACCURACY CRYPTO_ACCURACY TOTAL_PREDICTIONS TOTAL_POSITIVES F_POSITIVES T_NEGATIVES F_NEGATIVES CONFIDENCE TIMESTAMP_START TIMESTAMP_FINISH TIME_TO_STABILIZE F1_SCORE_MACRO F1_SCORE_WEIGHTED BALANCED_ACCURACY PRECISION_SCORE RECALL_SCORE OVERALL_ACCURACY CRYPTO_ACCURACY TOTAL_PREDICTIONS TOTAL_POSITIVES F_POSITIVES T_NEGATIVES F_NEGATIVES CONFIDENCE TIMESTAMP_START TIMESTAMP_FINISH SAMPLING_SNAPSHOTS SAMPLING_RATE SAMPLING_MODE TIME_TO_STABILIZE
30 0.9235888719558716 0.9993987083435059 0.8676470518112183 0.9997201561927795 0.8676470518112183 0.9994412660598755 1.0 16108.0 25.0 0.0 16083.0 9.0 1.0 07/06/2023-15:21:04 07/06/2023-15:56:04 35 0.9934104084968567 0.9996851086616516 0.9871465563774109 0.9998416900634766 0.9871465563774109 0.9996870756149292 1.0 31959.0 379.0 0.0 31580.0 10.0 1.0 26/06/2023-08:59:23 26/06/2023-09:34:23 100 0.1 Random 35
60 0.9235877394676208 0.9993938207626343 0.8676470518112183 0.9997178912162781 0.8676470518112183 0.9994366765022278 1.0 15978.0 25.0 0.0 15953.0 9.0 1.0 07/06/2023-16:12:50 07/06/2023-16:47:50 35
1800 0.9492922425270081 0.9993883967399597 0.9081632494926453 0.9997081160545349 0.9081632494926453 0.9994177222251892 1.0 15458.0 40.0 0.0 15418.0 9.0 1.0 07/06/2023-17:50:32 07/06/2023-18:25:32 35
300 0.9414160251617432 0.9993985891342163 0.895348846912384 0.999715268611908 0.895348846912384 0.9994317293167114 1.0 15838.0 34.0 0.0 15804.0 9.0 1.0 07/06/2023-23:06:56 07/06/2023-23:41:56 35
10000 0.9593176245689392 0.9994117021560669 0.925000011920929 0.9997164011001587 0.925000011920929 0.9994345903396606 1.0 15918.0 51.0 0.0 15867.0 9.0 1.0 07/06/2023-23:54:16 08/06/2023-00:29:16 35
60 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 12/06/2023-14:34:35 12/06/2023-14:36:35 2
...@@ -36,7 +36,7 @@ RUN python3 -m pip install --upgrade pip-tools ...@@ -36,7 +36,7 @@ RUN python3 -m pip install --upgrade pip-tools
# Note: this step enables sharing the previous Docker build steps among all the Python components # Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in COPY common_requirements.in common_requirements.in
#COPY scalability_accuracy.csv scalability_accuracy.csv COPY scalability_accuracy.csv scalability_accuracy.csv
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt RUN python3 -m pip install -r common_requirements.txt
......
...@@ -55,8 +55,9 @@ current_dir = os.path.dirname(os.path.abspath(__file__)) ...@@ -55,8 +55,9 @@ current_dir = os.path.dirname(os.path.abspath(__file__))
# Demo constants # Demo constants
DEMO_MODE = True DEMO_MODE = True
MONITORING_MODE = False
ATTACK_IPS = ["37.187.95.110", "91.121.140.167", "94.23.23.52", "94.23.247.226", "149.202.83.171"] ATTACK_IPS = ["37.187.95.110", "91.121.140.167", "94.23.23.52", "94.23.247.226", "149.202.83.171"]
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 10)) BATCH_SIZE = int(os.getenv("BATCH_SIZE", 50))
TIME_START = time.time() TIME_START = time.time()
METRICS_POOL = MetricsPool('l3_centralizedattackdetector', 'RPC') METRICS_POOL = MetricsPool('l3_centralizedattackdetector', 'RPC')
...@@ -526,35 +527,36 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -526,35 +527,36 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
inference_time_start = time.time() inference_time_start = time.time()
cryptomining_detector_output = self.perform_distributed_inference(self.active_requests) cryptomining_detector_output = self.perform_distributed_inference(self.active_requests)
LOGGER.debug("cryptomining_detector_output length: {}".format(len(cryptomining_detector_output)))
inference_time_end = time.time() inference_time_end = time.time()
LOGGER.debug("Inference performed in {} seconds".format(inference_time_end - inference_time_start)) LOGGER.debug("Inference performed in {} seconds".format(inference_time_end - inference_time_start))
#logging.info("Inference performed correctly") #logging.info("Inference performed correctly")
self.inference_results.append({"output": cryptomining_detector_output, "timestamp": datetime.now()}) if MONITORING_MODE:
#LOGGER.debug("inference_results length: {}".format(len(self.inference_results))) self.inference_results.append({"output": cryptomining_detector_output, "timestamp": datetime.now()})
LOGGER.debug("inference_results length: {}".format(len(self.inference_results)))
time_start_active_requests = time.time()
for i, req in enumerate(self.active_requests): for i, req in enumerate(self.active_requests):
service_id = req.connection_metadata.service_id service_id = req.connection_metadata.service_id
device_id = req.connection_metadata.endpoint_id.device_id device_id = req.connection_metadata.endpoint_id.device_id
endpoint_id = req.connection_metadata.endpoint_id endpoint_id = req.connection_metadata.endpoint_id
create_kpis_start = time.time()
# Check if a request of a new service has been received and, if so, create the monitored KPIs for that service # Check if a request of a new service has been received and, if so, create the monitored KPIs for that service
if service_id not in self.service_ids: if service_id not in self.service_ids:
self.create_kpis(service_id, device_id, endpoint_id) self.create_kpis(service_id, device_id, endpoint_id)
self.service_ids.append(service_id) self.service_ids.append(service_id)
LOGGER.debug("Time to create KPIs: {} seconds".format(time.time() - create_kpis_start))
monitor_kpis_start = time.time() if MONITORING_MODE:
self.monitor_kpis() monitor_kpis_start = time.time()
monitor_kpis_end = time.time() self.monitor_kpis()
monitor_kpis_end = time.time()
LOGGER.debug("Monitoring KPIs performed in {} seconds".format(monitor_kpis_end - monitor_kpis_start)) LOGGER.debug("Monitoring KPIs performed in {} seconds".format(monitor_kpis_end - monitor_kpis_start))
#LOGGER.debug("cryptomining_detector_output: {}".format(cryptomining_detector_output[i])) #LOGGER.debug("cryptomining_detector_output: {}".format(cryptomining_detector_output[i]))
if DEMO_MODE:
self.confidence.value = cryptomining_detector_output[i]["confidence"]
self.analyze_prediction_accuracy()
connection_info = ConnectionInfo( connection_info = ConnectionInfo(
req.connection_metadata.ip_o, req.connection_metadata.ip_o,
req.connection_metadata.port_o, req.connection_metadata.port_o,
...@@ -564,6 +566,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -564,6 +566,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
self.l3_non_empty_time_interval = True self.l3_non_empty_time_interval = True
crypto_detector_time_start = time.time()
if cryptomining_detector_output[i]["tag_name"] == "Crypto": if cryptomining_detector_output[i]["tag_name"] == "Crypto":
self.l3_security_status = 1 self.l3_security_status = 1
...@@ -586,7 +589,9 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -586,7 +589,9 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
self.l3_ml_model_confidence_normal * (self.l3_inferences_in_interval_counter_normal - 1) self.l3_ml_model_confidence_normal * (self.l3_inferences_in_interval_counter_normal - 1)
+ cryptomining_detector_output[i]["confidence"] + cryptomining_detector_output[i]["confidence"]
) / self.l3_inferences_in_interval_counter_normal ) / self.l3_inferences_in_interval_counter_normal
LOGGER.debug("Time to analyze crypto detector output: {} seconds".format(time.time() - crypto_detector_time_start))
attack_mitigator_time_start = time.time()
# Only notify Attack Mitigator when a cryptomining connection has been detected # Only notify Attack Mitigator when a cryptomining connection has been detected
if cryptomining_detector_output[i]["tag_name"] == "Crypto": if cryptomining_detector_output[i]["tag_name"] == "Crypto":
if DEMO_MODE: if DEMO_MODE:
...@@ -621,6 +626,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -621,6 +626,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
message = L3AttackmitigatorOutput(**cryptomining_detector_output[i]) message = L3AttackmitigatorOutput(**cryptomining_detector_output[i])
response = self.attackmitigator_client.PerformMitigation(message) response = self.attackmitigator_client.PerformMitigation(message)
notification_time_end = time.time() notification_time_end = time.time()
LOGGER.debug("Notification performed in {} seconds".format(notification_time_end - notification_time_start))
self.am_notification_times.append(notification_time_end - notification_time_start) self.am_notification_times.append(notification_time_end - notification_time_start)
...@@ -662,7 +668,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -662,7 +668,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
return Empty(message="Attack Mitigator not found") return Empty(message="Attack Mitigator not found")
else: else:
logging.info("No attack detected") logging.info("No attack detected")
time_start_no_attack = time.time()
if cryptomining_detector_output[i]["tag_name"] != "Crypto": if cryptomining_detector_output[i]["tag_name"] != "Crypto":
self.y_pred.append(0) self.y_pred.append(0)
if connection_info.ip_o not in ATTACK_IPS and connection_info.ip_d not in ATTACK_IPS: if connection_info.ip_o not in ATTACK_IPS and connection_info.ip_d not in ATTACK_IPS:
...@@ -674,17 +680,32 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -674,17 +680,32 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
self.y_true.append(1) self.y_true.append(1)
self.total_predictions.value = self.total_predictions.value + 1 self.total_predictions.value = self.total_predictions.value + 1
time_end_no_attack = time.time()
LOGGER.debug("Time to process no attack: {}".format(time_end_no_attack - time_start_no_attack))
# return Empty(message="Ok, information received (no attack detected)") # return Empty(message="Ok, information received (no attack detected)")
LOGGER.debug("Time to analyze attack mitigator output: {} seconds".format(time.time() - attack_mitigator_time_start))
analyze_prediction_accuracy_start = time.time()
if DEMO_MODE:
self.confidence.value = cryptomining_detector_output[i]["confidence"]
self.analyze_prediction_accuracy()
LOGGER.debug("Time to analyze prediction accuracy: {} seconds".format(time.time() - analyze_prediction_accuracy_start))
time_end_active_requests = time.time()
LOGGER.debug("Time to process active requests: {}".format(time_end_active_requests - time_start_active_requests))
# Remove requests from "self.active_requests" which aren't in "process_request" # Remove requests from "self.active_requests" which aren't in "process_request"
time_start_remove_requests = time.time()
self.active_requests = [req for req in self.active_requests if req not in process_request] self.active_requests = [req for req in self.active_requests if req not in process_request]
LOGGER.debug("Number of active requests: {}".format(len(self.active_requests)))
time_end_remove_requests = time.time()
LOGGER.debug("Time to remove requests: {}".format(time_end_remove_requests - time_start_remove_requests))
analyze_connections_end = time.time() analyze_connections_end = time.time()
LOGGER.info("Time to analyze connections: {}".format(analyze_connections_end - analyze_connections_start)) LOGGER.info("Time to analyze connections: {}".format(analyze_connections_end - analyze_connections_start))
return Empty(message="Ok, metrics processed") return Empty(message="Ok, metrics processed")
analyze_connections_end = time.time() analyze_connections_end = time.time()
LOGGER.info("Time to analyze connections: {}".format(analyze_connections_end - analyze_connections_start)) LOGGER.info("Time to analyze connection request: {}".format(analyze_connections_end - analyze_connections_start))
return Empty(message="Ok, information received") return Empty(message="Ok, information received")
...@@ -692,6 +713,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -692,6 +713,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
#LOGGER.info("Number of Attack Connections Correctly Classified: {}".format(self.correct_attack_conns)) #LOGGER.info("Number of Attack Connections Correctly Classified: {}".format(self.correct_attack_conns))
#LOGGER.info("Number of Attack Connections: {}".format(len(self.attack_connections))) #LOGGER.info("Number of Attack Connections: {}".format(len(self.attack_connections)))
time_start_calc_acc = time.time()
if self.total_predictions.value > 0: if self.total_predictions.value > 0:
self.overall_detection_acc.value = self.correct_predictions / self.total_predictions.value self.overall_detection_acc.value = self.correct_predictions / self.total_predictions.value
else: else:
...@@ -711,12 +734,20 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -711,12 +734,20 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
self.attack_connections_len.value = len(self.attack_connections) self.attack_connections_len.value = len(self.attack_connections)
LOGGER.debug("Time to calculate accuracy: {}".format(time.time() - time_start_calc_acc))
time_start_f1_score = time.time()
self.f1_score_macro.value = metrics.f1_score(self.y_true, self.y_pred, average="macro") self.f1_score_macro.value = metrics.f1_score(self.y_true, self.y_pred, average="macro")
self.f1_score_weighted.value = metrics.f1_score(self.y_true, self.y_pred, average="weighted") self.f1_score_weighted.value = metrics.f1_score(self.y_true, self.y_pred, average="weighted")
self.balanced_accuracy.value = metrics.balanced_accuracy_score(self.y_true, self.y_pred) self.balanced_accuracy.value = metrics.balanced_accuracy_score(self.y_true, self.y_pred)
self.precision_score.value = metrics.precision_score(self.y_true, self.y_pred, average="macro") self.precision_score.value = metrics.precision_score(self.y_true, self.y_pred, average="macro")
self.recall_score.value = metrics.recall_score(self.y_true, self.y_pred, average="macro") self.recall_score.value = metrics.recall_score(self.y_true, self.y_pred, average="macro")
LOGGER.info("Time to calculate F1 score: {}".format(time.time() - time_start_f1_score))
time_start_generate_report = time.time()
with open("prediction_accuracy.txt", "a") as f: with open("prediction_accuracy.txt", "a") as f:
#LOGGER.debug("Exporting prediction accuracy and confidence") #LOGGER.debug("Exporting prediction accuracy and confidence")
...@@ -729,7 +760,9 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -729,7 +760,9 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
f.write("False Negatives: {}\n".format(self.false_negatives.value)) f.write("False Negatives: {}\n".format(self.false_negatives.value))
f.write("Cryptomining Detector Confidence: {}\n\n".format(self.confidence.value)) f.write("Cryptomining Detector Confidence: {}\n\n".format(self.confidence.value))
f.write("Timestamp: {}\n".format(datetime.now().strftime("%d/%m/%Y %H:%M:%S"))) f.write("Timestamp: {}\n".format(datetime.now().strftime("%d/%m/%Y %H:%M:%S")))
f.close() f.close()
LOGGER.debug("Time to generate report: {}".format(time.time() - time_start_generate_report))
def generate_accuracy_scalability_csv(self): def generate_accuracy_scalability_csv(self):
LOGGER.debug("Starting async prediction accuracy analysis exp 3") LOGGER.debug("Starting async prediction accuracy analysis exp 3")
...@@ -740,6 +773,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -740,6 +773,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
# Wait for the system to stabilize # Wait for the system to stabilize
time.sleep(self.time_to_stabilize * 60) time.sleep(self.time_to_stabilize * 60)
#self.analyze_prediction_accuracy()
np.save("y_true_snapshots_{}_rate_{}_mode_{}_time_to_stabilize_{}_exp3.npy".format(self.sampling_snapshots, self.sampling_rate, self.sampling_mode, self.time_to_stabilize), self.y_true) np.save("y_true_snapshots_{}_rate_{}_mode_{}_time_to_stabilize_{}_exp3.npy".format(self.sampling_snapshots, self.sampling_rate, self.sampling_mode, self.time_to_stabilize), self.y_true)
np.save("y_pred_snapshots_{}_rate_{}_mode_{}_time_to_stabilize_{}_exp3.npy".format(self.sampling_snapshots, self.sampling_rate, self.sampling_mode, self.time_to_stabilize), self.y_pred) np.save("y_pred_snapshots_{}_rate_{}_mode_{}_time_to_stabilize_{}_exp3.npy".format(self.sampling_snapshots, self.sampling_rate, self.sampling_mode, self.time_to_stabilize), self.y_pred)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment