# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import time from typing import List from common.proto.asyncio.optical_attack_detector_grpc import ( OpticalAttackDetectorServiceStub, ) from common.proto.asyncio.optical_attack_detector_pb2 import DetectionRequest from common.Settings import get_log_level, get_setting from grpclib.client import Channel from opticalattackmanager.Config import MONITORING_INTERVAL from prometheus_client import Counter, Histogram # Create a metric to track time spent and requests made. # TODO: adjust histogram buckets to more realistic values LOOP_TIME = Histogram( "optical_security_loop_seconds", "Time taken by each security loop" ) DROP_COUNTER = Counter( "optical_security_dropped_assessments", "Dropped assessments due to detector timeout", ) log_level = get_log_level() logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) async def detect_attack( host: str, port: int, context_id: str, service_id: str, kpi_id: str, timeout: float = 10.0, ) -> None: try: LOGGER.info("Sending request for {}...".format(service_id)) async with Channel(host, port) as channel: stub = OpticalAttackDetectorServiceStub(channel) request: DetectionRequest = DetectionRequest() request.service_id.context_id.context_uuid.uuid = context_id request.service_id.service_uuid.uuid = str(service_id) request.kpi_id.kpi_id.uuid = kpi_id await stub.DetectAttack(request, timeout=timeout) LOGGER.info("Monitoring finished for {}".format(service_id)) except Exception as e: LOGGER.warning("Exception while processing service_id {}".format(service_id)) LOGGER.exception(e) DROP_COUNTER.inc() async def monitor_services(terminate, service_list: List): monitoring_interval = int( get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL) ) host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST") port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC")) LOGGER.info("Starting execution of the async loop") while not terminate.is_set(): if len(service_list) == 0: LOGGER.debug("No services to monitor...") time.sleep(monitoring_interval) continue LOGGER.info("Starting new monitoring cycle...") start_time = time.time() tasks = [] for service in service_list: aw = detect_attack( host, port, service["context"], service["service"], service["kpi"], # allow at most 90% of the monitoring interval to succeed monitoring_interval * 0.9, ) tasks.append(aw) [await aw for aw in tasks] end_time = time.time() time_taken = end_time - start_time LOOP_TIME.observe(time_taken) LOGGER.info( "Monitoring loop with {} services took {:.3f} seconds ({:.2f}%)... " "Waiting for {:.2f} seconds...".format( len(service_list), time_taken, (time_taken / monitoring_interval) * 100, monitoring_interval - time_taken, ) ) if time_taken / monitoring_interval > 0.9: LOGGER.warning( "Monitoring loop is taking {} % of the desired time " "({} seconds)".format( (time_taken / monitoring_interval) * 100, monitoring_interval ) ) if monitoring_interval - time_taken > 0: time.sleep(monitoring_interval - time_taken)