diff --git a/manifests/l3_centralizedattackdetectorservice.yaml b/manifests/l3_centralizedattackdetectorservice.yaml index 95c6d8176ca86c98c1e26d88267c864247ae8b5b..8a3be69b672200120afb4bca3892dd0c08ec2d65 100644 --- a/manifests/l3_centralizedattackdetectorservice.yaml +++ b/manifests/l3_centralizedattackdetectorservice.yaml @@ -42,6 +42,8 @@ spec: value: "0.5" - name: MONITORED_KPIS_TIME_INTERVAL_AGG value: "60" + - name: TEST_ML_MODEL + value: "0" readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:10001"] diff --git a/proto/l3_attackmitigator.proto b/proto/l3_attackmitigator.proto index 572d96f9e586dae4a124b1b9de1368b71fb9f0b7..d8ed4baf788a793b6b1451606760256db8ebe089 100644 --- a/proto/l3_attackmitigator.proto +++ b/proto/l3_attackmitigator.proto @@ -13,15 +13,14 @@ // limitations under the License. syntax = "proto3"; +package l3_attackmitigator; import "context.proto"; +import "l3_centralizedattackdetector.proto"; service L3Attackmitigator{ - // Perform Mitigation - rpc PerformMitigation (L3AttackmitigatorOutput) returns (context.Empty) {} - // Get Mitigation + rpc PerformMitigation (L3AttackmitigatorOutput) returns (l3_centralizedattackdetector.StatusMessage) {} rpc GetMitigation (context.Empty) returns (context.Empty) {} - // Get Configured ACL Rules rpc GetConfiguredACLRules (context.Empty) returns (ACLRules) {} } diff --git a/proto/l3_centralizedattackdetector.proto b/proto/l3_centralizedattackdetector.proto index 17fc1553ee89f5734c934f37170852c3c92bddf2..de967aea0812c611d7d969b2c3b20421446e927f 100644 --- a/proto/l3_centralizedattackdetector.proto +++ b/proto/l3_centralizedattackdetector.proto @@ -13,6 +13,7 @@ // limitations under the License. syntax = "proto3"; +package l3_centralizedattackdetector; import "context.proto"; @@ -25,6 +26,10 @@ service L3Centralizedattackdetector { // Get the list of features used by the ML model in the CAD component rpc GetFeaturesIds (context.Empty) returns (AutoFeatures) {} + + // Sets the list of attack IPs in order to be used to compute the prediction accuracy of the + // ML model in the CAD component in case of testing the ML model. + rpc SetAttackIPs (AttackIPs) returns (context.Empty) {} } message Feature { @@ -66,3 +71,7 @@ message L3CentralizedattackdetectorBatchInput { message StatusMessage { string message = 1; } + +message AttackIPs { + repeated string attack_ips = 1; +} \ No newline at end of file diff --git a/src/l3_attackmitigator/README.md b/src/l3_attackmitigator/README.md index 04c937a1d35e91071e0357278c81b33335e2e37a..d82400cdcd2a396c6275fea39dc1c127ee3510dc 100644 --- a/src/l3_attackmitigator/README.md +++ b/src/l3_attackmitigator/README.md @@ -1,3 +1,8 @@ -# l3_attackmitigator -- Receives packages and process it with TSTAT -- Functions: ReportSummarizeKpi(KpiList) +# L3 Attack Mitigator + +Receives detected attacks from the Centralized Attack Detector component and performs the necessary mitigations. + +## Functions: +- PerformMitigation(L3AttackmitigatorOutput) -> StatusMessage +- GetMitigation(Empty) -> Empty +- GetConfiguredACLRules(Empty) -> ACLRules diff --git a/src/l3_attackmitigator/client/l3_attackmitigatorClient.py b/src/l3_attackmitigator/client/l3_attackmitigatorClient.py index c5d98b1c4974172e50e65db16ba4753e742eab28..bae3fd62785e02eed1cd8fd7678c1775b0193d84 100644 --- a/src/l3_attackmitigator/client/l3_attackmitigatorClient.py +++ b/src/l3_attackmitigator/client/l3_attackmitigatorClient.py @@ -15,17 +15,12 @@ import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc +from common.proto.context_pb2 import Empty +from common.proto.l3_attackmitigator_pb2 import L3AttackmitigatorOutput, ACLRules +from common.proto.l3_attackmitigator_pb2_grpc import L3AttackmitigatorStub +from common.proto.l3_centralizedattackdetector_pb2 import StatusMessage from common.tools.client.RetryDecorator import retry, delay_exponential -from common.proto.l3_attackmitigator_pb2_grpc import ( - L3AttackmitigatorStub, -) -from common.proto.l3_attackmitigator_pb2 import ( - L3AttackmitigatorOutput, ACLRules -) - -from common.proto.context_pb2 import ( - Empty -) +from common.tools.grpc.Tools import grpc_message_to_json_string LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -37,7 +32,7 @@ class l3_attackmitigatorClient: if not host: host = get_service_host(ServiceNameEnum.L3_AM) if not port: port = get_service_port_grpc(ServiceNameEnum.L3_AM) self.endpoint = "{}:{}".format(host, port) - LOGGER.debug("Creating channel to {}...".format(self.endpoint)) + LOGGER.debug("Creating channel to {:s}...".format(self.endpoint)) self.channel = None self.stub = None self.connect() @@ -54,23 +49,22 @@ class l3_attackmitigatorClient: self.stub = None @RETRY_DECORATOR - def PerformMitigation(self, request: L3AttackmitigatorOutput) -> Empty: - LOGGER.debug('PerformMitigation request: {}'.format(request)) + def PerformMitigation(self, request: L3AttackmitigatorOutput) -> StatusMessage: + LOGGER.debug('PerformMitigation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.PerformMitigation(request) - LOGGER.debug('PerformMitigation result: {}'.format(response)) + LOGGER.debug('PerformMitigation result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetMitigation(self, request: Empty) -> Empty: - LOGGER.debug('GetMitigation request: {}'.format(request)) + LOGGER.debug('GetMitigation request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetMitigation(request) - LOGGER.debug('GetMitigation result: {}'.format(response)) + LOGGER.debug('GetMitigation result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetConfiguredACLRules(self, request: Empty) -> ACLRules: - LOGGER.debug('GetConfiguredACLRules request: {}'.format(request)) + LOGGER.debug('GetConfiguredACLRules request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetConfiguredACLRules(request) - LOGGER.debug('GetConfiguredACLRules result: {}'.format(response)) + LOGGER.debug('GetConfiguredACLRules result: {:s}'.format(grpc_message_to_json_string(response))) return response - diff --git a/src/l3_attackmitigator/requirements.in b/src/l3_attackmitigator/requirements.in index a8aba849708799232f6b0732c3661396266da329..38d04994fb0fa1951fb465bc127eb72659dc2eaf 100644 --- a/src/l3_attackmitigator/requirements.in +++ b/src/l3_attackmitigator/requirements.in @@ -11,5 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -# no extra dependency diff --git a/src/l3_attackmitigator/service/l3_attackmitigatorServiceServicerImpl.py b/src/l3_attackmitigator/service/l3_attackmitigatorServiceServicerImpl.py index f3613b377a86f61ba0a76665eb3001f5d9721a2a..5a7abe0a7416e61ae73b24e5f528ebc1717d8f2e 100644 --- a/src/l3_attackmitigator/service/l3_attackmitigatorServiceServicerImpl.py +++ b/src/l3_attackmitigator/service/l3_attackmitigatorServiceServicerImpl.py @@ -13,33 +13,39 @@ # limitations under the License. from __future__ import print_function + +import grpc import logging import time -from common.proto.l3_centralizedattackdetector_pb2 import Empty -from common.proto.l3_attackmitigator_pb2_grpc import L3AttackmitigatorServicer -from common.proto.l3_attackmitigator_pb2 import ACLRules -from common.proto.context_pb2 import ( - ServiceId, - ConfigActionEnum, -) - +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.acl_pb2 import AclForwardActionEnum, AclLogActionEnum, AclRuleTypeEnum -from common.proto.context_pb2 import ConfigActionEnum, Service, ServiceId, ConfigRule +from common.proto.context_pb2 import ConfigActionEnum, Empty, Service, ServiceId +from common.proto.l3_attackmitigator_pb2 import ACLRules, L3AttackmitigatorOutput +from common.proto.l3_attackmitigator_pb2_grpc import L3AttackmitigatorServicer +from common.proto.l3_centralizedattackdetector_pb2 import StatusMessage from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient from service.client.ServiceClient import ServiceClient -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) - -METRICS_POOL = MetricsPool('l3_attackmitigator', 'RPC') +METRICS_POOL = MetricsPool("l3_attackmitigator", "RPC") class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): def __init__(self): - LOGGER.info("Creating Attack Mitigator Service") + """ + Initializes the Attack Mitigator service. + + Args: + None. + + Returns: + None. + """ + + LOGGER.info("Creating Attack Mitigator service") self.last_value = -1 self.last_tag = 0 @@ -60,6 +66,23 @@ class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): src_port: str, dst_port: str, ) -> None: + """ + Configures an ACL rule to block undesired TCP traffic. + + Args: + context_uuid (str): The UUID of the context. + service_uuid (str): The UUID of the service. + device_uuid (str): The UUID of the device. + endpoint_uuid (str): The UUID of the endpoint. + src_ip (str): The source IP address. + dst_ip (str): The destination IP address. + src_port (str): The source port. + dst_port (str): The destination port. + + Returns: + None. + """ + # Create ServiceId service_id = ServiceId() service_id.context_id.context_uuid.uuid = context_uuid @@ -107,29 +130,41 @@ class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): acl_entry.action.forward_action = AclForwardActionEnum.ACLFORWARDINGACTION_DROP acl_entry.action.log_action = AclLogActionEnum.ACLLOGACTION_NOLOG - LOGGER.info("ACL Rule Set: %s", grpc_message_to_json_string(acl_rule_set)) - LOGGER.info("ACL Config Rule: %s", grpc_message_to_json_string(acl_config_rule)) + LOGGER.info(f"ACL Rule Set: {grpc_message_to_json_string(acl_rule_set)}") + LOGGER.info(f"ACL Config Rule: {grpc_message_to_json_string(acl_config_rule)}") # Add the ACLRuleSet to the list of configured ACLRuleSets self.configured_acl_config_rules.append(acl_config_rule) + + LOGGER.info(service_request) # Update the Service with the new ACL RuleSet - service_reply: ServiceId = self.service_client.UpdateService(service_request) + service_reply = self.service_client.UpdateService(service_request) - LOGGER.info("Service reply: %s", grpc_message_to_json_string(service_reply)) + LOGGER.info(f"Service reply: {grpc_message_to_json_string(service_reply)}") if service_reply != service_request.service_id: # pylint: disable=no-member raise Exception("Service update failed. Wrong ServiceId was returned") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def PerformMitigation(self, request, context): + def PerformMitigation(self, request : L3AttackmitigatorOutput, context : grpc.ServicerContext) -> StatusMessage: + """ + Performs mitigation on an attack by configuring an ACL rule to block undesired TCP traffic. + + Args: + request (L3AttackmitigatorOutput): The request message containing the attack mitigation information. + context (Empty): The context of the request. + + Returns: + StatusMessage: A response with a message indicating that the attack mitigation information + was received and processed. + """ + last_value = request.confidence last_tag = request.tag LOGGER.info( - "Attack Mitigator received attack mitigation information. Prediction confidence: %s, Predicted class: %s", - last_value, - last_tag, + f"Attack Mitigator received attack mitigation information. Prediction confidence: {last_value}, Predicted class: {last_tag}" ) ip_o = request.ip_o @@ -141,21 +176,23 @@ class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): counter = 0 service_id = request.service_id - LOGGER.info("Service Id.:\n{}".format(grpc_message_to_json_string(service_id))) - + LOGGER.info(f"Service Id.: {grpc_message_to_json_string(service_id)}") LOGGER.info("Retrieving service from Context") + while sentinel: try: service = self.context_client.GetService(service_id) sentinel = False except Exception as e: counter = counter + 1 - LOGGER.debug("Waiting 2 seconds", counter, e) + LOGGER.debug(f"Waiting 2 seconds for service to be available (attempt: {counter})") time.sleep(2) - LOGGER.info(f"Service with Service Id.: {grpc_message_to_json_string(service_id)}\n{grpc_message_to_json_string(service)}") - + LOGGER.info( + f"Service with Service Id.: {grpc_message_to_json_string(service_id)}\n{grpc_message_to_json_string(service)}" + ) LOGGER.info("Adding new rule to the service to block the attack") + self.configure_acl_rule( context_uuid=service_id.context_id.context_uuid.uuid, service_uuid=service_id.service_uuid.uuid, @@ -167,8 +204,8 @@ class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): dst_port=port_d, ) LOGGER.info("Service with new rule:\n{}".format(grpc_message_to_json_string(service))) - LOGGER.info("Updating service with the new rule") + self.service_client.UpdateService(service) service = self.context_client.GetService(service_id) @@ -178,10 +215,21 @@ class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): ) ) - return Empty(message=f"OK, received values: {last_tag} with confidence {last_value}.") + return StatusMessage(message=f"OK, received values: {last_tag} with confidence {last_value}.") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def GetConfiguredACLRules(self, request, context): + def GetConfiguredACLRules(self, request : Empty, context : grpc.ServicerContext) -> ACLRules: + """ + Returns the configured ACL rules. + + Args: + request (Empty): The request message. + context (Empty): The context of the RPC call. + + Returns: + acl_rules (ACLRules): The configured ACL rules. + """ + acl_rules = ACLRules() for acl_config_rule in self.configured_acl_config_rules: diff --git a/src/l3_attackmitigator/service/test_create_service.py b/src/l3_attackmitigator/service/test_create_service.py deleted file mode 100644 index 01cf769a271de1bbbd0329a3ce21ea476ac10cab..0000000000000000000000000000000000000000 --- a/src/l3_attackmitigator/service/test_create_service.py +++ /dev/null @@ -1,267 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -import logging -from common.proto.l3_centralizedattackdetector_pb2 import ( - Empty -) -from common.proto.l3_attackmitigator_pb2_grpc import ( - L3AttackmitigatorServicer, -) -from common.proto.context_pb2 import ( - Service, ServiceId, ServiceConfig, ServiceTypeEnum, ServiceStatusEnum, ServiceStatus, Context, ContextId, Uuid, - Timestamp, ConfigRule, ConfigRule_Custom, ConfigActionEnum, Device, DeviceId, DeviceConfig, - DeviceOperationalStatusEnum, DeviceDriverEnum, EndPoint, Link, LinkId, EndPoint, EndPointId, Topology, TopologyId -) -from common.proto.context_pb2_grpc import ( - ContextServiceStub -) -from common.proto.service_pb2_grpc import ( - ServiceServiceStub -) -from datetime import datetime -import grpc - -LOGGER = logging.getLogger(__name__) -CONTEXT_CHANNEL = "192.168.165.78:1010" -SERVICE_CHANNEL = "192.168.165.78:3030" - -class l3_attackmitigatorServiceServicerImpl(L3AttackmitigatorServicer): - - def GetNewService(self, service_id): - service = Service() - service_id_obj = self.GenerateServiceId(service_id) - service.service_id.CopyFrom(service_id_obj) - service.service_type = ServiceTypeEnum.SERVICETYPE_L3NM - service_status = ServiceStatus() - service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE - service.service_status.CopyFrom(service_status) - timestamp = Timestamp() - timestamp.timestamp = datetime.timestamp(datetime.now()) - service.timestamp.CopyFrom(timestamp) - return service - - def GetNewContext(self, service_id): - context = Context() - context_id = ContextId() - uuid = Uuid() - uuid.uuid = service_id - context_id.context_uuid.CopyFrom(uuid) - context.context_id.CopyFrom(context_id) - return context - - def GetNewDevice(self, service_id): - device = Device() - device_id = DeviceId() - uuid = Uuid() - uuid.uuid = service_id - device_id.device_uuid.CopyFrom(uuid) - device.device_type="test" - device.device_id.CopyFrom(device_id) - device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED - return device - - def GetNewLink(self, service_id): - link = Link() - link_id = LinkId() - uuid = Uuid() - uuid.uuid = service_id - link_id.link_uuid.CopyFrom(uuid) - link.link_id.CopyFrom(link_id) - return link - - def GetNewTopology(self,context_id, device_id, link_id): - topology = Topology() - topology_id = TopologyId() - topology_id.context_id.CopyFrom(context_id) - uuid = Uuid() - uuid.uuid = "test_crypto" - topology_id.topology_uuid.CopyFrom(uuid) - topology.topology_id.CopyFrom(topology_id) - topology.device_ids.extend([device_id]) - topology.link_ids.extend([link_id]) - return topology - - def GetNewEndpoint(self, topology_id, device_id, uuid_name): - endpoint = EndPoint() - endpoint_id = EndPointId() - endpoint_id.topology_id.CopyFrom(topology_id) - endpoint_id.device_id.CopyFrom(device_id) - uuid = Uuid() - uuid.uuid = uuid_name - endpoint_id.endpoint_uuid.CopyFrom(uuid) - endpoint.endpoint_id.CopyFrom(endpoint_id) - endpoint.endpoint_type = "test" - return endpoint - - - def __init__(self): - LOGGER.debug("Creating Servicer...") - self.last_value = -1 - self.last_tag = 0 - """ - context = self.GetNewContext("test_crypto") - print(context, flush=True) - print(self.SetContext(context)) - - service = self.GetNewService("test_crypto") - print("This is the new service", self.CreateService(service), flush = True) - - ip_o = "127.0.0.1" - ip_d = "127.0.0.2" - port_o = "123" - port_d = "124" - - service_id = self.GenerateServiceId("test_crypto") - - config_rule = self.GetConfigRule(ip_o, ip_d, port_o, port_d) - - service = self.GetService(service_id) - print("Service obtained from id", service, flush=True) - - config_rule = self.GetConfigRule(ip_o, ip_d, port_o, port_d) - - #service_config = service.service_config - #service_config.append(config_rule) - - service_config = ServiceConfig() - service_config.config_rules.extend([config_rule]) - service.service_config.CopyFrom(service_config) - - device = self.GetNewDevice("test_crypto") - print("New device", device, flush=True) - device_id = self.SetDevice(device) - - link = self.GetNewLink("test_crypto") - print("New link", link, flush=True) - link_id = self.SetLink(link) - - topology = self.GetNewTopology(context.context_id, device.device_id, link.link_id) - print("New topology", topology, flush=True) - topology_id = self.SetTopology(topology) - - endpoint = self.GetNewEndpoint(topology.topology_id, device.device_id, "test_crypto") - print("New endpoint", endpoint, flush=True) - link.link_endpoint_ids.extend([endpoint.endpoint_id]) - - self.SetLink(link) - - print("Service with new rule", service, flush=True) - self.UpdateService(service) - - service2 = self.GetService(service_id) - print("Service obtained from id after updating", service2, flush=True) - """ - - def GenerateRuleValue(self, ip_o, ip_d, port_o, port_d): - value = { - 'ipv4:source-address': ip_o, - 'ipv4:destination-address': ip_d, - 'transport:source-port': port_o, - 'transport:destination-port': port_d, - 'forwarding-action': 'DROP', - } - return value - - def GetConfigRule(self, ip_o, ip_d, port_o, port_d): - config_rule = ConfigRule() - config_rule_custom = ConfigRule_Custom() - config_rule.action = ConfigActionEnum.CONFIGACTION_SET - config_rule_custom.resource_key = 'test' - config_rule_custom.resource_value = str(self.GenerateRuleValue(ip_o, ip_d, port_o, port_d)) - config_rule.custom.CopyFrom(config_rule_custom) - return config_rule - - def GenerateServiceId(self, service_id): - service_id_obj = ServiceId() - context_id = ContextId() - uuid = Uuid() - uuid.uuid = service_id - context_id.context_uuid.CopyFrom(uuid) - service_id_obj.context_id.CopyFrom(context_id) - service_id_obj.service_uuid.CopyFrom(uuid) - return service_id_obj - - def SendOutput(self, request, context): - # SEND CONFIDENCE TO MITIGATION SERVER - print("Server received mitigation values...", request.confidence, flush=True) - - last_value = request.confidence - last_tag = request.tag - - ip_o = request.ip_o - ip_d = request.ip_d - port_o = request.port_o - port_d = request.port_d - - service_id = self.GenerateServiceId(request.service_id) - - config_rule = self.GetConfigRule(ip_o, ip_d, port_o, port_d) - - service = GetService(service_id) - print(service) - #service.config_rules.append(config_rule) - #UpdateService(service) - - # RETURN OK TO THE CALLER - return Empty( - message=f"OK, received values: {last_tag} with confidence {last_value}." - ) - - def SetDevice(self, device): - with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: - stub = ContextServiceStub(channel) - return stub.SetDevice(device) - - def SetLink(self, link): - with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: - stub = ContextServiceStub(channel) - return stub.SetLink(link) - - def SetTopology(self, link): - with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: - stub = ContextServiceStub(channel) - return stub.SetTopology(link) - - - def GetService(self, service_id): - with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: - stub = ContextServiceStub(channel) - return stub.GetService(service_id) - - def SetContext(self, context): - with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: - stub = ContextServiceStub(channel) - return stub.SetContext(context) - - def UpdateService(self, service): - with grpc.insecure_channel(SERVICE_CHANNEL) as channel: - stub = ServiceServiceStub(channel) - stub.UpdateService(service) - - def CreateService(self, service): - with grpc.insecure_channel(SERVICE_CHANNEL) as channel: - stub = ServiceServiceStub(channel) - stub.CreateService(service) - - def GetMitigation(self, request, context): - # GET OR PERFORM MITIGATION STRATEGY - logging.debug("") - print("Returing mitigation strategy...") - k = self.last_value * 2 - return Empty( - message=f"Mitigation with double confidence = {k}" - ) - diff --git a/src/l3_centralizedattackdetector/Config.py b/src/l3_centralizedattackdetector/Config.py index f6c7e33553820b1214e5265cf219db629bcfe006..809380b2cda1c8c556f973e570de36e3189edb99 100644 --- a/src/l3_centralizedattackdetector/Config.py +++ b/src/l3_centralizedattackdetector/Config.py @@ -18,7 +18,7 @@ import logging LOG_LEVEL = logging.WARNING # gRPC settings -GRPC_SERVICE_PORT = 10001 # TODO UPM FIXME +GRPC_SERVICE_PORT = 10001 GRPC_MAX_WORKERS = 10 GRPC_GRACE_PERIOD = 60 diff --git a/src/l3_centralizedattackdetector/README.md b/src/l3_centralizedattackdetector/README.md index bcec4052cc9aa2ea734e08a4ed6b9158609b3532..2273eef80ec4c366d549d20d9447434003257217 100644 --- a/src/l3_centralizedattackdetector/README.md +++ b/src/l3_centralizedattackdetector/README.md @@ -1,3 +1,10 @@ -# l3_centralizedattackdetector -- Receives packages and process it with TSTAT -- Functions: ReportSummarizeKpi(KpiList) +# L3 Centralized Attack Detector + +Receives snapshot statistics from Distributed Attack Detector component and performs an inference to detect attacks. +It then sends the detected attacks to the Attack Mitigator component for them to be mitigated. + +## Functions: +- AnalyzeConnectionStatistics(L3CentralizedattackdetectorMetrics) -> StatusMessage +- AnalyzeBatchConnectionStatistics(L3CentralizedattackdetectorBatchInput) -> StatusMessage +- GetFeaturesIds(Empty) -> AutoFeatures +- SetAttackIPs(AttackIPs) -> Empty diff --git a/src/l3_centralizedattackdetector/client/l3_centralizedattackdetectorClient.py b/src/l3_centralizedattackdetector/client/l3_centralizedattackdetectorClient.py index 2ef33438e77dbe4c3609bd21133fb3a9c95c8bcc..8de016a5d56ea1e1fefe23ba6e29f6865ee5e5a6 100644 --- a/src/l3_centralizedattackdetector/client/l3_centralizedattackdetectorClient.py +++ b/src/l3_centralizedattackdetector/client/l3_centralizedattackdetectorClient.py @@ -13,18 +13,17 @@ # limitations under the License. import grpc, logging -from common.tools.client.RetryDecorator import retry, delay_exponential -from common.proto.l3_centralizedattackdetector_pb2_grpc import ( - L3CentralizedattackdetectorStub, -) +from common.proto.context_pb2 import Empty +from common.proto.l3_centralizedattackdetector_pb2_grpc import L3CentralizedattackdetectorStub from common.proto.l3_centralizedattackdetector_pb2 import ( + AttackIPs, AutoFeatures, - Empty, L3CentralizedattackdetectorBatchInput, L3CentralizedattackdetectorMetrics, - ModelInput, - ModelOutput + StatusMessage ) +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -34,7 +33,7 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, class l3_centralizedattackdetectorClient: def __init__(self, address, port): self.endpoint = "{}:{}".format(address, port) - LOGGER.debug("Creating channel to {}...".format(self.endpoint)) + LOGGER.debug("Creating channel to {:s}...".format(self.endpoint)) self.channel = None self.stub = None self.connect() @@ -51,24 +50,29 @@ class l3_centralizedattackdetectorClient: self.stub = None @RETRY_DECORATOR - def AnalyzeConnectionStatistics(self, request: L3CentralizedattackdetectorMetrics) -> Empty: - LOGGER.debug('AnalyzeConnectionStatistics request: {}'.format(request)) + def AnalyzeConnectionStatistics(self, request : L3CentralizedattackdetectorMetrics) -> StatusMessage: + LOGGER.debug('AnalyzeConnectionStatistics request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.AnalyzeConnectionStatistics(request) - LOGGER.debug('AnalyzeConnectionStatistics result: {}'.format(response)) + LOGGER.debug('AnalyzeConnectionStatistics result: {:s}'.format(grpc_message_to_json_string(response))) return response - + @RETRY_DECORATOR - def AnalyzeBatchConnectionStatistics(self, request: L3CentralizedattackdetectorBatchInput) -> Empty: - LOGGER.debug('AnalyzeBatchConnectionStatistics request: {}'.format(request)) - response = self.stub.GetOutput(request) - LOGGER.debug('AnalyzeBatchConnectionStatistics result: {}'.format(response)) + def AnalyzeBatchConnectionStatistics(self, request: L3CentralizedattackdetectorBatchInput) -> StatusMessage: + LOGGER.debug('AnalyzeBatchConnectionStatistics request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.AnalyzeBatchConnectionStatistics(request) + LOGGER.debug('AnalyzeBatchConnectionStatistics result: {:s}'.format(grpc_message_to_json_string(response))) return response - + @RETRY_DECORATOR - def GetFeaturesIds(self, request: Empty) -> AutoFeatures: - LOGGER.debug('GetFeaturesIds request: {}'.format(request)) - response = self.stub.GetOutput(request) - LOGGER.debug('GetFeaturesIds result: {}'.format(response)) + def GetFeaturesIds(self, request : Empty) -> AutoFeatures: + LOGGER.debug('GetFeaturesIds request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetFeaturesIds(request) + LOGGER.debug('GetFeaturesIds result: {:s}'.format(grpc_message_to_json_string(response))) return response - + @RETRY_DECORATOR + def SetAttackIPs(self, request : AttackIPs) -> Empty: + LOGGER.debug('SetAttackIPs request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SetAttackIPs(request) + LOGGER.debug('SetAttackIPs result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py index 3bfd6fd2ff09ef471d94b6c66470ed5668704827..91793230d0626d9a8dc112c6442a7364b6beb1a1 100644 --- a/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py +++ b/src/l3_centralizedattackdetector/service/l3_centralizedattackdetectorServiceServicerImpl.py @@ -13,34 +13,34 @@ # limitations under the License. from __future__ import print_function -from datetime import datetime, timedelta import csv -import os +import grpc +import logging import numpy as np import onnxruntime as rt -import logging +import os import time import uuid +from datetime import datetime, timedelta from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from common.proto.context_pb2 import Timestamp, SliceId, ConnectionId +from common.proto.context_pb2 import Empty, Timestamp from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.l3_attackmitigator_pb2 import L3AttackmitigatorOutput -from common.proto.l3_centralizedattackdetector_pb2 import Empty, AutoFeatures +from common.proto.l3_centralizedattackdetector_pb2 import AttackIPs, AutoFeatures, L3CentralizedattackdetectorMetrics, StatusMessage from common.proto.l3_centralizedattackdetector_pb2_grpc import L3CentralizedattackdetectorServicer from common.proto.monitoring_pb2 import Kpi, KpiDescriptor from common.tools.timestamp.Converters import timestamp_utcnow_to_float -from monitoring.client.MonitoringClient import MonitoringClient from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient +from monitoring.client.MonitoringClient import MonitoringClient LOGGER = logging.getLogger(__name__) current_dir = os.path.dirname(os.path.abspath(__file__)) -# Constants -DEMO_MODE = False -ATTACK_IPS = ["37.187.95.110", "91.121.140.167", "94.23.23.52", "94.23.247.226", "149.202.83.171"] +# Environment variables +TEST_ML_MODEL = True if int(os.getenv("TEST_ML_MODEL", 0)) == 1 else False BATCH_SIZE = int(os.getenv("BATCH_SIZE", 10)) METRICS_POOL = MetricsPool("l3_centralizedattackdetector", "RPC") @@ -61,16 +61,21 @@ class ConnectionInfo: ) def __str__(self): - return "ip_o: " + self.ip_o + "\nport_o: " + self.port_o + "\nip_d: " + self.ip_d + "\nport_d: " + self.port_d + return f"ip_o: {self.ip_o}\nport_o: {self.port_o}\nip_d: {self.ip_d}\nport_d: {self.port_d}" class l3_centralizedattackdetectorServiceServicerImpl(L3CentralizedattackdetectorServicer): + def __init__(self): + """ + Initializes the Centralized Attack Detector service. - """ - Initialize variables, prediction model and clients of components used by CAD - """ + Args: + None + + Returns: + None + """ - def __init__(self): LOGGER.info("Creating Centralized Attack Detector Service") self.inference_values = [] @@ -82,14 +87,14 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto ) self.cryptomining_detector_model = rt.InferenceSession(self.cryptomining_detector_model_path) - # Load cryptomining detector features metadata from ONNX file + # Load cryptomining attack detector features metadata from ONNX file self.cryptomining_detector_features_metadata = list( self.cryptomining_detector_model.get_modelmeta().custom_metadata_map.values() ) self.cryptomining_detector_features_metadata = [float(x) for x in self.cryptomining_detector_features_metadata] self.cryptomining_detector_features_metadata.sort() - LOGGER.info("Cryptomining Detector Features: " + str(self.cryptomining_detector_features_metadata)) + LOGGER.info(f"Cryptomining Attack Detector Features: {self.cryptomining_detector_features_metadata}") LOGGER.info(f"Batch size: {BATCH_SIZE}") self.input_name = self.cryptomining_detector_model.get_inputs()[0].name @@ -121,7 +126,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto self.monitored_kpis = { "l3_security_status": { "kpi_id": None, - "description": "L3 - Confidence of the cryptomining detector in the security status in the last time interval of the service {service_id}", + "description": "L3 - Confidence of the cryptomining attack detector in the security status in the last time interval of the service {service_id}", "kpi_sample_type": KpiSampleType.KPISAMPLETYPE_L3_SECURITY_STATUS_CRYPTO, "service_ids": [], }, @@ -170,6 +175,9 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto # AM evaluation tests self.am_notification_times = [] + + # List of attack connections IPs + self.attack_ips = [] # List of attack connections self.attack_connections = [] @@ -180,13 +188,12 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto self.false_positives = 0 self.false_negatives = 0 - self.replica_uuid = uuid.uuid4() + self.pod_id = uuid.uuid4() + LOGGER.info(f"Pod Id.: {self.pod_id}") self.first_batch_request_time = 0 self.last_batch_request_time = 0 - LOGGER.info("This replica's identifier is: " + str(self.replica_uuid)) - self.response_times_csv_file_path = "response_times.csv" col_names = ["timestamp_first_req", "timestamp_last_req", "total_time", "batch_size"] @@ -194,16 +201,6 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto writer = csv.writer(file) writer.writerow(col_names) - """ - Create a monitored KPI for a specific service and add it to the Monitoring Client - -input: - + service_id: service ID where the KPI will be monitored - + kpi_name: name of the KPI - + kpi_description: description of the KPI - + kpi_sample_type: KPI sample type of the KPI (it must be defined in the kpi_sample_types.proto file) - -output: KPI identifier representing the KPI - """ - def create_kpi( self, service_id, @@ -211,24 +208,40 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto kpi_description, kpi_sample_type, ): + """ + Creates a new KPI for a specific service and add it to the Monitoring client + + Args: + service_id (ServiceID): The ID of the service. + kpi_name (str): The name of the KPI. + kpi_description (str): The description of the KPI. + kpi_sample_type (KpiSampleType): The sample type of the KPI. + + Returns: + kpi (Kpi): The created KPI. + """ + kpidescriptor = KpiDescriptor() kpidescriptor.kpi_description = kpi_description kpidescriptor.service_id.service_uuid.uuid = service_id.service_uuid.uuid kpidescriptor.kpi_sample_type = kpi_sample_type - new_kpi = self.monitoring_client.SetKpi(kpidescriptor) + kpi = self.monitoring_client.SetKpi(kpidescriptor) LOGGER.info("Created KPI {}".format(kpi_name)) - return new_kpi - - """ - Create the monitored KPIs for a specific service, add them to the Monitoring Client and store their identifiers in the monitored_kpis dictionary - -input: - + service_id: service ID where the KPIs will be monitored - -output: None - """ + return kpi def create_kpis(self, service_id): + """ + Creates the monitored KPIs for a specific service, adds them to the Monitoring client and stores their identifiers in the monitored_kpis dictionary + + Args: + service_id (uuid): The ID of the service. + + Returns: + None + """ + LOGGER.info("Creating KPIs for service {}".format(service_id)) # all the KPIs are created for all the services from which requests are received @@ -245,6 +258,16 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto LOGGER.info("Created KPIs for service {}".format(service_id)) def monitor_kpis(self): + """ + Monitors KPIs for all the services from which requests are received + + Args: + None + + Returns: + None + """ + monitor_inference_results = self.inference_results monitor_service_ids = self.service_ids @@ -256,13 +279,22 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto for service_id in monitor_service_ids: LOGGER.debug("service_id: {}".format(service_id)) - self.monitor_compute_l3_kpi(service_id, monitor_inference_results) - + self.monitor_compute_l3_kpi() LOGGER.debug("KPIs sent to monitoring server") else: LOGGER.debug("No KPIs sent to monitoring server") def assign_timestamp(self, monitor_inference_results): + """ + Assigns a timestamp to the monitored inference results. + + Args: + monitor_inference_results (list): A list of monitored inference results. + + Returns: + None + """ + time_interval = self.MONITORED_KPIS_TIME_INTERVAL_AGG # assign the timestamp of the first inference result to the time_interval_start @@ -302,7 +334,19 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto LOGGER.debug("time_interval_start: {}".format(self.time_interval_start)) LOGGER.debug("time_interval_end: {}".format(self.time_interval_end)) - def monitor_compute_l3_kpi(self,): + def monitor_compute_l3_kpi( + self, + ): + """ + Computes the monitored KPIs for a specific service and sends them to the Monitoring server + + Args: + None + + Returns: + None + """ + # L3 security status kpi_security_status = Kpi() kpi_security_status.kpi_id.kpi_id.CopyFrom(self.monitored_kpis["l3_security_status"]["kpi_id"]) @@ -357,19 +401,36 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto LOGGER.debug("Error sending KPIs to monitoring server: {}".format(e)) def monitor_ml_model_confidence(self): - if self.l3_security_status == 0: - return self.l3_ml_model_confidence_normal + """ + Get the monitored KPI for the confidence of the ML model + + Args: + None + + Returns: + confidence (float): The monitored KPI for the confidence of the ML model + """ - return self.l3_ml_model_confidence_crypto + confidence = None - """ - Classify connection as standard traffic or cryptomining attack and return results - -input: - + request: L3CentralizedattackdetectorMetrics object with connection features information - -output: L3AttackmitigatorOutput object with information about the assigned class and prediction confidence - """ + if self.l3_security_status == 0: + confidence = self.l3_ml_model_confidence_normal + else: + confidence = self.l3_ml_model_confidence_crypto + + return confidence def perform_inference(self, request): + """ + Performs inference on the input data using the Cryptomining Attack Detector model to classify the connection as standard traffic or cryptomining attack. + + Args: + request (L3CentralizedattackdetectorMetrics): A L3CentralizedattackdetectorMetrics object with connection features information. + + Returns: + dict: A dictionary containing the predicted class, the probability of that class, and other relevant information required to block the attack. + """ + x_data = np.array([[feature.feature for feature in request.features]]) # Print input data shape @@ -443,14 +504,17 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto return output_message - """ - Classify connection as standard traffic or cryptomining attack and return results - -input: - + request: L3CentralizedattackdetectorMetrics object with connection features information - -output: L3AttackmitigatorOutput object with information about the assigned class and prediction confidence - """ + def perform_batch_inference(self, requests): + """ + Performs batch inference on the input data using the Cryptomining Attack Detector model to classify the connection as standard traffic or cryptomining attack. + + Args: + requests (list): A list of L3CentralizedattackdetectorMetrics objects with connection features information. + + Returns: + list: A list of dictionaries containing the predicted class, the probability of that class, and other relevant information required to block the attack for each request. + """ - def perform_distributed_inference(self, requests): batch_size = len(requests) # Create an empty array to hold the input data @@ -533,15 +597,25 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto return output_messages - """ - Receive features from Attack Mitigator, predict attack and communicate with Attack Mitigator - -input: - + request: L3CentralizedattackdetectorMetrics object with connection features information - -output: Empty object with a message about the execution of the function - """ - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def AnalyzeConnectionStatistics(self, request, context): + def AnalyzeConnectionStatistics( + self, request : L3CentralizedattackdetectorMetrics, context : grpc.ServicerContext + ) -> StatusMessage: + """ + Analyzes the connection statistics sent in the request, performs batch inference on the + input data using the Cryptomining Attack Detector model to classify the connection as + standard traffic or cryptomining attack, and notifies the Attack Mitigator component in + case of attack. + + Args: + request (L3CentralizedattackdetectorMetrics): A L3CentralizedattackdetectorMetrics + object with connection features information. + context (grpc.ServicerContext): The context of the request. + + Returns: + StatusMessage: An response indicating that the information was received and processed. + """ + # Perform inference with the data sent in the request if len(self.active_requests) == 0: self.first_batch_request_time = time.time() @@ -549,14 +623,14 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto self.active_requests.append(request) if len(self.active_requests) >= BATCH_SIZE: - LOGGER.debug("Performing inference... {}".format(self.replica_uuid)) + LOGGER.debug("Performing inference... {}".format(self.pod_id)) inference_time_start = time.time() - cryptomining_detector_output = self.perform_distributed_inference(self.active_requests) + cryptomining_detector_output = self.perform_batch_inference(self.active_requests) inference_time_end = time.time() LOGGER.debug("Inference performed in {} seconds".format(inference_time_end - inference_time_start)) - logging.info("Inference performed correctly") + LOGGER.info("Inference performed correctly") self.inference_results.append({"output": cryptomining_detector_output, "timestamp": datetime.now()}) LOGGER.debug("inference_results length: {}".format(len(self.inference_results))) @@ -564,7 +638,8 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto for i, req in enumerate(self.active_requests): service_id = req.connection_metadata.service_id - # Check if a request of a new service has been received and, if so, create the monitored KPIs for that service + # Check if a request of a new service has been received and, if so, create + # the monitored KPIs for that service if service_id not in self.service_ids: self.create_kpis(service_id) self.service_ids.append(service_id) @@ -576,7 +651,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto LOGGER.debug("Monitoring KPIs performed in {} seconds".format(monitor_kpis_end - monitor_kpis_start)) LOGGER.debug("cryptomining_detector_output: {}".format(cryptomining_detector_output[i])) - if DEMO_MODE: + if TEST_ML_MODEL: self.analyze_prediction_accuracy(cryptomining_detector_output[i]["confidence"]) connection_info = ConnectionInfo( @@ -613,10 +688,10 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto # Only notify Attack Mitigator when a cryptomining connection has been detected if cryptomining_detector_output[i]["tag_name"] == "Crypto": - if DEMO_MODE: + if TEST_ML_MODEL: self.attack_connections.append(connection_info) - if connection_info.ip_o in ATTACK_IPS or connection_info.ip_d in ATTACK_IPS: + if connection_info.ip_o in self.attack_ips or connection_info.ip_d in self.attack_ips: self.correct_attack_conns += 1 self.correct_predictions += 1 else: @@ -629,17 +704,17 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto LOGGER.debug("Crypto attack detected") # Notify the Attack Mitigator component about the attack - logging.info( + LOGGER.info( "Notifying the Attack Mitigator component about the attack in order to block the connection..." ) try: - logging.info("Sending the connection information to the Attack Mitigator component...") + LOGGER.info("Sending the connection information to the Attack Mitigator component...") message = L3AttackmitigatorOutput(**cryptomining_detector_output[i]) - + am_response = self.attackmitigator_client.PerformMitigation(message) LOGGER.debug("AM response: {}".format(am_response)) - + notification_time_end = time.time() self.am_notification_times.append(notification_time_end - notification_time_start) @@ -670,18 +745,18 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto f.write("Std notification time: {}\n".format(std_notification_time)) f.write("Median notification time: {}\n".format(median_notification_time)) - logging.info("Attack Mitigator notified") + LOGGER.info("Attack Mitigator notified") except Exception as e: - logging.error("Error notifying the Attack Mitigator component about the attack: ", e) - logging.error("Couldn't find l3_attackmitigator") + LOGGER.error("Error notifying the Attack Mitigator component about the attack: ", e) + LOGGER.error("Couldn't find l3_attackmitigator") - return Empty(message="Attack Mitigator not found") + return StatusMessage(message="Attack Mitigator not found") else: - logging.info("No attack detected") + LOGGER.info("No attack detected") if cryptomining_detector_output[i]["tag_name"] != "Crypto": - if connection_info.ip_o not in ATTACK_IPS and connection_info.ip_d not in ATTACK_IPS: + if connection_info.ip_o not in self.attack_ips and connection_info.ip_d not in self.attack_ips: self.correct_predictions += 1 else: LOGGER.debug("False negative: {}".format(connection_info)) @@ -705,11 +780,21 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto writer = csv.writer(file) writer.writerow(col_values) - return Empty(message="Ok, metrics processed") + return StatusMessage(message="Ok, metrics processed") - return Empty(message="Ok, information received") + return StatusMessage(message="Ok, information received") def analyze_prediction_accuracy(self, confidence): + """ + Analyzes the prediction accuracy of the Centralized Attack Detector. + + Args: + confidence (float): The confidence level of the Cryptomining Attack Detector model. + + Returns: + None + """ + LOGGER.info("Number of Attack Connections Correctly Classified: {}".format(self.correct_attack_conns)) LOGGER.info("Number of Attack Connections: {}".format(len(self.attack_connections))) @@ -726,7 +811,7 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto cryptomining_attack_detection_acc = 0 LOGGER.info("Cryptomining Attack Detection Accuracy: {}".format(cryptomining_attack_detection_acc)) - LOGGER.info("Cryptomining Detector Confidence: {}".format(confidence)) + LOGGER.info("Cryptomining Attack Detector Confidence: {}".format(confidence)) with open("prediction_accuracy.txt", "a") as f: LOGGER.debug("Exporting prediction accuracy and confidence") @@ -738,12 +823,28 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto f.write("False Positives: {}\n".format(self.false_positives)) f.write("True Negatives: {}\n".format(self.total_predictions - len(self.attack_connections))) f.write("False Negatives: {}\n".format(self.false_negatives)) - f.write("Cryptomining Detector Confidence: {}\n\n".format(confidence)) + f.write("Cryptomining Attack Detector Confidence: {}\n\n".format(confidence)) f.write("Timestamp: {}\n".format(datetime.now().strftime("%d/%m/%Y %H:%M:%S"))) f.close() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def AnalyzeBatchConnectionStatistics(self, request, context): + def AnalyzeBatchConnectionStatistics( + self, request : L3CentralizedattackdetectorBatchInput, context : grpc.ServicerContext + ) -> StatusMessage: + """ + Analyzes a batch of connection statistics sent in the request, performs batch inference on the + input data using the Cryptomining Attack Detector model to classify the connection as standard + traffic or cryptomining attack, and notifies the Attack Mitigator component in case of attack. + + Args: + request (L3CentralizedattackdetectorBatchInput): A L3CentralizedattackdetectorBatchInput + object with connection features information. + context (grpc.ServicerContext): The context of the request. + + Returns: + StatusMessage: An StatusMessage indicating that the information was received and processed. + """ + batch_time_start = time.time() for metric in request.metrics: @@ -751,25 +852,50 @@ class l3_centralizedattackdetectorServiceServicerImpl(L3Centralizedattackdetecto batch_time_end = time.time() with open("batch_time.txt", "a") as f: - f.write(str(len(request.metrics)) + "\n") - f.write(str(batch_time_end - batch_time_start) + "\n\n") + f.write(f"{len(request.metrics)}\n") + f.write(f"{batch_time_end - batch_time_start}\n\n") f.close() - logging.debug("Metrics: " + str(len(request.metrics))) - logging.debug("Batch time: " + str(batch_time_end - batch_time_start)) - - return Empty(message="OK, information received.") + LOGGER.debug(f"Batch time: {batch_time_end - batch_time_start}") + LOGGER.debug("Batch time: {}".format(batch_time_end - batch_time_start)) - """ - Send features allocated in the metadata of the onnx file to the DAD - -output: ONNX metadata as a list of integers - """ + return StatusMessage(message="OK, information received.") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def GetFeaturesIds(self, request: Empty, context): - features = AutoFeatures() + def GetFeaturesIds(self, request : Empty, context : grpc.ServicerContext) -> AutoFeatures: + """ + Returns a list of feature IDs used by the Cryptomining Attack Detector model. + + Args: + request (Empty): An empty request object. + context (grpc.ServicerContext): The context of the request. + + Returns: + features_ids (AutoFeatures): A list of feature IDs used by the Cryptomining Attack Detector model. + """ + + features_ids = AutoFeatures() for feature in self.cryptomining_detector_features_metadata: - features.auto_features.append(feature) + features_ids.auto_features.append(feature) + + return features_ids + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SetAttackIPs(self, request : AttackIPs, context : grpc.ServicerContext) -> Empty: + """ + Sets the list of attack IPs in order to be used to compute the prediction accuracy of the + Centralized Attack Detector in case of testing the ML model. + + Args: + request (AttackIPs): A list of attack IPs. + context (grpc.ServicerContext): The context of the request. + + Returns: + empty (Empty): An empty response object. + """ + + self.attack_ips = request.attack_ips + LOGGER.debug(f"Succesfully set attack IPs: {self.attack_ips}") - return features + return Empty() diff --git a/src/l3_distributedattackdetector/Config.py b/src/l3_distributedattackdetector/Config.py index e04de0b2622b621fb95f1c382ac3a152836de760..a1419ef09c9b3dcbff5aa576536fae8ffe6bc7a4 100644 --- a/src/l3_distributedattackdetector/Config.py +++ b/src/l3_distributedattackdetector/Config.py @@ -18,7 +18,7 @@ import logging LOG_LEVEL = logging.WARNING # gRPC settings -GRPC_SERVICE_PORT = 10000 # TODO UPM FIXME +GRPC_SERVICE_PORT = 10000 GRPC_MAX_WORKERS = 10 GRPC_GRACE_PERIOD = 60 diff --git a/src/l3_distributedattackdetector/README.md b/src/l3_distributedattackdetector/README.md index d8cac8b72d41c6eb6ce2b2908e6ab7402966ad62..d79563dd8936814132e96aa738216435be44950a 100644 --- a/src/l3_distributedattackdetector/README.md +++ b/src/l3_distributedattackdetector/README.md @@ -1,3 +1,3 @@ -# l3_distributedattackdetector -- Receives packages and process it with TSTAT -- Functions: ReportSummarizeKpi(KpiList) +# L3 Distributed Attack Detector + +Receives packages and processes them with TSTAT to generate traffic snapshot statistics. diff --git a/src/l3_distributedattackdetector/requirements.in b/src/l3_distributedattackdetector/requirements.in index a8aba849708799232f6b0732c3661396266da329..64e4aa198bd8b7902c0bb810e5fcd6f108faae6f 100644 --- a/src/l3_distributedattackdetector/requirements.in +++ b/src/l3_distributedattackdetector/requirements.in @@ -12,4 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -# no extra dependency +numpy==1.23.* +asyncio==3.4.3 diff --git a/src/l3_distributedattackdetector/service/__main__.py b/src/l3_distributedattackdetector/service/__main__.py index 1f558dfb6c271cf63a9e36ae06cb9993f7e49c57..a8f0ac3c4f9737091c2c1a39134b97ee7bd6de7d 100644 --- a/src/l3_distributedattackdetector/service/__main__.py +++ b/src/l3_distributedattackdetector/service/__main__.py @@ -13,207 +13,39 @@ # limitations under the License. import logging -import sys -import os -import time -import grpc -from common.proto.l3_centralizedattackdetector_pb2_grpc import ( - L3CentralizedattackdetectorStub, -) -from common.proto.l3_centralizedattackdetector_pb2 import ( - ModelInput, -) +from sys import stdout +import sys +from l3_distributedattackdetector import l3_distributedattackdetector -LOGGER = logging.getLogger(__name__) -TSTAT_DIR_NAME = "piped/" -JSON_BLANK = { - "ip_o": "", # Client IP - "port_o": "", # Client port - "ip_d": "", # Server ip - "port_d": "", # Server port - "flow_id": "", # Identifier:c_ip,c_port,s_ip,s_port,time_start - "protocol": "", # Connection protocol - "time_start": 0, # Start of connection - "time_end": 0, # Time of last packet -} +# Setup LOGGER +LOGGER = logging.getLogger("main_dad_LOGGER") +LOGGER.setLevel(logging.INFO) +logFormatter = logging.Formatter(fmt="%(levelname)-8s %(message)s") +consoleHandler = logging.StreamHandler(stdout) +consoleHandler.setFormatter(logFormatter) +LOGGER.addHandler(consoleHandler) -def follow(thefile, time_sleep): - """ - Generator function that yields new lines in a file - It reads the logfie (the opened file) - """ - # seek the end of the file - thefile.seek(0, os.SEEK_END) +PROFILING = False - trozo = "" - # start infinite loop - while True: - # read last line of file - line = thefile.readline() - # sleep if file hasn't been updated - if not line: - time.sleep(time_sleep) # FIXME - continue - - if line[-1] != "\n": - trozo += line - # print ("OJO :"+line+":") - else: - if trozo != "": - line = trozo + line - trozo = "" - yield line - -def load_file(dirname=TSTAT_DIR_NAME): - """ - - Client side - - """ - # "/home/dapi/Tstat/TOSHI/tstat/tstat_DRv4/tstat/piped/" - - while True: - here = os.path.dirname(os.path.abspath(__file__)) - tstat_piped = os.path.join(here, dirname) - tstat_dirs = os.listdir(tstat_piped) - if len(tstat_dirs) > 0: - tstat_dirs.sort() - new_dir = tstat_dirs[-1] - print(new_dir) - # print("dir: {0}".format(new_dir)) - tstat_file = tstat_piped + new_dir + "/log_tcp_temp_complete" - print("tstat_file: {0}".format(tstat_file)) - return tstat_file - else: - print("No tstat directory!") - time.sleep(1) - -def process_line(line): - """ - - Preprocessing before a message per line - - Avoids crash when nan are found by generating a 0s array - - Returns a list of values - """ - - def makeDivision(i, j): - """ - Helper function - """ - return i / j if (j and type(i) != str and type(j) != str) else 0 - - line = line.split(" ") - try: - n_packets_server, n_packets_client = float( - line[16]), float(line[2]) - except: - return [0 for i in range(9)] - n_bits_server, n_bits_client = float(line[22]), float(line[8]) - seconds = float(line[30]) / 1e6 # Duration in ms - values = [ - makeDivision(n_packets_server, seconds), - makeDivision(n_packets_client, seconds), - makeDivision(n_bits_server, seconds), - makeDivision(n_bits_client, seconds), - makeDivision(n_bits_server, n_packets_server), - makeDivision(n_bits_client, n_packets_client), - makeDivision(n_packets_server, n_packets_client), - makeDivision(n_bits_server, n_bits_client), - ] - return values - -def open_channel(input_information): - with grpc.insecure_channel("localhost:10001") as channel: - stub = L3CentralizedattackdetectorStub(channel) - response = stub.SendInput( - ModelInput(**input_information)) - LOGGER.debug("Inferencer send_input sent and received: ", - response.message) - # response = stub.get_output(Inferencer_pb2.empty(message="")) - # print("Inferencer get_output response: \n", response) - -def run(time_sleep, max_lines): - - filename = load_file() - write_salida = None - print( - "following: ", - filename, - " time to wait:", - time_sleep, - "lineas_tope:", - max_lines, - "write salida:", - write_salida, - ) - logfile = open(filename, "r") - # iterate over the generator - loglines = follow(logfile, time_sleep) - lin = 0 - ultima_lin = 0 - last_line = "" - cryptos = 0 - new_connections = {} # Dict for storing NEW data - connections_db = {} # Dict for storing ALL data - print('Reading lines') - for line in loglines: - print('Received Line') - start = time.time() - line_id = line.split(" ") - conn_id = (line_id[0], line_id[1], line_id[14], line_id[15]) - new_connections[conn_id] = process_line(line) - try: - connections_db[conn_id]["time_end"] = time.time() - except KeyError: - connections_db[conn_id] = JSON_BLANK.copy() - connections_db[conn_id]["time_start"] = time.time() - connections_db[conn_id]["time_end"] = time.time() - connections_db[conn_id]["ip_o"] = conn_id[0] - connections_db[conn_id]["port_o"] = conn_id[1] - connections_db[conn_id]["flow_id"] = "".join(conn_id) - connections_db[conn_id]["protocol"] = "TCP" - connections_db[conn_id]["ip_d"] = conn_id[2] - connections_db[conn_id]["port_d"] = conn_id[3] +def main(): + l3_distributedattackdetector() - # CRAFT DICT - inference_information = { - "n_packets_server_seconds": new_connections[conn_id][0], - "n_packets_client_seconds": new_connections[conn_id][1], - "n_bits_server_seconds": new_connections[conn_id][2], - "n_bits_client_seconds": new_connections[conn_id][3], - "n_bits_server_n_packets_server": new_connections[conn_id][4], - "n_bits_client_n_packets_client": new_connections[conn_id][5], - "n_packets_server_n_packets_client": new_connections[conn_id][6], - "n_bits_server_n_bits_client": new_connections[conn_id][7], - "ip_o": connections_db[conn_id]["ip_o"], - "port_o": connections_db[conn_id]["port_o"], - "ip_d": connections_db[conn_id]["ip_d"], - "port_d": connections_db[conn_id]["port_d"], - "flow_id": connections_db[conn_id]["flow_id"], - "protocol": connections_db[conn_id]["protocol"], - "time_start": connections_db[conn_id]["time_start"], - "time_end": connections_db[conn_id]["time_end"], - } - # SEND MSG - try: - open_channel(inference_information) - except: - LOGGER.info("Centralized Attack Mitigator is not up") +if __name__ == "__main__": + if PROFILING: + import cProfile, pstats, io - if write_salida: - print(line, end="") - sys.stdout.flush() - lin += 1 - if lin >= max_lines: - break - elif lin == 1: - print("primera:", ultima_lin) + pr = cProfile.Profile() + pr.enable() - end = time.time() - start - print(end) + main() + if PROFILING: + pr.disable() + s = io.StringIO() + sortby = "cumulative" + ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + ps.print_stats() + LOGGER.info(s.getvalue()) -def main(): - logging.basicConfig() - run(5, 70) - -if __name__ == '__main__': - sys.exit(main()) + sys.exit(0) \ No newline at end of file diff --git a/src/l3_distributedattackdetector/service/l3_distributedattackdetector.py b/src/l3_distributedattackdetector/service/l3_distributedattackdetector.py new file mode 100644 index 0000000000000000000000000000000000000000..357f44a9ab2037438252fb0ca40b1a7dc3c74c54 --- /dev/null +++ b/src/l3_distributedattackdetector/service/l3_distributedattackdetector.py @@ -0,0 +1,376 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import grpc +import logging +import numpy as np +import os +import signal +import time +from sys import stdout +from common.proto.context_pb2 import ( + Empty, + ServiceTypeEnum, + ContextId, +) +from common.proto.context_pb2_grpc import ContextServiceStub +from common.proto.l3_centralizedattackdetector_pb2 import ( + L3CentralizedattackdetectorMetrics, + L3CentralizedattackdetectorBatchInput, + ConnectionMetadata, + Feature, +) +from common.proto.l3_centralizedattackdetector_pb2_grpc import L3CentralizedattackdetectorStub + +# Setup LOGGER +LOGGER = logging.getLogger("dad_LOGGER") +LOGGER.setLevel(logging.INFO) +logFormatter = logging.Formatter(fmt="%(levelname)-8s %(message)s") +consoleHandler = logging.StreamHandler(stdout) +consoleHandler.setFormatter(logFormatter) +LOGGER.addHandler(consoleHandler) + +TSTAT_DIR_NAME = "piped/" +CENTRALIZED_ATTACK_DETECTOR = "192.168.165.78:10001" +JSON_BLANK = { + "ip_o": "", # Client IP + "port_o": "", # Client port + "ip_d": "", # Server ip + "port_d": "", # Server port + "flow_id": "", # Identifier:c_ip,c_port,s_ip,s_port,time_start + "protocol": "", # Connection protocol + "time_start": 0.0, # Start of connection + "time_end": 0.0, # Time of last packet +} + +STOP = False +IGNORE_FIRST_LINE_TSTAT = True + +CONTEXT_ID = "admin" +CONTEXT_CHANNEL = "192.168.165.78:1010" +PROFILING = False +SEND_DATA_IN_BATCHES = False +BATCH_SIZE = 10 +ATTACK_IPS = ["37.187.95.110", "91.121.140.167", "94.23.23.52", "94.23.247.226", "149.202.83.171"] + +class l3_distributedattackdetector(): + def __init__(self): + LOGGER.info("Creating Distributed Attack Detector") + + self.feature_ids = [] + + self.cad_features = {} + self.conn_id = () + + self.connections_dict = {} # Dict for storing ALL data + self.new_connections = {} # Dict for storing NEW data + + signal.signal(signal.SIGINT, self.handler) + + with grpc.insecure_channel(CENTRALIZED_ATTACK_DETECTOR) as channel: + self.cad = L3CentralizedattackdetectorStub(channel) + LOGGER.info("Connected to the centralized attack detector") + + LOGGER.info("Obtaining features...") + self.feature_ids = self.get_features_ids() + LOGGER.info("Features Ids.: {:s}".format(str(self.feature_ids))) + + asyncio.run(self.process_traffic()) + + + def handler(self): + if STOP: + exit() + + STOP = True + + LOGGER.info("Gracefully stopping...") + + def follow(self, thefile, time_sleep): + """ + Generator function that yields new lines in a file + It reads the logfie (the opened file) + """ + # seek the end of the file + # thefile.seek(0, os.SEEK_END) + + trozo = "" + + # start infinite loop + while True: + # read last line of file + line = thefile.readline() + + # sleep if file hasn't been updated + if not line: + time.sleep(time_sleep) + continue + if line[-1] != "\n": + trozo += line + else: + if trozo != "": + line = trozo + line + trozo = "" + yield line + + + def load_file(self, dirname=TSTAT_DIR_NAME): # - Client side - + while True: + here = os.path.dirname(os.path.abspath(__file__)) + tstat_piped = os.path.join(here, dirname) + tstat_dirs = os.listdir(tstat_piped) + if len(tstat_dirs) > 0: + tstat_dirs.sort() + new_dir = tstat_dirs[-1] + tstat_file = tstat_piped + new_dir + "/log_tcp_temp_complete" + LOGGER.info("Following: {:s}".format(str(tstat_file))) + return tstat_file + else: + LOGGER.info("No Tstat directory!") + time.sleep(5) + + + def process_line(self, line): + """ + - Preprocessing before a message per line + - Avoids crash when nan are found by generating a 0s array + - Returns a list of values + """ + line = line.split(" ") + + try: + values = [] + for feature_id in self.feature_ids: + feature_id = int(feature_id) + feature = feature_id - 1 + values.append(float(line[feature])) + except IndexError: + print("IndexError: {0}".format(line)) + + return values + + + def get_service_ids(self, context_id_str): + with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: + stub = ContextServiceStub(channel) + context_id = ContextId() + context_id.context_uuid.uuid = context_id_str + return stub.ListServiceIds(context_id) + + + def get_services(self, context_id_str): + with grpc.insecure_channel(CONTEXT_CHANNEL) as channel: + stub = ContextServiceStub(channel) + context_id = ContextId() + context_id.context_uuid.uuid = context_id_str + return stub.ListServices(context_id) + + + def get_service_id(self, context_id): + service_id_list = self.get_service_ids(context_id) + service_id = None + for s_id in service_id_list.service_ids: + if ( + s_id.service_uuid.uuid == "0eaa0752-c7b6-4c2e-97da-317fbfee5112" + ): # TODO: Change this identifier to the L3VPN service identifier with the real router for the demo v2 + service_id = s_id + break + + return service_id + + + def get_service_id2(self, context_id): + service_list = self.get_services(context_id) + service_id = None + for s in service_list.services: + if s.service_type == ServiceTypeEnum.SERVICETYPE_L3NM: + service_id = s.service_id + break + else: + pass + return service_id + + + def get_endpoint_id(self, context_id): + service_list = self.get_services(context_id) + endpoint_id = None + for s in service_list.services: + if s.service_type == ServiceTypeEnum.SERVICETYPE_L3NM: + endpoint_id = s.service_endpoint_ids[0] + break + return endpoint_id + + + def get_features_ids(self): + return self.cad.GetFeaturesIds(Empty()).auto_features + + + def check_types(self): + for feature in self.cad_features["features"]: + assert isinstance(feature, float) + + assert isinstance(self.cad_features["connection_metadata"]["ip_o"], str) + assert isinstance(self.cad_features["connection_metadata"]["port_o"], str) + assert isinstance(self.cad_features["connection_metadata"]["ip_d"], str) + assert isinstance(self.cad_features["connection_metadata"]["port_d"], str) + assert isinstance(self.cad_features["connection_metadata"]["flow_id"], str) + assert isinstance(self.cad_features["connection_metadata"]["protocol"], str) + assert isinstance(self.cad_features["connection_metadata"]["time_start"], float) + assert isinstance(self.cad_features["connection_metadata"]["time_end"], float) + + + def insert_connection(self): + try: + self.connections_dict[self.conn_id]["time_end"] = time.time() + except KeyError: + self.connections_dict[self.conn_id] = JSON_BLANK.copy() + self.connections_dict[self.conn_id]["time_start"] = time.time() + self.connections_dict[self.conn_id]["time_end"] = time.time() + self.connections_dict[self.conn_id]["ip_o"] = self.conn_id[0] + self.connections_dict[self.conn_id]["port_o"] = self.conn_id[1] + self.connections_dict[self.conn_id]["flow_id"] = ":".join(self.conn_id) + self.connections_dict[self.conn_id]["service_id"] = self.get_service_id2(CONTEXT_ID) + self.connections_dict[self.conn_id]["endpoint_id"] = self.get_endpoint_id(CONTEXT_ID) + self.connections_dict[self.conn_id]["protocol"] = "TCP" + self.connections_dict[self.conn_id]["ip_d"] = self.conn_id[2] + self.connections_dict[self.conn_id]["port_d"] = self.conn_id[3] + + + def check_if_connection_is_attack(self): + if self.conn_id[0] in ATTACK_IPS or self.conn_id[2] in ATTACK_IPS: + LOGGER.info("Attack detected. Origin: {0}, destination: {1}".format(self.conn_id[0], self.conn_id[2])) + + + def create_cad_features(self): + self.cad_features = { + "features": self.new_connections[self.conn_id][0:10], + "connection_metadata": { + "ip_o": self.connections_dict[self.conn_id]["ip_o"], + "port_o": self.connections_dict[self.conn_id]["port_o"], + "ip_d": self.connections_dict[self.conn_id]["ip_d"], + "port_d": self.connections_dict[self.conn_id]["port_d"], + "flow_id": self.connections_dict[self.conn_id]["flow_id"], + "service_id": self.connections_dict[self.conn_id]["service_id"], + "endpoint_id": self.connections_dict[self.conn_id]["endpoint_id"], + "protocol": self.connections_dict[self.conn_id]["protocol"], + "time_start": self.connections_dict[self.conn_id]["time_start"], + "time_end": self.connections_dict[self.conn_id]["time_end"], + }, + } + + + async def send_batch_async(self, metrics_list_pb): + loop = asyncio.get_running_loop() + + # Create metrics batch + metrics_batch = L3CentralizedattackdetectorBatchInput() + metrics_batch.metrics.extend(metrics_list_pb) + + # Send batch + future = loop.run_in_executor( + None, self.cad.AnalyzeBatchConnectionStatistics, metrics_batch + ) + + try: + await future + except Exception as e: + LOGGER.error(f"Error sending batch: {e}") + + + async def send_data(self, metrics_list_pb, send_data_times): + # Send to CAD + if SEND_DATA_IN_BATCHES: + if len(metrics_list_pb) == BATCH_SIZE: + send_data_time_start = time.time() + await self.send_batch_async(metrics_list_pb) + metrics_list_pb = [] + + send_data_time_end = time.time() + send_data_time = send_data_time_end - send_data_time_start + send_data_times = np.append(send_data_times, send_data_time) + + else: + send_data_time_start = time.time() + self.cad.AnalyzeConnectionStatistics(metrics_list_pb[-1]) + + send_data_time_end = time.time() + send_data_time = send_data_time_end - send_data_time_start + send_data_times = np.append(send_data_times, send_data_time) + + return metrics_list_pb, send_data_times + + + async def process_traffic(self): + LOGGER.info("Loading Tstat log file...") + logfile = open(self.load_file(), "r") + + LOGGER.info("Following Tstat log file...") + loglines = self.follow(logfile, 5) + + process_time = [] + num_lines = 0 + + send_data_times = np.array([]) + metrics_list_pb = [] + + LOGGER.info("Starting to process data...") + + index = 0 + while True: + line = next(loglines, None) + + while line is None: + LOGGER.info("Waiting for new data...") + time.sleep(1 / 100) + line = next(loglines, None) + if index == 0 and IGNORE_FIRST_LINE_TSTAT: + index = index + 1 + continue + if STOP: + break + + num_lines += 1 + start = time.time() + line_id = line.split(" ") + self.conn_id = (line_id[0], line_id[1], line_id[14], line_id[15]) + self.new_connections[self.conn_id] = self.process_line(line) + + self.check_if_connection_is_attack() + + self.insert_connection() + + self.create_cad_features() + + self.check_types() + + connection_metadata = ConnectionMetadata(**self.cad_features["connection_metadata"]) + metrics = L3CentralizedattackdetectorMetrics() + + for feature in self.cad_features["features"]: + feature_obj = Feature() + feature_obj.feature = feature + metrics.features.append(feature_obj) + + metrics.connection_metadata.CopyFrom(connection_metadata) + metrics_list_pb.append(metrics) + + metrics_list_pb, send_data_times = await self.send_data(metrics_list_pb, send_data_times) + + index = index + 1 + + process_time.append(time.time() - start) + + if num_lines % 10 == 0: + LOGGER.info(f"Number of lines: {num_lines} - Average processing time: {sum(process_time) / num_lines}") \ No newline at end of file diff --git a/src/l3_distributedattackdetector/service/tstat b/src/l3_distributedattackdetector/service/tstat deleted file mode 100644 index 06c7fb082e12c8392b71d0ec2f7d74827d30e4a3..0000000000000000000000000000000000000000 Binary files a/src/l3_distributedattackdetector/service/tstat and /dev/null differ diff --git a/src/tests/scenario3/l3/deploy_specs.sh b/src/tests/scenario3/l3/deploy_specs.sh index c3c9122b8594908c9d9f7d9a56daa4f8d0d8cf52..8c8264fca75d471c3bbbf0cb523c7a17bcffa5a0 100644 --- a/src/tests/scenario3/l3/deploy_specs.sh +++ b/src/tests/scenario3/l3/deploy_specs.sh @@ -20,7 +20,23 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. -export TFS_COMPONENTS="context device pathcomp service slice compute webui load_generator monitoring automation l3_attackmitigator l3_centralizedattackdetector" +#export TFS_COMPONENTS="context device pathcomp service slice compute webui load_generator" +export TFS_COMPONENTS="context device pathcomp service slice webui" + +# Uncomment to activate Monitoring +export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" + +# Uncomment to activate Automation and Policy Manager +#export TFS_COMPONENTS="${TFS_COMPONENTS} automation policy" + +# Uncomment to activate Optical CyberSecurity +#export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager" + +# Uncomment to activate L3 CyberSecurity +export TFS_COMPONENTS="${TFS_COMPONENTS} l3_attackmitigator l3_centralizedattackdetector" + +# Uncomment to activate TE +#export TFS_COMPONENTS="${TFS_COMPONENTS} te" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" @@ -29,7 +45,13 @@ export TFS_IMAGE_TAG="dev" export TFS_K8S_NAMESPACE="tfs" # Set additional manifest files to be applied after the deployment -export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml" +export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml" + +# Uncomment to monitor performance of components +export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/servicemonitors.yaml" + +# Uncomment when deploying Optical CyberSecurity +#export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml" # Set the new Grafana admin password export TFS_GRAFANA_PASSWORD="admin123+" @@ -66,7 +88,7 @@ export CRDB_DEPLOY_MODE="single" export CRDB_DROP_DATABASE_IF_EXISTS="YES" # Disable flag for re-deploying CockroachDB from scratch. -export CRDB_REDEPLOY="YES" +export CRDB_REDEPLOY="" # ----- NATS ------------------------------------------------------------------- @@ -81,7 +103,7 @@ export NATS_EXT_PORT_CLIENT="4222" export NATS_EXT_PORT_HTTP="8222" # Disable flag for re-deploying NATS from scratch. -export NATS_REDEPLOY="YES" +export NATS_REDEPLOY="" # ----- QuestDB ---------------------------------------------------------------- @@ -114,7 +136,7 @@ export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups" export QDB_DROP_TABLES_IF_EXIST="YES" # Disable flag for re-deploying QuestDB from scratch. -export QDB_REDEPLOY="YES" +export QDB_REDEPLOY="" # ----- K8s Observability ------------------------------------------------------