diff --git a/my_deploy.sh b/my_deploy.sh index 2153da8da78082a122fbd62cc85b4d93033da8da..f2cd3cc1188539dad57bbed13ce939d15ebbea09 100644 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -7,7 +7,7 @@ export TFS_REGISTRY_IMAGE="http://localhost:32000/tfs/" # interdomain slice pathcomp dlt # dbscanserving opticalattackmitigator opticalcentralizedattackdetector # l3_attackmitigator l3_centralizedattackdetector l3_distributedattackdetector -export TFS_COMPONENTS="context device automation service compute monitoring webui dbscanserving opticalattackmitigator opticalattackmanager opticalattackdetector" +export TFS_COMPONENTS="context device automation service compute monitoring webui dbscanserving opticalattackmitigator" # opticalattackmanager opticalattackdetector # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" diff --git a/proto/generate_code_python.sh b/proto/generate_code_python.sh index f28dbe4fde13c56f20a454049ab220a21f63a663..caf73fb5c09d8ce2f3d57585eeaad9334ac83f6a 100755 --- a/proto/generate_code_python.sh +++ b/proto/generate_code_python.sh @@ -19,6 +19,9 @@ cd $(dirname $0) mkdir -p src/python rm -rf src/python/*.py +mkdir -p src/python/asyncio +rm -rf src/python/asyncio/*.py + tee src/python/__init__.py << EOF > /dev/null # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # @@ -35,6 +38,22 @@ tee src/python/__init__.py << EOF > /dev/null # limitations under the License. EOF +tee src/python/asyncio/__init__.py << EOF > /dev/null +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +EOF + # Generate Python code python3 -m grpc_tools.protoc -I=./ --python_out=src/python/ --grpc_python_out=src/python/ *.proto diff --git a/proto/optical_attack_detector.proto b/proto/optical_attack_detector.proto index 4c5fcaa7676c4374df73630ffae1ab693bb589d4..6d363f99872a1f5d54d90fbaa28f43d281ae08dc 100644 --- a/proto/optical_attack_detector.proto +++ b/proto/optical_attack_detector.proto @@ -27,6 +27,6 @@ service OpticalAttackDetectorService { } message DetectionRequest { - KpiId kpi_id = 1; - context.ServiceId service_id = 7; + context.ServiceId service_id = 1; + monitoring.KpiId kpi_id = 2; } diff --git a/src/dbscanserving/client/DbscanServingClient.py b/src/dbscanserving/client/DbscanServingClient.py index 5069cab0b02bf8e27f21f32368db507da6af5908..7794115c57f8d1e8d25453c5846de2890d4e588d 100644 --- a/src/dbscanserving/client/DbscanServingClient.py +++ b/src/dbscanserving/client/DbscanServingClient.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from email.policy import default +from typing import Counter import grpc from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse @@ -63,10 +63,11 @@ class DbscanServingClient: request.num_samples, request.num_features ) ) - response = self.stub.Detect(request) + response: DetectionResponse = self.stub.Detect(request) LOGGER.debug( - "Detect result with {} cluster indices".format( - len(response.cluster_indices) + "Detect result with {} cluster indices [{}]".format( + len(response.cluster_indices), + Counter(response.cluster_indices) ) ) return response diff --git a/src/dbscanserving/service/DbscanServiceServicerImpl.py b/src/dbscanserving/service/DbscanServiceServicerImpl.py index d9035fc8b5c06279b52beb562a5c053ba0b05106..d13c4b8e6e3e847a45c842a4e18794675beb06f2 100644 --- a/src/dbscanserving/service/DbscanServiceServicerImpl.py +++ b/src/dbscanserving/service/DbscanServiceServicerImpl.py @@ -13,7 +13,6 @@ # limitations under the License. import logging -import os import grpc from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 53430780e843526cdad2ddbfb030f75287d93154..00f8b5aa234451e6f610a59d2c319cab8888c88e 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -19,6 +19,7 @@ class ManagementDB(): self.client = sl.connect(database, check_same_thread=False) self.create_monitoring_table() self.create_subscription_table() + self.create_alarm_table() def create_monitoring_table(self): self.client.execute(""" @@ -45,9 +46,24 @@ class ManagementDB(): ); """) + def create_alarm_table(self): + self.client.execute(""" + CREATE TABLE IF NOT EXISTS alarm( + alarm_id INTEGER PRIMARY KEY AUTOINCREMENT, + alarm_description TEXT, + alarm_name TEXT, + kpi_id INTEGER, + kpi_min_value REAL, + kpi_max_value REAL, + in_range INTEGER, + include_min_value INTEGER, + include_max_value INTEGER + ); + """) + def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id): c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ?",(device_id,kpi_sample_type,endpoint_id)) + c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id)) data=c.fetchone() if data is None: c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id)) @@ -68,14 +84,26 @@ class ManagementDB(): print("already exists") return data[0] - def delete_KPI(self,device_id,kpi_sample_type): + def insert_alarm(self,alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value): + c = self.client.cursor() + c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value)) + self.client.commit() + return c.lastrowid + else: + print("already exists") + return data[0] + + def delete_KPI(self,kpi_id): c = self.client.cursor() - c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) + c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) data=c.fetchone() if data is None: return False else: - c.execute("DELETE FROM kpi WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type)) + c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) self.client.commit() return True @@ -90,14 +118,14 @@ class ManagementDB(): self.client.commit() return True - def delete_kpid_id(self,kpi_id): + def delete_alarm(self,alarm_id): c = self.client.cursor() - c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,)) + c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) data=c.fetchone() if data is None: return False else: - c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,)) + c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,)) self.client.commit() return True @@ -108,17 +136,19 @@ class ManagementDB(): def get_subscription(self,subs_id): data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,)) return data.fetchone() + + def get_alarm(self,alarm_id): + data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,)) + return data.fetchone() def get_KPIS(self): data = self.client.execute("SELECT * FROM kpi") - #print("\n") - #for row in data: - # print(row) return data.fetchall() def get_subscriptions(self): data = self.client.execute("SELECT * FROM subscription") - #print("\n") - #for row in data: - # print(row) + return data.fetchall() + + def get_alarms(self): + data = self.client.execute("SELECT * FROM alarm") return data.fetchall() \ No newline at end of file diff --git a/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py b/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py index fd2ac85c38a9c9988f50f4a64579d9f83e85aabc..c19b5f0304d41f994738988b63fcf0bc757b8903 100644 --- a/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py +++ b/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py @@ -16,13 +16,14 @@ import logging import random import grpc -from common.proto.context_pb2 import Empty, ServiceId -from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse, Sample +from common.proto.context_pb2 import Empty +from common.proto import dbscanserving_pb2 as dbscan from common.proto.monitoring_pb2 import Kpi from common.proto.optical_attack_detector_pb2_grpc import ( OpticalAttackDetectorServiceServicer, ) from common.proto.optical_attack_mitigator_pb2 import AttackDescription, AttackResponse +from common.proto import optical_attack_detector_pb2 as oad from common.rpc_method_wrapper.Decorator import ( create_metrics, safe_and_metered_rpc_method, @@ -56,35 +57,30 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi @safe_and_metered_rpc_method(METRICS, LOGGER) def DetectAttack( - self, service_id: ServiceId, context: grpc.ServicerContext + self, request: oad.DetectionRequest, context: grpc.ServicerContext ) -> Empty: - LOGGER.debug( - "Received request for {}/{}...".format( - service_id.context_id.context_uuid.uuid, service_id.service_uuid.uuid - ) - ) # run attack detection for every service - request: DetectionRequest = DetectionRequest() - request.num_samples = 310 - request.num_features = 100 - request.eps = 100.5 - request.min_samples = 5 + detection_request: dbscan.DetectionRequest = dbscan.DetectionRequest() + detection_request.num_samples = 310 + detection_request.num_features = 100 + detection_request.eps = 100.5 + detection_request.min_samples = 5 for _ in range(200): - grpc_sample = Sample() + grpc_sample = dbscan.Sample() for __ in range(100): grpc_sample.features.append(random.uniform(0.0, 10.0)) - request.samples.append(grpc_sample) + detection_request.samples.append(grpc_sample) for _ in range(100): - grpc_sample = Sample() + grpc_sample = dbscan.Sample() for __ in range(100): grpc_sample.features.append(random.uniform(50.0, 60.0)) - request.samples.append(grpc_sample) + detection_request.samples.append(grpc_sample) for _ in range(10): - grpc_sample = Sample() + grpc_sample = dbscan.Sample() for __ in range(100): grpc_sample.features.append(random.uniform(5000.0, 6000.0)) - request.samples.append(grpc_sample) - response: DetectionResponse = dbscanserving_client.Detect(request) + detection_request.samples.append(grpc_sample) + response: dbscan.DetectionResponse = dbscanserving_client.Detect(detection_request) # including KPI # TODO: set kpi_id and kpi_value according to the service @@ -96,7 +92,7 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi if -1 in response.cluster_indices: # attack detected attack = AttackDescription() - attack.cs_id.uuid = service_id.service_uuid.uuid + attack.cs_id.uuid = request.service_id.service_uuid.uuid response: AttackResponse = attack_mitigator_client.NotifyAttack(attack) # if attack is detected, run the attack mitigator diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py index e07b857915329eef6a5ec5b2236df9d9abf4cb02..e6269130ea308ec9862ed91e9e05223bd633b0d4 100644 --- a/src/opticalattackmanager/service/__main__.py +++ b/src/opticalattackmanager/service/__main__.py @@ -8,23 +8,13 @@ import time from multiprocessing import Manager, Process from typing import List -from grpclib.client import Channel - from common.Constants import ServiceNameEnum -from common.proto.context_pb2 import ( - ContextIdList, - Empty, - EventTypeEnum, - ServiceIdList, -) +from common.proto.asyncio.optical_attack_detector_grpc import OpticalAttackDetectorServiceStub +from common.proto.asyncio.optical_attack_detector_pb2 import DetectionRequest +from common.proto.context_pb2 import ContextIdList, Empty, EventTypeEnum, ServiceIdList +from common.tools.grpc.Tools import grpc_message_to_json_string from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import KpiDescriptor -from common.proto.asyncio.optical_attack_detector_grpc import ( - OpticalAttackDetectorServiceStub, -) -from common.proto.asyncio.context_pb2 import ( - ServiceId, -) from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, @@ -35,6 +25,7 @@ from common.Settings import ( wait_for_environment_variables, ) from context.client.ContextClient import ContextClient +from grpclib.client import Channel from monitoring.client.MonitoringClient import MonitoringClient from opticalattackmanager.Config import MONITORING_INTERVAL from opticalattackmanager.utils.EventsCollector import EventsCollector @@ -43,44 +34,41 @@ from prometheus_client import start_http_server terminate = threading.Event() LOGGER = None -# For more channel options, please see: -# https://grpc.io/grpc/core/group__grpc__arg__keys.html -# CHANNEL_OPTIONS = [ -# ("grpc.lb_policy_name", "pick_first"), -# ("grpc.enable_retries", True), -# ("grpc.keepalive_timeout_ms", 10000), -# ] -# TODO: configure retries - def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning("Terminate signal received") terminate.set() -async def detect_attack(host: str, port: int, context_id: str, service_id: str) -> None: +async def detect_attack( + host: str, + port: int, + context_id: str, + service_id: str, + kpi_id: str, + timeout: float = 10.0, +) -> None: try: LOGGER.info("Sending request for {}...".format(service_id)) async with Channel(host, port) as channel: - # async with grpc.aio.insecure_channel( - # target=endpoint, options=CHANNEL_OPTIONS - # ) as channel: stub = OpticalAttackDetectorServiceStub(channel) - service = ServiceId() - service.context_id.context_uuid.uuid = context_id - service.service_uuid.uuid = str(service_id) - # Timeout in seconds. - # Please refer gRPC Python documents for more detail. - # https://grpc.io/grpc/python/grpc.html - await stub.DetectAttack(service, timeout=10) + request: DetectionRequest = DetectionRequest() + request.service_id.context_id.context_uuid.uuid = context_id + request.service_id.service_uuid.uuid = str(service_id) + + request.kpi_id.kpi_id.uuid = kpi_id + + await stub.DetectAttack(request, timeout=timeout) LOGGER.info("Monitoring finished for {}".format(service_id)) except Exception as e: LOGGER.warning("Exception while processing service_id {}".format(service_id)) LOGGER.exception(e) + # TODO: create prometheus metric to be increased here that counts the number + # of dropped detections -async def monitor_services(service_list: List[ServiceId]): +async def monitor_services(service_list: List): monitoring_interval = int( get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL) @@ -104,7 +92,15 @@ async def monitor_services(service_list: List[ServiceId]): tasks = [] for service in service_list: - aw = detect_attack(host, port, service["context"], service["service"]) + aw = detect_attack( + host, + port, + service["context"], + service["service"], + service["kpi"], + # allow at most 90% of the monitoring interval to succeed + monitoring_interval * 0.9, + ) tasks.append(aw) [await aw for aw in tasks] @@ -139,11 +135,11 @@ def create_kpi(client: MonitoringClient, service_id): kpi_description.service_id.service_uuid.uuid = service_id kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN new_kpi = client.SetKpi(kpi_description) - LOGGER.info("Created KPI {}...".format(new_kpi.kpi_id)) + LOGGER.info("Created KPI {}: ".format(grpc_message_to_json_string(new_kpi))) return new_kpi -def get_context_updates(service_list: List[ServiceId]): +def get_context_updates(service_list: List): # to make sure we are thread safe... LOGGER.info("Connecting with context and monitoring components...") context_client: ContextClient = ContextClient() @@ -161,9 +157,9 @@ def get_context_updates(service_list: List[ServiceId]): if event is None: LOGGER.info("No event received") continue # no event received - LOGGER.info("Event received: {}".format(event)) + LOGGER.info("Event received: {}".format(grpc_message_to_json_string(event))) if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: - LOGGER.info("Service created: {}".format(event.service_id)) + LOGGER.info("Service created: {}".format(grpc_message_to_json_string(event.service_id))) kpi_id = create_kpi(monitoring_client, event.service_id.service_uuid.uuid) service_list.append( { @@ -174,7 +170,7 @@ def get_context_updates(service_list: List[ServiceId]): ) elif event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE: - LOGGER.info("Service removed: {}".format(event.service_id)) + LOGGER.info("Service removed: {}".format(grpc_message_to_json_string(event.service_id))) # find service and remove it from the list of currently monitored for service in service_list: if ( @@ -195,11 +191,11 @@ def main(): logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) + logging.getLogger("hpack").setLevel(logging.CRITICAL) + wait_for_environment_variables( [ - get_env_var_name( - ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST - ), + get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST), get_env_var_name( ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC ), @@ -208,12 +204,8 @@ def main(): wait_for_environment_variables( [ - get_env_var_name( - ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST - ), - get_env_var_name( - ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC - ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ] ) @@ -245,9 +237,13 @@ def main(): # creating a thread-safe list to be shared among threads service_list = Manager().list() kpi_id = create_kpi(monitoring_client, "1213") - service_list.append({"context": "admin", "service": "1213", "kpi": kpi_id.kpi_id.uuid}) + service_list.append( + {"context": "admin", "service": "1213", "kpi": kpi_id.kpi_id.uuid} + ) kpi_id = create_kpi(monitoring_client, "1456") - service_list.append({"context": "admin", "service": "1456", "kpi": kpi_id.kpi_id.uuid}) + service_list.append( + {"context": "admin", "service": "1456", "kpi": kpi_id.kpi_id.uuid} + ) context_ids: ContextIdList = context_client.ListContextIds(Empty()) @@ -255,6 +251,8 @@ def main(): for context_id in context_ids.context_ids: context_services: ServiceIdList = context_client.ListServiceIds(context_id) for service in context_services.service_ids: + # in case of a service restart, monitoring component will not duplicate KPIs + # but rather return the existing KPI if that's the case kpi_id = create_kpi(monitoring_client, service.service_uuid.uuid) service_list.append( {