diff --git a/deploy_component.sh b/deploy_component.sh index a4cf6184c83ef026562abe8e084430bba3ead9c8..c9ceaf0a6e1f89cd39985975daaa7d963400dd91 100755 --- a/deploy_component.sh +++ b/deploy_component.sh @@ -56,16 +56,16 @@ for COMPONENT in $TFS_COMPONENTS; do echo "Processing '$COMPONENT' component..." IMAGE_NAME="$COMPONENT:$TFS_IMAGE_TAG" IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGE/$IMAGE_NAME" | sed 's,//,/,g' | sed 's,http:/,,g') - + echo " Building Docker image..." BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}.log" - + if [ "$COMPONENT" == "automation" ] || [ "$COMPONENT" == "policy" ]; then docker build -t "$IMAGE_NAME" -f ./src/"$COMPONENT"/Dockerfile ./src/"$COMPONENT"/ > "$BUILD_LOG" - elif [ "$COMPONENT" == "pathcomp" ]; then + elif [ "$COMPONENT" == "pathcomp" ]; then BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-frontend.log" docker build -t "$COMPONENT-frontend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/frontend/Dockerfile . >> "$BUILD_LOG" - + BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}-backend.log" docker build -t "$COMPONENT-backend:$TFS_IMAGE_TAG" -f ./src/"$COMPONENT"/backend/Dockerfile . >> "$BUILD_LOG" # next command is redundant, but helpful to keep cache updated between rebuilds @@ -73,44 +73,44 @@ for COMPONENT in $TFS_COMPONENTS; do else docker build -t "$IMAGE_NAME" -f ./src/"$COMPONENT"/Dockerfile . > "$BUILD_LOG" fi - + if [ -n "$TFS_REGISTRY_IMAGE" ]; then echo " Pushing Docker image to '$TFS_REGISTRY_IMAGE'..." - + if [ "$COMPONENT" == "pathcomp" ]; then TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}-frontend.log" docker tag "$COMPONENT-frontend:$TFS_IMAGE_TAG" "$IMAGE_URL-frontend" > "$TAG_LOG" - + TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}-backend.log" docker tag "$COMPONENT-backend:$TFS_IMAGE_TAG" "$IMAGE_URL-backend" > "$TAG_LOG" - + PUSH_LOG="$TMP_LOGS_FOLDER/push_${COMPONENT}-frontend.log" docker push "$IMAGE_URL-frontend" > "$PUSH_LOG" - + PUSH_LOG="$TMP_LOGS_FOLDER/push_${COMPONENT}-backend.log" docker push "$IMAGE_URL-backend" > "$PUSH_LOG" else TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}.log" docker tag "$IMAGE_NAME" "$IMAGE_URL" > "$TAG_LOG" - + PUSH_LOG="$TMP_LOGS_FOLDER/push_${COMPONENT}.log" docker push "$IMAGE_URL" > "$PUSH_LOG" fi fi - + echo " Adapting '$COMPONENT' manifest file..." MANIFEST="$TMP_MANIFESTS_FOLDER/${COMPONENT}service.yaml" cp ./manifests/"${COMPONENT}"service.yaml "$MANIFEST" - + if [ -n "$TFS_REGISTRY_IMAGE" ]; then # Registry is set if [ "$COMPONENT" == "pathcomp" ]; then VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-frontend:" "$MANIFEST" | cut -d ":" -f3) sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-frontend:${VERSION}#image: $IMAGE_URL-frontend#g" "$MANIFEST" - + VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-backend:" "$MANIFEST" | cut -d ":" -f3) sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-backend:${VERSION}#image: $IMAGE_URL-backend#g" "$MANIFEST" - + sed -E -i "s#imagePullPolicy: .*#imagePullPolicy: Always#g" "$MANIFEST" else VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f3) @@ -122,10 +122,10 @@ for COMPONENT in $TFS_COMPONENTS; do if [ "$COMPONENT" == "pathcomp" ]; then VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-frontend:" "$MANIFEST" | cut -d ":" -f3) sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-frontend:${VERSION}#image: $IMAGE_NAME-frontend#g" "$MANIFEST" - + VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-backend:" "$MANIFEST" | cut -d ":" -f3) sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-backend:${VERSION}#image: $IMAGE_NAME-backend#g" "$MANIFEST" - + sed -E -i "s#imagePullPolicy: .*#imagePullPolicy: Never#g" "$MANIFEST" else VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f3) @@ -133,9 +133,9 @@ for COMPONENT in $TFS_COMPONENTS; do sed -E -i "s#imagePullPolicy: .*#imagePullPolicy: Never#g" "$MANIFEST" fi fi - + # TODO: harmonize names of the monitoring component - + echo " Deploying '$COMPONENT' component to Kubernetes..." DEPLOY_LOG="$TMP_LOGS_FOLDER/deploy_${COMPONENT}.log" kubectl --namespace $TFS_K8S_NAMESPACE delete -f "$MANIFEST" > "$DEPLOY_LOG" @@ -143,33 +143,33 @@ for COMPONENT in $TFS_COMPONENTS; do COMPONENT_OBJNAME=$(echo "${COMPONENT}" | sed "s/\_/-/") kubectl --namespace $TFS_K8S_NAMESPACE scale deployment --replicas=0 ${COMPONENT_OBJNAME}service >> "$DEPLOY_LOG" kubectl --namespace $TFS_K8S_NAMESPACE scale deployment --replicas=1 ${COMPONENT_OBJNAME}service >> "$DEPLOY_LOG" - + echo " Collecting env-vars for '$COMPONENT' component..." - + SERVICE_DATA=$(kubectl get service ${COMPONENT}service --namespace $TFS_K8S_NAMESPACE -o json) if [ -z "${SERVICE_DATA}" ]; then continue; fi - + # Env vars for service's host address SERVICE_HOST=$(echo ${SERVICE_DATA} | jq -r '.spec.clusterIP') if [ -z "${SERVICE_HOST}" ]; then continue; fi # TODO: remove previous value from file ENVVAR_HOST=$(echo "${COMPONENT}service_SERVICE_HOST" | tr '[:lower:]' '[:upper:]') echo "export ${ENVVAR_HOST}=${SERVICE_HOST}" >> $ENV_VARS_SCRIPT - + # Env vars for service's 'grpc' port (if any) SERVICE_PORT_GRPC=$(echo ${SERVICE_DATA} | jq -r '.spec.ports[] | select(.name=="grpc") | .port') if [ -n "${SERVICE_PORT_GRPC}" ]; then ENVVAR_PORT_GRPC=$(echo "${COMPONENT}service_SERVICE_PORT_GRPC" | tr '[:lower:]' '[:upper:]') echo "export ${ENVVAR_PORT_GRPC}=${SERVICE_PORT_GRPC}" >> $ENV_VARS_SCRIPT fi - + # Env vars for service's 'http' port (if any) SERVICE_PORT_HTTP=$(echo ${SERVICE_DATA} | jq -r '.spec.ports[] | select(.name=="http") | .port') if [ -n "${SERVICE_PORT_HTTP}" ]; then ENVVAR_PORT_HTTP=$(echo "${COMPONENT}service_SERVICE_PORT_HTTP" | tr '[:lower:]' '[:upper:]') echo "export ${ENVVAR_PORT_HTTP}=${SERVICE_PORT_HTTP}" >> $ENV_VARS_SCRIPT fi - + printf "\n" done @@ -177,7 +177,7 @@ done for COMPONENT in $TFS_COMPONENTS; do echo "Waiting for '$COMPONENT' component..." kubectl wait --namespace $TFS_K8S_NAMESPACE \ - --for='condition=available' --timeout=300s deployment/${COMPONENT}service + --for='condition=available' --timeout=3s deployment/${COMPONENT}service printf "\n" done diff --git a/deploy_l3_component.sh b/deploy_l3_component.sh new file mode 100755 index 0000000000000000000000000000000000000000..a590bacbcdb99ce4d1b7d399da4d9ecb94166870 --- /dev/null +++ b/deploy_l3_component.sh @@ -0,0 +1,45 @@ +component=$1 + +source "my_deploy.sh" + +echo "Deploying $component..." + +# check if component == "CAD" +if [ $component == "CAD" ]; then + # find kubernetes pod that contains "centralizedattackdetectorservice" + pod=$(kubectl --namespace $TFS_K8S_NAMESPACE get pods | grep l3-centralizedattackdetectorservice | awk '{print $1}') + + # delete pod + kubectl --namespace $TFS_K8S_NAMESPACE delete pod $pod --force --grace-period=0 + + # # wait for pod to be deleted + # while [ $(kubectl --namespace $TFS_K8S_NAMESPACE get pods | grep l3-centralizedattackdetectorservice | wc -l) -gt 0 ]; do + # sleep 1 + # done + + # deploy l3_centralizedattackdetector component + ./deploy_component.sh "l3_centralizedattackdetector" +fi + +# check if component == "AM" +if [ $component == "AM" ]; then + # find kubernetes pod that contains "l3-attackmitigatorservice" + pod=$(kubectl --namespace $TFS_K8S_NAMESPACE get pods | grep l3-attackmitigatorservice | awk '{print $1}') + + # delete pod + kubectl --namespace $TFS_K8S_NAMESPACE delete pod $pod --force --grace-period=0 + + # # wait for pod to be deleted + # while [ $(kubectl --namespace $TFS_K8S_NAMESPACE get pods | grep l3-attackmitigatorservice | wc -l) -gt 0 ]; do + # sleep 1 + # done + + # deploy l3_attackmitigator component + ./deploy_component.sh "l3_attackmitigator" +fi + +echo "Component $component deployed" + +echo "Restarting DAD..." +sshpass -p "ubuntu" ssh -o StrictHostKeyChecking=no -n -f ubuntu@192.168.165.73 "sh -c 'nohup /home/ubuntu/TeraflowDockerDistributed/restart.sh > /dev/null 2>&1 &'" +echo "DAD restarted" \ No newline at end of file diff --git a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py index 305fd979b37c06689c9774a41c125eb620649f95..02346f7dd69404484ce673053a23af2c12764630 100644 --- a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py +++ b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py @@ -14,6 +14,7 @@ from __future__ import print_function from datetime import datetime +from datetime import timedelta import os import grpc @@ -35,15 +36,19 @@ from monitoring.client.MonitoringClient import MonitoringClient from common.proto.monitoring_pb2 import Kpi from common.tools.timestamp.Converters import timestamp_utcnow_to_float -from common.proto.context_pb2 import Timestamp, ServiceId, EndPointId +from common.proto.context_pb2 import Timestamp, ServiceId, EndPointId, SliceId, DeviceId from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient +# from context.client.ContextClient import ContextClient + from multiprocessing import Process, Queue from google.protobuf.json_format import MessageToJson, Parse import copy +import uuid + LOGGER = logging.getLogger(__name__) current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -67,51 +72,60 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto self.prob_name = self.model.get_outputs()[1].name self.monitoring_client = MonitoringClient() self.service_ids = [] + self.monitored_service_ids = Queue() self.monitored_kpis = { "l3_security_status": { "kpi_id": None, "description": "L3 - Confidence of the cryptomining detector in the security status in the last time interval of the service {service_id}", - "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_UNKNOWN, # TODO: change this to KPI_L3_SECURITY_STATUS and add it to kpi_sample_types.proto + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO, "service_ids": [], }, "l3_ml_model_confidence": { "kpi_id": None, "description": "L3 - Security status of the service in a time interval of the service {service_id} (“0†if no attack has been detected on the service and “1†if a cryptomining attack has been detected)", - "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_UNKNOWN, # TODO: change this to KPI_L3_ML_CONFIDENCE and add it to kpi_sample_types.proto + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_ML_CONFIDENCE, "service_ids": [], }, "l3_unique_attack_conns": { "kpi_id": None, "description": "L3 - Number of attack connections detected in a time interval of the service {service_id} (attacks of the same connection [origin IP, origin port, destination IP and destination port] are only considered once)", - "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_UNKNOWN, # TODO: change this to KPI_UNIQUE_ATTACK_CONNS and add it to kpi_sample_types.proto + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS, "service_ids": [], }, "l3_unique_compromised_clients": { "kpi_id": None, "description": "L3 - Number of unique compromised clients of the service in a time interval of the service {service_id} (attacks from the same origin IP are only considered once)", - "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_UNKNOWN, # TODO: change this to KPI_UNIQUE_COMPROMISED_CLIENTS and add it to kpi_sample_types.proto + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_COMPROMISED_CLIENTS, "service_ids": [], }, "l3_unique_attackers": { "kpi_id": None, "description": "L3 - number of unique attackers of the service in a time interval of the service {service_id} (attacks from the same destination IP are only considered once)", - "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_UNKNOWN, # TODO: change this to KPI_UNIQUE_ATTACKERS and add it to kpi_sample_types.proto + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_ATTACKERS, "service_ids": [], }, } self.attackmitigator_client = l3_attackmitigatorClient() + # self.context_client = ContextClient() + # self.context_id = "admin" # Environment variables self.CLASSIFICATION_THRESHOLD = os.getenv("CAD_CLASSIFICATION_THRESHOLD", 0.5) - self.MONITORED_KPIS_TIME_INTERVAL_AGG = os.getenv("MONITORED_KPIS_TIME_INTERVAL_AGG", 5) + self.MONITORED_KPIS_TIME_INTERVAL_AGG = os.getenv("MONITORED_KPIS_TIME_INTERVAL_AGG", 30) # Constants self.NORMAL_CLASS = 0 self.CRYPTO_CLASS = 1 # start monitoring process - self.monitoring_process = Process(target=self.monitor_kpis, args=(self.inference_results,)) - self.monitoring_process.start() + self.monitoring_process = Process( + target=self.monitor_kpis, + args=( + self.monitored_service_ids, + self.inference_results, + ), + ) + # self.monitoring_process.start() """ Create a monitored KPI for a specific service and add it to the Monitoring Client @@ -124,15 +138,23 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto """ def create_kpi( - self, client: MonitoringClient, service_id, device_id, endpoint_id, kpi_name, kpi_description, kpi_sample_type + self, + service_id, + device_id, + endpoint_id, + # slice_id, + kpi_name, + kpi_description, + kpi_sample_type, ): kpidescriptor = KpiDescriptor() kpidescriptor.kpi_description = kpi_description kpidescriptor.service_id.service_uuid.uuid = service_id.service_uuid.uuid kpidescriptor.device_id.device_uuid.uuid = device_id.device_uuid.uuid kpidescriptor.endpoint_id.endpoint_uuid.uuid = endpoint_id.endpoint_uuid.uuid + # kpidescriptor.slice_id.slice_uuid.uuid = slice_id.slice_uuid.uuid kpidescriptor.kpi_sample_type = kpi_sample_type - new_kpi = client.SetKpi(kpidescriptor) + new_kpi = self.monitoring_client.SetKpi(kpidescriptor) LOGGER.info("Created KPI {}".format(kpi_name)) @@ -146,206 +168,314 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto """ def create_kpis(self, service_id, device_id, endpoint_id): + LOGGER.info("Creating KPIs for service {}".format(service_id)) + # for now, all the KPIs are created for all the services from which requests are received for kpi in self.monitored_kpis: + # slice_ids_list = self.context_client.ListSliceIds(self.context_id)[0] + # # generate random slice_id + # slice_id = SliceId() + # slice_id.slice_uuid.uuid = str(uuid.uuid4()) + + # generate random device_id + device_id = DeviceId() + device_id.device_uuid.uuid = str(uuid.uuid4()) + created_kpi = self.create_kpi( - self.monitoring_client, service_id, device_id, endpoint_id, + # slice_id, kpi, - self.monitored_kpis[kpi]["description"], + self.monitored_kpis[kpi]["description"].format(service_id=service_id.service_uuid.uuid), self.monitored_kpis[kpi]["kpi_sample_type"], ) self.monitored_kpis[kpi]["kpi_id"] = created_kpi.kpi_id self.monitored_kpis[kpi]["service_ids"].append(service_id.service_uuid.uuid) - def monitor_kpis(self, inference_results): + self.monitoring_process.start() + + def monitor_kpis(self, service_ids, inference_results): + self.monitoring_client_test = MonitoringClient() + + monitor_inference_results = [] + monitor_service_ids = [] + + # sleep(10) + time_interval_start = None + while True: - try: - # get all information from the inference_results queue - monitor_inference_results = [] + # get all information from the inference_results queue + # deserialize the inference results + # for i in range(len(monitor_inference_results)): + # monitor_inference_results[i]["output"]["service_id"] = Parse( + # monitor_inference_results[i]["output"]["service_id"], ServiceId() + # ) + # monitor_inference_results[i]["output"]["endpoint_id"] = Parse( + # monitor_inference_results[i]["output"]["endpoint_id"], EndPointId() + # ) + + LOGGER.debug("Sleeping for %s seconds", self.MONITORED_KPIS_TIME_INTERVAL_AGG) + sleep(self.MONITORED_KPIS_TIME_INTERVAL_AGG) + + for i in range(service_ids.qsize()): + new_service_id = service_ids.get() + service_id = Parse(new_service_id, ServiceId()) + monitor_service_ids.append(service_id) + + for i in range(inference_results.qsize()): + new_inference_result = inference_results.get() + new_inference_result["output"]["service_id"] = Parse( + new_inference_result["output"]["service_id"], ServiceId() + ) + new_inference_result["output"]["endpoint_id"] = Parse( + new_inference_result["output"]["endpoint_id"], EndPointId() + ) + + monitor_inference_results.append(new_inference_result) + + LOGGER.debug("monitor_inference_results: {}".format(len(monitor_inference_results))) + LOGGER.debug("monitor_service_ids: {}".format(len(monitor_service_ids))) + + while len(monitor_inference_results) == 0: + LOGGER.debug("monitor_inference_results is empty, waiting for new inference results") for i in range(inference_results.qsize()): - monitor_inference_results.append(self.inference_results.get()) - - # deserialize the inference results - for i in range(len(monitor_inference_results)): - monitor_inference_results[i]["output"]["service_id"] = Parse( - monitor_inference_results[i]["output"]["service_id"], ServiceId() + new_inference_result = inference_results.get() + new_inference_result["output"]["service_id"] = Parse( + new_inference_result["output"]["service_id"], ServiceId() ) - monitor_inference_results[i]["output"]["endpoint_id"] = Parse( - monitor_inference_results[i]["output"]["endpoint_id"], EndPointId() + new_inference_result["output"]["endpoint_id"] = Parse( + new_inference_result["output"]["endpoint_id"], EndPointId() ) - for service_id in self.service_ids: - time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG - time_interval_start = datetime.utcnow() - time_interval_end = time_interval_start + time_interval + monitor_inference_results.append(new_inference_result) - # L3 security status - kpi_security_status = Kpi() - kpi_security_status.kpi_id.kpi_id.CopyFrom( - self.monitored_kpis["l3_security_status"]["kpi_id"].kpi_id - ) + sleep(1) - # get the output.tag of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - outputs_last_time_interval = [] + for service_id in monitor_service_ids: + LOGGER.debug("service_id: {}".format(service_id)) - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - outputs_last_time_interval.append(self.monitor_inference_results[i]["output"]["tag"]) + time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG + # time_interval_start = datetime.utcnow() - kpi_security_status.kpi_value.intVal = ( - 0 if np.all(outputs_last_time_interval == self.NORMAL_CLASS) else 1 - ) + # assign the timestamp of the first inference result to the time_interval_start + if time_interval_start is None: + time_interval_start = monitor_inference_results[0]["timestamp"] + else: + time_interval_start = time_interval_start + timedelta(seconds=time_interval) + + # add time_interval to the current time to get the time interval end + time_interval_end = time_interval_start + timedelta(seconds=time_interval) + + # delete the inference results that are previous to the time interval start + deleted_items = [] + + for i in range(len(monitor_inference_results)): + if monitor_inference_results[i]["timestamp"] < time_interval_start: + deleted_items.append(i) + + LOGGER.debug("deleted_items: {}".format(deleted_items)) + + for i in range(len(deleted_items)): + monitor_inference_results.pop(deleted_items[i] - i) + + if len(monitor_inference_results) == 0: + break + + LOGGER.debug("time_interval_start: {}".format(time_interval_start)) + LOGGER.debug("time_interval_end: {}".format(time_interval_end)) - # L3 ML model confidence - kpi_conf = Kpi() - kpi_conf.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_ml_model_confidence"]["kpi_id"].kpi_id) - - # get the output.confidence of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - confidences_normal_last_time_interval = [] - confidences_crypto_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - if self.monitor_inference_results[i]["output"]["tag"] == self.NORMAL_CLASS: - confidences_normal_last_time_interval.append( - self.monitor_inference_results[i]["output"]["confidence"] + # L3 security status + kpi_security_status = Kpi() + kpi_security_status.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_security_status"]["kpi_id"]) + + # get the output.tag of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + outputs_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + if ( + monitor_inference_results[i]["timestamp"] >= time_interval_start + and monitor_inference_results[i]["timestamp"] < time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid in self.monitored_kpis["l3_security_status"]["service_ids"] + ): + outputs_last_time_interval.append(monitor_inference_results[i]["output"]["tag"]) + + kpi_security_status.kpi_value.int32Val = ( + 0 if np.all(outputs_last_time_interval == self.NORMAL_CLASS) else 1 + ) + + # L3 ML model confidence + kpi_conf = Kpi() + kpi_conf.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_ml_model_confidence"]["kpi_id"]) + + # get the output.confidence of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + confidences_normal_last_time_interval = [] + confidences_crypto_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + LOGGER.debug("monitor_inference_results[i]: {}".format(monitor_inference_results[i])) + + if ( + monitor_inference_results[i]["timestamp"] >= time_interval_start + and monitor_inference_results[i]["timestamp"] < time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid + in self.monitored_kpis["l3_ml_model_confidence"]["service_ids"] + ): + if monitor_inference_results[i]["output"]["tag"] == self.NORMAL_CLASS: + confidences_normal_last_time_interval.append( + monitor_inference_results[i]["output"]["confidence"] + ) + elif monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + confidences_crypto_last_time_interval.append( + monitor_inference_results[i]["output"]["confidence"] + ) + else: + LOGGER.debug("Unknown tag: {}".format(monitor_inference_results[i]["output"]["tag"])) + + LOGGER.debug("confidences_normal_last_time_interval: {}".format(confidences_normal_last_time_interval)) + LOGGER.debug("confidences_crypto_last_time_interval: {}".format(confidences_crypto_last_time_interval)) + + kpi_conf.kpi_value.floatVal = ( + np.mean(confidences_crypto_last_time_interval) + if np.all(outputs_last_time_interval == self.CRYPTO_CLASS) + else np.mean(confidences_normal_last_time_interval) + ) + + # L3 unique attack connections + kpi_unique_attack_conns = Kpi() + kpi_unique_attack_conns.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"]) + + # get the number of unique attack connections (grouping by origin IP, origin port, destination IP, destination port) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_attack_conns_last_time_interval = 0 + unique_attack_conns_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + if ( + monitor_inference_results[i]["timestamp"] >= time_interval_start + and monitor_inference_results[i]["timestamp"] < time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid + in self.monitored_kpis["l3_unique_attack_conns"]["service_ids"] + ): + if monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + current_attack_conn = { + "ip_o": monitor_inference_results[i]["output"]["ip_o"], + "port_o": monitor_inference_results[i]["output"]["port_o"], + "ip_d": monitor_inference_results[i]["output"]["ip_d"], + "port_d": monitor_inference_results[i]["output"]["port_d"], + } + + for j in range(len(unique_attack_conns_last_time_interval)): + if current_attack_conn == unique_attack_conns_last_time_interval[j]: + break + + num_unique_attack_conns_last_time_interval += 1 + unique_attack_conns_last_time_interval.append(current_attack_conn) + + kpi_unique_attack_conns.kpi_value.int32Val = num_unique_attack_conns_last_time_interval + + # L3 unique compromised clients + kpi_unique_compromised_clients = Kpi() + kpi_unique_compromised_clients.kpi_id.kpi_id.CopyFrom( + self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"] + ) + + # get the number of unique compromised clients (grouping by origin IP) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_compromised_clients_last_time_interval = 0 + unique_compromised_clients_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + if ( + monitor_inference_results[i]["timestamp"] >= time_interval_start + and monitor_inference_results[i]["timestamp"] < time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid + in self.monitored_kpis["l3_unique_attack_conns"]["service_ids"] + ): + if monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + if ( + monitor_inference_results[i]["output"]["ip_o"] + not in unique_compromised_clients_last_time_interval + ): + unique_compromised_clients_last_time_interval.append( + monitor_inference_results[i]["output"]["ip_o"] ) - elif self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: - confidences_crypto_last_time_interval.append( - self.monitor_inference_results[i]["output"]["confidence"] + num_unique_compromised_clients_last_time_interval += 1 + + kpi_unique_compromised_clients.kpi_value.int32Val = num_unique_compromised_clients_last_time_interval + + # L3 unique attackers + kpi_unique_attackers = Kpi() + kpi_unique_attackers.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"]) + + # get the number of unique attackers (grouping by destination ip) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_attackers_last_time_interval = 0 + unique_attackers_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + if ( + monitor_inference_results[i]["timestamp"] >= time_interval_start + and monitor_inference_results[i]["timestamp"] < time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid + in self.monitored_kpis["l3_unique_attack_conns"]["service_ids"] + ): + if monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + if ( + monitor_inference_results[i]["output"]["ip_d"] + not in unique_attackers_last_time_interval + ): + unique_attackers_last_time_interval.append( + monitor_inference_results[i]["output"]["ip_d"] ) + num_unique_attackers_last_time_interval += 1 - kpi_conf.kpi_value.intVal = ( - np.mean(confidences_crypto_last_time_interval) - if np.all(outputs_last_time_interval == self.CRYPTO_CLASS) - else np.mean(confidences_normal_last_time_interval) - ) + kpi_unique_attackers.kpi_value.int32Val = num_unique_attackers_last_time_interval - # L3 unique attack connections - kpi_unique_attack_conns = Kpi() - kpi_unique_attack_conns.kpi_id.kpi_id.CopyFrom( - self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id - ) + timestamp = Timestamp() + timestamp.timestamp = timestamp_utcnow_to_float() - # get the number of unique attack connections (grouping by origin IP, origin port, destination IP, destination port) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - num_unique_attack_conns_last_time_interval = 0 - unique_attack_conns_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: - current_attack_conn = { - "ip_o": self.monitor_inference_results[i]["input"]["src_ip"], - "port_o": self.monitor_inference_results[i]["input"]["src_port"], - "ip_d": self.monitor_inference_results[i]["input"]["dst_ip"], - "port_d": self.monitor_inference_results[i]["input"]["dst_port"], - } - - for j in range(unique_attack_conns_last_time_interval): - if current_attack_conn == unique_attack_conns_last_time_interval[j]: - break - - num_unique_attack_conns_last_time_interval += 1 - unique_attack_conns_last_time_interval.append(current_attack_conn) - - kpi_unique_attack_conns.kpi_value.intVal = num_unique_attack_conns_last_time_interval - - # L3 unique compromised clients - kpi_unique_compromised_clients = Kpi() - kpi_unique_compromised_clients.kpi_id.kpi_id.CopyFrom( - self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id - ) + kpi_security_status.timestamp.CopyFrom(timestamp) + kpi_conf.timestamp.CopyFrom(timestamp) + kpi_unique_attack_conns.timestamp.CopyFrom(timestamp) + kpi_unique_compromised_clients.timestamp.CopyFrom(timestamp) + kpi_unique_attackers.timestamp.CopyFrom(timestamp) - # get the number of unique compromised clients (grouping by origin IP) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - num_unique_compromised_clients_last_time_interval = 0 - unique_compromised_clients_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: - if ( - self.monitor_inference_results[i]["output"]["ip_o"] - not in unique_compromised_clients_last_time_interval - ): - unique_compromised_clients_last_time_interval.append( - self.monitor_inference_results[i]["output"]["ip_o"] - ) - num_unique_compromised_clients_last_time_interval += 1 - - kpi_unique_compromised_clients.kpi_value.intVal = num_unique_compromised_clients_last_time_interval - - # L3 unique attackers - kpi_unique_attackers = Kpi() - kpi_unique_attackers.kpi_id.kpi_id.CopyFrom( - self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"].kpi_id - ) + LOGGER.debug("Sending KPIs to monitoring server") + + LOGGER.debug("kpi_security_status: {}".format(kpi_security_status)) + LOGGER.debug("kpi_conf: {}".format(kpi_conf)) + LOGGER.debug("kpi_unique_attack_conns: {}".format(kpi_unique_attack_conns)) + LOGGER.debug("kpi_unique_compromised_clients: {}".format(kpi_unique_compromised_clients)) + LOGGER.debug("kpi_unique_attackers: {}".format(kpi_unique_attackers)) + + _create_kpi_request = KpiDescriptor() + _create_kpi_request.kpi_description = "KPI Description Test" + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN + _create_kpi_request.device_id.device_uuid.uuid = "DEVUPM" # pylint: disable=maybe-no-member + _create_kpi_request.service_id.service_uuid.uuid = "SERVUPM" # pylint: disable=maybe-no-member + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = "ENDUPM" # pylint: disable=maybe-no-member + + new_kpi = self.monitoring_client_test.SetKpi(_create_kpi_request) + LOGGER.debug("New KPI: {}".format(new_kpi)) + + _include_kpi_request = Kpi() + _include_kpi_request.kpi_id.kpi_id.uuid = new_kpi.kpi_id.uuid + _include_kpi_request.timestamp.timestamp = timestamp_utcnow_to_float() + _include_kpi_request.kpi_value.floatVal = 500 + + self.monitoring_client_test.IncludeKpi(_include_kpi_request) + + # self.monitoring_client.IncludeKpi(kpi_security_status) + # self.monitoring_client.IncludeKpi(kpi_conf) + # self.monitoring_client.IncludeKpi(kpi_unique_attack_conns) + # self.monitoring_client.IncludeKpi(kpi_unique_compromised_clients) + # self.monitoring_client.IncludeKpi(kpi_unique_attackers) - # get the number of unique attackers (grouping by destination ip) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable - num_unique_attackers_last_time_interval = 0 - unique_attackers_last_time_interval = [] - - for i in range(self.monitor_inference_results): - if ( - self.monitor_inference_results[i]["timestamp"] >= time_interval_start - and self.monitor_inference_results[i]["timestamp"] <= time_interval_end - and self.monitor_inference_results[i]["service_id"] == service_id - and service_id in self.monitored_kpis["l3_security_status"]["service_ids"] - ): - if self.monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: - if ( - self.monitor_inference_results[i]["output"]["ip_d"] - not in unique_attackers_last_time_interval - ): - unique_attackers_last_time_interval.append( - self.monitor_inference_results[i]["output"]["ip_d"] - ) - num_unique_attackers_last_time_interval += 1 - - kpi_unique_attackers.kpi_value.intVal = num_unique_attackers_last_time_interval - - timestamp = Timestamp() - timestamp.timestamp = timestamp_utcnow_to_float() - - kpi_security_status.timestamp.CopyFrom(timestamp) - kpi_conf.timestamp.CopyFrom(timestamp) - kpi_unique_attack_conns.timestamp.CopyFrom(timestamp) - kpi_unique_compromised_clients.timestamp.CopyFrom(timestamp) - kpi_unique_attackers.timestamp.CopyFrom(timestamp) - - self.monitoring_client.IncludeKpi(kpi_security_status) - self.monitoring_client.IncludeKpi(kpi_conf) - self.monitoring_client.IncludeKpi(kpi_unique_attack_conns) - self.monitoring_client.IncludeKpi(kpi_unique_compromised_clients) - self.monitoring_client.IncludeKpi(kpi_unique_attackers) - - sleep(self.MONITORED_KPIS_TIME_INTERVAL_AGG) - except KeyboardInterrupt: - print("Exiting...") - break + LOGGER.debug("KPIs sent to monitoring server") """ Classify connection as standard traffic or cryptomining attack and return results @@ -441,6 +571,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto if service_id not in self.service_ids: self.create_kpis(service_id, device_id, endpoint_id) self.service_ids.append(service_id) + self.monitored_service_ids.put(MessageToJson(service_id, preserving_proto_field_name=True)) # Only notify Attack Mitigator when a cryptomining connection has been detected if cryptomining_detector_output["tag_name"] == "Crypto": @@ -455,9 +586,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto logging.info("Sending the connection information to the Attack Mitigator component...") message = L3AttackmitigatorOutput(**cryptomining_detector_output) response = self.attackmitigator_client.SendOutput(message) - logging.info( - "Attack Mitigator notified and received response: ", response.message - ) # FIX No message received + # logging.info("Attack Mitigator notified and received response: ", response.message) # FIX No message received + logging.info("Attack Mitigator notified") return Empty(message="OK, information received and mitigator notified abou the attack") except Exception as e: