From 1530ad32e64868414e169014cb3d109ed9380e12 Mon Sep 17 00:00:00 2001 From: Carlos Natalino <carlos.natalino@chalmers.se> Date: Fri, 12 May 2023 11:03:52 +0200 Subject: [PATCH] Addressing requests from review on the 10th of May. --- my_deploy.sh | 8 +- src/dbscanserving/Config.py | 11 -- .../client/DbscanServingClient.py | 14 +- src/dbscanserving/service/DbscanService.py | 75 +------- .../service/DbscanServiceServicerImpl.py | 5 +- src/dbscanserving/service/__main__.py | 17 +- src/dbscanserving/tests/test_unitary.py | 1 + src/opticalattackdetector/Config.py | 14 -- .../client/OpticalAttackDetectorClient.py | 16 +- src/opticalattackdetector/requirements.in | 16 +- .../service/OpticalAttackDetectorService.py | 78 ++------ ...pticalAttackDetectorServiceServicerImpl.py | 45 +++-- src/opticalattackdetector/service/__main__.py | 26 +-- .../tests/test_unitary.py | 177 +----------------- src/opticalattackmanager/Config.py | 9 - src/opticalattackmanager/requirements.in | 16 +- src/opticalattackmanager/service/__main__.py | 43 ++--- .../utils/EventsCollector.py | 84 --------- src/opticalattackmanager/utils/monitor.py | 1 - src/opticalattackmitigator/Config.py | 11 -- .../client/OpticalAttackMitigatorClient.py | 16 +- src/opticalattackmitigator/requirements.in | 13 ++ .../service/OpticalAttackMitigatorService.py | 78 ++------ ...ticalAttackMitigatorServiceServicerImpl.py | 6 +- .../service/__main__.py | 13 +- .../tests/test_unitary.py | 2 +- src/tests/scenario3/optical/deploy_specs.sh | 89 --------- src/tests/scenario3/optical/scaphandre.yaml | 3 + 28 files changed, 181 insertions(+), 706 deletions(-) delete mode 100644 src/opticalattackmanager/utils/EventsCollector.py diff --git a/my_deploy.sh b/my_deploy.sh index 2c3ed27ca..ee3244ac9 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -20,7 +20,7 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. -export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator" +export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator l3_attackmitigator l3_centralizedattackdetector" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" @@ -29,7 +29,7 @@ export TFS_IMAGE_TAG="dev" export TFS_K8S_NAMESPACE="tfs" # Set additional manifest files to be applied after the deployment -export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml manifests/cachingservice.yaml" +export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml" # Set the new Grafana admin password export TFS_GRAFANA_PASSWORD="admin123+" @@ -37,10 +37,6 @@ export TFS_GRAFANA_PASSWORD="admin123+" # Disable skip-build flag to rebuild the Docker images. export TFS_SKIP_BUILD="" -# addition for the optical cybersecurity component -# export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager" -# export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml" - # ----- CockroachDB ------------------------------------------------------------ diff --git a/src/dbscanserving/Config.py b/src/dbscanserving/Config.py index ba1fc4da1..38d04994f 100644 --- a/src/dbscanserving/Config.py +++ b/src/dbscanserving/Config.py @@ -11,14 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - -# General settings -LOG_LEVEL = logging.DEBUG - -# gRPC settings -GRPC_SERVICE_PORT = 10008 - -# Prometheus settings -METRICS_PORT = 9192 diff --git a/src/dbscanserving/client/DbscanServingClient.py b/src/dbscanserving/client/DbscanServingClient.py index 15081ab3a..d8a74948d 100644 --- a/src/dbscanserving/client/DbscanServingClient.py +++ b/src/dbscanserving/client/DbscanServingClient.py @@ -12,14 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import logging from typing import Counter +import grpc + from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import delay_exponential, retry from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse from common.proto.dbscanserving_pb2_grpc import DetectorStub +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import delay_exponential, retry LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -33,8 +35,10 @@ RETRY_DECORATOR = retry( class DbscanServingClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.DBSCANSERVING) - if not port: port = get_service_port_grpc(ServiceNameEnum.DBSCANSERVING) + if not host: + host = get_service_host(ServiceNameEnum.DBSCANSERVING) + if not port: + port = get_service_port_grpc(ServiceNameEnum.DBSCANSERVING) self.endpoint = "{:s}:{:s}".format(str(host), str(port)) LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) diff --git a/src/dbscanserving/service/DbscanService.py b/src/dbscanserving/service/DbscanService.py index 3511e4e5f..8ae152160 100644 --- a/src/dbscanserving/service/DbscanService.py +++ b/src/dbscanserving/service/DbscanService.py @@ -13,79 +13,24 @@ # limitations under the License. import logging -from concurrent import futures -import grpc -from grpc_health.v1.health import OVERALL_HEALTH, HealthServicer -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server - -from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS) +from common.Constants import ServiceNameEnum from common.proto.dbscanserving_pb2_grpc import add_DetectorServicer_to_server -from dbscanserving.Config import GRPC_SERVICE_PORT +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService + from dbscanserving.service.DbscanServiceServicerImpl import \ DbscanServiceServicerImpl -BIND_ADDRESS = "0.0.0.0" LOGGER = logging.getLogger(__name__) -class DbscanService: - def __init__( - self, - address=BIND_ADDRESS, - port=GRPC_SERVICE_PORT, - grace_period=DEFAULT_GRPC_GRACE_PERIOD, - max_workers=DEFAULT_GRPC_MAX_WORKERS, - ): - - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.dbscan_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = "{:s}:{:s}".format(str(self.address), str(self.port)) - LOGGER.debug( - "Starting Service (tentative endpoint: {:s}, max_workers: {:s})...".format( - str(self.endpoint), str(self.max_workers) - ) - ) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) +class DbscanService(GenericGrpcService): + def __init__(self, cls_name: str = __name__): + port = get_service_port_grpc(ServiceNameEnum.DBSCANSERVING) + super().__init__(port, cls_name=cls_name) self.dbscan_servicer = DbscanServiceServicerImpl() - add_DetectorServicer_to_server(self.dbscan_servicer, self.server) - - self.health_servicer = HealthServicer( - experimental_non_blocking=True, - experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1), - ) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = "{:s}:{:s}".format(str(self.address), str(port)) - LOGGER.info("Listening on {:s}...".format(self.endpoint)) - self.server.start() - self.health_servicer.set( - OVERALL_HEALTH, HealthCheckResponse.SERVING - ) # pylint: disable=maybe-no-member - LOGGER.debug("Service started") - - def stop(self): - LOGGER.debug( - "Stopping service (grace period {:s} seconds)...".format( - str(self.grace_period) - ) - ) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug("Service stopped") + def install_servicers(self): + add_DetectorServicer_to_server(self.dbscan_servicer, self.server) diff --git a/src/dbscanserving/service/DbscanServiceServicerImpl.py b/src/dbscanserving/service/DbscanServiceServicerImpl.py index 3eeb7aa8b..ac4f4c90b 100644 --- a/src/dbscanserving/service/DbscanServiceServicerImpl.py +++ b/src/dbscanserving/service/DbscanServiceServicerImpl.py @@ -17,13 +17,14 @@ import logging import grpc from sklearn.cluster import DBSCAN +from common.method_wrappers.Decorator import (MetricsPool, + safe_and_metered_rpc_method) from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse from common.proto.dbscanserving_pb2_grpc import DetectorServicer -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('DBSCANServing', 'RPC') +METRICS_POOL = MetricsPool("DBSCANServing", "RPC") class DbscanServiceServicerImpl(DetectorServicer): diff --git a/src/dbscanserving/service/__main__.py b/src/dbscanserving/service/__main__.py index 7a1a3be74..fc0f8346e 100644 --- a/src/dbscanserving/service/__main__.py +++ b/src/dbscanserving/service/__main__.py @@ -17,12 +17,9 @@ import signal import sys import threading +from common.Settings import get_log_level, get_metrics_port from prometheus_client import start_http_server -from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS) -from common.Settings import get_log_level, get_metrics_port, get_setting -from dbscanserving.Config import GRPC_SERVICE_PORT from dbscanserving.service.DbscanService import DbscanService terminate = threading.Event() @@ -41,12 +38,6 @@ def main(): logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) - service_port = get_setting( - "DBSCANSERVICE_SERVICE_PORT_GRPC", default=GRPC_SERVICE_PORT - ) - grace_period = get_setting("GRACE_PERIOD", default=DEFAULT_GRPC_GRACE_PERIOD) - max_workers = get_setting("MAX_WORKERS", default=DEFAULT_GRPC_MAX_WORKERS) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -57,13 +48,11 @@ def main(): start_http_server(metrics_port) # Starting CentralizedCybersecurity service - grpc_service = DbscanService( - port=service_port, max_workers=max_workers, grace_period=grace_period - ) + grpc_service = DbscanService() grpc_service.start() # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=0.1): + while not terminate.wait(timeout=1): pass LOGGER.info("Terminating...") diff --git a/src/dbscanserving/tests/test_unitary.py b/src/dbscanserving/tests/test_unitary.py index a0f92d13a..7349978e5 100644 --- a/src/dbscanserving/tests/test_unitary.py +++ b/src/dbscanserving/tests/test_unitary.py @@ -21,6 +21,7 @@ import pytest from common.proto.dbscanserving_pb2 import (DetectionRequest, DetectionResponse, Sample) + from dbscanserving.client.DbscanServingClient import DbscanServingClient from dbscanserving.Config import GRPC_SERVICE_PORT from dbscanserving.service.DbscanService import DbscanService diff --git a/src/opticalattackdetector/Config.py b/src/opticalattackdetector/Config.py index f5812acaf..38d04994f 100644 --- a/src/opticalattackdetector/Config.py +++ b/src/opticalattackdetector/Config.py @@ -11,17 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - -# General settings -LOG_LEVEL = logging.DEBUG - -# gRPC settings -GRPC_SERVICE_PORT = 10006 - -# service settings -MONITORING_INTERVAL = 10 # monitoring interval in seconds - -# Prometheus settings -METRICS_PORT = 9192 diff --git a/src/opticalattackdetector/client/OpticalAttackDetectorClient.py b/src/opticalattackdetector/client/OpticalAttackDetectorClient.py index 50f303b5d..3b95227ad 100644 --- a/src/opticalattackdetector/client/OpticalAttackDetectorClient.py +++ b/src/opticalattackdetector/client/OpticalAttackDetectorClient.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import logging + +import grpc from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import delay_exponential, retry -from common.tools.grpc.Tools import grpc_message_to_json from common.proto.context_pb2 import Empty from common.proto.optical_attack_detector_pb2_grpc import \ OpticalAttackDetectorServiceStub +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import delay_exponential, retry +from common.tools.grpc.Tools import grpc_message_to_json LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -34,8 +36,10 @@ RETRY_DECORATOR = retry( class OpticalAttackDetectorClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.OPTICALATTACKDETECTOR) - if not port: port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKDETECTOR) + if not host: + host = get_service_host(ServiceNameEnum.OPTICALATTACKDETECTOR) + if not port: + port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKDETECTOR) self.endpoint = "{:s}:{:s}".format(str(host), str(port)) LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) self.channel = None diff --git a/src/opticalattackdetector/requirements.in b/src/opticalattackdetector/requirements.in index 7c2cf4fa6..b69d4dd78 100644 --- a/src/opticalattackdetector/requirements.in +++ b/src/opticalattackdetector/requirements.in @@ -1,2 +1,16 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + numpy -redis \ No newline at end of file +redis diff --git a/src/opticalattackdetector/service/OpticalAttackDetectorService.py b/src/opticalattackdetector/service/OpticalAttackDetectorService.py index ad91d7109..2128d79b4 100644 --- a/src/opticalattackdetector/service/OpticalAttackDetectorService.py +++ b/src/opticalattackdetector/service/OpticalAttackDetectorService.py @@ -13,82 +13,26 @@ # limitations under the License. import logging -from concurrent import futures -import grpc -from grpc_health.v1.health import OVERALL_HEALTH, HealthServicer -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server - -from common.Constants import (DEFAULT_GRPC_BIND_ADDRESS, - DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS) +from common.Constants import ServiceNameEnum from common.proto.optical_attack_detector_pb2_grpc import \ add_OpticalAttackDetectorServiceServicer_to_server -from opticalattackdetector.Config import GRPC_SERVICE_PORT +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService + from opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl import \ OpticalAttackDetectorServiceServicerImpl LOGGER = logging.getLogger(__name__) -class OpticalAttackDetectorService: - def __init__( - self, - address=DEFAULT_GRPC_BIND_ADDRESS, - port=GRPC_SERVICE_PORT, - max_workers=DEFAULT_GRPC_MAX_WORKERS, - grace_period=DEFAULT_GRPC_GRACE_PERIOD, - ): - - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.attack_detector_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = "{:s}:{:s}".format(str(self.address), str(self.port)) - LOGGER.debug( - "Starting Service (tentative endpoint: {:s}, max_workers: {:s})...".format( - str(self.endpoint), str(self.max_workers) - ) - ) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) +class OpticalAttackDetectorService(GenericGrpcService): + def __init__(self, cls_name: str = __name__): + port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKDETECTOR) + super().__init__(port, cls_name=cls_name) + self.opticalattackdetector_servicer = OpticalAttackDetectorServiceServicerImpl() - self.attack_detector_servicer = OpticalAttackDetectorServiceServicerImpl() + def install_servicers(self): add_OpticalAttackDetectorServiceServicer_to_server( - self.attack_detector_servicer, self.server - ) - - self.health_servicer = HealthServicer( - experimental_non_blocking=True, - experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1), - ) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = "{:s}:{:s}".format(str(self.address), str(port)) - LOGGER.info("Listening on {:s}...".format(self.endpoint)) - self.server.start() - self.health_servicer.set( - OVERALL_HEALTH, HealthCheckResponse.SERVING - ) # pylint: disable=maybe-no-member - - LOGGER.debug("Service started") - - def stop(self): - LOGGER.debug( - "Stopping service (grace period {:s} seconds)...".format( - str(self.grace_period) - ) + self.opticalattackdetector_servicer, self.server ) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug("Service stopped") diff --git a/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py b/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py index 6a3ee2243..00c244c45 100644 --- a/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py +++ b/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py @@ -19,9 +19,9 @@ import random import grpc import numpy as np import redis -from prometheus_client import Histogram - from common.Constants import ServiceNameEnum +from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, + safe_and_metered_rpc_method) from common.proto import dbscanserving_pb2 as dbscan from common.proto import optical_attack_detector_pb2 as oad from common.proto.context_pb2 import Empty @@ -30,33 +30,48 @@ from common.proto.optical_attack_detector_pb2_grpc import \ OpticalAttackDetectorServiceServicer from common.proto.optical_attack_mitigator_pb2 import (AttackDescription, AttackResponse) -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.Settings import get_service_host, get_setting from common.tools.timestamp.Converters import timestamp_utcnow_to_float from dbscanserving.client.DbscanServingClient import DbscanServingClient from monitoring.client.MonitoringClient import MonitoringClient from opticalattackmitigator.client.OpticalAttackMitigatorClient import \ OpticalAttackMitigatorClient +from prometheus_client import Histogram LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('OpticalAttackDetector', 'RPC') +METRICS_POOL = MetricsPool("OpticalAttackDetector", "RPC") + +METRICS_POOL_DETAILS = MetricsPool( + "OpticalAttackDetector", + "execution", + labels={ + "operation": "", + "step": "", + }, +) + + +metric_labels = dict(operation="detect") +histogram_duration: Histogram = METRICS_POOL_DETAILS.get_or_create( + "details", MetricTypeEnum.HISTOGRAM_DURATION +) + DETECTION_RESPONSE_TIME = Histogram( "tfs_opticalattackdetector_inference_response_time", - "Time taken by the inference component to reply" + "Time taken by the inference component to reply", ) MONITORING_RESPONSE_TIME = Histogram( "tfs_opticalattackdetector_monitoring_response_time", - "Time taken by the monitoring component to reply" + "Time taken by the monitoring component to reply", ) MITIGATION_RESPONSE_TIME = Histogram( "tfs_opticalattackdetector_mitigation_response_time", - "Time taken by the attack mitigator to reply" + "Time taken by the attack mitigator to reply", ) CACHE_RESPONSE_TIME = Histogram( - "tfs_opticalattackdetector_cache_response_time", - "Time taken by the cache to reply" + "tfs_opticalattackdetector_cache_response_time", "Time taken by the cache to reply" ) monitoring_client: MonitoringClient = MonitoringClient() @@ -182,7 +197,8 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi detection_sample.append(random.uniform(5000.0, 6000.0)) # adding the sample to the cache and recovering the cache - with CACHE_RESPONSE_TIME.time(): + with histogram_duration.labels(step="cachefetch", **metric_labels).time(): + # with CACHE_RESPONSE_TIME.time(): r.rpush( "opm_{}".format(s_uuid.replace("-", "_")), pickle.dumps(tuple(detection_sample)), @@ -199,7 +215,8 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi detection_sample.features.append(feature) detection_request.samples.append(detection_sample) - with DETECTION_RESPONSE_TIME.time(): + with histogram_duration.labels(step="uldetection", **metric_labels).time(): + # with DETECTION_RESPONSE_TIME.time(): response: dbscan.DetectionResponse = dbscanserving_client.Detect( detection_request ) @@ -219,14 +236,16 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi else: kpi.kpi_value.int32Val = 0 - with MONITORING_RESPONSE_TIME.time(): + with histogram_duration.labels(step="includekpi", **metric_labels).time(): + # with MONITORING_RESPONSE_TIME.time(): monitoring_client.IncludeKpi(kpi) # if -1 in response.cluster_indices: # attack detected if kpi.kpi_value.int32Val == -1: attack = AttackDescription() attack.cs_id.uuid = request.service_id.service_uuid.uuid - with MITIGATION_RESPONSE_TIME.time(): + with histogram_duration.labels(step="mitigation", **metric_labels).time(): + # with MITIGATION_RESPONSE_TIME.time(): response: AttackResponse = attack_mitigator_client.NotifyAttack(attack) # if attack is detected, run the attack mitigator diff --git a/src/opticalattackdetector/service/__main__.py b/src/opticalattackdetector/service/__main__.py index d090dc1d2..27706fce4 100644 --- a/src/opticalattackdetector/service/__main__.py +++ b/src/opticalattackdetector/service/__main__.py @@ -19,15 +19,14 @@ import threading from prometheus_client import start_http_server -from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS, ServiceNameEnum) +from common.Constants import ServiceNameEnum from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, - get_log_level, get_metrics_port, get_setting, + get_log_level, get_metrics_port, wait_for_environment_variables) + from opticalattackdetector.client.OpticalAttackDetectorClient import \ OpticalAttackDetectorClient -from opticalattackdetector.Config import GRPC_SERVICE_PORT from opticalattackdetector.service.OpticalAttackDetectorService import \ OpticalAttackDetectorService @@ -55,11 +54,6 @@ def main(): get_env_var_name( ServiceNameEnum.DBSCANSERVING, ENVVAR_SUFIX_SERVICE_PORT_GRPC ), - ] - ) - - wait_for_environment_variables( - [ get_env_var_name( ServiceNameEnum.OPTICALATTACKMITIGATOR, ENVVAR_SUFIX_SERVICE_HOST ), @@ -69,12 +63,6 @@ def main(): ] ) - service_port = get_setting( - "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC", default=GRPC_SERVICE_PORT - ) - max_workers = get_setting("MAX_WORKERS", default=DEFAULT_GRPC_MAX_WORKERS) - grace_period = get_setting("GRACE_PERIOD", default=DEFAULT_GRPC_GRACE_PERIOD) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -82,16 +70,14 @@ def main(): # Start metrics server metrics_port = get_metrics_port() - start_http_server(metrics_port) # TODO: make sure this is uncommented + start_http_server(metrics_port) # Starting CentralizedCybersecurity service - grpc_service = OpticalAttackDetectorService( - port=service_port, max_workers=max_workers, grace_period=grace_period - ) + grpc_service = OpticalAttackDetectorService() grpc_service.start() # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=0.1): + while not terminate.wait(timeout=1): pass LOGGER.info("Terminating...") diff --git a/src/opticalattackdetector/tests/test_unitary.py b/src/opticalattackdetector/tests/test_unitary.py index b1fc487fe..72e3a4ac1 100644 --- a/src/opticalattackdetector/tests/test_unitary.py +++ b/src/opticalattackdetector/tests/test_unitary.py @@ -17,11 +17,11 @@ import os from unittest.mock import patch import pytest - from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, DEFAULT_GRPC_MAX_WORKERS) from common.proto import dbscanserving_pb2 as dbscan from common.proto.optical_attack_detector_pb2 import DetectionRequest + from opticalattackdetector.client.OpticalAttackDetectorClient import \ OpticalAttackDetectorClient from opticalattackdetector.Config import GRPC_SERVICE_PORT @@ -53,17 +53,9 @@ def optical_attack_detector_service(): max_workers=DEFAULT_GRPC_MAX_WORKERS, grace_period=DEFAULT_GRPC_GRACE_PERIOD, ) - # mocker_context_client = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client') - # mocker_context_client.start() - - # mocker_influx_db = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client') - # mocker_influx_db.start() - _service.start() yield _service _service.stop() - # mocker_context_client.stop() - # mocker_influx_db.stop() @pytest.fixture(scope="session") @@ -106,170 +98,3 @@ def test_detect_attack( mitigator.NotifyAttack.assert_called_once() monitoring.IncludeKpi.assert_called_once() dbscanserving.assert_called_once() - - -# def test_detect_attack_with_context( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb: -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# context.ListContextIds.return_value = cid_list - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_called_with(cid_list.context_ids[0]) -# influxdb.query.assert_called_once() - - -# def test_detect_attack_with_contexts( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb: -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# cid_list.context_ids.append(ContextId(**CONTEXT_ID_2)) -# context.ListContextIds.return_value = cid_list - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_any_call(cid_list.context_ids[0]) -# context.ListServices.assert_any_call(cid_list.context_ids[1]) -# influxdb.query.assert_called_once() - - -# def test_detect_attack_with_service( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" -# ) as dbscan: - -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# context.ListContextIds.return_value = cid_list - -# service_list = ServiceList() -# service_list.services.append(Service(**SERVICE_DEV1_DEV2)) -# context.ListServices.return_value = service_list - -# influxdb.query.return_value.get_points.return_value = [(1, 2), (3, 4)] - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_called_with(cid_list.context_ids[0]) -# influxdb.query.assert_called_once() -# dbscan.Detect.assert_called() - - -# def test_detect_attack_no_attack( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" -# ) as dbscan, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client" -# ) as mitigator: - -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# context.ListContextIds.return_value = cid_list - -# service_list = ServiceList() -# service_list.services.append(Service(**SERVICE_DEV1_DEV2)) -# context.ListServices.return_value = service_list - -# # dbscan.Detect.return_value = object() -# dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, 5] - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_called_with(cid_list.context_ids[0]) -# influxdb.query.assert_called_once() -# dbscan.Detect.assert_called() -# mitigator.NotifyAttack.assert_not_called() - - -# def test_detect_attack_with_attack( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" -# ) as dbscan, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client" -# ) as mitigator: - -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# context.ListContextIds.return_value = cid_list - -# service_list = ServiceList() -# service_list.services.append(Service(**SERVICE_DEV1_DEV2)) -# context.ListServices.return_value = service_list - -# # dbscan.Detect.return_value = object() -# dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, -1] - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_called_with(cid_list.context_ids[0]) -# influxdb.query.assert_called_once() -# dbscan.Detect.assert_called() -# mitigator.NotifyAttack.assert_called() - - -# def test_report_summarized_kpi( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# kpi_list = KpiList() -# optical_attack_detector_client.ReportSummarizedKpi(kpi_list) - - -# def test_report_kpi(optical_attack_detector_client: OpticalAttackDetectorClient): -# kpi_list = KpiList() -# optical_attack_detector_client.ReportKpi(kpi_list) diff --git a/src/opticalattackmanager/Config.py b/src/opticalattackmanager/Config.py index 1cf42c6bc..7cbb8423d 100644 --- a/src/opticalattackmanager/Config.py +++ b/src/opticalattackmanager/Config.py @@ -12,13 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - -# General settings -LOG_LEVEL = logging.DEBUG - -# service settings MONITORING_INTERVAL = 10 # monitoring interval in seconds - -# Prometheus settings -METRICS_PORT = 9192 diff --git a/src/opticalattackmanager/requirements.in b/src/opticalattackmanager/requirements.in index 91fa143a9..6f2b2b7d3 100644 --- a/src/opticalattackmanager/requirements.in +++ b/src/opticalattackmanager/requirements.in @@ -1,5 +1,19 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + grpcio==1.49.* grpcio-health-checking==1.49.* grpcio-tools==1.49.* grpclib[protobuf] -redis \ No newline at end of file +redis diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py index 2ea79d714..7c3c8b052 100644 --- a/src/opticalattackmanager/service/__main__.py +++ b/src/opticalattackmanager/service/__main__.py @@ -18,14 +18,12 @@ import math import pickle import signal import sys -import threading import time from concurrent.futures import ProcessPoolExecutor -from multiprocessing import Manager, Process +from multiprocessing import Event, Manager, Process from typing import Dict, List import redis -from grpclib.client import Channel from prometheus_client import Counter, Gauge, Histogram, start_http_server from common.Constants import ServiceNameEnum @@ -51,12 +49,12 @@ from common.Settings import ( ) from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector from monitoring.client.MonitoringClient import MonitoringClient from opticalattackmanager.Config import MONITORING_INTERVAL -from opticalattackmanager.utils.EventsCollector import EventsCollector from opticalattackmanager.utils.monitor import delegate_services -terminate = threading.Event() +terminate = Event() LOGGER = None # SERVICE_LIST_MODE: @@ -174,14 +172,24 @@ def get_context_updates(terminate, service_list, cache): context_client: ContextClient = ContextClient() monitoring_client: MonitoringClient = MonitoringClient() - events_collector: EventsCollector = EventsCollector(context_client) + events_collector: EventsCollector = EventsCollector( + context_client, + log_events_received=True, + activate_connection_collector=False, + activate_context_collector=False, + activate_device_collector=False, + activate_link_collector=False, + activate_slice_collector=False, + activate_topology_collector=False, + activate_service_collector=True, + ) events_collector.start() LOGGER.info("Connected with components successfully... Waiting for events...") time.sleep(20) - while not terminate.wait(timeout=1): + while not terminate.wait(timeout=5): event = events_collector.get_event(block=True, timeout=1) if event is None: LOGGER.debug("No event received") @@ -245,8 +253,8 @@ def get_number_workers( async def monitor_services(terminate, service_list=None, cache=None): - host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST") - port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC")) + host = get_service_host(ServiceNameEnum.OPTICALATTACKDETECTOR) + port = get_metrics_port(ServiceNameEnum.OPTICALATTACKDETECTOR) cur_number_workers = MIN_NUMBER_WORKERS desired_monitoring_interval = 30 # defaults to 30 seconds @@ -398,7 +406,6 @@ async def monitor_services(terminate, service_list=None, cache=None): ) ) - # TODO: implement logic to only register change when the number changes # calculate new number of workers cur_number_workers = get_number_workers( cur_number_workers, duration, desired_monitoring_interval * 0.9 @@ -428,22 +435,12 @@ def main(): wait_for_environment_variables( [ + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST), get_env_var_name( ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC ), - ] - ) - - wait_for_environment_variables( - [ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - ] - ) - - wait_for_environment_variables( - [ get_env_var_name( ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST ), @@ -546,7 +543,7 @@ def main(): # asyncio.create_task(monitor_services(service_list)) # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=0.1): + while not terminate.wait(timeout=1): pass LOGGER.info("Terminating...") diff --git a/src/opticalattackmanager/utils/EventsCollector.py b/src/opticalattackmanager/utils/EventsCollector.py deleted file mode 100644 index f4aabc4c4..000000000 --- a/src/opticalattackmanager/utils/EventsCollector.py +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import queue -import threading - -import grpc - -from common.proto.context_pb2 import Empty -from common.tools.grpc.Tools import grpc_message_to_json_string -from context.client.ContextClient import ContextClient - -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.DEBUG) - - -class EventsCollector: - def __init__( - self, context_client_grpc: ContextClient, log_events_received=False - ) -> None: - self._events_queue = queue.Queue() - self._log_events_received = log_events_received - - self._service_stream = context_client_grpc.GetServiceEvents(Empty()) - - self._service_thread = threading.Thread( - target=self._collect, args=(self._service_stream,), daemon=False - ) - - def _collect(self, events_stream) -> None: - try: - for event in events_stream: - if self._log_events_received: - LOGGER.info( - "[_collect] event: {:s}".format( - grpc_message_to_json_string(event) - ) - ) - self._events_queue.put_nowait(event) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member - raise # pragma: no cover - - def start(self): - self._service_thread.start() - - def get_event(self, block: bool = True, timeout: float = 0.1): - try: - return self._events_queue.get(block=block, timeout=timeout) - except queue.Empty: # pylint: disable=catching-non-exception - return None - - def get_events(self, block: bool = True, timeout: float = 0.1, count: int = None): - events = [] - if count is None: - while True: - event = self.get_event(block=block, timeout=timeout) - if event is None: - break - events.append(event) - else: - for _ in range(count): - event = self.get_event(block=block, timeout=timeout) - if event is None: - continue - events.append(event) - return sorted(events, key=lambda e: e.event.timestamp.timestamp) - - def stop(self): - self._service_stream.cancel() - - self._service_thread.join() diff --git a/src/opticalattackmanager/utils/monitor.py b/src/opticalattackmanager/utils/monitor.py index 2a40fe16f..22dfa03fa 100644 --- a/src/opticalattackmanager/utils/monitor.py +++ b/src/opticalattackmanager/utils/monitor.py @@ -14,7 +14,6 @@ import asyncio import logging -from typing import List from grpclib.client import Channel from prometheus_client import Counter diff --git a/src/opticalattackmitigator/Config.py b/src/opticalattackmitigator/Config.py index a8c620ab8..38d04994f 100644 --- a/src/opticalattackmitigator/Config.py +++ b/src/opticalattackmitigator/Config.py @@ -11,14 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - -# General settings -LOG_LEVEL = logging.DEBUG - -# gRPC settings -GRPC_SERVICE_PORT = 10007 - -# Prometheus settings -METRICS_PORT = 9192 diff --git a/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py b/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py index 6851d3492..4d283202b 100644 --- a/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py +++ b/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import logging + +import grpc from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import delay_exponential, retry -from common.tools.grpc.Tools import grpc_message_to_json from common.proto.optical_attack_mitigator_pb2 import (AttackDescription, AttackResponse) from common.proto.optical_attack_mitigator_pb2_grpc import AttackMitigatorStub +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import delay_exponential, retry +from common.tools.grpc.Tools import grpc_message_to_json LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -34,8 +36,10 @@ RETRY_DECORATOR = retry( class OpticalAttackMitigatorClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.OPTICALATTACKMITIGATOR) - if not port: port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKMITIGATOR) + if not host: + host = get_service_host(ServiceNameEnum.OPTICALATTACKMITIGATOR) + if not port: + port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKMITIGATOR) self.endpoint = "{:s}:{:s}".format(str(host), str(port)) LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) self.channel = None diff --git a/src/opticalattackmitigator/requirements.in b/src/opticalattackmitigator/requirements.in index e69de29bb..38d04994f 100644 --- a/src/opticalattackmitigator/requirements.in +++ b/src/opticalattackmitigator/requirements.in @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalattackmitigator/service/OpticalAttackMitigatorService.py b/src/opticalattackmitigator/service/OpticalAttackMitigatorService.py index a37a3b5ca..c5a3ff4cd 100644 --- a/src/opticalattackmitigator/service/OpticalAttackMitigatorService.py +++ b/src/opticalattackmitigator/service/OpticalAttackMitigatorService.py @@ -13,83 +13,27 @@ # limitations under the License. import logging -from concurrent import futures -import grpc -from grpc_health.v1.health import OVERALL_HEALTH, HealthServicer -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server - -from common.Constants import (DEFAULT_GRPC_BIND_ADDRESS, - DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS) +from common.Constants import ServiceNameEnum from common.proto.optical_attack_mitigator_pb2_grpc import \ add_AttackMitigatorServicer_to_server -from opticalattackmitigator.Config import GRPC_SERVICE_PORT +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService + from opticalattackmitigator.service.OpticalAttackMitigatorServiceServicerImpl import \ OpticalAttackMitigatorServiceServicerImpl -BIND_ADDRESS = "0.0.0.0" LOGGER = logging.getLogger(__name__) -class OpticalAttackMitigatorService: +class OpticalAttackMitigatorService(GenericGrpcService): def __init__( self, - address=DEFAULT_GRPC_BIND_ADDRESS, - port=GRPC_SERVICE_PORT, - max_workers=DEFAULT_GRPC_MAX_WORKERS, - grace_period=DEFAULT_GRPC_GRACE_PERIOD, + cls_name=__name__, ): + port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKMITIGATOR) + super().__init__(port, cls_name=cls_name) + self.opticalattackmitigator_services = OpticalAttackMitigatorServiceServicerImpl() - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.attack_mitigator_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = "{:s}:{:s}".format(str(self.address), str(self.port)) - LOGGER.debug( - "Starting Service (tentative endpoint: {:s}, max_workers: {:s})...".format( - str(self.endpoint), str(self.max_workers) - ) - ) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - - self.attack_mitigator_servicer = OpticalAttackMitigatorServiceServicerImpl() - add_AttackMitigatorServicer_to_server( - self.attack_mitigator_servicer, self.server - ) - - self.health_servicer = HealthServicer( - experimental_non_blocking=True, - experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1), - ) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = "{:s}:{:s}".format(str(self.address), str(port)) - LOGGER.info("Listening on {:s}...".format(self.endpoint)) - self.server.start() - self.health_servicer.set( - OVERALL_HEALTH, HealthCheckResponse.SERVING - ) # pylint: disable=maybe-no-member - - LOGGER.debug("Service started") - - def stop(self): - LOGGER.debug( - "Stopping service (grace period {:s} seconds)...".format( - str(self.grace_period) - ) - ) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug("Service stopped") + def install_servicers(self): + add_AttackMitigatorServicer_to_server(self.opticalattackmitigator_services, self.server) diff --git a/src/opticalattackmitigator/service/OpticalAttackMitigatorServiceServicerImpl.py b/src/opticalattackmitigator/service/OpticalAttackMitigatorServiceServicerImpl.py index 31fbae28b..d6018b733 100644 --- a/src/opticalattackmitigator/service/OpticalAttackMitigatorServiceServicerImpl.py +++ b/src/opticalattackmitigator/service/OpticalAttackMitigatorServiceServicerImpl.py @@ -15,16 +15,16 @@ import logging import grpc - +from common.method_wrappers.Decorator import (MetricsPool, + safe_and_metered_rpc_method) from common.proto.optical_attack_mitigator_pb2 import (AttackDescription, AttackResponse) from common.proto.optical_attack_mitigator_pb2_grpc import \ AttackMitigatorServicer -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('OpticalAttackMitigator', 'RPC') +METRICS_POOL = MetricsPool("OpticalAttackMitigator", "RPC") class OpticalAttackMitigatorServiceServicerImpl(AttackMitigatorServicer): diff --git a/src/opticalattackmitigator/service/__main__.py b/src/opticalattackmitigator/service/__main__.py index 6f65c52b6..fc874f08d 100644 --- a/src/opticalattackmitigator/service/__main__.py +++ b/src/opticalattackmitigator/service/__main__.py @@ -19,8 +19,7 @@ import threading from prometheus_client import start_http_server -from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS, ServiceNameEnum) +from common.Constants import ServiceNameEnum from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, get_setting, @@ -52,12 +51,6 @@ def main(): ] ) - service_port = get_setting( - "OPTICALATTACKMITIGATORSERVICE_SERVICE_PORT_GRPC", default=GRPC_SERVICE_PORT - ) - max_workers = get_setting("MAX_WORKERS", default=DEFAULT_GRPC_MAX_WORKERS) - grace_period = get_setting("GRACE_PERIOD", default=DEFAULT_GRPC_GRACE_PERIOD) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -68,9 +61,7 @@ def main(): start_http_server(metrics_port) # Starting CentralizedCybersecurity service - grpc_service = OpticalAttackMitigatorService( - port=service_port, max_workers=max_workers, grace_period=grace_period - ) + grpc_service = OpticalAttackMitigatorService() grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/opticalattackmitigator/tests/test_unitary.py b/src/opticalattackmitigator/tests/test_unitary.py index 8149ceb1b..68836c4e8 100644 --- a/src/opticalattackmitigator/tests/test_unitary.py +++ b/src/opticalattackmitigator/tests/test_unitary.py @@ -17,10 +17,10 @@ import os from unittest.mock import patch import pytest - from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, DEFAULT_GRPC_MAX_WORKERS) from common.proto.optical_attack_mitigator_pb2 import AttackDescription + from opticalattackmitigator.client.OpticalAttackMitigatorClient import \ OpticalAttackMitigatorClient from opticalattackmitigator.Config import GRPC_SERVICE_PORT diff --git a/src/tests/scenario3/optical/deploy_specs.sh b/src/tests/scenario3/optical/deploy_specs.sh index 4e9989916..878013d8b 100644 --- a/src/tests/scenario3/optical/deploy_specs.sh +++ b/src/tests/scenario3/optical/deploy_specs.sh @@ -16,27 +16,9 @@ # ----- TeraFlowSDN ------------------------------------------------------------ -# Set the URL of the internal MicroK8s Docker registry where the images will be uploaded to. -export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" - # Set the list of components, separated by spaces, you want to build images for, and deploy. export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator" -# Set the tag you want to use for your images. -export TFS_IMAGE_TAG="dev" - -# Set the name of the Kubernetes namespace to deploy TFS to. -export TFS_K8S_NAMESPACE="tfs" - -# Set additional manifest files to be applied after the deployment -export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml" - -# Set the new Grafana admin password -export TFS_GRAFANA_PASSWORD="admin123+" - -# Disable skip-build flag to rebuild the Docker images. -export TFS_SKIP_BUILD="" - # addition for the optical cybersecurity component export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager" export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml" @@ -44,87 +26,16 @@ export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml # ----- CockroachDB ------------------------------------------------------------ -# Set the namespace where CockroackDB will be deployed. -export CRDB_NAMESPACE="crdb" - -# Set the external port CockroackDB Postgre SQL interface will be exposed to. -export CRDB_EXT_PORT_SQL="26257" - -# Set the external port CockroackDB HTTP Mgmt GUI interface will be exposed to. -export CRDB_EXT_PORT_HTTP="8081" - -# Set the database username to be used by Context. -export CRDB_USERNAME="tfs" - -# Set the database user's password to be used by Context. -export CRDB_PASSWORD="tfs123" - -# Set the database name to be used by Context. -export CRDB_DATABASE="tfs" - -# Set CockroachDB installation mode to 'single'. This option is convenient for development and testing. -# See ./deploy/all.sh or ./deploy/crdb.sh for additional details -export CRDB_DEPLOY_MODE="single" # Disable flag for dropping database, if it exists. export CRDB_DROP_DATABASE_IF_EXISTS="YES" -# Disable flag for re-deploying CockroachDB from scratch. -export CRDB_REDEPLOY="" - - -# ----- NATS ------------------------------------------------------------------- - -# Set the namespace where NATS will be deployed. -export NATS_NAMESPACE="nats" - -# Set the external port NATS Client interface will be exposed to. -export NATS_EXT_PORT_CLIENT="4222" - -# Set the external port NATS HTTP Mgmt GUI interface will be exposed to. -export NATS_EXT_PORT_HTTP="8222" - -# Disable flag for re-deploying NATS from scratch. -export NATS_REDEPLOY="" # ----- QuestDB ---------------------------------------------------------------- -# Set the namespace where QuestDB will be deployed. -export QDB_NAMESPACE="qdb" - -# Set the external port QuestDB Postgre SQL interface will be exposed to. -export QDB_EXT_PORT_SQL="8812" - -# Set the external port QuestDB Influx Line Protocol interface will be exposed to. -export QDB_EXT_PORT_ILP="9009" - -# Set the external port QuestDB HTTP Mgmt GUI interface will be exposed to. -export QDB_EXT_PORT_HTTP="9000" - -# Set the database username to be used for QuestDB. -export QDB_USERNAME="admin" - -# Set the database user's password to be used for QuestDB. -export QDB_PASSWORD="quest" - -# Set the table name to be used by Monitoring for KPIs. -export QDB_TABLE_MONITORING_KPIS="tfs_monitoring_kpis" - -# Set the table name to be used by Slice for plotting groups. -export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups" - # Disable flag for dropping tables if they exist. export QDB_DROP_TABLES_IF_EXIST="YES" # Disable flag for re-deploying QuestDB from scratch. export QDB_REDEPLOY="" - - -# ----- K8s Observability ------------------------------------------------------ - -# Set the external port Prometheus Mgmt HTTP GUI interface will be exposed to. -export PROM_EXT_PORT_HTTP="9090" - -# Set the external port Grafana HTTP Dashboards will be exposed to. -export GRAF_EXT_PORT_HTTP="3000" diff --git a/src/tests/scenario3/optical/scaphandre.yaml b/src/tests/scenario3/optical/scaphandre.yaml index 6ec17e423..9cf98b5ee 100644 --- a/src/tests/scenario3/optical/scaphandre.yaml +++ b/src/tests/scenario3/optical/scaphandre.yaml @@ -1,3 +1,6 @@ +# this file deploys a tool that enables the monitoring of energy consumption +# per component. +# More info: https://github.com/hubblo-org/scaphandre apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: -- GitLab