diff --git a/expose_ingress_grpc.sh b/expose_ingress_grpc.sh index 945641c1f3b6ae981685a2a257260b14ceb927b2..2bc0fd64b60cafdfad92b3d8d031cd28d7d6a873 100755 --- a/expose_ingress_grpc.sh +++ b/expose_ingress_grpc.sh @@ -21,7 +21,7 @@ export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} # If not already set, set the list of components you want to build images for, and deploy. -export TFS_COMPONENTS=${TFS_COMPONENTS:-"context device automation policy service compute monitoring dbscanserving opticalattackmitigator opticalcentralizedattackdetector webui"} +export TFS_COMPONENTS=${TFS_COMPONENTS:-"context device automation policy service compute monitoring dbscanserving opticalattackmitigator opticalcentralizedattackdetector l3_attackmitigator l3_centralizedattackdetector webui"} ######################################################################################################################## # Automated steps start here diff --git a/manifests/l3_attackmitigatorservice.yaml b/manifests/l3_attackmitigatorservice.yaml index dec1bc4d936a9db5758679691f0fb130a5d67324..ee97d2c92abb5abcad80f8ddf04800ef13144522 100644 --- a/manifests/l3_attackmitigatorservice.yaml +++ b/manifests/l3_attackmitigatorservice.yaml @@ -32,6 +32,7 @@ spec: imagePullPolicy: Always ports: - containerPort: 10002 + - containerPort: 9192 env: - name: LOG_LEVEL value: "DEBUG" @@ -53,11 +54,69 @@ apiVersion: v1 kind: Service metadata: name: l3-attackmitigatorservice + labels: + app: l3-attackmitigatorservice spec: type: ClusterIP selector: app: l3-attackmitigatorservice ports: + - name: metrics + protocol: TCP + port: 9192 + targetPort: 9192 - name: grpc port: 10002 targetPort: 10002 + +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: l3-attackmitigatorservice-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: l3-attackmitigatorservice + minReplicas: 1 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 80 + behavior: + scaleDown: + stabilizationWindowSeconds: 120 + +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: tfs-l3-attackmitigatorservice-metric + labels: + app: l3-attackmitigatorservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: l3-attackmitigatorservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running diff --git a/manifests/l3_centralizedattackdetectorservice.yaml b/manifests/l3_centralizedattackdetectorservice.yaml index 0ef23ba512ac5397203baa4195ceebe17cf6c743..594e21f4dbb1b10e1d859053c33785e2e59e4b46 100644 --- a/manifests/l3_centralizedattackdetectorservice.yaml +++ b/manifests/l3_centralizedattackdetectorservice.yaml @@ -32,6 +32,7 @@ spec: imagePullPolicy: Always ports: - containerPort: 10001 + - containerPort: 9192 env: - name: LOG_LEVEL value: "DEBUG" @@ -53,11 +54,68 @@ apiVersion: v1 kind: Service metadata: name: l3-centralizedattackdetectorservice + labels: + app: l3-centralizedattackdetectorservice spec: type: ClusterIP selector: app: l3-centralizedattackdetectorservice ports: + - name: metrics + protocol: TCP + port: 9192 + targetPort: 9192 - name: grpc port: 10001 targetPort: 10001 + +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: l3-centralizedattackdetectorservice-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: l3-centralizedattackdetectorservice + minReplicas: 1 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 80 + behavior: + scaleDown: + stabilizationWindowSeconds: 120 +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: tfs-l3-centralizedattackdetectorservice-metric + labels: + app: l3-centralizedattackdetectorservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: l3-centralizedattackdetectorservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running diff --git a/my_deploy.sh b/my_deploy.sh index d6f3513e9b2090905b7814c4563644ecda7bd2c6..ee3244ac99d5a2e8d5dba5a6ccd1609b0012c06b 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -20,7 +20,7 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. -export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator" +export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator l3_attackmitigator l3_centralizedattackdetector" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" diff --git a/proto/l3_attackmitigator.proto b/proto/l3_attackmitigator.proto index 5ce9428b6f47318cf4bdd205c48645295ff26d71..572d96f9e586dae4a124b1b9de1368b71fb9f0b7 100644 --- a/proto/l3_attackmitigator.proto +++ b/proto/l3_attackmitigator.proto @@ -17,23 +17,33 @@ syntax = "proto3"; import "context.proto"; service L3Attackmitigator{ - // Sends a greeting - rpc SendOutput (L3AttackmitigatorOutput) returns (context.Empty) {} - // Sends another greeting + // Perform Mitigation + rpc PerformMitigation (L3AttackmitigatorOutput) returns (context.Empty) {} + // Get Mitigation rpc GetMitigation (context.Empty) returns (context.Empty) {} + // Get Configured ACL Rules + rpc GetConfiguredACLRules (context.Empty) returns (ACLRules) {} } message L3AttackmitigatorOutput { float confidence = 1; string timestamp = 2; - string ip_o = 3; - string tag_name = 4; - int32 tag = 5; - string flow_id = 6; - string protocol = 7; - string port_d = 8; - string ml_id = 9; - float time_start = 10; - float time_end = 11; + string ip_o = 3; + string ip_d = 4; + string tag_name = 5; + int32 tag = 6; + string flow_id = 7; + string protocol = 8; + string port_o = 9; + string port_d = 10; + string ml_id = 11; + context.ServiceId service_id = 12; + context.EndPointId endpoint_id = 13; + float time_start = 14; + float time_end = 15; +} + +message ACLRules { + repeated context.ConfigRule acl_rules = 1; } diff --git a/proto/l3_centralizedattackdetector.proto b/proto/l3_centralizedattackdetector.proto index 2aeb8826e8662b6495f10a333145a9f6abe594b9..ed99435aa7db6584b381079cb1e3d589fb9998b5 100644 --- a/proto/l3_centralizedattackdetector.proto +++ b/proto/l3_centralizedattackdetector.proto @@ -14,65 +14,55 @@ syntax = "proto3"; +import "context.proto"; + service L3Centralizedattackdetector { - // Sends a greeting - rpc SendInput (L3CentralizedattackdetectorMetrics) returns (Empty) {} - // Sends another greeting - rpc GetOutput (Empty) returns (L3CentralizedattackdetectorModelOutput) {} + // Analyze single input to the ML model in the CAD component + rpc AnalyzeConnectionStatistics (L3CentralizedattackdetectorMetrics) returns (Empty) {} + + // Analyze a batch of inputs to the ML model in the CAD component + rpc AnalyzeBatchConnectionStatistics (L3CentralizedattackdetectorBatchInput) returns (Empty) {} + + // Get the list of features used by the ML model in the CAD component + rpc GetFeaturesIds (Empty) returns (AutoFeatures) {} +} + +message Feature { + float feature = 1; } message L3CentralizedattackdetectorMetrics { - /* - Model input sent to the Inferencer by the client - There are currently 9 values and - */ + // Input sent by the DAD compoenent to the ML model integrated in the CAD component. - // Machine learning - float n_packets_server_seconds = 1; - float n_packets_client_seconds = 2; - float n_bits_server_seconds = 3; - float n_bits_client_seconds = 4; - float n_bits_server_n_packets_server = 5; - float n_bits_client_n_packets_client = 6; - float n_packets_server_n_packets_client = 7; - float n_bits_server_n_bits_client = 8; + // Machine learning model features + repeated Feature features = 1; + ConnectionMetadata connection_metadata = 2; - // Conection identifier - string ip_o = 9; - string port_o = 10; - string ip_d = 11; - string port_d = 12; - string flow_id = 13; - string protocol = 14; - float time_start = 15; - float time_end = 16; } -message Empty { - string message = 1; +message ConnectionMetadata { + string ip_o = 1; + string port_o = 2; + string ip_d = 3; + string port_d = 4; + string flow_id = 5; + context.ServiceId service_id = 6; + context.EndPointId endpoint_id = 7; + string protocol = 8; + float time_start = 9; + float time_end = 10; } -message L3CentralizedattackdetectorModelOutput { - float confidence = 1; - string timestamp = 2; - string ip_o = 3; - string tag_name = 4; - int32 tag = 5; - string flow_id = 6; - string protocol = 7; - string port_d = 8; - string ml_id = 9; - float time_start = 10; - float time_end = 11; +// Collection of values representing ML features +message AutoFeatures { + repeated float auto_features = 1; } -// Collections or streams? -/* -message InputCollection { - repeated model_input = 1; +// Collection (batch) of model inputs that will be sent to the model +message L3CentralizedattackdetectorBatchInput { + repeated L3CentralizedattackdetectorMetrics metrics = 1; } -message OutputCollection { - repeated model_output = 1; +message Empty { + string message = 1; } -*/ diff --git a/scripts/show_logs_l3-attack-mitigator.sh b/scripts/show_logs_l3-attack-mitigator.sh new file mode 100755 index 0000000000000000000000000000000000000000..fc54a2a36e14b1a40941a5536ef575e7d4ce7038 --- /dev/null +++ b/scripts/show_logs_l3-attack-mitigator.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/l3-attackmitigatorservice -c server diff --git a/scripts/show_logs_l3-centralized-attack-detector.sh b/scripts/show_logs_l3-centralized-attack-detector.sh new file mode 100755 index 0000000000000000000000000000000000000000..002755f3a97d18d1242b05aeeba1364c980bf4a1 --- /dev/null +++ b/scripts/show_logs_l3-centralized-attack-detector.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/l3-centralizedattackdetectorservice -c server diff --git a/src/common/Constants.py b/src/common/Constants.py index a7bf198a7204677ed3669fc28a2c3528a5936425..a8e66f7d42cadc7f5cae4d1f42c2a3a0fad599b2 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -47,6 +47,8 @@ class ServiceNameEnum(Enum): COMPUTE = 'compute' CYBERSECURITY = 'cybersecurity' INTERDOMAIN = 'interdomain' + L3_AM = 'l3-attackmitigator' + L3_CAD = 'l3-centralizedattackdetector' PATHCOMP = 'pathcomp' WEBUI = 'webui' @@ -66,6 +68,8 @@ DEFAULT_SERVICE_GRPC_PORTS = { ServiceNameEnum.DLT .value : 8080, ServiceNameEnum.COMPUTE .value : 9090, ServiceNameEnum.CYBERSECURITY.value : 10000, + ServiceNameEnum.L3_CAD .value : 10001, + ServiceNameEnum.L3_AM .value : 10002, ServiceNameEnum.INTERDOMAIN .value : 10010, ServiceNameEnum.PATHCOMP .value : 10020, diff --git a/src/l3_attackmitigator/Config.py b/src/l3_attackmitigator/Config.py index fafeb8d11d1ec5e5dd2da16c699161ea03f4995d..48b6dbab894e2081066bb12d883e826df34e81ca 100644 --- a/src/l3_attackmitigator/Config.py +++ b/src/l3_attackmitigator/Config.py @@ -18,7 +18,7 @@ import logging LOG_LEVEL = logging.WARNING # gRPC settings -GRPC_SERVICE_PORT = 10002 # TODO UPM FIXME +GRPC_SERVICE_PORT = 10002 GRPC_MAX_WORKERS = 10 GRPC_GRACE_PERIOD = 60 diff --git a/src/l3_attackmitigator/Dockerfile b/src/l3_attackmitigator/Dockerfile index da9ed75ad4f627b5f5f54d21c2f1a8544a600e03..99b7e7a9435ca4172e4ec38f8f8d13c20ebdce57 100644 --- a/src/l3_attackmitigator/Dockerfile +++ b/src/l3_attackmitigator/Dockerfile @@ -63,6 +63,9 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow COPY src/l3_attackmitigator/. l3_attackmitigator +COPY src/monitoring/. monitoring +COPY src/context/. context/ +COPY src/service/. service/ # Start the service ENTRYPOINT ["python", "-m", "l3_attackmitigator.service"] diff --git a/src/l3_attackmitigator/client/l3_attackmitigatorClient.py b/src/l3_attackmitigator/client/l3_attackmitigatorClient.py index fad553cc25ce655ed6ba6435b6511cf440774950..c5d98b1c4974172e50e65db16ba4753e742eab28 100644 --- a/src/l3_attackmitigator/client/l3_attackmitigatorClient.py +++ b/src/l3_attackmitigator/client/l3_attackmitigatorClient.py @@ -13,13 +13,18 @@ # limitations under the License. import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc from common.tools.client.RetryDecorator import retry, delay_exponential from common.proto.l3_attackmitigator_pb2_grpc import ( L3AttackmitigatorStub, ) from common.proto.l3_attackmitigator_pb2 import ( - Output, - EmptyMitigator + L3AttackmitigatorOutput, ACLRules +) + +from common.proto.context_pb2 import ( + Empty ) LOGGER = logging.getLogger(__name__) @@ -28,8 +33,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class l3_attackmitigatorClient: - def __init__(self, address, port): - self.endpoint = "{}:{}".format(address, port) + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.L3_AM) + if not port: port = get_service_port_grpc(ServiceNameEnum.L3_AM) + self.endpoint = "{}:{}".format(host, port) LOGGER.debug("Creating channel to {}...".format(self.endpoint)) self.channel = None self.stub = None @@ -47,16 +54,23 @@ class l3_attackmitigatorClient: self.stub = None @RETRY_DECORATOR - def SendOutput(self, request: Output) -> EmptyMitigator: - LOGGER.debug('SendOutput request: {}'.format(request)) - response = self.stub.SendOutput(request) - LOGGER.debug('SendOutput result: {}'.format(response)) + def PerformMitigation(self, request: L3AttackmitigatorOutput) -> Empty: + LOGGER.debug('PerformMitigation request: {}'.format(request)) + response = self.stub.PerformMitigation(request) + LOGGER.debug('PerformMitigation result: {}'.format(response)) return response - + @RETRY_DECORATOR - def GetMitigation(self, request: EmptyMitigator) -> EmptyMitigator: + def GetMitigation(self, request: Empty) -> Empty: LOGGER.debug('GetMitigation request: {}'.format(request)) response = self.stub.GetMitigation(request) LOGGER.debug('GetMitigation result: {}'.format(response)) return response + + @RETRY_DECORATOR + def GetConfiguredACLRules(self, request: Empty) -> ACLRules: + LOGGER.debug('GetConfiguredACLRules request: {}'.format(request)) + response = self.stub.GetConfiguredACLRules(request) + LOGGER.debug('GetConfiguredACLRules result: {}'.format(response)) + return response diff --git a/src/l3_attackmitigator/service/l3_attackmitigatorServiceServicerImpl.py b/src/l3_attackmitigator/service/l3_attackmitigatorServiceServicerImpl.py index b697a83ff43c28a9051fb6465db3803ca1c068d8..34cfcd5d081a431f165461564eeb5a3390a3bda5 100644 --- a/src/l3_attackmitigator/service/l3_attackmitigatorServiceServicerImpl.py +++ b/src/l3_attackmitigator/service/l3_attackmitigatorServiceServicerImpl.py @@ -14,41 +14,184 @@ from __future__ import print_function import logging -from common.proto.l3_attackmitigator_pb2 import ( - EmptyMitigator -) -from common.proto.l3_attackmitigator_pb2_grpc import ( - L3AttackmitigatorServicer, +import time + +from common.proto.l3_centralizedattackdetector_pb2 import Empty +from common.proto.l3_attackmitigator_pb2_grpc import L3AttackmitigatorServicer +from common.proto.l3_attackmitigator_pb2 import ACLRules +from common.proto.context_pb2 import ( + ServiceId, + ConfigActionEnum, ) +from common.proto.acl_pb2 import AclForwardActionEnum, AclLogActionEnum, AclRuleTypeEnum +from common.proto.context_pb2 import ConfigActionEnum, Service, ServiceId, ConfigRule +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from service.client.ServiceClient import ServiceClient + +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method + LOGGER = logging.getLogger(__name__) -class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): +METRICS_POOL = MetricsPool('l3_attackmitigator', 'RPC') + +class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): def __init__(self): - LOGGER.debug("Creating Servicer...") + LOGGER.info("Creating Attack Mitigator Service") + self.last_value = -1 self.last_tag = 0 - - def SendOutput(self, request, context): - # SEND CONFIDENCE TO MITIGATION SERVER - logging.debug("") - print("Server received mitigation values...", request.confidence) + self.sequence_id = 0 + + self.context_client = ContextClient() + self.service_client = ServiceClient() + self.configured_acl_config_rules = [] + + def configure_acl_rule( + self, + context_uuid: str, + service_uuid: str, + device_uuid: str, + endpoint_uuid: str, + src_ip: str, + dst_ip: str, + src_port: str, + dst_port: str, + ) -> None: + # Create ServiceId + service_id = ServiceId() + service_id.context_id.context_uuid.uuid = context_uuid + service_id.service_uuid.uuid = service_uuid + + # Get service form Context + # context_client = ContextClient() + + try: + _service: Service = self.context_client.GetService(service_id) + except: + raise Exception("Service({:s}) not found".format(grpc_message_to_json_string(service_id))) + + # _service is read-only; copy it to have an updatable service message + service_request = Service() + service_request.CopyFrom(_service) + + # Add ACL ConfigRule into the service service_request + acl_config_rule = service_request.service_config.config_rules.add() + acl_config_rule.action = ConfigActionEnum.CONFIGACTION_SET + + # Set EndpointId associated to the ACLRuleSet + acl_endpoint_id = acl_config_rule.acl.endpoint_id + acl_endpoint_id.device_id.device_uuid.uuid = device_uuid + acl_endpoint_id.endpoint_uuid.uuid = endpoint_uuid + + # Set RuleSet for this ACL ConfigRule + acl_rule_set = acl_config_rule.acl.rule_set + # TODO: update the following parameters; for instance, add them as parameters of the method configure_acl_rule + # acl_rule_set.name = "DROP-HTTPS" + acl_rule_set.name = "DROP-TCP" + acl_rule_set.type = AclRuleTypeEnum.ACLRULETYPE_IPV4 + # acl_rule_set.description = "DROP undesired HTTPS traffic" + acl_rule_set.description = "DROP undesired TCP traffic" + + # Add ACLEntry to the ACLRuleSet + acl_entry = acl_rule_set.entries.add() + acl_entry.sequence_id = self.sequence_id + acl_entry.description = "DROP-{src_ip}:{src_port}-{dst_ip}:{dst_port}".format( + src_ip=src_ip, src_port=src_port, dst_ip=dst_ip, dst_port=dst_port + ) + acl_entry.match.protocol = ( + 6 # TCP according to https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml + ) + acl_entry.match.src_address = "{}/32".format(src_ip) + acl_entry.match.dst_address = "{}/32".format(dst_ip) + acl_entry.match.src_port = int(src_port) + acl_entry.match.dst_port = int(dst_port) + # TODO: update the following parameters; for instance, add them as parameters of the method configure_acl_rule + acl_entry.action.forward_action = AclForwardActionEnum.ACLFORWARDINGACTION_DROP + acl_entry.action.log_action = AclLogActionEnum.ACLLOGACTION_NOLOG + + LOGGER.info("ACL Rule Set: %s", acl_rule_set) + LOGGER.info("ACL Config Rule: %s", acl_config_rule) + + # Add the ACLRuleSet to the list of configured ACLRuleSets + self.configured_acl_config_rules.append(acl_config_rule) + + # Update the Service with the new ACL RuleSet + # service_client = ServiceClient() + service_reply: ServiceId = self.service_client.UpdateService(service_request) + + # TODO: Log the service_reply details + + if service_reply != service_request.service_id: # pylint: disable=no-member + raise Exception("Service update failed. Wrong ServiceId was returned") + + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def PerformMitigation(self, request, context): last_value = request.confidence last_tag = request.tag - # RETURN OK TO THE CALLER - return EmptyMitigator( - message=f"OK, received values: {last_tag} with confidence {last_value}." + + LOGGER.info( + "Attack Mitigator received attack mitigation information. Prediction confidence: %s, Predicted class: %s", + last_value, + last_tag, ) - def GetMitigation(self, request, context): - # GET OR PERFORM MITIGATION STRATEGY - logging.debug("") - print("Returing mitigation strategy...") - k = self.last_value * 2 - return EmptyMitigator( - message=f"Mitigation with double confidence = {k}" + ip_o = request.ip_o + ip_d = request.ip_d + port_o = request.port_o + port_d = request.port_d + + sentinel = True + counter = 0 + service_id = request.service_id + + LOGGER.info("Service Id.:\n{}".format(service_id)) + + LOGGER.info("Retrieving service from Context") + while sentinel: + try: + service = self.context_client.GetService(service_id) + sentinel = False + except Exception as e: + counter = counter + 1 + LOGGER.debug("Waiting 2 seconds", counter, e) + time.sleep(2) + + LOGGER.info(f"Service with Service Id.: {service_id}\n{service}") + + LOGGER.info("Adding new rule to the service to block the attack") + self.configure_acl_rule( + context_uuid=service_id.context_id.context_uuid.uuid, + service_uuid=service_id.service_uuid.uuid, + device_uuid=request.endpoint_id.device_id.device_uuid.uuid, + endpoint_uuid=request.endpoint_id.endpoint_uuid.uuid, + src_ip=ip_o, + dst_ip=ip_d, + src_port=port_o, + dst_port=port_d, ) + LOGGER.info("Service with new rule:\n{}".format(service)) + + LOGGER.info("Updating service with the new rule") + self.service_client.UpdateService(service) + + LOGGER.info( + "Service obtained from Context after updating with the new rule:\n{}".format( + self.context_client.GetService(service_id) + ) + ) + + return Empty(message=f"OK, received values: {last_tag} with confidence {last_value}.") + + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetConfiguredACLRules(self, request, context): + acl_rules = ACLRules() + for acl_config_rule in self.configured_acl_config_rules: + acl_rules.acl_rules.append(acl_config_rule) - + return acl_rules diff --git a/src/l3_attackmitigator/service/test_create_service.py b/src/l3_attackmitigator/service/test_create_service.py new file mode 100644 index 0000000000000000000000000000000000000000..01cf769a271de1bbbd0329a3ce21ea476ac10cab --- /dev/null +++ b/src/l3_attackmitigator/service/test_create_service.py @@ -0,0 +1,267 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import logging +from common.proto.l3_centralizedattackdetector_pb2 import ( + Empty +) +from common.proto.l3_attackmitigator_pb2_grpc import ( + L3AttackmitigatorServicer, +) +from common.proto.context_pb2 import ( + Service, ServiceId, ServiceConfig, ServiceTypeEnum, ServiceStatusEnum, ServiceStatus, Context, ContextId, Uuid, + Timestamp, ConfigRule, ConfigRule_Custom, ConfigActionEnum, Device, DeviceId, DeviceConfig, + DeviceOperationalStatusEnum, DeviceDriverEnum, EndPoint, Link, LinkId, EndPoint, EndPointId, Topology, TopologyId +) +from common.proto.context_pb2_grpc import ( + ContextServiceStub +) +from common.proto.service_pb2_grpc import ( + ServiceServiceStub +) +from datetime import datetime +import grpc + +LOGGER = logging.getLogger(__name__) +CONTEXT_CHANNEL = "192.168.165.78:1010" +SERVICE_CHANNEL = "192.168.165.78:3030" + +class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): + + def GetNewService(self, service_id): + service = Service() + service_id_obj = self.GenerateServiceId(service_id) + service.service_id.CopyFrom(service_id_obj) + service.service_type = ServiceTypeEnum.SERVICETYPE_L3NM + service_status = ServiceStatus() + service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE + service.service_status.CopyFrom(service_status) + timestamp = Timestamp() + timestamp.timestamp = datetime.timestamp(datetime.now()) + service.timestamp.CopyFrom(timestamp) + return service + + def GetNewContext(self, service_id): + context = Context() + context_id = ContextId() + uuid = Uuid() + uuid.uuid = service_id + context_id.context_uuid.CopyFrom(uuid) + context.context_id.CopyFrom(context_id) + return context + + def GetNewDevice(self, service_id): + device = Device() + device_id = DeviceId() + uuid = Uuid() + uuid.uuid = service_id + device_id.device_uuid.CopyFrom(uuid) + device.device_type="test" + device.device_id.CopyFrom(device_id) + device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + return device + + def GetNewLink(self, service_id): + link = Link() + link_id = LinkId() + uuid = Uuid() + uuid.uuid = service_id + link_id.link_uuid.CopyFrom(uuid) + link.link_id.CopyFrom(link_id) + return link + + def GetNewTopology(self,context_id, device_id, link_id): + topology = Topology() + topology_id = TopologyId() + topology_id.context_id.CopyFrom(context_id) + uuid = Uuid() + uuid.uuid = "test_crypto" + topology_id.topology_uuid.CopyFrom(uuid) + topology.topology_id.CopyFrom(topology_id) + topology.device_ids.extend([device_id]) + topology.link_ids.extend([link_id]) + return topology + + def GetNewEndpoint(self, topology_id, device_id, uuid_name): + endpoint = EndPoint() + endpoint_id = EndPointId() + endpoint_id.topology_id.CopyFrom(topology_id) + endpoint_id.device_id.CopyFrom(device_id) + uuid = Uuid() + uuid.uuid = uuid_name + endpoint_id.endpoint_uuid.CopyFrom(uuid) + endpoint.endpoint_id.CopyFrom(endpoint_id) + endpoint.endpoint_type = "test" + return endpoint + + + def __init__(self): + LOGGER.debug("Creating Servicer...") + self.last_value = -1 + self.last_tag = 0 + """ + context = self.GetNewContext("test_crypto") + print(context, flush=True) + print(self.SetContext(context)) + + service = self.GetNewService("test_crypto") + print("This is the new service", self.CreateService(service), flush = True) + + ip_o = "127.0.0.1" + ip_d = "127.0.0.2" + port_o = "123" + port_d = "124" + + service_id = self.GenerateServiceId("test_crypto") + + config_rule = self.GetConfigRule(ip_o, ip_d, port_o, port_d) + + service = self.GetService(service_id) + print("Service obtained from id", service, flush=True) + + config_rule = self.GetConfigRule(ip_o, ip_d, port_o, port_d) + + #service_config = service.service_config + #service_config.append(config_rule) + + service_config = ServiceConfig() + service_config.config_rules.extend([config_rule]) + service.service_config.CopyFrom(service_config) + + device = self.GetNewDevice("test_crypto") + print("New device", device, flush=True) + device_id = self.SetDevice(device) + + link = self.GetNewLink("test_crypto") + print("New link", link, flush=True) + link_id = self.SetLink(link) + + topology = self.GetNewTopology(context.context_id, device.device_id, link.link_id) + print("New topology", topology, flush=True) + topology_id = self.SetTopology(topology) + + endpoint = self.GetNewEndpoint(topology.topology_id, device.device_id, "test_crypto") + print("New endpoint", endpoint, flush=True) + link.link_endpoint_ids.extend([endpoint.endpoint_id]) + + self.SetLink(link) + + print("Service with new rule", service, flush=True) + self.UpdateService(service) + + service2 = self.GetService(service_id) + print("Service obtained from id after updating", service2, flush=True) + """ + + def GenerateRuleValue(self, ip_o, ip_d, port_o, port_d): + value = { + 'ipv4:source-address': ip_o, + 'ipv4:destination-address': ip_d, + 'transport:source-port': port_o, + 'transport:destination-port': port_d, + 'forwarding-action': 'DROP', + } + return value + + def GetConfigRule(self, ip_o, ip_d, port_o, port_d): + config_rule = ConfigRule() + config_rule_custom = ConfigRule_Custom() + config_rule.action = ConfigActionEnum.CONFIGACTION_SET + config_rule_custom.resource_key = 'test' + config_rule_custom.resource_value = str(self.GenerateRuleValue(ip_o, ip_d, port_o, port_d)) + config_rule.custom.CopyFrom(config_rule_custom) + return config_rule + + def GenerateServiceId(self, service_id): + service_id_obj = ServiceId() + context_id = ContextId() + uuid = Uuid() + uuid.uuid = service_id + context_id.context_uuid.CopyFrom(uuid) + service_id_obj.context_id.CopyFrom(context_id) + service_id_obj.service_uuid.CopyFrom(uuid) + return service_id_obj + + def SendOutput(self, request, context): + # SEND CONFIDENCE TO MITIGATION SERVER + print("Server received mitigation values...", request.confidence, flush=True) + + last_value = request.confidence + last_tag = request.tag + + ip_o = request.ip_o + ip_d = request.ip_d + port_o = request.port_o + port_d = request.port_d + + service_id = self.GenerateServiceId(request.service_id) + + config_rule = self.GetConfigRule(ip_o, ip_d, port_o, port_d) + + service = GetService(service_id) + print(service) + #service.config_rules.append(config_rule) + #UpdateService(service) + + # RETURN OK TO THE CALLER + return Empty( + message=f"OK, received values: {last_tag} with confidence {last_value}." + ) + + def SetDevice(self, device): + with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: + stub = ContextServiceStub(channel) + return stub.SetDevice(device) + + def SetLink(self, link): + with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: + stub = ContextServiceStub(channel) + return stub.SetLink(link) + + def SetTopology(self, link): + with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: + stub = ContextServiceStub(channel) + return stub.SetTopology(link) + + + def GetService(self, service_id): + with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: + stub = ContextServiceStub(channel) + return stub.GetService(service_id) + + def SetContext(self, context): + with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: + stub = ContextServiceStub(channel) + return stub.SetContext(context) + + def UpdateService(self, service): + with grpc.insecure_channel(SERVICE_CHANNEL) as channel: + stub = ServiceServiceStub(channel) + stub.UpdateService(service) + + def CreateService(self, service): + with grpc.insecure_channel(SERVICE_CHANNEL) as channel: + stub = ServiceServiceStub(channel) + stub.CreateService(service) + + def GetMitigation(self, request, context): + # GET OR PERFORM MITIGATION STRATEGY + logging.debug("") + print("Returing mitigation strategy...") + k = self.last_value * 2 + return Empty( + message=f"Mitigation with double confidence = {k}" + ) + diff --git a/src/l3_centralizedattackdetector/.gitlab-ci.yml b/src/l3_centralizedattackdetector/.gitlab-ci.yml index 791b917524fa3ad2fa91e0bc4f928fcb60851341..057545eb1b65aac26610c6460a47592b4e7604c2 100644 --- a/src/l3_centralizedattackdetector/.gitlab-ci.yml +++ b/src/l3_centralizedattackdetector/.gitlab-ci.yml @@ -52,7 +52,7 @@ unit test l3_centralizedattackdetector: - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi script: - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - - docker run --name $IMAGE_NAME -d -p 10001:10001 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker run --name $IMAGE_NAME -d -p 10001:10001 --env CAD_CLASSIFICATION_THRESHOLD=0.5 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - sleep 5 - docker ps -a - docker logs $IMAGE_NAME diff --git a/src/l3_centralizedattackdetector/Dockerfile b/src/l3_centralizedattackdetector/Dockerfile index e5b9aa33b78b2593dc1df6d48edaf8c41b65a02b..377ecd21b0ad553512797ce7a31f1ce520462a13 100644 --- a/src/l3_centralizedattackdetector/Dockerfile +++ b/src/l3_centralizedattackdetector/Dockerfile @@ -44,6 +44,10 @@ WORKDIR /var/teraflow/common COPY src/common/. ./ RUN rm -rf proto +RUN mkdir -p /var/teraflow/l3_attackmitigator +WORKDIR /var/teraflow/l3_attackmitigator +COPY src/l3_attackmitigator/. ./ + # Create proto sub-folder, copy .proto files, and generate Python code RUN mkdir -p /var/teraflow/common/proto WORKDIR /var/teraflow/common/proto @@ -63,6 +67,7 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow COPY src/l3_centralizedattackdetector/. l3_centralizedattackdetector +COPY src/monitoring/. monitoring # Start the service ENTRYPOINT ["python", "-m", "l3_centralizedattackdetector.service"] diff --git a/src/l3_centralizedattackdetector/client/l3_centralizedattackdetectorClient.py b/src/l3_centralizedattackdetector/client/l3_centralizedattackdetectorClient.py index f84bd84217c30e25c6e8ccfbb9b9e48c073a5ee5..2ef33438e77dbe4c3609bd21133fb3a9c95c8bcc 100644 --- a/src/l3_centralizedattackdetector/client/l3_centralizedattackdetectorClient.py +++ b/src/l3_centralizedattackdetector/client/l3_centralizedattackdetectorClient.py @@ -18,7 +18,10 @@ from common.proto.l3_centralizedattackdetector_pb2_grpc import ( L3CentralizedattackdetectorStub, ) from common.proto.l3_centralizedattackdetector_pb2 import ( + AutoFeatures, Empty, + L3CentralizedattackdetectorBatchInput, + L3CentralizedattackdetectorMetrics, ModelInput, ModelOutput ) @@ -48,17 +51,24 @@ class l3_centralizedattackdetectorClient: self.stub = None @RETRY_DECORATOR - def SendInput(self, request: ModelInput) -> Empty: - LOGGER.debug('SendInput request: {}'.format(request)) - response = self.stub.SendInput(request) - LOGGER.debug('SendInput result: {}'.format(response)) + def AnalyzeConnectionStatistics(self, request: L3CentralizedattackdetectorMetrics) -> Empty: + LOGGER.debug('AnalyzeConnectionStatistics request: {}'.format(request)) + response = self.stub.AnalyzeConnectionStatistics(request) + LOGGER.debug('AnalyzeConnectionStatistics result: {}'.format(response)) return response @RETRY_DECORATOR - def GetOutput(self, request: Empty) -> ModelOutput: - LOGGER.debug('GetOutput request: {}'.format(request)) + def AnalyzeBatchConnectionStatistics(self, request: L3CentralizedattackdetectorBatchInput) -> Empty: + LOGGER.debug('AnalyzeBatchConnectionStatistics request: {}'.format(request)) response = self.stub.GetOutput(request) - LOGGER.debug('GetOutput result: {}'.format(response)) + LOGGER.debug('AnalyzeBatchConnectionStatistics result: {}'.format(response)) + return response + + @RETRY_DECORATOR + def GetFeaturesIds(self, request: Empty) -> AutoFeatures: + LOGGER.debug('GetFeaturesIds request: {}'.format(request)) + response = self.stub.GetOutput(request) + LOGGER.debug('GetFeaturesIds result: {}'.format(response)) return response diff --git a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorService.py b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorService.py index bf58247d3c6467b6f9b3ca7d0dcbc9c5239195c4..857f0e448725399caa82b3fdd331ff9ceff623a8 100644 --- a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorService.py +++ b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorService.py @@ -85,7 +85,6 @@ class l3_centralizedattackdetectorService: ) # pylint: disable=maybe-no-member LOGGER.debug("Service started") - #self.l3_centralizedattackdetector_servicer.setup_l3_centralizedattackdetector() def stop(self): LOGGER.debug( diff --git a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py index dfe67813f875cbbc3173460efed73b212ba0ee0f..8f59c81150345f15faaf69824d1036b87f5bd80d 100644 --- a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py +++ b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py @@ -14,118 +14,763 @@ from __future__ import print_function from datetime import datetime +from datetime import timedelta + import os -import grpc import numpy as np import onnxruntime as rt import logging -from common.proto.l3_centralizedattackdetector_pb2 import ( - Empty, -) -from common.proto.l3_centralizedattackdetector_pb2_grpc import ( - L3CentralizedattackdetectorServicer, -) - -from common.proto.l3_attackmitigator_pb2 import ( - L3AttackmitigatorOutput, -) -from common.proto.l3_attackmitigator_pb2_grpc import ( - L3AttackmitigatorStub, -) +import time + +from common.proto.l3_centralizedattackdetector_pb2 import Empty, AutoFeatures +from common.proto.l3_centralizedattackdetector_pb2_grpc import L3CentralizedattackdetectorServicer + +from common.proto.l3_attackmitigator_pb2 import L3AttackmitigatorOutput + +from common.proto.monitoring_pb2 import KpiDescriptor +from common.proto.kpi_sample_types_pb2 import KpiSampleType + +from monitoring.client.MonitoringClient import MonitoringClient +from common.proto.monitoring_pb2 import Kpi + +from common.tools.timestamp.Converters import timestamp_utcnow_to_float +from common.proto.context_pb2 import Timestamp, SliceId, ConnectionId + +from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient + +import uuid + +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method + LOGGER = logging.getLogger(__name__) -here = os.path.dirname(os.path.abspath(__file__)) -MODEL_FILE = os.path.join(here, "ml_model/teraflow_rf.onnx") +current_dir = os.path.dirname(os.path.abspath(__file__)) + +# Demo constants +DEMO_MODE = False +ATTACK_IPS = ["37.187.95.110", "91.121.140.167", "94.23.23.52", "94.23.247.226", "149.202.83.171"] + +BATCH_SIZE= 10 + +METRICS_POOL = MetricsPool('l3_centralizedattackdetector', 'RPC') + + +class ConnectionInfo: + def __init__(self, ip_o, port_o, ip_d, port_d): + self.ip_o = ip_o + self.port_o = port_o + self.ip_d = ip_d + self.port_d = port_d + + def __eq__(self, other): + return ( + self.ip_o == other.ip_o + and self.port_o == other.port_o + and self.ip_d == other.ip_d + and self.port_d == other.port_d + ) + + def __str__(self): + return "ip_o: " + self.ip_o + "\nport_o: " + self.port_o + "\nip_d: " + self.ip_d + "\nport_d: " + self.port_d + class l3_centralizedattackdetectorServiceServicerImpl(L3CentralizedattackdetectorServicer): + """ + Initialize variables, prediction model and clients of components used by CAD + """ + def __init__(self): - LOGGER.debug("Creating Servicer...") + LOGGER.info("Creating Centralized Attack Detector Service") + self.inference_values = [] - self.model = rt.InferenceSession(MODEL_FILE) - self.input_name = self.model.get_inputs()[0].name - self.label_name = self.model.get_outputs()[0].name - self.prob_name = self.model.get_outputs()[1].name + self.inference_results = [] + self.cryptomining_detector_path = os.path.join(current_dir, "ml_model/cryptomining_detector/") + self.cryptomining_detector_file_name = os.listdir(self.cryptomining_detector_path)[0] + self.cryptomining_detector_model_path = os.path.join( + self.cryptomining_detector_path, self.cryptomining_detector_file_name + ) + self.cryptomining_detector_model = rt.InferenceSession(self.cryptomining_detector_model_path) + + # Load cryptomining detector features metadata from ONNX file + self.cryptomining_detector_features_metadata = list( + self.cryptomining_detector_model.get_modelmeta().custom_metadata_map.values() + ) + self.cryptomining_detector_features_metadata = [float(x) for x in self.cryptomining_detector_features_metadata] + self.cryptomining_detector_features_metadata.sort() + LOGGER.info("Cryptomining Detector Features: " + str(self.cryptomining_detector_features_metadata)) + + LOGGER.info("Batch size: " + str(BATCH_SIZE)) + + self.input_name = self.cryptomining_detector_model.get_inputs()[0].name + self.label_name = self.cryptomining_detector_model.get_outputs()[0].name + self.prob_name = self.cryptomining_detector_model.get_outputs()[1].name + + # Kpi values + self.l3_security_status = 0 # unnecessary + self.l3_ml_model_confidence = 0 + self.l3_inferences_in_interval_counter = 0 + + self.l3_ml_model_confidence_normal = 0 + self.l3_inferences_in_interval_counter_normal = 0 + + self.l3_ml_model_confidence_crypto = 0 + self.l3_inferences_in_interval_counter_crypto = 0 + + self.l3_attacks = [] + self.l3_unique_attack_conns = 0 + self.l3_unique_compromised_clients = 0 + self.l3_unique_attackers = 0 + + self.l3_non_empty_time_interval = False + self.active_requests = [] + + self.monitoring_client = MonitoringClient() + self.service_ids = [] + self.monitored_kpis = { + "l3_security_status": { + "kpi_id": None, + "description": "L3 - Confidence of the cryptomining detector in the security status in the last time interval of the service {service_id}", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO, + "service_ids": [], + }, + "l3_ml_model_confidence": { + "kpi_id": None, + "description": "L3 - Security status of the service in a time interval of the service {service_id} (“0” if no attack has been detected on the service and “1” if a cryptomining attack has been detected)", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_ML_CONFIDENCE, + "service_ids": [], + }, + "l3_unique_attack_conns": { + "kpi_id": None, + "description": "L3 - Number of attack connections detected in a time interval of the service {service_id} (attacks of the same connection [origin IP, origin port, destination IP and destination port] are only considered once)", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS, + "service_ids": [], + }, + "l3_unique_compromised_clients": { + "kpi_id": None, + "description": "L3 - Number of unique compromised clients of the service in a time interval of the service {service_id} (attacks from the same origin IP are only considered once)", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_COMPROMISED_CLIENTS, + "service_ids": [], + }, + "l3_unique_attackers": { + "kpi_id": None, + "description": "L3 - number of unique attackers of the service in a time interval of the service {service_id} (attacks from the same destination IP are only considered once)", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_ATTACKERS, + "service_ids": [], + }, + } + self.attackmitigator_client = l3_attackmitigatorClient() + + # Environment variables + self.CLASSIFICATION_THRESHOLD = os.getenv("CAD_CLASSIFICATION_THRESHOLD", 0.5) + self.MONITORED_KPIS_TIME_INTERVAL_AGG = os.getenv("MONITORED_KPIS_TIME_INTERVAL_AGG", 60) + + # Constants + self.NORMAL_CLASS = 0 + self.CRYPTO_CLASS = 1 + + self.kpi_test = None + self.time_interval_start = None + self.time_interval_end = None + + # CAD evaluation tests + self.cad_inference_times = [] + self.cad_num_inference_measurements = 100 + + # AM evaluation tests + self.am_notification_times = [] + + # List of attack connections + self.attack_connections = [] + + self.correct_attack_conns = 0 + self.correct_predictions = 0 + self.total_predictions = 0 + self.false_positives = 0 + self.false_negatives = 0 + + """ + Create a monitored KPI for a specific service and add it to the Monitoring Client + -input: + + service_id: service ID where the KPI will be monitored + + kpi_name: name of the KPI + + kpi_description: description of the KPI + + kpi_sample_type: KPI sample type of the KPI (it must be defined in the kpi_sample_types.proto file) + -output: KPI identifier representing the KPI + """ + + def create_kpi( + self, + service_id, + kpi_name, + kpi_description, + kpi_sample_type, + ): + kpidescriptor = KpiDescriptor() + kpidescriptor.kpi_description = kpi_description + kpidescriptor.service_id.service_uuid.uuid = service_id.service_uuid.uuid + kpidescriptor.kpi_sample_type = kpi_sample_type + new_kpi = self.monitoring_client.SetKpi(kpidescriptor) + + LOGGER.info("Created KPI {}".format(kpi_name)) + + return new_kpi + + """ + Create the monitored KPIs for a specific service, add them to the Monitoring Client and store their identifiers in the monitored_kpis dictionary + -input: + + service_id: service ID where the KPIs will be monitored + -output: None + """ + + def create_kpis(self, service_id, device_id, endpoint_id): + LOGGER.info("Creating KPIs for service {}".format(service_id)) + + # for now, all the KPIs are created for all the services from which requests are received + for kpi in self.monitored_kpis: + # generate random slice_id + slice_id = SliceId() + slice_id.slice_uuid.uuid = str(uuid.uuid4()) + + # generate random connection_id + connection_id = ConnectionId() + connection_id.connection_uuid.uuid = str(uuid.uuid4()) + + created_kpi = self.create_kpi( + service_id, + kpi, + self.monitored_kpis[kpi]["description"].format(service_id=service_id.service_uuid.uuid), + self.monitored_kpis[kpi]["kpi_sample_type"], + ) + self.monitored_kpis[kpi]["kpi_id"] = created_kpi.kpi_id + self.monitored_kpis[kpi]["service_ids"].append(service_id.service_uuid.uuid) + + LOGGER.info("Created KPIs for service {}".format(service_id)) + + def monitor_kpis(self): + monitor_inference_results = self.inference_results + monitor_service_ids = self.service_ids + + self.assign_timestamp(monitor_inference_results) + + non_empty_time_interval = self.l3_non_empty_time_interval + + if non_empty_time_interval: + for service_id in monitor_service_ids: + LOGGER.debug("service_id: {}".format(service_id)) + + self.monitor_compute_l3_kpi(service_id, monitor_inference_results) + + # Demo mode inference results are erased + """if DEMO_MODE: + # Delete fist half of the inference results + LOGGER.debug("inference_results len: {}".format(len(self.inference_results))) + self.inference_results = self.inference_results[len(self.inference_results)//2:] + LOGGER.debug("inference_results len after erase: {}".format(len(self.inference_results)))""" + # end = time.time() + # LOGGER.debug("Time to process inference results with erase: {}".format(end - start)) + LOGGER.debug("KPIs sent to monitoring server") + else: + LOGGER.debug("No KPIs sent to monitoring server") + + def assign_timestamp(self, monitor_inference_results): + time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG + + # assign the timestamp of the first inference result to the time_interval_start + if self.time_interval_start is None: + self.time_interval_start = monitor_inference_results[0]["timestamp"] + LOGGER.debug("self.time_interval_start: {}".format(self.time_interval_start)) + + # add time_interval to the current time to get the time interval end + LOGGER.debug("time_interval: {}".format(time_interval)) + LOGGER.debug(timedelta(seconds=time_interval)) + self.time_interval_end = self.time_interval_start + timedelta(seconds=time_interval) + + current_time = datetime.utcnow() + + LOGGER.debug("current_time: {}".format(current_time)) + + if current_time >= self.time_interval_end: + self.time_interval_start = self.time_interval_end + self.time_interval_end = self.time_interval_start + timedelta(seconds=time_interval) + self.l3_security_status = 0 # unnecessary + self.l3_ml_model_confidence = 0 + self.l3_inferences_in_interval_counter = 0 + + self.l3_ml_model_confidence_normal = 0 + self.l3_inferences_in_interval_counter_normal = 0 + + self.l3_ml_model_confidence_crypto = 0 + self.l3_inferences_in_interval_counter_crypto = 0 + + self.l3_attacks = [] + self.l3_unique_attack_conns = 0 + self.l3_unique_compromised_clients = 0 + self.l3_unique_attackers = 0 + + self.l3_non_empty_time_interval = False + + LOGGER.debug("time_interval_start: {}".format(self.time_interval_start)) + LOGGER.debug("time_interval_end: {}".format(self.time_interval_end)) + + def monitor_compute_l3_kpi(self, service_id, monitor_inference_results): + # L3 security status + kpi_security_status = Kpi() + kpi_security_status.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_security_status"]["kpi_id"]) + kpi_security_status.kpi_value.int32Val = self.l3_security_status + + # L3 ML model confidence + kpi_conf = Kpi() + kpi_conf.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_ml_model_confidence"]["kpi_id"]) + kpi_conf.kpi_value.floatVal = self.monitor_ml_model_confidence() + + # L3 unique attack connections + kpi_unique_attack_conns = Kpi() + kpi_unique_attack_conns.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"]) + kpi_unique_attack_conns.kpi_value.int32Val = self.l3_unique_attack_conns + + # L3 unique compromised clients + kpi_unique_compromised_clients = Kpi() + kpi_unique_compromised_clients.kpi_id.kpi_id.CopyFrom( + self.monitored_kpis["l3_unique_compromised_clients"]["kpi_id"] + ) + kpi_unique_compromised_clients.kpi_value.int32Val = self.l3_unique_compromised_clients + + # L3 unique attackers + kpi_unique_attackers = Kpi() + kpi_unique_attackers.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_unique_attackers"]["kpi_id"]) + kpi_unique_attackers.kpi_value.int32Val = self.l3_unique_attackers + + timestamp = Timestamp() + timestamp.timestamp = timestamp_utcnow_to_float() + + kpi_security_status.timestamp.CopyFrom(timestamp) + kpi_conf.timestamp.CopyFrom(timestamp) + kpi_unique_attack_conns.timestamp.CopyFrom(timestamp) + kpi_unique_compromised_clients.timestamp.CopyFrom(timestamp) + kpi_unique_attackers.timestamp.CopyFrom(timestamp) + + LOGGER.debug("Sending KPIs to monitoring server") + + LOGGER.debug("kpi_security_status: {}".format(kpi_security_status)) + LOGGER.debug("kpi_conf: {}".format(kpi_conf)) + LOGGER.debug("kpi_unique_attack_conns: {}".format(kpi_unique_attack_conns)) + LOGGER.debug("kpi_unique_compromised_clients: {}".format(kpi_unique_compromised_clients)) + LOGGER.debug("kpi_unique_attackers: {}".format(kpi_unique_attackers)) + + try: + self.monitoring_client.IncludeKpi(kpi_security_status) + self.monitoring_client.IncludeKpi(kpi_conf) + self.monitoring_client.IncludeKpi(kpi_unique_attack_conns) + self.monitoring_client.IncludeKpi(kpi_unique_compromised_clients) + self.monitoring_client.IncludeKpi(kpi_unique_attackers) + except Exception as e: + LOGGER.debug("Error sending KPIs to monitoring server: {}".format(e)) + + def monitor_ml_model_confidence(self): + if self.l3_security_status == 0: + return self.l3_ml_model_confidence_normal - def make_inference(self, request): - # ML MODEL - x_data = np.array([ - [ - request.n_packets_server_seconds, - request.n_packets_client_seconds, - request.n_bits_server_seconds, - request.n_bits_client_seconds, - request.n_bits_server_n_packets_server, - request.n_bits_client_n_packets_client, - request.n_packets_server_n_packets_client, - request.n_bits_server_n_bits_client, - ] - ]) - - predictions = self.model.run( - [self.prob_name], {self.input_name: x_data.astype(np.float32)})[0] - # Output format + return self.l3_ml_model_confidence_crypto + + """ + Classify connection as standard traffic or cryptomining attack and return results + -input: + + request: L3CentralizedattackdetectorMetrics object with connection features information + -output: L3AttackmitigatorOutput object with information about the assigned class and prediction confidence + """ + + def perform_inference(self, request): + x_data = np.array([[feature.feature for feature in request.features]]) + + # Print input data shape + LOGGER.debug("x_data.shape: {}".format(x_data.shape)) + + # Get batch size + batch_size = x_data.shape[0] + + # Print batch size + LOGGER.debug("batch_size: {}".format(batch_size)) + LOGGER.debug("x_data.shape: {}".format(x_data.shape)) + + inference_time_start = time.perf_counter() + + # Perform inference + predictions = self.cryptomining_detector_model.run( + [self.prob_name], {self.input_name: x_data.astype(np.float32)} + )[0] + + inference_time_end = time.perf_counter() + + # Measure inference time + inference_time = inference_time_end - inference_time_start + self.cad_inference_times.append(inference_time) + + if len(self.cad_inference_times) > self.cad_num_inference_measurements: + inference_times_np_array = np.array(self.cad_inference_times) + np.save(f"inference_times_{batch_size}.npy", inference_times_np_array) + + avg_inference_time = np.mean(inference_times_np_array) + max_inference_time = np.max(inference_times_np_array) + min_inference_time = np.min(inference_times_np_array) + std_inference_time = np.std(inference_times_np_array) + median_inference_time = np.median(inference_times_np_array) + + LOGGER.debug("Average inference time: {}".format(avg_inference_time)) + LOGGER.debug("Max inference time: {}".format(max_inference_time)) + LOGGER.debug("Min inference time: {}".format(min_inference_time)) + LOGGER.debug("Standard deviation inference time: {}".format(std_inference_time)) + LOGGER.debug("Median inference time: {}".format(median_inference_time)) + + with open(f"inference_times_stats_{batch_size}.txt", "w") as f: + f.write("Average inference time: {}\n".format(avg_inference_time)) + f.write("Max inference time: {}\n".format(max_inference_time)) + f.write("Min inference time: {}\n".format(min_inference_time)) + f.write("Standard deviation inference time: {}\n".format(std_inference_time)) + f.write("Median inference time: {}\n".format(median_inference_time)) + + # Gather the predicted class, the probability of that class and other relevant information required to block the attack output_message = { "confidence": None, "timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S"), - "ip_o": request.ip_o, + "ip_o": request.connection_metadata.ip_o, + "ip_d": request.connection_metadata.ip_d, "tag_name": None, "tag": None, - "flow_id": request.flow_id, - "protocol": request.protocol, - "port_d": request.port_d, - "ml_id": "RandomForest", - "time_start": request.time_start, - "time_end": request.time_end, + "flow_id": request.connection_metadata.flow_id, + "protocol": request.connection_metadata.protocol, + "port_o": request.connection_metadata.port_o, + "port_d": request.connection_metadata.port_d, + "ml_id": self.cryptomining_detector_file_name, + "service_id": request.connection_metadata.service_id, + "endpoint_id": request.connection_metadata.endpoint_id, + "time_start": request.connection_metadata.time_start, + "time_end": request.connection_metadata.time_end, } - if predictions[0][1] >= 0.5: + + if predictions[0][1] >= self.CLASSIFICATION_THRESHOLD: output_message["confidence"] = predictions[0][1] output_message["tag_name"] = "Crypto" - output_message["tag"] = 1 + output_message["tag"] = self.CRYPTO_CLASS else: output_message["confidence"] = predictions[0][0] output_message["tag_name"] = "Normal" - output_message["tag"] = 0 + output_message["tag"] = self.NORMAL_CLASS - return L3AttackmitigatorOutput(**output_message) + return output_message + + """ + Classify connection as standard traffic or cryptomining attack and return results + -input: + + request: L3CentralizedattackdetectorMetrics object with connection features information + -output: L3AttackmitigatorOutput object with information about the assigned class and prediction confidence + """ - def SendInput(self, request, context): - # PERFORM INFERENCE WITH SENT INPUTS - logging.debug("") - print("Inferencing ...") + def perform_distributed_inference(self, requests): + batch_size = len(requests) - # STORE VALUES - self.inference_values.append(request) + # Create an empty array to hold the input data + x_data = np.empty((batch_size, len(requests[0].features))) - # MAKE INFERENCE - output = self.make_inference(request) + # Fill in the input data array with features from each request + for i, request in enumerate(requests): + x_data[i] = [feature.feature for feature in request.features] - # SEND INFO TO MITIGATION SERVER - try: - with grpc.insecure_channel("localhost:10002") as channel: - stub = L3AttackmitigatorStub(channel) - print("Sending to mitigator...") - response = stub.SendOutput(output) - print("Sent output to mitigator and received: ", response.message) - - # RETURN "OK" TO THE CALLER - return Empty( - message="OK, information received and mitigator notified" + # Print input data shape + LOGGER.debug("x_data.shape: {}".format(x_data.shape)) + + inference_time_start = time.perf_counter() + + # Perform inference + predictions = self.cryptomining_detector_model.run( + [self.prob_name], {self.input_name: x_data.astype(np.float32)} + )[0] + + inference_time_end = time.perf_counter() + + # Measure inference time + inference_time = inference_time_end - inference_time_start + self.cad_inference_times.append(inference_time) + + if len(self.cad_inference_times) > self.cad_num_inference_measurements: + inference_times_np_array = np.array(self.cad_inference_times) + np.save(f"inference_times_{batch_size}.npy", inference_times_np_array) + + avg_inference_time = np.mean(inference_times_np_array) + max_inference_time = np.max(inference_times_np_array) + min_inference_time = np.min(inference_times_np_array) + std_inference_time = np.std(inference_times_np_array) + median_inference_time = np.median(inference_times_np_array) + + LOGGER.debug("Average inference time: {}".format(avg_inference_time)) + LOGGER.debug("Max inference time: {}".format(max_inference_time)) + LOGGER.debug("Min inference time: {}".format(min_inference_time)) + LOGGER.debug("Standard deviation inference time: {}".format(std_inference_time)) + LOGGER.debug("Median inference time: {}".format(median_inference_time)) + + with open(f"inference_times_stats_{batch_size}.txt", "w") as f: + f.write("Average inference time: {}\n".format(avg_inference_time)) + f.write("Max inference time: {}\n".format(max_inference_time)) + f.write("Min inference time: {}\n".format(min_inference_time)) + f.write("Standard deviation inference time: {}\n".format(std_inference_time)) + f.write("Median inference time: {}\n".format(median_inference_time)) + + # Gather the predicted class, the probability of that class and other relevant information required to block the attack + output_messages = [] + for i, request in enumerate(requests): + output_messages.append({ + "confidence": None, + "timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S"), + "ip_o": request.connection_metadata.ip_o, + "ip_d": request.connection_metadata.ip_d, + "tag_name": None, + "tag": None, + "flow_id": request.connection_metadata.flow_id, + "protocol": request.connection_metadata.protocol, + "port_o": request.connection_metadata.port_o, + "port_d": request.connection_metadata.port_d, + "ml_id": self.cryptomining_detector_file_name, + "service_id": request.connection_metadata.service_id, + "endpoint_id": request.connection_metadata.endpoint_id, + "time_start": request.connection_metadata.time_start, + "time_end": request.connection_metadata.time_end, + }) + + if predictions[i][1] >= self.CLASSIFICATION_THRESHOLD: + output_messages[i]["confidence"] = predictions[i][1] + output_messages[i]["tag_name"] = "Crypto" + output_messages[i]["tag"] = self.CRYPTO_CLASS + else: + output_messages[i]["confidence"] = predictions[i][0] + output_messages[i]["tag_name"] = "Normal" + output_messages[i]["tag"] = self.NORMAL_CLASS + + return output_messages + + """ + Receive features from Attack Mitigator, predict attack and communicate with Attack Mitigator + -input: + + request: L3CentralizedattackdetectorMetrics object with connection features information + -output: Empty object with a message about the execution of the function + """ + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def AnalyzeConnectionStatistics(self, request, context): + # Perform inference with the data sent in the request + self.active_requests.append(request) + + if len(self.active_requests) == BATCH_SIZE: + logging.info("Performing inference...") + + inference_time_start = time.time() + cryptomining_detector_output = self.perform_distributed_inference(self.active_requests) + inference_time_end = time.time() + + LOGGER.debug("Inference performed in {} seconds".format(inference_time_end - inference_time_start)) + logging.info("Inference performed correctly") + + self.inference_results.append({"output": cryptomining_detector_output, "timestamp": datetime.now()}) + LOGGER.debug("inference_results length: {}".format(len(self.inference_results))) + + for i, req in enumerate(self.active_requests): + service_id = req.connection_metadata.service_id + device_id = req.connection_metadata.endpoint_id.device_id + endpoint_id = req.connection_metadata.endpoint_id + + # Check if a request of a new service has been received and, if so, create the monitored KPIs for that service + if service_id not in self.service_ids: + self.create_kpis(service_id, device_id, endpoint_id) + self.service_ids.append(service_id) + + monitor_kpis_start = time.time() + self.monitor_kpis() + monitor_kpis_end = time.time() + + LOGGER.debug("Monitoring KPIs performed in {} seconds".format(monitor_kpis_end - monitor_kpis_start)) + LOGGER.debug("cryptomining_detector_output: {}".format(cryptomining_detector_output[i])) + + if DEMO_MODE: + self.analyze_prediction_accuracy(cryptomining_detector_output[i]["confidence"]) + + connection_info = ConnectionInfo( + req.connection_metadata.ip_o, + req.connection_metadata.port_o, + req.connection_metadata.ip_d, + req.connection_metadata.port_d, ) - except: - print('Couldnt find l3_attackmitigator') - return Empty( - message="Mitigator Not found" - ) - def GetOutput(self, request, context): - logging.debug("") - print("Returing inference output...") - k = np.multiply(self.inference_values, [2]) - k = np.sum(k) - return self.make_inference(k) + self.l3_non_empty_time_interval = True + if cryptomining_detector_output[i]["tag_name"] == "Crypto": + self.l3_security_status = 1 + self.l3_inferences_in_interval_counter_crypto += 1 + self.l3_ml_model_confidence_crypto = ( + self.l3_ml_model_confidence_crypto * (self.l3_inferences_in_interval_counter_crypto - 1) + + cryptomining_detector_output[i]["confidence"] + ) / self.l3_inferences_in_interval_counter_crypto - + if connection_info not in self.l3_attacks: + self.l3_attacks.append(connection_info) + self.l3_unique_attack_conns += 1 + + self.l3_unique_compromised_clients = len(set([conn.ip_o for conn in self.l3_attacks])) + self.l3_unique_attackers = len(set([conn.ip_d for conn in self.l3_attacks])) + + else: + self.l3_inferences_in_interval_counter_normal += 1 + self.l3_ml_model_confidence_normal = ( + self.l3_ml_model_confidence_normal * (self.l3_inferences_in_interval_counter_normal - 1) + + cryptomining_detector_output[i]["confidence"] + ) / self.l3_inferences_in_interval_counter_normal + + # Only notify Attack Mitigator when a cryptomining connection has been detected + if cryptomining_detector_output[i]["tag_name"] == "Crypto": + if DEMO_MODE: + self.attack_connections.append(connection_info) + + if connection_info.ip_o in ATTACK_IPS or connection_info.ip_d in ATTACK_IPS: + self.correct_attack_conns += 1 + self.correct_predictions += 1 + else: + LOGGER.debug("False positive: {}".format(connection_info)) + self.false_positives += 1 + + self.total_predictions += 1 + + # if False: + notification_time_start = time.perf_counter() + + LOGGER.debug("Crypto attack detected") + + # Notify the Attack Mitigator component about the attack + logging.info( + "Notifying the Attack Mitigator component about the attack in order to block the connection..." + ) + + try: + logging.info("Sending the connection information to the Attack Mitigator component...") + message = L3AttackmitigatorOutput(**cryptomining_detector_output[i]) + response = self.attackmitigator_client.PerformMitigation(message) + notification_time_end = time.perf_counter() + + self.am_notification_times.append(notification_time_end - notification_time_start) + + LOGGER.debug(f"am_notification_times length: {len(self.am_notification_times)}") + LOGGER.debug(f"last am_notification_time: {self.am_notification_times[-1]}") + + if len(self.am_notification_times) > 100: + am_notification_times_np_array = np.array(self.am_notification_times) + np.save("am_notification_times.npy", am_notification_times_np_array) + + avg_notification_time = np.mean(am_notification_times_np_array) + max_notification_time = np.max(am_notification_times_np_array) + min_notification_time = np.min(am_notification_times_np_array) + std_notification_time = np.std(am_notification_times_np_array) + median_notification_time = np.median(am_notification_times_np_array) + + LOGGER.debug("Average notification time: {}".format(avg_notification_time)) + LOGGER.debug("Max notification time: {}".format(max_notification_time)) + LOGGER.debug("Min notification time: {}".format(min_notification_time)) + LOGGER.debug("Std notification time: {}".format(std_notification_time)) + LOGGER.debug("Median notification time: {}".format(median_notification_time)) + + with open("am_notification_times_stats.txt", "w") as f: + f.write("Average notification time: {}\n".format(avg_notification_time)) + f.write("Max notification time: {}\n".format(max_notification_time)) + f.write("Min notification time: {}\n".format(min_notification_time)) + f.write("Std notification time: {}\n".format(std_notification_time)) + f.write("Median notification time: {}\n".format(median_notification_time)) + + # logging.info("Attack Mitigator notified and received response: ", response.message) # FIX No message received + logging.info("Attack Mitigator notified") + + #return Empty(message="OK, information received and mitigator notified abou the attack") + + except Exception as e: + logging.error("Error notifying the Attack Mitigator component about the attack: ", e) + logging.error("Couldn't find l3_attackmitigator") + + return Empty(message="Attack Mitigator not found") + else: + logging.info("No attack detected") + + if cryptomining_detector_output[i]["tag_name"] != "Crypto": + if connection_info.ip_o not in ATTACK_IPS and connection_info.ip_d not in ATTACK_IPS: + self.correct_predictions += 1 + else: + LOGGER.debug("False negative: {}".format(connection_info)) + self.false_negatives += 1 + + self.total_predictions += 1 + + # return Empty(message="Ok, information received (no attack detected)") + + self.active_requests = [] + return Empty(message="Ok, metrics processed") + + return Empty(message="Ok, information received") + + def analyze_prediction_accuracy(self, confidence): + LOGGER.info("Number of Attack Connections Correctly Classified: {}".format(self.correct_attack_conns)) + LOGGER.info("Number of Attack Connections: {}".format(len(self.attack_connections))) + + if self.total_predictions > 0: + overall_detection_acc = self.correct_predictions / self.total_predictions + else: + overall_detection_acc = 0 + + LOGGER.info("Overall Detection Accuracy: {}\n".format(overall_detection_acc)) + + if len(self.attack_connections) > 0: + cryptomining_attack_detection_acc = self.correct_attack_conns / len(self.attack_connections) + else: + cryptomining_attack_detection_acc = 0 + + LOGGER.info("Cryptomining Attack Detection Accuracy: {}".format(cryptomining_attack_detection_acc)) + LOGGER.info("Cryptomining Detector Confidence: {}".format(confidence)) + + with open("prediction_accuracy.txt", "a") as f: + LOGGER.debug("Exporting prediction accuracy and confidence") + + f.write("Overall Detection Accuracy: {}\n".format(overall_detection_acc)) + f.write("Cryptomining Attack Detection Accuracy: {}\n".format(cryptomining_attack_detection_acc)) + f.write("Total Predictions: {}\n".format(self.total_predictions)) + f.write("Total Positives: {}\n".format(len(self.attack_connections))) + f.write("False Positives: {}\n".format(self.false_positives)) + f.write("True Negatives: {}\n".format(self.total_predictions - len(self.attack_connections))) + f.write("False Negatives: {}\n".format(self.false_negatives)) + f.write("Cryptomining Detector Confidence: {}\n\n".format(confidence)) + f.write("Timestamp: {}\n".format(datetime.now().strftime("%d/%m/%Y %H:%M:%S"))) + f.close() + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def AnalyzeBatchConnectionStatistics(self, request, context): + batch_time_start = time.time() + + for metric in request.metrics: + self.AnalyzeConnectionStatistics(metric, context) + batch_time_end = time.time() + + with open("batch_time.txt", "a") as f: + f.write(str(len(request.metrics)) + "\n") + f.write(str(batch_time_end - batch_time_start) + "\n\n") + f.close() + + logging.debug("Metrics: " + str(len(request.metrics))) + logging.debug("Batch time: " + str(batch_time_end - batch_time_start)) + + return Empty(message="OK, information received.") + + """ + Send features allocated in the metadata of the onnx file to the DAD + -output: ONNX metadata as a list of integers + """ + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetFeaturesIds(self, request: Empty, context): + features = AutoFeatures() + + for feature in self.cryptomining_detector_features_metadata: + features.auto_features.append(feature) + + return features diff --git a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl_old.py b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl_old.py new file mode 100644 index 0000000000000000000000000000000000000000..1fdc955557f189d2f5aded162052743b3e762036 --- /dev/null +++ b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl_old.py @@ -0,0 +1,791 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from datetime import datetime +from datetime import timedelta + +import os +import numpy as np +import onnxruntime as rt +import logging +import time + +from common.proto.l3_centralizedattackdetector_pb2 import Empty, AutoFeatures +from common.proto.l3_centralizedattackdetector_pb2_grpc import L3CentralizedattackdetectorServicer + +from common.proto.l3_attackmitigator_pb2 import L3AttackmitigatorOutput + +from common.proto.monitoring_pb2 import KpiDescriptor +from common.proto.kpi_sample_types_pb2 import KpiSampleType + +from monitoring.client.MonitoringClient import MonitoringClient +from common.proto.monitoring_pb2 import Kpi + +from common.tools.timestamp.Converters import timestamp_utcnow_to_float +from common.proto.context_pb2 import Timestamp, SliceId, ConnectionId + +from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient + +import uuid + + +LOGGER = logging.getLogger(__name__) +# ML directory (ml_model/cryptomining_detector/cryptomining_detector.onnx) +current_dir = os.path.dirname(os.path.abspath(__file__)) + +# Demo constants +DEMO_MODE = True +ATTACK_IPS = ["37.187.95.110", "91.121.140.167", "94.23.23.52", "94.23.247.226", "149.202.83.171"] + + +class ConnectionInfo: + def __init__(self, ip_o, port_o, ip_d, port_d): + self.ip_o = ip_o + self.port_o = port_o + self.ip_d = ip_d + self.port_d = port_d + + def __eq__(self, other): + return ( + self.ip_o == other.ip_o + and self.port_o == other.port_o + and self.ip_d == other.ip_d + and self.port_d == other.port_d + ) + + def __str__(self): + return "ip_o: " + self.ip_o + "\nport_o: " + self.port_o + "\nip_d: " + self.ip_d + "\nport_d: " + self.port_d + + +class l3_centralizedattackdetectorServiceServicerImpl(L3CentralizedattackdetectorServicer): + + """ + Initialize variables, prediction model and clients of components used by CAD + """ + + def __init__(self): + LOGGER.info("Creating Centralized Attack Detector Service") + + self.inference_values = [] + self.inference_results = [] + self.cryptomining_detector_path = os.path.join(current_dir, "ml_model/cryptomining_detector/") + self.cryptomining_detector_file_name = os.listdir(self.cryptomining_detector_path)[0] + self.cryptomining_detector_model_path = os.path.join( + self.cryptomining_detector_path, self.cryptomining_detector_file_name + ) + self.cryptomining_detector_model = rt.InferenceSession(self.cryptomining_detector_model_path) + + # Load cryptomining detector features metadata from ONNX file + self.cryptomining_detector_features_metadata = list( + self.cryptomining_detector_model.get_modelmeta().custom_metadata_map.values() + ) + self.cryptomining_detector_features_metadata = [float(x) for x in self.cryptomining_detector_features_metadata] + self.cryptomining_detector_features_metadata.sort() + LOGGER.info("Cryptomining Detector Features: " + str(self.cryptomining_detector_features_metadata)) + + self.input_name = self.cryptomining_detector_model.get_inputs()[0].name + self.label_name = self.cryptomining_detector_model.get_outputs()[0].name + self.prob_name = self.cryptomining_detector_model.get_outputs()[1].name + + self.monitoring_client = MonitoringClient() + self.service_ids = [] + self.monitored_kpis = { + "l3_security_status": { + "kpi_id": None, + "description": "L3 - Confidence of the cryptomining detector in the security status in the last time interval of the service {service_id}", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO, + "service_ids": [], + }, + "l3_ml_model_confidence": { + "kpi_id": None, + "description": "L3 - Security status of the service in a time interval of the service {service_id} (“0” if no attack has been detected on the service and “1” if a cryptomining attack has been detected)", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_ML_CONFIDENCE, + "service_ids": [], + }, + "l3_unique_attack_conns": { + "kpi_id": None, + "description": "L3 - Number of attack connections detected in a time interval of the service {service_id} (attacks of the same connection [origin IP, origin port, destination IP and destination port] are only considered once)", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_ATTACK_CONNS, + "service_ids": [], + }, + "l3_unique_compromised_clients": { + "kpi_id": None, + "description": "L3 - Number of unique compromised clients of the service in a time interval of the service {service_id} (attacks from the same origin IP are only considered once)", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_COMPROMISED_CLIENTS, + "service_ids": [], + }, + "l3_unique_attackers": { + "kpi_id": None, + "description": "L3 - number of unique attackers of the service in a time interval of the service {service_id} (attacks from the same destination IP are only considered once)", + "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_UNIQUE_ATTACKERS, + "service_ids": [], + }, + } + self.attackmitigator_client = l3_attackmitigatorClient() + + # Environment variables + self.CLASSIFICATION_THRESHOLD = os.getenv("CAD_CLASSIFICATION_THRESHOLD", 0.5) + self.MONITORED_KPIS_TIME_INTERVAL_AGG = os.getenv("MONITORED_KPIS_TIME_INTERVAL_AGG", 60) + + # Constants + self.NORMAL_CLASS = 0 + self.CRYPTO_CLASS = 1 + + self.kpi_test = None + self.time_interval_start = None + self.time_interval_end = None + + # CAD evaluation tests + self.cad_inference_times = [] + self.cad_num_inference_measurements = 100 + + # AM evaluation tests + self.am_notification_times = [] + + # List of attack connections + self.attack_connections = [] + + self.correct_attack_conns = 0 + self.correct_predictions = 0 + self.total_predictions = 0 + self.false_positives = 0 + self.false_negatives = 0 + + """ + Create a monitored KPI for a specific service and add it to the Monitoring Client + -input: + + service_id: service ID where the KPI will be monitored + + kpi_name: name of the KPI + + kpi_description: description of the KPI + + kpi_sample_type: KPI sample type of the KPI (it must be defined in the kpi_sample_types.proto file) + -output: KPI identifier representing the KPI + """ + + def create_kpi( + self, + service_id, + kpi_name, + kpi_description, + kpi_sample_type, + ): + kpidescriptor = KpiDescriptor() + kpidescriptor.kpi_description = kpi_description + kpidescriptor.service_id.service_uuid.uuid = service_id.service_uuid.uuid + kpidescriptor.kpi_sample_type = kpi_sample_type + new_kpi = self.monitoring_client.SetKpi(kpidescriptor) + + LOGGER.info("Created KPI {}".format(kpi_name)) + + return new_kpi + + """ + Create the monitored KPIs for a specific service, add them to the Monitoring Client and store their identifiers in the monitored_kpis dictionary + -input: + + service_id: service ID where the KPIs will be monitored + -output: None + """ + + def create_kpis(self, service_id, device_id, endpoint_id): + LOGGER.info("Creating KPIs for service {}".format(service_id)) + + # for now, all the KPIs are created for all the services from which requests are received + for kpi in self.monitored_kpis: + # generate random slice_id + slice_id = SliceId() + slice_id.slice_uuid.uuid = str(uuid.uuid4()) + + # generate random connection_id + connection_id = ConnectionId() + connection_id.connection_uuid.uuid = str(uuid.uuid4()) + + created_kpi = self.create_kpi( + service_id, + kpi, + self.monitored_kpis[kpi]["description"].format(service_id=service_id.service_uuid.uuid), + self.monitored_kpis[kpi]["kpi_sample_type"], + ) + self.monitored_kpis[kpi]["kpi_id"] = created_kpi.kpi_id + self.monitored_kpis[kpi]["service_ids"].append(service_id.service_uuid.uuid) + + LOGGER.info("Created KPIs for service {}".format(service_id)) + + def monitor_kpis(self): + monitor_inference_results = self.inference_results + monitor_service_ids = self.service_ids + + LOGGER.debug("monitor_inference_results: {}".format(len(monitor_inference_results))) + LOGGER.debug("monitor_service_ids: {}".format(len(monitor_service_ids))) + + self.assign_timestamp(monitor_inference_results) + + self.delete_older_inference_results(monitor_inference_results) + + non_empty_time_interval = self.check_inference_time_interval(monitor_inference_results) + + if non_empty_time_interval: + # start = time.time() + for service_id in monitor_service_ids: + LOGGER.debug("service_id: {}".format(service_id)) + + self.monitor_compute_l3_kpi(service_id, monitor_inference_results) + + # Demo mode inference results are erased + """if DEMO_MODE: + # Delete fist half of the inference results + LOGGER.debug("inference_results len: {}".format(len(self.inference_results))) + self.inference_results = self.inference_results[len(self.inference_results)//2:] + LOGGER.debug("inference_results len after erase: {}".format(len(self.inference_results)))""" + # end = time.time() + # LOGGER.debug("Time to process inference results with erase: {}".format(end - start)) + LOGGER.debug("KPIs sent to monitoring server") + else: + LOGGER.debug("No KPIs sent to monitoring server") + + def assign_timestamp(self, monitor_inference_results): + time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG + + # assign the timestamp of the first inference result to the time_interval_start + if self.time_interval_start is None: + self.time_interval_start = monitor_inference_results[0]["timestamp"] + LOGGER.debug("self.time_interval_start: {}".format(self.time_interval_start)) + + # add time_interval to the current time to get the time interval end + LOGGER.debug("time_interval: {}".format(time_interval)) + LOGGER.debug(timedelta(seconds=time_interval)) + self.time_interval_end = self.time_interval_start + timedelta(seconds=time_interval) + + current_time = datetime.utcnow() + + LOGGER.debug("current_time: {}".format(current_time)) + + if current_time >= self.time_interval_end: + self.time_interval_start = self.time_interval_end + self.time_interval_end = self.time_interval_start + timedelta(seconds=time_interval) + + LOGGER.debug("time_interval_start: {}".format(self.time_interval_start)) + LOGGER.debug("time_interval_end: {}".format(self.time_interval_end)) + + def delete_older_inference_results(self, monitor_inference_results): + # delete all inference results that are older than the time_interval_start + delete_inference_results = [] + + for i in range(len(monitor_inference_results)): + inference_result_timestamp = monitor_inference_results[i]["timestamp"] + + if inference_result_timestamp < self.time_interval_start: + delete_inference_results.append(monitor_inference_results[i]) + + if len(delete_inference_results) > 0: + monitor_inference_results = [ + inference_result + for inference_result in monitor_inference_results + if inference_result not in delete_inference_results + ] + if DEMO_MODE: + LOGGER.debug("inference_results len: {}".format(len(self.inference_results))) + self.inference_results = monitor_inference_results + LOGGER.debug("inference_results len after erase: {}".format(len(self.inference_results))) + LOGGER.debug(f"Cleaned inference results. {len(delete_inference_results)} inference results deleted") + + def check_inference_time_interval(self, monitor_inference_results): + # check if there is at least one inference result in monitor_inference_results in the current time_interval + num_inference_results_in_time_interval = 0 + for i in range(len(monitor_inference_results)): + inference_result_timestamp = monitor_inference_results[i]["timestamp"] + + if ( + inference_result_timestamp >= self.time_interval_start + and inference_result_timestamp < self.time_interval_end + ): + num_inference_results_in_time_interval += 1 + + if num_inference_results_in_time_interval > 0: + non_empty_time_interval = True + LOGGER.debug( + f"Current time interval is not empty (there are {num_inference_results_in_time_interval} inference results" + ) + else: + non_empty_time_interval = False + LOGGER.debug("Current time interval is empty. No KPIs will be reported.") + + return non_empty_time_interval + + def monitor_compute_l3_kpi(self, service_id, monitor_inference_results): + # L3 security status + kpi_security_status = Kpi() + kpi_security_status.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_security_status"]["kpi_id"]) + kpi_security_status.kpi_value.int32Val = self.monitor_security_status(service_id, monitor_inference_results) + + # L3 ML model confidence + kpi_conf = Kpi() + kpi_conf.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_ml_model_confidence"]["kpi_id"]) + kpi_conf.kpi_value.floatVal = self.monitor_ml_model_confidence( + service_id, monitor_inference_results, kpi_security_status + ) + + # L3 unique attack connections + kpi_unique_attack_conns = Kpi() + kpi_unique_attack_conns.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_unique_attack_conns"]["kpi_id"]) + kpi_unique_attack_conns.kpi_value.int32Val = self.monitor_unique_attack_conns( + service_id, monitor_inference_results + ) + + # L3 unique compromised clients + kpi_unique_compromised_clients = Kpi() + kpi_unique_compromised_clients.kpi_id.kpi_id.CopyFrom( + self.monitored_kpis["l3_unique_compromised_clients"]["kpi_id"] + ) + kpi_unique_compromised_clients.kpi_value.int32Val = self.monitor_unique_compromised_clients( + service_id, monitor_inference_results + ) + + # L3 unique attackers + kpi_unique_attackers = Kpi() + kpi_unique_attackers.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_unique_attackers"]["kpi_id"]) + kpi_unique_attackers.kpi_value.int32Val = self.monitor_unique_attackers(service_id, monitor_inference_results) + + timestamp = Timestamp() + timestamp.timestamp = timestamp_utcnow_to_float() + + kpi_security_status.timestamp.CopyFrom(timestamp) + kpi_conf.timestamp.CopyFrom(timestamp) + kpi_unique_attack_conns.timestamp.CopyFrom(timestamp) + kpi_unique_compromised_clients.timestamp.CopyFrom(timestamp) + kpi_unique_attackers.timestamp.CopyFrom(timestamp) + + LOGGER.debug("Sending KPIs to monitoring server") + + LOGGER.debug("kpi_security_status: {}".format(kpi_security_status)) + LOGGER.debug("kpi_conf: {}".format(kpi_conf)) + LOGGER.debug("kpi_unique_attack_conns: {}".format(kpi_unique_attack_conns)) + LOGGER.debug("kpi_unique_compromised_clients: {}".format(kpi_unique_compromised_clients)) + LOGGER.debug("kpi_unique_attackers: {}".format(kpi_unique_attackers)) + + try: + self.monitoring_client.IncludeKpi(kpi_security_status) + self.monitoring_client.IncludeKpi(kpi_conf) + self.monitoring_client.IncludeKpi(kpi_unique_attack_conns) + self.monitoring_client.IncludeKpi(kpi_unique_compromised_clients) + self.monitoring_client.IncludeKpi(kpi_unique_attackers) + except Exception as e: + LOGGER.debug("Error sending KPIs to monitoring server: {}".format(e)) + + def monitor_security_status(self, service_id, monitor_inference_results): + # get the output.tag of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + outputs_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + if ( + monitor_inference_results[i]["timestamp"] >= self.time_interval_start + and monitor_inference_results[i]["timestamp"] < self.time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid in self.monitored_kpis["l3_security_status"]["service_ids"] + ): + outputs_last_time_interval.append(monitor_inference_results[i]["output"]["tag"]) + + LOGGER.debug("outputs_last_time_interval: {}".format(outputs_last_time_interval)) + + # check if all outputs are 0 + all_outputs_zero = True + for output in outputs_last_time_interval: + if output != self.NORMAL_CLASS: + all_outputs_zero = False + break + + if all_outputs_zero: + return 0 + return 1 + + def monitor_ml_model_confidence(self, service_id, monitor_inference_results, kpi_security_status): + # get the output.confidence of the ML model of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + confidences_normal_last_time_interval = [] + confidences_crypto_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + LOGGER.debug("monitor_inference_results[i]: {}".format(monitor_inference_results[i])) + + if ( + monitor_inference_results[i]["timestamp"] >= self.time_interval_start + and monitor_inference_results[i]["timestamp"] < self.time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid in self.monitored_kpis["l3_ml_model_confidence"]["service_ids"] + ): + if monitor_inference_results[i]["output"]["tag"] == self.NORMAL_CLASS: + confidences_normal_last_time_interval.append(monitor_inference_results[i]["output"]["confidence"]) + elif monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + confidences_crypto_last_time_interval.append(monitor_inference_results[i]["output"]["confidence"]) + else: + LOGGER.debug("Unknown tag: {}".format(monitor_inference_results[i]["output"]["tag"])) + + LOGGER.debug("confidences_normal_last_time_interval: {}".format(confidences_normal_last_time_interval)) + LOGGER.debug("confidences_crypto_last_time_interval: {}".format(confidences_crypto_last_time_interval)) + + if kpi_security_status.kpi_value.int32Val == 0: + return np.mean(confidences_normal_last_time_interval) + + return np.mean(confidences_crypto_last_time_interval) + + def monitor_unique_attack_conns(self, service_id, monitor_inference_results): + # get the number of unique attack connections (grouping by origin IP, origin port, destination IP, destination port) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_attack_conns_last_time_interval = 0 + unique_attack_conns_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + if ( + monitor_inference_results[i]["timestamp"] >= self.time_interval_start + and monitor_inference_results[i]["timestamp"] < self.time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid in self.monitored_kpis["l3_unique_attack_conns"]["service_ids"] + ): + if monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + current_attack_conn = { + "ip_o": monitor_inference_results[i]["output"]["ip_o"], + "port_o": monitor_inference_results[i]["output"]["port_o"], + "ip_d": monitor_inference_results[i]["output"]["ip_d"], + "port_d": monitor_inference_results[i]["output"]["port_d"], + } + + is_unique_attack_conn = True + + for j in range(len(unique_attack_conns_last_time_interval)): + if current_attack_conn == unique_attack_conns_last_time_interval[j]: + is_unique_attack_conn = False + + if is_unique_attack_conn: + num_unique_attack_conns_last_time_interval += 1 + unique_attack_conns_last_time_interval.append(current_attack_conn) + + return num_unique_attack_conns_last_time_interval + + def monitor_unique_compromised_clients(self, service_id, monitor_inference_results): + # get the number of unique compromised clients (grouping by origin IP) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_compromised_clients_last_time_interval = 0 + unique_compromised_clients_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + if ( + monitor_inference_results[i]["timestamp"] >= self.time_interval_start + and monitor_inference_results[i]["timestamp"] < self.time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid in self.monitored_kpis["l3_unique_compromised_clients"]["service_ids"] + ): + if monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + if ( + monitor_inference_results[i]["output"]["ip_o"] + not in unique_compromised_clients_last_time_interval + ): + unique_compromised_clients_last_time_interval.append( + monitor_inference_results[i]["output"]["ip_o"] + ) + num_unique_compromised_clients_last_time_interval += 1 + + return num_unique_compromised_clients_last_time_interval + + def monitor_unique_attackers(self, service_id, monitor_inference_results): + # get the number of unique attackers (grouping by destination ip) of the last aggregation time interval as indicated by the self.MONITORED_KPIS_TIME_INTERVAL_AGG variable + num_unique_attackers_last_time_interval = 0 + unique_attackers_last_time_interval = [] + + for i in range(len(monitor_inference_results)): + if ( + monitor_inference_results[i]["timestamp"] >= self.time_interval_start + and monitor_inference_results[i]["timestamp"] < self.time_interval_end + and monitor_inference_results[i]["output"]["service_id"] == service_id + and service_id.service_uuid.uuid in self.monitored_kpis["l3_unique_attackers"]["service_ids"] + ): + if monitor_inference_results[i]["output"]["tag"] == self.CRYPTO_CLASS: + if monitor_inference_results[i]["output"]["ip_d"] not in unique_attackers_last_time_interval: + unique_attackers_last_time_interval.append(monitor_inference_results[i]["output"]["ip_d"]) + num_unique_attackers_last_time_interval += 1 + + return num_unique_attackers_last_time_interval + + """ + Classify connection as standard traffic or cryptomining attack and return results + -input: + + request: L3CentralizedattackdetectorMetrics object with connection features information + -output: L3AttackmitigatorOutput object with information about the assigned class and prediction confidence + """ + + def perform_inference(self, request): + x_data = np.array([[feature.feature for feature in request.features]]) + + # Print input data shape + LOGGER.debug("x_data.shape: {}".format(x_data.shape)) + + # Get batch size + batch_size = x_data.shape[0] + + # Print batch size + LOGGER.debug("batch_size: {}".format(batch_size)) + LOGGER.debug("x_data.shape: {}".format(x_data.shape)) + + inference_time_start = time.perf_counter() + + # Perform inference + predictions = self.cryptomining_detector_model.run( + [self.prob_name], {self.input_name: x_data.astype(np.float32)} + )[0] + + inference_time_end = time.perf_counter() + + # Measure inference time + inference_time = inference_time_end - inference_time_start + self.cad_inference_times.append(inference_time) + + if len(self.cad_inference_times) > self.cad_num_inference_measurements: + inference_times_np_array = np.array(self.cad_inference_times) + np.save(f"inference_times_{batch_size}.npy", inference_times_np_array) + + avg_inference_time = np.mean(inference_times_np_array) + max_inference_time = np.max(inference_times_np_array) + min_inference_time = np.min(inference_times_np_array) + std_inference_time = np.std(inference_times_np_array) + median_inference_time = np.median(inference_times_np_array) + + LOGGER.debug("Average inference time: {}".format(avg_inference_time)) + LOGGER.debug("Max inference time: {}".format(max_inference_time)) + LOGGER.debug("Min inference time: {}".format(min_inference_time)) + LOGGER.debug("Standard deviation inference time: {}".format(std_inference_time)) + LOGGER.debug("Median inference time: {}".format(median_inference_time)) + + with open(f"inference_times_stats_{batch_size}.txt", "w") as f: + f.write("Average inference time: {}\n".format(avg_inference_time)) + f.write("Max inference time: {}\n".format(max_inference_time)) + f.write("Min inference time: {}\n".format(min_inference_time)) + f.write("Standard deviation inference time: {}\n".format(std_inference_time)) + f.write("Median inference time: {}\n".format(median_inference_time)) + + # Gather the predicted class, the probability of that class and other relevant information required to block the attack + output_message = { + "confidence": None, + "timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S"), + "ip_o": request.connection_metadata.ip_o, + "ip_d": request.connection_metadata.ip_d, + "tag_name": None, + "tag": None, + "flow_id": request.connection_metadata.flow_id, + "protocol": request.connection_metadata.protocol, + "port_o": request.connection_metadata.port_o, + "port_d": request.connection_metadata.port_d, + "ml_id": self.cryptomining_detector_file_name, + "service_id": request.connection_metadata.service_id, + "endpoint_id": request.connection_metadata.endpoint_id, + "time_start": request.connection_metadata.time_start, + "time_end": request.connection_metadata.time_end, + } + + if predictions[0][1] >= self.CLASSIFICATION_THRESHOLD: + output_message["confidence"] = predictions[0][1] + output_message["tag_name"] = "Crypto" + output_message["tag"] = self.CRYPTO_CLASS + else: + output_message["confidence"] = predictions[0][0] + output_message["tag_name"] = "Normal" + output_message["tag"] = self.NORMAL_CLASS + + return output_message + + """ + Receive features from Attack Mitigator, predict attack and communicate with Attack Mitigator + -input: + + request: L3CentralizedattackdetectorMetrics object with connection features information + -output: Empty object with a message about the execution of the function + """ + + def AnalyzeConnectionStatistics(self, request, context): + # Perform inference with the data sent in the request + logging.info("Performing inference...") + start = time.time() + cryptomining_detector_output = self.perform_inference(request) + end = time.time() + LOGGER.debug("Inference performed in {} seconds".format(end - start)) + logging.info("Inference performed correctly") + + self.inference_results.append({"output": cryptomining_detector_output, "timestamp": datetime.now()}) + LOGGER.debug("inference_results length: {}".format(len(self.inference_results))) + + service_id = request.connection_metadata.service_id + device_id = request.connection_metadata.endpoint_id.device_id + endpoint_id = request.connection_metadata.endpoint_id + + # Check if a request of a new service has been received and, if so, create the monitored KPIs for that service + if service_id not in self.service_ids: + self.create_kpis(service_id, device_id, endpoint_id) + self.service_ids.append(service_id) + + start = time.time() + self.monitor_kpis() + end = time.time() + + LOGGER.debug("Monitoring KPIs performed in {} seconds".format(end - start)) + LOGGER.debug("cryptomining_detector_output: {}".format(cryptomining_detector_output)) + + if DEMO_MODE: + self.analyze_prediction_accuracy(cryptomining_detector_output["confidence"]) + + connection_info = ConnectionInfo( + request.connection_metadata.ip_o, + request.connection_metadata.port_o, + request.connection_metadata.ip_d, + request.connection_metadata.port_d, + ) + + if cryptomining_detector_output["tag_name"] == "Crypto": + LOGGER.debug("Crypto found") + LOGGER.debug(connection_info) + + # Only notify Attack Mitigator when a cryptomining connection has been detected + if cryptomining_detector_output["tag_name"] == "Crypto" and connection_info not in self.attack_connections: + self.attack_connections.append(connection_info) + + if connection_info.ip_o in ATTACK_IPS or connection_info.ip_d in ATTACK_IPS: + self.correct_attack_conns += 1 + self.correct_predictions += 1 + else: + LOGGER.debug("False positive: {}".format(connection_info)) + self.false_positives += 1 + + self.total_predictions += 1 + + # if False: + notification_time_start = time.perf_counter() + + LOGGER.debug("Crypto attack detected") + + # Notify the Attack Mitigator component about the attack + logging.info( + "Notifying the Attack Mitigator component about the attack in order to block the connection..." + ) + + try: + logging.info("Sending the connection information to the Attack Mitigator component...") + message = L3AttackmitigatorOutput(**cryptomining_detector_output) + response = self.attackmitigator_client.PerformMitigation(message) + notification_time_end = time.perf_counter() + + self.am_notification_times.append(notification_time_end - notification_time_start) + + LOGGER.debug(f"am_notification_times length: {len(self.am_notification_times)}") + LOGGER.debug(f"last am_notification_time: {self.am_notification_times[-1]}") + + if len(self.am_notification_times) > 100: + am_notification_times_np_array = np.array(self.am_notification_times) + np.save("am_notification_times.npy", am_notification_times_np_array) + + avg_notification_time = np.mean(am_notification_times_np_array) + max_notification_time = np.max(am_notification_times_np_array) + min_notification_time = np.min(am_notification_times_np_array) + std_notification_time = np.std(am_notification_times_np_array) + median_notification_time = np.median(am_notification_times_np_array) + + LOGGER.debug("Average notification time: {}".format(avg_notification_time)) + LOGGER.debug("Max notification time: {}".format(max_notification_time)) + LOGGER.debug("Min notification time: {}".format(min_notification_time)) + LOGGER.debug("Std notification time: {}".format(std_notification_time)) + LOGGER.debug("Median notification time: {}".format(median_notification_time)) + + with open("am_notification_times_stats.txt", "w") as f: + f.write("Average notification time: {}\n".format(avg_notification_time)) + f.write("Max notification time: {}\n".format(max_notification_time)) + f.write("Min notification time: {}\n".format(min_notification_time)) + f.write("Std notification time: {}\n".format(std_notification_time)) + f.write("Median notification time: {}\n".format(median_notification_time)) + + # logging.info("Attack Mitigator notified and received response: ", response.message) # FIX No message received + logging.info("Attack Mitigator notified") + + return Empty(message="OK, information received and mitigator notified abou the attack") + except Exception as e: + logging.error("Error notifying the Attack Mitigator component about the attack: ", e) + logging.error("Couldn't find l3_attackmitigator") + + return Empty(message="Attack Mitigator not found") + else: + logging.info("No attack detected") + + if cryptomining_detector_output["tag_name"] != "Crypto": + if connection_info.ip_o not in ATTACK_IPS and connection_info.ip_d not in ATTACK_IPS: + self.correct_predictions += 1 + else: + LOGGER.debug("False negative: {}".format(connection_info)) + self.false_negatives += 1 + + self.total_predictions += 1 + + return Empty(message="Ok, information received (no attack detected)") + + def analyze_prediction_accuracy(self, confidence): + LOGGER.info("Number of Attack Connections Correctly Classified: {}".format(self.correct_attack_conns)) + LOGGER.info("Number of Attack Connections: {}".format(len(self.attack_connections))) + + if self.total_predictions > 0: + overall_detection_acc = self.correct_predictions / self.total_predictions + else: + overall_detection_acc = 0 + + LOGGER.info("Overall Detection Accuracy: {}\n".format(overall_detection_acc)) + + if len(self.attack_connections) > 0: + cryptomining_attack_detection_acc = self.correct_attack_conns / len(self.attack_connections) + else: + cryptomining_attack_detection_acc = 0 + + LOGGER.info("Cryptomining Attack Detection Accuracy: {}".format(cryptomining_attack_detection_acc)) + LOGGER.info("Cryptomining Detector Confidence: {}".format(confidence)) + + with open("prediction_accuracy.txt", "a") as f: + LOGGER.debug("Exporting prediction accuracy and confidence") + + f.write("Overall Detection Accuracy: {}\n".format(overall_detection_acc)) + f.write("Cryptomining Attack Detection Accuracy: {}\n".format(cryptomining_attack_detection_acc)) + f.write("Total Predictions: {}\n".format(self.total_predictions)) + f.write("Total Positives: {}\n".format(len(self.attack_connections))) + f.write("False Positives: {}\n".format(self.false_positives)) + f.write("True Negatives: {}\n".format(self.total_predictions - len(self.attack_connections))) + f.write("False Negatives: {}\n".format(self.false_negatives)) + f.write("Cryptomining Detector Confidence: {}\n\n".format(confidence)) + f.write("Timestamp: {}\n".format(datetime.now().strftime("%d/%m/%Y %H:%M:%S"))) + f.close() + + def AnalyzeBatchConnectionStatistics(self, request, context): + start = time.time() + + for metric in request.metrics: + self.AnalyzeConnectionStatistics(metric, context) + end = time.time() + + with open("batch_time.txt", "a") as f: + f.write(str(len(request.metrics)) + "\n") + f.write(str(end - start) + "\n\n") + f.close() + + logging.debug("Metrics: " + str(len(request.metrics))) + logging.debug("Batch time: " + str(end - start)) + + return Empty(message="OK, information received.") + + """ + Send features allocated in the metadata of the onnx file to the DAD + -output: ONNX metadata as a list of integers + """ + + def GetFeaturesIds(self, request: Empty, context): + features = AutoFeatures() + + for feature in self.cryptomining_detector_features_metadata: + features.auto_features.append(feature) + + return features diff --git a/src/l3_centralizedattackdetector/service/ml_model/cryptomining_detector/crypto_5g_rf_spider_features.onnx b/src/l3_centralizedattackdetector/service/ml_model/cryptomining_detector/crypto_5g_rf_spider_features.onnx new file mode 100644 index 0000000000000000000000000000000000000000..731724b29f3a1c22d50de8adbf291193a352ab33 Binary files /dev/null and b/src/l3_centralizedattackdetector/service/ml_model/cryptomining_detector/crypto_5g_rf_spider_features.onnx differ diff --git a/src/l3_centralizedattackdetector/service/ml_model/teraflow_rf.onnx b/src/l3_centralizedattackdetector/service/ml_model/teraflow_rf.onnx deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/src/tests/scenario3/l3/README.md b/src/tests/scenario3/l3/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f66d8e351033d2762a77269243b6d3bb2a1d7022 --- /dev/null +++ b/src/tests/scenario3/l3/README.md @@ -0,0 +1,3 @@ +# Scripts to automatically run the "Attack Detection & Mitigation at the L3 Layer" workflow (Scenario 3). +"launch_l3_attack_detection_and_mitigation.sh" launches the TeraFlow OS components, which includes the CentralizedAttackDetector and AttackMitigator componentes necessary to perform this workflow. +"launch_l3_attack_detection_and_mitigation_complete.sh" also launches the DistributedAttackDetector, which monitors the network data plane and passively collects traffic packets and aggregates them in network flows, which are then provided to the CentralizedAttackDetector to detect attacks that may be occurring in the network. diff --git a/src/tests/scenario3/l3/complete_deploy.sh b/src/tests/scenario3/l3/complete_deploy.sh new file mode 100755 index 0000000000000000000000000000000000000000..5e8a2772c61ac2e96e5cf675468d27be2b940fe6 --- /dev/null +++ b/src/tests/scenario3/l3/complete_deploy.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +./src/tests/ofc22/run_test_03_delete_service.sh +./src/tests/ofc22/run_test_04_cleanup.sh +source src/tests/ofc22/deploy_specs.sh +source my_deploy.sh +./deploy/all.sh +source tfs_runtime_env_vars.sh +ofc22/run_test_01_bootstrap.sh +ofc22/run_test_02_create_service.sh diff --git a/src/tests/scenario3/l3/copy_protos_to_dad.sh b/src/tests/scenario3/l3/copy_protos_to_dad.sh new file mode 100755 index 0000000000000000000000000000000000000000..6735d9cf95d2243e6f87b5508c2e3f7b9756c474 --- /dev/null +++ b/src/tests/scenario3/l3/copy_protos_to_dad.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set the variables for the remote host and destination directory +REMOTE_HOST="192.168.165.73" +DEST_DIR="/home/ubuntu/TeraflowDockerDistributed/l3_distributedattackdetector/proto" + +# Copy the files to the remote host +sshpass -p "ubuntu" scp /home/ubuntu/tfs-ctrl-new/proto/src/python/l3_centralizedattackdetector_pb2.py "$REMOTE_HOST:$DEST_DIR" +sshpass -p "ubuntu" scp /home/ubuntu/tfs-ctrl-new/proto/src/python/l3_centralizedattackdetector_pb2_grpc.py "$REMOTE_HOST:$DEST_DIR" + +sshpass -p "ubuntu" scp /home/ubuntu/tfs-ctrl-new/proto/src/python/l3_attackmitigator_pb2.py "$REMOTE_HOST:$DEST_DIR" +sshpass -p "ubuntu" scp /home/ubuntu/tfs-ctrl-new/proto/src/python/l3_attackmitigator_pb2_grpc.py "$REMOTE_HOST:$DEST_DIR" diff --git a/src/tests/scenario3/l3/deploy_l3_component.sh b/src/tests/scenario3/l3/deploy_l3_component.sh new file mode 100755 index 0000000000000000000000000000000000000000..8e468c9067c93a06c6716ac618f5c9fdba860d34 --- /dev/null +++ b/src/tests/scenario3/l3/deploy_l3_component.sh @@ -0,0 +1,60 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +component=$1 + +source "my_deploy.sh" + +echo "Deploying $component..." + +# check if component == "CAD" +if [ $component == "CAD" ]; then + # find kubernetes pod that contains "centralizedattackdetectorservice" + pod=$(kubectl --namespace $TFS_K8S_NAMESPACE get pods | grep l3-centralizedattackdetectorservice | awk '{print $1}') + + # delete pod + kubectl --namespace $TFS_K8S_NAMESPACE delete pod $pod --force --grace-period=0 + + # # wait for pod to be deleted + # while [ $(kubectl --namespace $TFS_K8S_NAMESPACE get pods | grep l3-centralizedattackdetectorservice | wc -l) -gt 0 ]; do + # sleep 1 + # done + + # deploy l3_centralizedattackdetector component + ./deploy_component.sh "l3_centralizedattackdetector" +fi + +# check if component == "AM" +if [ $component == "AM" ]; then + # find kubernetes pod that contains "l3-attackmitigatorservice" + pod=$(kubectl --namespace $TFS_K8S_NAMESPACE get pods | grep l3-attackmitigatorservice | awk '{print $1}') + + # delete pod + kubectl --namespace $TFS_K8S_NAMESPACE delete pod $pod --force --grace-period=0 + + # # wait for pod to be deleted + # while [ $(kubectl --namespace $TFS_K8S_NAMESPACE get pods | grep l3-attackmitigatorservice | wc -l) -gt 0 ]; do + # sleep 1 + # done + + # deploy l3_attackmitigator component + ./deploy_component.sh "l3_attackmitigator" +fi + +echo "Component $component deployed" + +echo "Restarting DAD..." +sshpass -p "ubuntu" ssh -o StrictHostKeyChecking=no -n -f ubuntu@192.168.165.73 "sh -c 'nohup /home/ubuntu/TeraflowDockerDistributed/restart.sh > /dev/null 2>&1 &'" +echo "DAD restarted" diff --git a/src/tests/scenario3/l3/get_ml_model_info.sh b/src/tests/scenario3/l3/get_ml_model_info.sh new file mode 100755 index 0000000000000000000000000000000000000000..19fb1177a23c13e4cdffd2e8c75df3aad3502c04 --- /dev/null +++ b/src/tests/scenario3/l3/get_ml_model_info.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pod=$(kubectl get pods -n "tfs" -l app=l3-centralizedattackdetectorservice | sed -n '2p' | cut -d " " -f1) +while true; do + kubectl -n "tfs" cp $pod:prediction_accuracy.txt ./prediction_accuracy.txt + clear + cat prediction_accuracy.txt | tail -n 10 + sleep 1 +done diff --git a/src/tests/scenario3/l3/launch_l3_attack_detection_and_mitigation.sh b/src/tests/scenario3/l3/launch_l3_attack_detection_and_mitigation.sh new file mode 100644 index 0000000000000000000000000000000000000000..a22d98bad6c203c825d3343c44e3d31674a41ec0 --- /dev/null +++ b/src/tests/scenario3/l3/launch_l3_attack_detection_and_mitigation.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cd /home/ubuntu/tfs-ctrl +source my_deploy.sh +./deploy.sh +./show_deploy.sh + +source tfs_runtime_env_vars.sh + +ofc22/run_test_01_bootstrap.sh +ofc22/run_test_02_create_service.sh diff --git a/src/tests/scenario3/l3/launch_l3_attack_detection_and_mitigation_complete.sh b/src/tests/scenario3/l3/launch_l3_attack_detection_and_mitigation_complete.sh new file mode 100644 index 0000000000000000000000000000000000000000..05b20077eb951102ab11fc90aaab53463c41f94f --- /dev/null +++ b/src/tests/scenario3/l3/launch_l3_attack_detection_and_mitigation_complete.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cd /home/ubuntu/tfs-ctrl +source my_deploy.sh +./deploy.sh +./show_deploy.sh + +source tfs_runtime_env_vars.sh + +ofc22/run_test_01_bootstrap.sh +ofc22/run_test_02_create_service.sh + +sshpass -p "ubuntu" ssh -o StrictHostKeyChecking=no -n -f ubuntu@192.168.165.73 "sh -c 'nohup /home/ubuntu/TeraflowDockerDistributed/restart.sh > /dev/null 2>&1 &'" diff --git a/src/tests/scenario3/l3/launch_webui.sh b/src/tests/scenario3/l3/launch_webui.sh new file mode 100755 index 0000000000000000000000000000000000000000..bf1867eb108331647f3ad343b0ab23d098617aff --- /dev/null +++ b/src/tests/scenario3/l3/launch_webui.sh @@ -0,0 +1,16 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ssh -L 12345:localhost:80 ubuntu@192.168.165.78 diff --git a/src/webui/grafana_db_l3_mon_kpis_psql.json b/src/webui/grafana_db_l3_mon_kpis_psql.json new file mode 100644 index 0000000000000000000000000000000000000000..87578ba88225c6f9a4d11a99e9b5fb9e4955619b --- /dev/null +++ b/src/webui/grafana_db_l3_mon_kpis_psql.json @@ -0,0 +1,491 @@ +{"overwrite": true, "folderId": 0, "dashboard": + { + "id": null, + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "iteration": 1675103296430, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "postgres", + "uid": "questdb-mon-kpi" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "always", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byRegexp", + "options": ".*PACKETS_.*" + }, + "properties": [ + { + "id": "custom.axisPlacement", + "value": "left" + }, + { + "id": "unit", + "value": "pps" + }, + { + "id": "custom.axisLabel", + "value": "Packets / sec" + }, + { + "id": "custom.axisSoftMin", + "value": 0 + } + ] + }, + { + "matcher": { + "id": "byRegexp", + "options": ".*BYTES_.*" + }, + "properties": [ + { + "id": "custom.axisPlacement", + "value": "right" + }, + { + "id": "unit", + "value": "Bps" + }, + { + "id": "custom.axisLabel", + "value": "Bytes / sec" + }, + { + "id": "custom.axisSoftMin", + "value": 0 + } + ] + } + ] + }, + "gridPos": { + "h": 19, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": [ + "first", + "min", + "mean", + "max", + "lastNotNull" + ], + "displayMode": "table", + "placement": "right" + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "postgres", + "uid": "questdb-mon-kpi" + }, + "format": "time_series", + "group": [], + "hide": false, + "metricColumn": "kpi_value", + "rawQuery": true, + "rawSql": "SELECT\r\n $__time(timestamp), kpi_value AS metric, device_name, endpoint_name, kpi_sample_type\r\nFROM\r\n tfs_monitoring_kpis\r\nWHERE\r\n $__timeFilter(timestamp) AND device_name IN (${device_name}) AND endpoint_name IN (${endpoint_name}) AND kpi_sample_type IN (${kpi_sample_type}) AND NOT (kpi_sample_type like '%L3%' OR kpi_sample_type like '%ML_CONFIDENCE%') \r\nGROUP BY\r\n device_name, endpoint_name, kpi_sample_type\r\nORDER BY\r\n timestamp", + "refId": "A", + "select": [ + [ + { + "params": [ + "kpi_value" + ], + "type": "column" + } + ] + ], + "table": "tfs_monitoring_kpis", + "timeColumn": "timestamp", + "where": [ + { + "name": "", + "params": [ + "device_id", + "IN", + "$device_id" + ], + "type": "expression" + } + ] + } + ], + "title": "L3 Monitoring Packets/Bytes Received/Sent", + "transformations": [ + { + "id": "renameByRegex", + "options": { + "regex": "metric {device_name=\\\"([^\\\"]+)\\\", endpoint_name=\\\"([^\\\"]+)\\\", kpi_sample_type=\\\"([^\\\"]+)\\\"}", + "renamePattern": "$3 ($1 : $2)" + } + } + ], + "type": "timeseries" + }, + { + "datasource": { + "type": "postgres", + "uid": "questdb-mon-kpi" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "always", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byRegexp", + "options": ".*PACKETS_.*" + }, + "properties": [ + { + "id": "custom.axisPlacement", + "value": "left" + }, + { + "id": "unit", + "value": "pps" + }, + { + "id": "custom.axisLabel", + "value": "Packets / sec" + }, + { + "id": "custom.axisSoftMin", + "value": 0 + } + ] + }, + { + "matcher": { + "id": "byRegexp", + "options": ".*BYTES_.*" + }, + "properties": [ + { + "id": "custom.axisPlacement", + "value": "right" + }, + { + "id": "unit", + "value": "Bps" + }, + { + "id": "custom.axisLabel", + "value": "Bytes / sec" + }, + { + "id": "custom.axisSoftMin", + "value": 0 + } + ] + } + ] + }, + "gridPos": { + "h": 19, + "w": 24, + "x": 0, + "y": 50 + }, + "id": 3, + "options": { + "legend": { + "calcs": [ + "first", + "min", + "mean", + "max", + "lastNotNull" + ], + "displayMode": "table", + "placement": "right" + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "postgres", + "uid": "questdb-mon-kpi" + }, + "format": "time_series", + "group": [], + "hide": false, + "metricColumn": "kpi_value", + "rawQuery": true, + "rawSql": "SELECT\r\n $__time(timestamp), kpi_value AS metric, device_name, endpoint_name, kpi_sample_type\r\nFROM\r\n monitoring\r\nWHERE\r\n $__timeFilter(timestamp) AND device_name IN (${device_name}) AND endpoint_name IN (${endpoint_name}) AND kpi_sample_type IN (${kpi_sample_type}) AND (kpi_sample_type like '%L3%' OR kpi_sample_type like '%ML_CONFIDENCE%') \r\nGROUP BY\r\n device_name, endpoint_name, kpi_sample_type\r\nORDER BY\r\n timestamp", + "refId": "A", + "select": [ + [ + { + "params": [ + "kpi_value" + ], + "type": "column" + } + ] + ], + "table": "monitoring", + "timeColumn": "timestamp", + "where": [ + { + "name": "", + "params": [ + "device_name", + "IN", + "$device_name" + ], + "type": "expression" + } + ] + } + ], + "title": "L3 Cybersecurity KPIs", + "transformations": [ + { + "id": "renameByRegex", + "options": { + "regex": "metric {device_name=\"(.?)\", endpoint_name=\"(.?)\", kpi_sample_type=\"(.*)\"}", + "renamePattern": "$3" + } + } + ], + "type": "timeseries" + } + ], + "refresh": "5s", + "schemaVersion": 36, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "current": { + "selected": true, + "text": [ + "All" + ], + "value": [ + "$__all" + ] + }, + "datasource": { + "type": "postgres", + "uid": "questdb-mon-kpi" + }, + "definition": "SELECT DISTINCT device_name FROM tfs_monitoring_kpis;", + "hide": 0, + "includeAll": true, + "label": "Device", + "multi": true, + "name": "device_name", + "options": [], + "query": "SELECT DISTINCT device_name FROM tfs_monitoring_kpis;", + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + }, + { + "current": { + "selected": false, + "text": "All", + "value": "$__all" + }, + "datasource": { + "type": "postgres", + "uid": "questdb-mon-kpi" + }, + "definition": "SELECT DISTINCT endpoint_name FROM tfs_monitoring_kpis WHERE device_name IN (${device_name})", + "hide": 0, + "includeAll": true, + "label": "EndPoint", + "multi": true, + "name": "endpoint_name", + "options": [], + "query": "SELECT DISTINCT endpoint_name FROM tfs_monitoring_kpis WHERE device_name IN (${device_name})", + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + }, + { + "current": { + "selected": true, + "text": [ + "PACKETS_RECEIVED", + "PACKETS_TRANSMITTED" + ], + "value": [ + "PACKETS_RECEIVED", + "PACKETS_TRANSMITTED" + ] + }, + "datasource": { + "type": "postgres", + "uid": "questdb-mon-kpi" + }, + "definition": "SELECT DISTINCT kpi_sample_type FROM tfs_monitoring_kpis;", + "hide": 0, + "includeAll": true, + "label": "Kpi Sample Type", + "multi": true, + "name": "kpi_sample_type", + "options": [], + "query": "SELECT DISTINCT kpi_sample_type FROM tfs_monitoring_kpis;", + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + } + ] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "utc", + "title": "L3 Monitoring CyberSecurity", + "uid": "tfs-l3-monit-cs", + "version": 1, + "weekStart": "" + } +}