# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function from datetime import datetime import os import grpc import numpy as np import onnxruntime as rt import logging from common.proto.l3_centralizedattackdetector_pb2 import Empty from common.proto.l3_centralizedattackdetector_pb2_grpc import L3CentralizedattackdetectorServicer from common.proto.l3_attackmitigator_pb2 import L3AttackmitigatorOutput from common.proto.l3_attackmitigator_pb2_grpc import L3AttackmitigatorStub from common.proto.monitoring_pb2 import KpiDescriptor from common.proto.kpi_sample_types_pb2 import KpiSampleType from monitoring.client.MonitoringClient import MonitoringClient from common.proto.monitoring_pb2 import Kpi from common.tools.timestamp.Converters import timestamp_utcnow_to_float from common.proto.context_pb2 import Timestamp from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient LOGGER = logging.getLogger(__name__) current_dir = os.path.dirname(os.path.abspath(__file__)) MODEL_FILE = os.path.join(current_dir, "ml_model/crypto_5g_rf_spider_features.onnx") classification_threshold = os.getenv("CAD_CLASSIFICATION_THRESHOLD", 0.5) class l3_centralizedattackdetectorServiceServicerImpl(L3CentralizedattackdetectorServicer): def __init__(self): LOGGER.info("Creating Centralized Attack Detector Service") self.inference_values = [] self.model = rt.InferenceSession(MODEL_FILE) self.input_name = self.model.get_inputs()[0].name self.label_name = self.model.get_outputs()[0].name self.prob_name = self.model.get_outputs()[1].name self.monitoring_client = MonitoringClient() self.predicted_class_kpi_id = None self.class_probability_kpi_id = None self.attackmitigator_client = l3_attackmitigatorClient() def create_predicted_class_kpi(self, client: MonitoringClient, service_id): kpi_description: KpiDescriptor = KpiDescriptor() kpi_description.kpi_description = "Cryptomining Detector Predicted Class (service: {})".format(service_id) kpi_description.service_id.service_uuid.uuid = service_id.service_uuid.uuid kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN new_kpi = client.SetKpi(kpi_description) LOGGER.info("Created Predicted Class KPI {}...".format(new_kpi.kpi_id)) return new_kpi def create_class_prob_kpi(self, client: MonitoringClient, service_id): kpi_description: KpiDescriptor = KpiDescriptor() kpi_description.kpi_description = "Cryptomining Detector Prediction (service: {})".format(service_id) kpi_description.service_id.service_uuid.uuid = service_id.service_uuid.uuid kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN new_kpi = client.SetKpi(kpi_description) LOGGER.info("Created Class Probability KPI {}...".format(new_kpi.kpi_id)) return new_kpi def make_inference(self, request): x_data = np.array( [ [ request.c_pkts_all, request.c_ack_cnt, request.c_bytes_uniq, request.c_pkts_data, request.c_bytes_all, request.s_pkts_all, request.s_ack_cnt, request.s_bytes_uniq, request.s_pkts_data, request.s_bytes_all, ] ] ) predictions = self.model.run([self.prob_name], {self.input_name: x_data.astype(np.float32)})[0] # Gather the predicted class, the probability of that class and other relevant information required to block the attack output_message = { "confidence": None, "timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S"), "ip_o": request.ip_o, "ip_d": request.ip_d, "tag_name": None, "tag": None, "flow_id": request.flow_id, "protocol": request.protocol, "port_o": request.port_o, "port_d": request.port_d, "ml_id": "RandomForest", "service_id": request.service_id, "endpoint_id": request.endpoint_id, "time_start": request.time_start, "time_end": request.time_end, } if predictions[0][1] >= classification_threshold: output_message["confidence"] = predictions[0][1] output_message["tag_name"] = "Crypto" output_message["tag"] = 1 else: output_message["confidence"] = predictions[0][0] output_message["tag_name"] = "Normal" output_message["tag"] = 0 return L3AttackmitigatorOutput(**output_message) def SendInput(self, request, context): # Store the data sent in the request self.inference_values.append(request) # Perform inference with the data sent in the request logging.info("Performing inference...") output = self.make_inference(request) logging.info("Inference performed correctly") # Include monitored KPIs values service_id = request.service_id if self.predicted_class_kpi_id is None: self.predicted_class_kpi_id = self.create_predicted_class_kpi(self.monitoring_client, service_id) if self.class_probability_kpi_id is None: self.class_probability_kpi_id = self.create_class_prob_kpi(self.monitoring_client, service_id) # Packet Aggregation Features -> DAD -> CAD -> ML -> (2 Instantaneous Value: higher class probability, predicted class) -> Monitoring # In addition, two counters: # Counter 1: Total number of crypto attack connections # Counter 2: Rate of crypto attack connections with respect to the total number of connections # Predicted class KPI kpi_class = Kpi() kpi_class.kpi_id.kpi_id.CopyFrom(self.predicted_class_kpi_id.kpi_id) kpi_class.kpi_value.int32Val = 1 if output.tag_name == "Crypto" else 0 # Class probability KPI kpi_prob = Kpi() kpi_prob.kpi_id.kpi_id.CopyFrom(self.class_probability_kpi_id.kpi_id) kpi_prob.kpi_value.floatVal = output.confidence timestamp = Timestamp() timestamp.timestamp = timestamp_utcnow_to_float() kpi_class.timestamp.CopyFrom(timestamp) kpi_prob.timestamp.CopyFrom(kpi_class.timestamp) self.monitoring_client.IncludeKpi(kpi_class) self.monitoring_client.IncludeKpi(kpi_prob) if output.tag_name == "Crypto": logging.info("Crypto attack detected") # Notify the Attack Mitigator component about the attack logging.info( "Notifying the Attack Mitigator component about the attack in order to block the connection..." ) try: """with grpc.insecure_channel("192.168.165.78:10002") as channel: stub = L3AttackmitigatorStub(channel) logging.info("Sending the connection information to the Attack Mitigator component...") response = stub.SendOutput(output)""" logging.info("Sending the connection information to the Attack Mitigator component...") response = self.attackmitigator_client.SendOutput(output) logging.info( "Attack Mitigator notified and received response: ", response.message ) # FIX No message received return Empty(message="OK, information received and mitigator notified abou the attack") except Exception as e: logging.error("Error notifying the Attack Mitigator component about the attack: ", e) logging.error("Couldn't find l3_attackmitigator") return Empty(message="Attack Mitigator not found") else: logging.info("No attack detected") return Empty(message="Ok, information received (no attack detected)") def GetOutput(self, request, context): logging.info("Returning inference output...") k = np.multiply(self.inference_values, [2]) k = np.sum(k) return self.make_inference(k)