diff --git a/complete_deploy.sh b/complete_deploy.sh new file mode 100755 index 0000000000000000000000000000000000000000..f0e2dbf3f93807d9c91b9fa7de47d79587184bfb --- /dev/null +++ b/complete_deploy.sh @@ -0,0 +1 @@ +source my_deploy.sh; ./deploy.sh; source tfs_runtime_env_vars.sh; ofc22/run_test_01_bootstrap.sh; ofc22/run_test_02_create_service.sh \ No newline at end of file diff --git a/deploy.sh b/deploy.sh index add41fa139a0127cb26d652f5b47decfe8658ad0..e605478f54b01a14dde4088b95fb1d7c1ba1cc85 100755 --- a/deploy.sh +++ b/deploy.sh @@ -248,7 +248,7 @@ for COMPONENT in $TFS_COMPONENTS; do echo "Waiting for '$COMPONENT' component..." COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/") kubectl wait --namespace $TFS_K8S_NAMESPACE \ - --for='condition=available' --timeout=300s deployment/${COMPONENT_OBJNAME}service + --for='condition=available' --timeout=10s deployment/${COMPONENT_OBJNAME}service printf "\n" done diff --git a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py index c56f117f383af69d90189653d5e45344d58aecfd..9981dfa9be5c5df6e70f0be9c43992317ffbd6f1 100644 --- a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py +++ b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py @@ -35,12 +35,16 @@ from monitoring.client.MonitoringClient import MonitoringClient from common.proto.monitoring_pb2 import Kpi from common.tools.timestamp.Converters import timestamp_utcnow_to_float -from common.proto.context_pb2 import Timestamp +from common.proto.context_pb2 import Timestamp, ServiceId, EndPointId from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient from multiprocessing import Process, Queue +from google.protobuf.json_format import MessageToJson, Parse +import copy + + LOGGER = logging.getLogger(__name__) current_dir = os.path.dirname(os.path.abspath(__file__)) MODEL_FILE = os.path.join(current_dir, "ml_model/crypto_5g_rf_spider_features.onnx") @@ -106,9 +110,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto self.CRYPTO_CLASS = 1 # start monitoring process - self.monitoring_process = Process( - target=self.monitoring_process, args=(self.inference_values, self.inference_results) - ) + self.monitoring_process = Process(target=self.monitor_kpis, args=(self.inference_results,)) self.monitoring_process.start() """ @@ -116,17 +118,19 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto -input: + client: Monitoring Client object where the KPI will be tracked + service_id: service ID where the KPI will be monitored + + kpi_description: description of the KPI + + kpi_sample_type: KPI sample type of the KPI (it must be defined in the kpi_sample_types.proto file) -output: KPI identifier representing the KPI """ - def create_kpi(self, client: MonitoringClient, service_id, kpi_description, kpi_sample_type): - kpi_description: KpiDescriptor = KpiDescriptor() - kpi_description.kpi_description = kpi_description - kpi_description.service_id.service_uuid.uuid = service_id.service_uuid.uuid - kpi_description.kpi_sample_type = kpi_sample_type - new_kpi = client.SetKpi(kpi_description) + def create_kpi(self, client: MonitoringClient, service_id, kpi_name, kpi_description, kpi_sample_type): + kpidescriptor = KpiDescriptor() + kpidescriptor.kpi_description = kpi_description + kpidescriptor.service_id.service_uuid.uuid = service_id.service_uuid.uuid + kpidescriptor.kpi_sample_type = kpi_sample_type + new_kpi = client.SetKpi(kpidescriptor) - LOGGER.info("Created KPI {}...".format(kpi_sample_type)) + LOGGER.info("Created KPI {}".format(kpi_name)) return new_kpi @@ -143,189 +147,199 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto created_kpi = self.create_kpi( self.monitoring_client, service_id, + kpi, self.monitored_kpis[kpi]["description"], self.monitored_kpis[kpi]["kpi_sample_type"], ) self.monitored_kpis[kpi]["kpi_id"] = created_kpi.kpi_id self.monitored_kpis[kpi]["service_ids"].append(service_id.service_uuid.uuid) - def monitor_kpis(self): + def monitor_kpis(self, inference_results): while True: - # get all information from the inference_values queue - monitor_inference_values = [] - - for i in range(self.inference_values.qsize()): - monitor_inference_values.append(self.inference_values.get()) - - # get all information from the inference_results queue - monitor_inference_results = [] - - for i in range(self.inference_results.qsize()): - monitor_inference_results.append(self.inference_results.get()) - - for service_id in self.service_ids: - time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG - time_interval_start = datetime.utcnow() - time_interval_end = time_interval_start + time_interval - - # L3 security status - kpi_security_status = Kpi() - kpi_security_status.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_security_status"]["kpi_id"].kpi_id) - - # get the output.tag of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - outputs_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - outputs_last_time_interval.append(self.monitor_inference_results[i]["output"]["tag"]) - - kpi_security_status.kpi_value.intVal = ( - 0 if np.all(outputs_last_time_interval == self.NORMAL_CLASS) else 1 - ) - - # L3 ML model confidence - kpi_conf = Kpi() - kpi_conf.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_ml_model_confidence"]["kpi_id"].kpi_id) - - # get the output.confidence of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - confidences_normal_last_time_interval = [] - confidences_crypto_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - if self.monitor_inference_results[i]["output"]["tag"] == self.NORMAL_CLASS: - confidences_normal_last_time_interval.append( - self.monitor_inference_results[i]["output"]["confidence"] - ) - elif self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: - confidences_crypto_last_time_interval.append( - self.monitor_inference_results[i]["output"]["confidence"] - ) - - kpi_conf.kpi_value.intVal = ( - np.mean(confidences_crypto_last_time_interval) - if np.all(outputs_last_time_interval == self.CRYPTO_CLASS) - else np.mean(confidences_normal_last_time_interval) - ) - - # L3 unique attack connections - kpi_unique_attack_conns = Kpi() - kpi_unique_attack_conns.kpi_id.kpi_id.CopyFrom( - self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id - ) - - # get the number of unique attack connections (grouping by origin IP, origin port, destination IP, destination port) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - num_unique_attack_conns_last_time_interval = 0 - unique_attack_conns_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: - current_attack_conn = { - "ip_o": self.monitor_inference_results[i]["input"]["src_ip"], - "port_o": self.monitor_inference_results[i]["input"]["src_port"], - "ip_d": self.monitor_inference_results[i]["input"]["dst_ip"], - "port_d": self.monitor_inference_results[i]["input"]["dst_port"], - } - - for j in range(unique_attack_conns_last_time_interval): - if current_attack_conn == unique_attack_conns_last_time_interval[j]: - break - - num_unique_attack_conns_last_time_interval += 1 - unique_attack_conns_last_time_interval.append(current_attack_conn) - - kpi_unique_attack_conns.kpi_value.intVal = num_unique_attack_conns_last_time_interval - - # L3 unique compromised clients - kpi_unique_compromised_clients = Kpi() - kpi_unique_compromised_clients.kpi_id.kpi_id.CopyFrom( - self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id - ) - - # get the number of unique compromised clients (grouping by origin IP) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - num_unique_compromised_clients_last_time_interval = 0 - unique_compromised_clients_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: - if ( - self.monitor_inference_results[i]["output"]["ip_o"] - not in unique_compromised_clients_last_time_interval - ): - unique_compromised_clients_last_time_interval.append( - self.monitor_inference_results[i]["output"]["ip_o"] + try: + # get all information from the inference_results queue + monitor_inference_results = [] + + for i in range(inference_results.qsize()): + monitor_inference_results.append(self.inference_results.get()) + + # deserialize the inference results + for i in range(len(monitor_inference_results)): + monitor_inference_results[i]["output"]["service_id"] = Parse( + monitor_inference_results[i]["output"]["service_id"], ServiceId() + ) + monitor_inference_results[i]["output"]["endpoint_id"] = Parse( + monitor_inference_results[i]["output"]["endpoint_id"], EndPointId() + ) + + for service_id in self.service_ids: + time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG + time_interval_start = datetime.utcnow() + time_interval_end = time_interval_start + time_interval + + # L3 security status + kpi_security_status = Kpi() + kpi_security_status.kpi_id.kpi_id.CopyFrom( + self.monitored_kpis["l3_security_status"]["kpi_id"].kpi_id + ) + + # get the output.tag of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + outputs_last_time_interval = [] + + for i in range(self.monitor_inference_results): + if ( + self.monitor_inference_results[i]["timestamp"] >= time_interval_start + and self.monitor_inference_results[i]["timestamp"] <= time_interval_end + and self.monitor_inference_results[i]["service_id"] == service_id + and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] + ): + outputs_last_time_interval.append(self.monitor_inference_results[i]["output"]["tag"]) + + kpi_security_status.kpi_value.intVal = ( + 0 if np.all(outputs_last_time_interval == self.NORMAL_CLASS) else 1 + ) + + # L3 ML model confidence + kpi_conf = Kpi() + kpi_conf.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_ml_model_confidence"]["kpi_id"].kpi_id) + + # get the output.confidence of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + confidences_normal_last_time_interval = [] + confidences_crypto_last_time_interval = [] + + for i in range(self.monitor_inference_results): + if ( + self.monitor_inference_results[i]["timestamp"] >= time_interval_start + and self.monitor_inference_results[i]["timestamp"] <= time_interval_end + and self.monitor_inference_results[i]["service_id"] == service_id + and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] + ): + if self.monitor_inference_results[i]["output"]["tag"] == self.NORMAL_CLASS: + confidences_normal_last_time_interval.append( + self.monitor_inference_results[i]["output"]["confidence"] ) - num_unique_compromised_clients_last_time_interval += 1 - - kpi_unique_compromised_clients.kpi_value.intVal = num_unique_compromised_clients_last_time_interval - - # L3 unique attackers - kpi_unique_attackers = Kpi() - kpi_unique_attackers.kpi_id.kpi_id.CopyFrom( - self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id - ) - - # get the number of unique attackers (grouping by destination ip) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - num_unique_attackers_last_time_interval = 0 - unique_attackers_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: - if ( - self.monitor_inference_results[i]["output"]["ip_d"] - not in unique_attackers_last_time_interval - ): - unique_attackers_last_time_interval.append( - self.monitor_inference_results[i]["output"]["ip_d"] + elif self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + confidences_crypto_last_time_interval.append( + self.monitor_inference_results[i]["output"]["confidence"] ) - num_unique_attackers_last_time_interval += 1 - - kpi_unique_attackers.kpi_value.intVal = num_unique_attackers_last_time_interval - - timestamp = Timestamp() - timestamp.timestamp = timestamp_utcnow_to_float() - kpi_security_status.timestamp.CopyFrom(timestamp) - kpi_conf.timestamp.CopyFrom(timestamp) - kpi_unique_attack_conns.timestamp.CopyFrom(timestamp) - kpi_unique_compromised_clients.timestamp.CopyFrom(timestamp) - kpi_unique_attackers.timestamp.CopyFrom(timestamp) - - self.monitoring_client.IncludeKpi(kpi_security_status) - self.monitoring_client.IncludeKpi(kpi_conf) - self.monitoring_client.IncludeKpi(kpi_unique_attack_conns) - self.monitoring_client.IncludeKpi(kpi_unique_compromised_clients) - self.monitoring_client.IncludeKpi(kpi_unique_attackers) - - sleep(self.MONITORED_KPIS_TIME_INTERVAL_AGG) + kpi_conf.kpi_value.intVal = ( + np.mean(confidences_crypto_last_time_interval) + if np.all(outputs_last_time_interval == self.CRYPTO_CLASS) + else np.mean(confidences_normal_last_time_interval) + ) + + # L3 unique attack connections + kpi_unique_attack_conns = Kpi() + kpi_unique_attack_conns.kpi_id.kpi_id.CopyFrom( + self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id + ) + + # get the number of unique attack connections (grouping by origin IP, origin port, destination IP, destination port) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_attack_conns_last_time_interval = 0 + unique_attack_conns_last_time_interval = [] + + for i in range(self.monitor_inference_results): + if ( + self.monitor_inference_results[i]["timestamp"] >= time_interval_start + and self.monitor_inference_results[i]["timestamp"] <= time_interval_end + and self.monitor_inference_results[i]["service_id"] == service_id + and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] + ): + if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + current_attack_conn = { + "ip_o": self.monitor_inference_results[i]["input"]["src_ip"], + "port_o": self.monitor_inference_results[i]["input"]["src_port"], + "ip_d": self.monitor_inference_results[i]["input"]["dst_ip"], + "port_d": self.monitor_inference_results[i]["input"]["dst_port"], + } + + for j in range(unique_attack_conns_last_time_interval): + if current_attack_conn == unique_attack_conns_last_time_interval[j]: + break + + num_unique_attack_conns_last_time_interval += 1 + unique_attack_conns_last_time_interval.append(current_attack_conn) + + kpi_unique_attack_conns.kpi_value.intVal = num_unique_attack_conns_last_time_interval + + # L3 unique compromised clients + kpi_unique_compromised_clients = Kpi() + kpi_unique_compromised_clients.kpi_id.kpi_id.CopyFrom( + self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id + ) + + # get the number of unique compromised clients (grouping by origin IP) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_compromised_clients_last_time_interval = 0 + unique_compromised_clients_last_time_interval = [] + + for i in range(self.monitor_inference_results): + if ( + self.monitor_inference_results[i]["timestamp"] >= time_interval_start + and self.monitor_inference_results[i]["timestamp"] <= time_interval_end + and self.monitor_inference_results[i]["service_id"] == service_id + and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] + ): + if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + if ( + self.monitor_inference_results[i]["output"]["ip_o"] + not in unique_compromised_clients_last_time_interval + ): + unique_compromised_clients_last_time_interval.append( + self.monitor_inference_results[i]["output"]["ip_o"] + ) + num_unique_compromised_clients_last_time_interval += 1 + + kpi_unique_compromised_clients.kpi_value.intVal = num_unique_compromised_clients_last_time_interval + + # L3 unique attackers + kpi_unique_attackers = Kpi() + kpi_unique_attackers.kpi_id.kpi_id.CopyFrom( + self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id + ) + + # get the number of unique attackers (grouping by destination ip) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_attackers_last_time_interval = 0 + unique_attackers_last_time_interval = [] + + for i in range(self.monitor_inference_results): + if ( + self.monitor_inference_results[i]["timestamp"] >= time_interval_start + and self.monitor_inference_results[i]["timestamp"] <= time_interval_end + and self.monitor_inference_results[i]["service_id"] == service_id + and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] + ): + if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + if ( + self.monitor_inference_results[i]["output"]["ip_d"] + not in unique_attackers_last_time_interval + ): + unique_attackers_last_time_interval.append( + self.monitor_inference_results[i]["output"]["ip_d"] + ) + num_unique_attackers_last_time_interval += 1 + + kpi_unique_attackers.kpi_value.intVal = num_unique_attackers_last_time_interval + + timestamp = Timestamp() + timestamp.timestamp = timestamp_utcnow_to_float() + + kpi_security_status.timestamp.CopyFrom(timestamp) + kpi_conf.timestamp.CopyFrom(timestamp) + kpi_unique_attack_conns.timestamp.CopyFrom(timestamp) + kpi_unique_compromised_clients.timestamp.CopyFrom(timestamp) + kpi_unique_attackers.timestamp.CopyFrom(timestamp) + + self.monitoring_client.IncludeKpi(kpi_security_status) + self.monitoring_client.IncludeKpi(kpi_conf) + self.monitoring_client.IncludeKpi(kpi_unique_attack_conns) + self.monitoring_client.IncludeKpi(kpi_unique_compromised_clients) + self.monitoring_client.IncludeKpi(kpi_unique_attackers) + + sleep(self.MONITORED_KPIS_TIME_INTERVAL_AGG) + except KeyboardInterrupt: + print("Exiting...") + break """ Classify connection as standard traffic or cryptomining attack and return results @@ -393,7 +407,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto def SendInput(self, request, context): # Store the data sent in the request - self.inference_values.put({"request": request, "timestamp": datetime.now()}) + # Protobuff messages are NOT pickable, so we need to serialize them first + # self.inference_values.put({"request": request, "timestamp": datetime.now()}) # Perform inference with the data sent in the request logging.info("Performing inference...") @@ -401,7 +416,16 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto logging.info("Inference performed correctly") # Store the results of the inference that will be later used to monitor the KPIs - self.inference_results.put({"output": cryptomining_detector_output, "timestamp": datetime.now()}) + # Protobuff messages are NOT pickable, so we need to serialize them first + cryptomining_detector_output_serialized = copy.deepcopy(cryptomining_detector_output) + cryptomining_detector_output_serialized["service_id"] = MessageToJson( + request.service_id, preserving_proto_field_name=True + ) + cryptomining_detector_output_serialized["endpoint_id"] = MessageToJson( + request.endpoint_id, preserving_proto_field_name=True + ) + + self.inference_results.put({"output": cryptomining_detector_output_serialized, "timestamp": datetime.now()}) service_id = request.service_id