diff --git a/src/opticalattackmanager/__init__.py b/src/opticalattackmanager/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/opticalattackmanager/__init__.py +++ b/src/opticalattackmanager/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalattackmanager/service/__init__.py b/src/opticalattackmanager/service/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/opticalattackmanager/service/__init__.py +++ b/src/opticalattackmanager/service/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py index 3c907ecc2898f7b800a557db80d37c74dc2a4972..c849a206bd120d4a0ffd9d7f1e82439101c9a3fd 100644 --- a/src/opticalattackmanager/service/__main__.py +++ b/src/opticalattackmanager/service/__main__.py @@ -1,3 +1,18 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + import asyncio import logging import signal @@ -28,112 +43,19 @@ from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient from grpclib.client import Channel from monitoring.client.MonitoringClient import MonitoringClient -from opticalattackmanager.Config import MONITORING_INTERVAL from opticalattackmanager.utils.EventsCollector import EventsCollector -from prometheus_client import start_http_server, Histogram, Counter +from opticalattackmanager.utils.monitor import monitor_services +from prometheus_client import Counter, Histogram, start_http_server terminate = threading.Event() LOGGER = None -# Create a metric to track time spent and requests made. -# TODO: adjust histogram buckets to more realistic values -LOOP_TIME = Histogram('optical_security_loop_seconds', 'Time taken by each security loop') -DROP_COUNTER = Counter('optical_security_dropped_assessments', 'Dropped assessments due to detector timeout') - def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning("Terminate signal received") terminate.set() -async def detect_attack( - host: str, - port: int, - context_id: str, - service_id: str, - kpi_id: str, - timeout: float = 10.0, -) -> None: - try: - LOGGER.info("Sending request for {}...".format(service_id)) - async with Channel(host, port) as channel: - stub = OpticalAttackDetectorServiceStub(channel) - - request: DetectionRequest = DetectionRequest() - request.service_id.context_id.context_uuid.uuid = context_id - request.service_id.service_uuid.uuid = str(service_id) - - request.kpi_id.kpi_id.uuid = kpi_id - - await stub.DetectAttack(request, timeout=timeout) - LOGGER.info("Monitoring finished for {}".format(service_id)) - except Exception as e: - LOGGER.warning("Exception while processing service_id {}".format(service_id)) - LOGGER.exception(e) - DROP_COUNTER.inc() - - -async def monitor_services(service_list: List): - - monitoring_interval = int( - get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL) - ) - - host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST") - port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC")) - - LOGGER.info("Starting execution of the async loop") - - while not terminate.is_set(): - - if len(service_list) == 0: - LOGGER.debug("No services to monitor...") - time.sleep(monitoring_interval) - continue - - LOGGER.info("Starting new monitoring cycle...") - - start_time = time.time() - - tasks = [] - for service in service_list: - aw = detect_attack( - host, - port, - service["context"], - service["service"], - service["kpi"], - # allow at most 90% of the monitoring interval to succeed - monitoring_interval * 0.9, - ) - tasks.append(aw) - [await aw for aw in tasks] - - end_time = time.time() - - time_taken = end_time - start_time - LOOP_TIME.observe(time_taken) - LOGGER.info( - "Monitoring loop with {} services took {:.3f} seconds ({:.2f}%)... " - "Waiting for {:.2f} seconds...".format( - len(service_list), - time_taken, - (time_taken / monitoring_interval) * 100, - monitoring_interval - time_taken, - ) - ) - - if time_taken / monitoring_interval > 0.9: - LOGGER.warning( - "Monitoring loop is taking {} % of the desired time " - "({} seconds)".format( - (time_taken / monitoring_interval) * 100, monitoring_interval - ) - ) - if monitoring_interval - time_taken > 0: - time.sleep(monitoring_interval - time_taken) - - def create_kpi(client: MonitoringClient, service_id): # create kpi kpi_description: KpiDescriptor = KpiDescriptor() @@ -288,7 +210,7 @@ def main(): # runs the async loop in the background loop = asyncio.get_event_loop() - loop.run_until_complete(monitor_services(service_list)) + loop.run_until_complete(monitor_services(terminate, service_list)) # asyncio.create_task(monitor_services(service_list)) # Wait for Ctrl+C or termination signal diff --git a/src/opticalattackmanager/utils/__init__.py b/src/opticalattackmanager/utils/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/opticalattackmanager/utils/__init__.py +++ b/src/opticalattackmanager/utils/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalattackmanager/utils/monitor.py b/src/opticalattackmanager/utils/monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..6f790725089c6722ab44eb8880678bfdd9d803f9 --- /dev/null +++ b/src/opticalattackmanager/utils/monitor.py @@ -0,0 +1,127 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import List + +from common.proto.asyncio.optical_attack_detector_grpc import ( + OpticalAttackDetectorServiceStub, +) +from common.proto.asyncio.optical_attack_detector_pb2 import DetectionRequest +from common.Settings import get_log_level, get_setting +from grpclib.client import Channel +from opticalattackmanager.Config import MONITORING_INTERVAL +from prometheus_client import Counter, Histogram + +# Create a metric to track time spent and requests made. +# TODO: adjust histogram buckets to more realistic values +LOOP_TIME = Histogram( + "optical_security_loop_seconds", "Time taken by each security loop" +) +DROP_COUNTER = Counter( + "optical_security_dropped_assessments", + "Dropped assessments due to detector timeout", +) +log_level = get_log_level() +logging.basicConfig(level=log_level) +LOGGER = logging.getLogger(__name__) + + +async def detect_attack( + host: str, + port: int, + context_id: str, + service_id: str, + kpi_id: str, + timeout: float = 10.0, +) -> None: + try: + LOGGER.info("Sending request for {}...".format(service_id)) + async with Channel(host, port) as channel: + stub = OpticalAttackDetectorServiceStub(channel) + + request: DetectionRequest = DetectionRequest() + request.service_id.context_id.context_uuid.uuid = context_id + request.service_id.service_uuid.uuid = str(service_id) + + request.kpi_id.kpi_id.uuid = kpi_id + + await stub.DetectAttack(request, timeout=timeout) + LOGGER.info("Monitoring finished for {}".format(service_id)) + except Exception as e: + LOGGER.warning("Exception while processing service_id {}".format(service_id)) + LOGGER.exception(e) + DROP_COUNTER.inc() + + +async def monitor_services(terminate, service_list: List): + + monitoring_interval = int( + get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL) + ) + + host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST") + port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC")) + + LOGGER.info("Starting execution of the async loop") + + while not terminate.is_set(): + + if len(service_list) == 0: + LOGGER.debug("No services to monitor...") + time.sleep(monitoring_interval) + continue + + LOGGER.info("Starting new monitoring cycle...") + + start_time = time.time() + + tasks = [] + for service in service_list: + aw = detect_attack( + host, + port, + service["context"], + service["service"], + service["kpi"], + # allow at most 90% of the monitoring interval to succeed + monitoring_interval * 0.9, + ) + tasks.append(aw) + [await aw for aw in tasks] + + end_time = time.time() + + time_taken = end_time - start_time + LOOP_TIME.observe(time_taken) + LOGGER.info( + "Monitoring loop with {} services took {:.3f} seconds ({:.2f}%)... " + "Waiting for {:.2f} seconds...".format( + len(service_list), + time_taken, + (time_taken / monitoring_interval) * 100, + monitoring_interval - time_taken, + ) + ) + + if time_taken / monitoring_interval > 0.9: + LOGGER.warning( + "Monitoring loop is taking {} % of the desired time " + "({} seconds)".format( + (time_taken / monitoring_interval) * 100, monitoring_interval + ) + ) + if monitoring_interval - time_taken > 0: + time.sleep(monitoring_interval - time_taken)