Skip to content
Snippets Groups Projects
Commit 0b991b39 authored by delacal's avatar delacal
Browse files

Fixed the implementation of the monitored KPIs in the Centralized Attack Detector component

parent c6a54c84
No related branches found
No related tags found
2 merge requests!142Release TeraFlowSDN 2.1,!93Updated L3 components + scalability
source my_deploy.sh; ./deploy.sh; source tfs_runtime_env_vars.sh; ofc22/run_test_01_bootstrap.sh; ofc22/run_test_02_create_service.sh
\ No newline at end of file
...@@ -248,7 +248,7 @@ for COMPONENT in $TFS_COMPONENTS; do ...@@ -248,7 +248,7 @@ for COMPONENT in $TFS_COMPONENTS; do
echo "Waiting for '$COMPONENT' component..." echo "Waiting for '$COMPONENT' component..."
COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/") COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/")
kubectl wait --namespace $TFS_K8S_NAMESPACE \ kubectl wait --namespace $TFS_K8S_NAMESPACE \
--for='condition=available' --timeout=300s deployment/${COMPONENT_OBJNAME}service --for='condition=available' --timeout=10s deployment/${COMPONENT_OBJNAME}service
printf "\n" printf "\n"
done done
......
...@@ -35,12 +35,16 @@ from monitoring.client.MonitoringClient import MonitoringClient ...@@ -35,12 +35,16 @@ from monitoring.client.MonitoringClient import MonitoringClient
from common.proto.monitoring_pb2 import Kpi from common.proto.monitoring_pb2 import Kpi
from common.tools.timestamp.Converters import timestamp_utcnow_to_float from common.tools.timestamp.Converters import timestamp_utcnow_to_float
from common.proto.context_pb2 import Timestamp from common.proto.context_pb2 import Timestamp, ServiceId, EndPointId
from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from google.protobuf.json_format import MessageToJson, Parse
import copy
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
MODEL_FILE = os.path.join(current_dir, "ml_model/crypto_5g_rf_spider_features.onnx") MODEL_FILE = os.path.join(current_dir, "ml_model/crypto_5g_rf_spider_features.onnx")
...@@ -106,9 +110,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -106,9 +110,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
self.CRYPTO_CLASS = 1 self.CRYPTO_CLASS = 1
# start monitoring process # start monitoring process
self.monitoring_process = Process( self.monitoring_process = Process(target=self.monitor_kpis, args=(self.inference_results,))
target=self.monitoring_process, args=(self.inference_values, self.inference_results)
)
self.monitoring_process.start() self.monitoring_process.start()
""" """
...@@ -116,17 +118,19 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -116,17 +118,19 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
-input: -input:
+ client: Monitoring Client object where the KPI will be tracked + client: Monitoring Client object where the KPI will be tracked
+ service_id: service ID where the KPI will be monitored + service_id: service ID where the KPI will be monitored
+ kpi_description: description of the KPI
+ kpi_sample_type: KPI sample type of the KPI (it must be defined in the kpi_sample_types.proto file)
-output: KPI identifier representing the KPI -output: KPI identifier representing the KPI
""" """
def create_kpi(self, client: MonitoringClient, service_id, kpi_description, kpi_sample_type): def create_kpi(self, client: MonitoringClient, service_id, kpi_name, kpi_description, kpi_sample_type):
kpi_description: KpiDescriptor = KpiDescriptor() kpidescriptor = KpiDescriptor()
kpi_description.kpi_description = kpi_description kpidescriptor.kpi_description = kpi_description
kpi_description.service_id.service_uuid.uuid = service_id.service_uuid.uuid kpidescriptor.service_id.service_uuid.uuid = service_id.service_uuid.uuid
kpi_description.kpi_sample_type = kpi_sample_type kpidescriptor.kpi_sample_type = kpi_sample_type
new_kpi = client.SetKpi(kpi_description) new_kpi = client.SetKpi(kpidescriptor)
LOGGER.info("Created KPI {}...".format(kpi_sample_type)) LOGGER.info("Created KPI {}".format(kpi_name))
return new_kpi return new_kpi
...@@ -143,189 +147,199 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -143,189 +147,199 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
created_kpi = self.create_kpi( created_kpi = self.create_kpi(
self.monitoring_client, self.monitoring_client,
service_id, service_id,
kpi,
self.monitored_kpis[kpi]["description"], self.monitored_kpis[kpi]["description"],
self.monitored_kpis[kpi]["kpi_sample_type"], self.monitored_kpis[kpi]["kpi_sample_type"],
) )
self.monitored_kpis[kpi]["kpi_id"] = created_kpi.kpi_id self.monitored_kpis[kpi]["kpi_id"] = created_kpi.kpi_id
self.monitored_kpis[kpi]["service_ids"].append(service_id.service_uuid.uuid) self.monitored_kpis[kpi]["service_ids"].append(service_id.service_uuid.uuid)
def monitor_kpis(self): def monitor_kpis(self, inference_results):
while True: while True:
# get all information from the inference_values queue try:
monitor_inference_values = [] # get all information from the inference_results queue
monitor_inference_results = []
for i in range(self.inference_values.qsize()):
monitor_inference_values.append(self.inference_values.get()) for i in range(inference_results.qsize()):
monitor_inference_results.append(self.inference_results.get())
# get all information from the inference_results queue
monitor_inference_results = [] # deserialize the inference results
for i in range(len(monitor_inference_results)):
for i in range(self.inference_results.qsize()): monitor_inference_results[i]["output"]["service_id"] = Parse(
monitor_inference_results.append(self.inference_results.get()) monitor_inference_results[i]["output"]["service_id"], ServiceId()
)
for service_id in self.service_ids: monitor_inference_results[i]["output"]["endpoint_id"] = Parse(
time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG monitor_inference_results[i]["output"]["endpoint_id"], EndPointId()
time_interval_start = datetime.utcnow() )
time_interval_end = time_interval_start + time_interval
for service_id in self.service_ids:
# L3 security status time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG
kpi_security_status = Kpi() time_interval_start = datetime.utcnow()
kpi_security_status.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_security_status"]["kpi_id"].kpi_id) time_interval_end = time_interval_start + time_interval
# get the output.tag of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable # L3 security status
outputs_last_time_interval = [] kpi_security_status = Kpi()
kpi_security_status.kpi_id.kpi_id.CopyFrom(
for i in range(self.monitor_inference_results): self.monitored_kpis["l3_security_status"]["kpi_id"].kpi_id
if ( )
self.monitor_inference_results[i]["timestamp"] >= time_interval_start
and self.monitor_inference_results[i]["timestamp"] <= time_interval_end # get the output.tag of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable
and self.monitor_inference_results[i]["service_id"] == service_id outputs_last_time_interval = []
and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
): for i in range(self.monitor_inference_results):
outputs_last_time_interval.append(self.monitor_inference_results[i]["output"]["tag"]) if (
self.monitor_inference_results[i]["timestamp"] >= time_interval_start
kpi_security_status.kpi_value.intVal = ( and self.monitor_inference_results[i]["timestamp"] <= time_interval_end
0 if np.all(outputs_last_time_interval == self.NORMAL_CLASS) else 1 and self.monitor_inference_results[i]["service_id"] == service_id
) and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
):
# L3 ML model confidence outputs_last_time_interval.append(self.monitor_inference_results[i]["output"]["tag"])
kpi_conf = Kpi()
kpi_conf.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_ml_model_confidence"]["kpi_id"].kpi_id) kpi_security_status.kpi_value.intVal = (
0 if np.all(outputs_last_time_interval == self.NORMAL_CLASS) else 1
# get the output.confidence of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable )
confidences_normal_last_time_interval = []
confidences_crypto_last_time_interval = [] # L3 ML model confidence
kpi_conf = Kpi()
for i in range(self.monitor_inference_results): kpi_conf.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_ml_model_confidence"]["kpi_id"].kpi_id)
if (
self.monitor_inference_results[i]["timestamp"] >= time_interval_start # get the output.confidence of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable
and self.monitor_inference_results[i]["timestamp"] <= time_interval_end confidences_normal_last_time_interval = []
and self.monitor_inference_results[i]["service_id"] == service_id confidences_crypto_last_time_interval = []
and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
): for i in range(self.monitor_inference_results):
if self.monitor_inference_results[i]["output"]["tag"] == self.NORMAL_CLASS: if (
confidences_normal_last_time_interval.append( self.monitor_inference_results[i]["timestamp"] >= time_interval_start
self.monitor_inference_results[i]["output"]["confidence"] and self.monitor_inference_results[i]["timestamp"] <= time_interval_end
) and self.monitor_inference_results[i]["service_id"] == service_id
elif self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
confidences_crypto_last_time_interval.append( ):
self.monitor_inference_results[i]["output"]["confidence"] if self.monitor_inference_results[i]["output"]["tag"] == self.NORMAL_CLASS:
) confidences_normal_last_time_interval.append(
self.monitor_inference_results[i]["output"]["confidence"]
kpi_conf.kpi_value.intVal = (
np.mean(confidences_crypto_last_time_interval)
if np.all(outputs_last_time_interval == self.CRYPTO_CLASS)
else np.mean(confidences_normal_last_time_interval)
)
# L3 unique attack connections
kpi_unique_attack_conns = Kpi()
kpi_unique_attack_conns.kpi_id.kpi_id.CopyFrom(
self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id
)
# get the number of unique attack connections (grouping by origin IP, origin port, destination IP, destination port) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable
num_unique_attack_conns_last_time_interval = 0
unique_attack_conns_last_time_interval = []
for i in range(self.monitor_inference_results):
if (
self.monitor_inference_results[i]["timestamp"] >= time_interval_start
and self.monitor_inference_results[i]["timestamp"] <= time_interval_end
and self.monitor_inference_results[i]["service_id"] == service_id
and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
):
if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS:
current_attack_conn = {
"ip_o": self.monitor_inference_results[i]["input"]["src_ip"],
"port_o": self.monitor_inference_results[i]["input"]["src_port"],
"ip_d": self.monitor_inference_results[i]["input"]["dst_ip"],
"port_d": self.monitor_inference_results[i]["input"]["dst_port"],
}
for j in range(unique_attack_conns_last_time_interval):
if current_attack_conn == unique_attack_conns_last_time_interval[j]:
break
num_unique_attack_conns_last_time_interval += 1
unique_attack_conns_last_time_interval.append(current_attack_conn)
kpi_unique_attack_conns.kpi_value.intVal = num_unique_attack_conns_last_time_interval
# L3 unique compromised clients
kpi_unique_compromised_clients = Kpi()
kpi_unique_compromised_clients.kpi_id.kpi_id.CopyFrom(
self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id
)
# get the number of unique compromised clients (grouping by origin IP) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable
num_unique_compromised_clients_last_time_interval = 0
unique_compromised_clients_last_time_interval = []
for i in range(self.monitor_inference_results):
if (
self.monitor_inference_results[i]["timestamp"] >= time_interval_start
and self.monitor_inference_results[i]["timestamp"] <= time_interval_end
and self.monitor_inference_results[i]["service_id"] == service_id
and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
):
if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS:
if (
self.monitor_inference_results[i]["output"]["ip_o"]
not in unique_compromised_clients_last_time_interval
):
unique_compromised_clients_last_time_interval.append(
self.monitor_inference_results[i]["output"]["ip_o"]
) )
num_unique_compromised_clients_last_time_interval += 1 elif self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS:
confidences_crypto_last_time_interval.append(
kpi_unique_compromised_clients.kpi_value.intVal = num_unique_compromised_clients_last_time_interval self.monitor_inference_results[i]["output"]["confidence"]
# L3 unique attackers
kpi_unique_attackers = Kpi()
kpi_unique_attackers.kpi_id.kpi_id.CopyFrom(
self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id
)
# get the number of unique attackers (grouping by destination ip) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable
num_unique_attackers_last_time_interval = 0
unique_attackers_last_time_interval = []
for i in range(self.monitor_inference_results):
if (
self.monitor_inference_results[i]["timestamp"] >= time_interval_start
and self.monitor_inference_results[i]["timestamp"] <= time_interval_end
and self.monitor_inference_results[i]["service_id"] == service_id
and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
):
if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS:
if (
self.monitor_inference_results[i]["output"]["ip_d"]
not in unique_attackers_last_time_interval
):
unique_attackers_last_time_interval.append(
self.monitor_inference_results[i]["output"]["ip_d"]
) )
num_unique_attackers_last_time_interval += 1
kpi_unique_attackers.kpi_value.intVal = num_unique_attackers_last_time_interval
timestamp = Timestamp()
timestamp.timestamp = timestamp_utcnow_to_float()
kpi_security_status.timestamp.CopyFrom(timestamp) kpi_conf.kpi_value.intVal = (
kpi_conf.timestamp.CopyFrom(timestamp) np.mean(confidences_crypto_last_time_interval)
kpi_unique_attack_conns.timestamp.CopyFrom(timestamp) if np.all(outputs_last_time_interval == self.CRYPTO_CLASS)
kpi_unique_compromised_clients.timestamp.CopyFrom(timestamp) else np.mean(confidences_normal_last_time_interval)
kpi_unique_attackers.timestamp.CopyFrom(timestamp) )
self.monitoring_client.IncludeKpi(kpi_security_status) # L3 unique attack connections
self.monitoring_client.IncludeKpi(kpi_conf) kpi_unique_attack_conns = Kpi()
self.monitoring_client.IncludeKpi(kpi_unique_attack_conns) kpi_unique_attack_conns.kpi_id.kpi_id.CopyFrom(
self.monitoring_client.IncludeKpi(kpi_unique_compromised_clients) self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id
self.monitoring_client.IncludeKpi(kpi_unique_attackers) )
sleep(self.MONITORED_KPIS_TIME_INTERVAL_AGG) # get the number of unique attack connections (grouping by origin IP, origin port, destination IP, destination port) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable
num_unique_attack_conns_last_time_interval = 0
unique_attack_conns_last_time_interval = []
for i in range(self.monitor_inference_results):
if (
self.monitor_inference_results[i]["timestamp"] >= time_interval_start
and self.monitor_inference_results[i]["timestamp"] <= time_interval_end
and self.monitor_inference_results[i]["service_id"] == service_id
and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
):
if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS:
current_attack_conn = {
"ip_o": self.monitor_inference_results[i]["input"]["src_ip"],
"port_o": self.monitor_inference_results[i]["input"]["src_port"],
"ip_d": self.monitor_inference_results[i]["input"]["dst_ip"],
"port_d": self.monitor_inference_results[i]["input"]["dst_port"],
}
for j in range(unique_attack_conns_last_time_interval):
if current_attack_conn == unique_attack_conns_last_time_interval[j]:
break
num_unique_attack_conns_last_time_interval += 1
unique_attack_conns_last_time_interval.append(current_attack_conn)
kpi_unique_attack_conns.kpi_value.intVal = num_unique_attack_conns_last_time_interval
# L3 unique compromised clients
kpi_unique_compromised_clients = Kpi()
kpi_unique_compromised_clients.kpi_id.kpi_id.CopyFrom(
self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id
)
# get the number of unique compromised clients (grouping by origin IP) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable
num_unique_compromised_clients_last_time_interval = 0
unique_compromised_clients_last_time_interval = []
for i in range(self.monitor_inference_results):
if (
self.monitor_inference_results[i]["timestamp"] >= time_interval_start
and self.monitor_inference_results[i]["timestamp"] <= time_interval_end
and self.monitor_inference_results[i]["service_id"] == service_id
and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
):
if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS:
if (
self.monitor_inference_results[i]["output"]["ip_o"]
not in unique_compromised_clients_last_time_interval
):
unique_compromised_clients_last_time_interval.append(
self.monitor_inference_results[i]["output"]["ip_o"]
)
num_unique_compromised_clients_last_time_interval += 1
kpi_unique_compromised_clients.kpi_value.intVal = num_unique_compromised_clients_last_time_interval
# L3 unique attackers
kpi_unique_attackers = Kpi()
kpi_unique_attackers.kpi_id.kpi_id.CopyFrom(
self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id
)
# get the number of unique attackers (grouping by destination ip) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable
num_unique_attackers_last_time_interval = 0
unique_attackers_last_time_interval = []
for i in range(self.monitor_inference_results):
if (
self.monitor_inference_results[i]["timestamp"] >= time_interval_start
and self.monitor_inference_results[i]["timestamp"] <= time_interval_end
and self.monitor_inference_results[i]["service_id"] == service_id
and service_id in self.monitored_kpis["l3_security_status"]["service_ids"]
):
if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS:
if (
self.monitor_inference_results[i]["output"]["ip_d"]
not in unique_attackers_last_time_interval
):
unique_attackers_last_time_interval.append(
self.monitor_inference_results[i]["output"]["ip_d"]
)
num_unique_attackers_last_time_interval += 1
kpi_unique_attackers.kpi_value.intVal = num_unique_attackers_last_time_interval
timestamp = Timestamp()
timestamp.timestamp = timestamp_utcnow_to_float()
kpi_security_status.timestamp.CopyFrom(timestamp)
kpi_conf.timestamp.CopyFrom(timestamp)
kpi_unique_attack_conns.timestamp.CopyFrom(timestamp)
kpi_unique_compromised_clients.timestamp.CopyFrom(timestamp)
kpi_unique_attackers.timestamp.CopyFrom(timestamp)
self.monitoring_client.IncludeKpi(kpi_security_status)
self.monitoring_client.IncludeKpi(kpi_conf)
self.monitoring_client.IncludeKpi(kpi_unique_attack_conns)
self.monitoring_client.IncludeKpi(kpi_unique_compromised_clients)
self.monitoring_client.IncludeKpi(kpi_unique_attackers)
sleep(self.MONITORED_KPIS_TIME_INTERVAL_AGG)
except KeyboardInterrupt:
print("Exiting...")
break
""" """
Classify connection as standard traffic or cryptomining attack and return results Classify connection as standard traffic or cryptomining attack and return results
...@@ -393,7 +407,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -393,7 +407,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
def SendInput(self, request, context): def SendInput(self, request, context):
# Store the data sent in the request # Store the data sent in the request
self.inference_values.put({"request": request, "timestamp": datetime.now()}) # Protobuff messages are NOT pickable, so we need to serialize them first
# self.inference_values.put({"request": request, "timestamp": datetime.now()})
# Perform inference with the data sent in the request # Perform inference with the data sent in the request
logging.info("Performing inference...") logging.info("Performing inference...")
...@@ -401,7 +416,16 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ...@@ -401,7 +416,16 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto
logging.info("Inference performed correctly") logging.info("Inference performed correctly")
# Store the results of the inference that will be later used to monitor the KPIs # Store the results of the inference that will be later used to monitor the KPIs
self.inference_results.put({"output": cryptomining_detector_output, "timestamp": datetime.now()}) # Protobuff messages are NOT pickable, so we need to serialize them first
cryptomining_detector_output_serialized = copy.deepcopy(cryptomining_detector_output)
cryptomining_detector_output_serialized["service_id"] = MessageToJson(
request.service_id, preserving_proto_field_name=True
)
cryptomining_detector_output_serialized["endpoint_id"] = MessageToJson(
request.endpoint_id, preserving_proto_field_name=True
)
self.inference_results.put({"output": cryptomining_detector_output_serialized, "timestamp": datetime.now()})
service_id = request.service_id service_id = request.service_id
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment