diff --git a/my_deploy.sh b/my_deploy.sh index 2c3ed27cadf909ac2811b4e8f6539f591d613e46..ee3244ac99d5a2e8d5dba5a6ccd1609b0012c06b 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -20,7 +20,7 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. -export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator" +export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator l3_attackmitigator l3_centralizedattackdetector" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" @@ -29,7 +29,7 @@ export TFS_IMAGE_TAG="dev" export TFS_K8S_NAMESPACE="tfs" # Set additional manifest files to be applied after the deployment -export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml manifests/cachingservice.yaml" +export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml" # Set the new Grafana admin password export TFS_GRAFANA_PASSWORD="admin123+" @@ -37,10 +37,6 @@ export TFS_GRAFANA_PASSWORD="admin123+" # Disable skip-build flag to rebuild the Docker images. export TFS_SKIP_BUILD="" -# addition for the optical cybersecurity component -# export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager" -# export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml" - # ----- CockroachDB ------------------------------------------------------------ diff --git a/src/dbscanserving/Config.py b/src/dbscanserving/Config.py index ba1fc4da1a2dbd73cab815e19675885d1543f63e..38d04994fb0fa1951fb465bc127eb72659dc2eaf 100644 --- a/src/dbscanserving/Config.py +++ b/src/dbscanserving/Config.py @@ -11,14 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - -# General settings -LOG_LEVEL = logging.DEBUG - -# gRPC settings -GRPC_SERVICE_PORT = 10008 - -# Prometheus settings -METRICS_PORT = 9192 diff --git a/src/dbscanserving/client/DbscanServingClient.py b/src/dbscanserving/client/DbscanServingClient.py index 15081ab3a0fce2c23838e4a00d0e0e56f91f474b..d8a74948d8c68b1ffabb1fc12c7d04a4817b90e0 100644 --- a/src/dbscanserving/client/DbscanServingClient.py +++ b/src/dbscanserving/client/DbscanServingClient.py @@ -12,14 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import logging from typing import Counter +import grpc + from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import delay_exponential, retry from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse from common.proto.dbscanserving_pb2_grpc import DetectorStub +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import delay_exponential, retry LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -33,8 +35,10 @@ RETRY_DECORATOR = retry( class DbscanServingClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.DBSCANSERVING) - if not port: port = get_service_port_grpc(ServiceNameEnum.DBSCANSERVING) + if not host: + host = get_service_host(ServiceNameEnum.DBSCANSERVING) + if not port: + port = get_service_port_grpc(ServiceNameEnum.DBSCANSERVING) self.endpoint = "{:s}:{:s}".format(str(host), str(port)) LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) diff --git a/src/dbscanserving/service/DbscanService.py b/src/dbscanserving/service/DbscanService.py index 3511e4e5fdcf85085dc783dc73688b4422c5736e..8ae1521600c78827d61642d09373941f48938769 100644 --- a/src/dbscanserving/service/DbscanService.py +++ b/src/dbscanserving/service/DbscanService.py @@ -13,79 +13,24 @@ # limitations under the License. import logging -from concurrent import futures -import grpc -from grpc_health.v1.health import OVERALL_HEALTH, HealthServicer -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server - -from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS) +from common.Constants import ServiceNameEnum from common.proto.dbscanserving_pb2_grpc import add_DetectorServicer_to_server -from dbscanserving.Config import GRPC_SERVICE_PORT +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService + from dbscanserving.service.DbscanServiceServicerImpl import \ DbscanServiceServicerImpl -BIND_ADDRESS = "0.0.0.0" LOGGER = logging.getLogger(__name__) -class DbscanService: - def __init__( - self, - address=BIND_ADDRESS, - port=GRPC_SERVICE_PORT, - grace_period=DEFAULT_GRPC_GRACE_PERIOD, - max_workers=DEFAULT_GRPC_MAX_WORKERS, - ): - - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.dbscan_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = "{:s}:{:s}".format(str(self.address), str(self.port)) - LOGGER.debug( - "Starting Service (tentative endpoint: {:s}, max_workers: {:s})...".format( - str(self.endpoint), str(self.max_workers) - ) - ) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) +class DbscanService(GenericGrpcService): + def __init__(self, cls_name: str = __name__): + port = get_service_port_grpc(ServiceNameEnum.DBSCANSERVING) + super().__init__(port, cls_name=cls_name) self.dbscan_servicer = DbscanServiceServicerImpl() - add_DetectorServicer_to_server(self.dbscan_servicer, self.server) - - self.health_servicer = HealthServicer( - experimental_non_blocking=True, - experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1), - ) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = "{:s}:{:s}".format(str(self.address), str(port)) - LOGGER.info("Listening on {:s}...".format(self.endpoint)) - self.server.start() - self.health_servicer.set( - OVERALL_HEALTH, HealthCheckResponse.SERVING - ) # pylint: disable=maybe-no-member - LOGGER.debug("Service started") - - def stop(self): - LOGGER.debug( - "Stopping service (grace period {:s} seconds)...".format( - str(self.grace_period) - ) - ) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug("Service stopped") + def install_servicers(self): + add_DetectorServicer_to_server(self.dbscan_servicer, self.server) diff --git a/src/dbscanserving/service/DbscanServiceServicerImpl.py b/src/dbscanserving/service/DbscanServiceServicerImpl.py index 3eeb7aa8be70ebc92f7ff67e8623d74f48212f43..ac4f4c90b51f31de36c6c3b05cf5064840786018 100644 --- a/src/dbscanserving/service/DbscanServiceServicerImpl.py +++ b/src/dbscanserving/service/DbscanServiceServicerImpl.py @@ -17,13 +17,14 @@ import logging import grpc from sklearn.cluster import DBSCAN +from common.method_wrappers.Decorator import (MetricsPool, + safe_and_metered_rpc_method) from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse from common.proto.dbscanserving_pb2_grpc import DetectorServicer -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('DBSCANServing', 'RPC') +METRICS_POOL = MetricsPool("DBSCANServing", "RPC") class DbscanServiceServicerImpl(DetectorServicer): diff --git a/src/dbscanserving/service/__main__.py b/src/dbscanserving/service/__main__.py index 7a1a3be74edaceb09576dc65d1ad0518039ef16c..fc0f8346e01b7ec30c4404990148915981ba1cd7 100644 --- a/src/dbscanserving/service/__main__.py +++ b/src/dbscanserving/service/__main__.py @@ -17,12 +17,9 @@ import signal import sys import threading +from common.Settings import get_log_level, get_metrics_port from prometheus_client import start_http_server -from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS) -from common.Settings import get_log_level, get_metrics_port, get_setting -from dbscanserving.Config import GRPC_SERVICE_PORT from dbscanserving.service.DbscanService import DbscanService terminate = threading.Event() @@ -41,12 +38,6 @@ def main(): logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) - service_port = get_setting( - "DBSCANSERVICE_SERVICE_PORT_GRPC", default=GRPC_SERVICE_PORT - ) - grace_period = get_setting("GRACE_PERIOD", default=DEFAULT_GRPC_GRACE_PERIOD) - max_workers = get_setting("MAX_WORKERS", default=DEFAULT_GRPC_MAX_WORKERS) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -57,13 +48,11 @@ def main(): start_http_server(metrics_port) # Starting CentralizedCybersecurity service - grpc_service = DbscanService( - port=service_port, max_workers=max_workers, grace_period=grace_period - ) + grpc_service = DbscanService() grpc_service.start() # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=0.1): + while not terminate.wait(timeout=1): pass LOGGER.info("Terminating...") diff --git a/src/dbscanserving/tests/test_unitary.py b/src/dbscanserving/tests/test_unitary.py index a0f92d13a08ed60f976799975033d16ad897f0c5..7349978e51cf5be8aa2f9899a91de91e13d1ae7e 100644 --- a/src/dbscanserving/tests/test_unitary.py +++ b/src/dbscanserving/tests/test_unitary.py @@ -21,6 +21,7 @@ import pytest from common.proto.dbscanserving_pb2 import (DetectionRequest, DetectionResponse, Sample) + from dbscanserving.client.DbscanServingClient import DbscanServingClient from dbscanserving.Config import GRPC_SERVICE_PORT from dbscanserving.service.DbscanService import DbscanService diff --git a/src/opticalattackdetector/Config.py b/src/opticalattackdetector/Config.py index f5812acaf938e3e183fea704f7e7ad845daa526a..38d04994fb0fa1951fb465bc127eb72659dc2eaf 100644 --- a/src/opticalattackdetector/Config.py +++ b/src/opticalattackdetector/Config.py @@ -11,17 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - -# General settings -LOG_LEVEL = logging.DEBUG - -# gRPC settings -GRPC_SERVICE_PORT = 10006 - -# service settings -MONITORING_INTERVAL = 10 # monitoring interval in seconds - -# Prometheus settings -METRICS_PORT = 9192 diff --git a/src/opticalattackdetector/client/OpticalAttackDetectorClient.py b/src/opticalattackdetector/client/OpticalAttackDetectorClient.py index 50f303b5d08beb86b3101154179951566cba372c..3b95227ade65cdfb4a47830dee461c57bfc7dbfb 100644 --- a/src/opticalattackdetector/client/OpticalAttackDetectorClient.py +++ b/src/opticalattackdetector/client/OpticalAttackDetectorClient.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import logging + +import grpc from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import delay_exponential, retry -from common.tools.grpc.Tools import grpc_message_to_json from common.proto.context_pb2 import Empty from common.proto.optical_attack_detector_pb2_grpc import \ OpticalAttackDetectorServiceStub +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import delay_exponential, retry +from common.tools.grpc.Tools import grpc_message_to_json LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -34,8 +36,10 @@ RETRY_DECORATOR = retry( class OpticalAttackDetectorClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.OPTICALATTACKDETECTOR) - if not port: port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKDETECTOR) + if not host: + host = get_service_host(ServiceNameEnum.OPTICALATTACKDETECTOR) + if not port: + port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKDETECTOR) self.endpoint = "{:s}:{:s}".format(str(host), str(port)) LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) self.channel = None diff --git a/src/opticalattackdetector/requirements.in b/src/opticalattackdetector/requirements.in index 7c2cf4fa6866387dd05592ff19391583bed69572..b69d4dd786500686fef4e04fd5b6096d5648cfc6 100644 --- a/src/opticalattackdetector/requirements.in +++ b/src/opticalattackdetector/requirements.in @@ -1,2 +1,16 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + numpy -redis \ No newline at end of file +redis diff --git a/src/opticalattackdetector/service/OpticalAttackDetectorService.py b/src/opticalattackdetector/service/OpticalAttackDetectorService.py index ad91d71098a1cfdf6852426679216344382d2f65..2128d79b4dcdaf4fa0f12cd1fd96ba56669d48f9 100644 --- a/src/opticalattackdetector/service/OpticalAttackDetectorService.py +++ b/src/opticalattackdetector/service/OpticalAttackDetectorService.py @@ -13,82 +13,26 @@ # limitations under the License. import logging -from concurrent import futures -import grpc -from grpc_health.v1.health import OVERALL_HEALTH, HealthServicer -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server - -from common.Constants import (DEFAULT_GRPC_BIND_ADDRESS, - DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS) +from common.Constants import ServiceNameEnum from common.proto.optical_attack_detector_pb2_grpc import \ add_OpticalAttackDetectorServiceServicer_to_server -from opticalattackdetector.Config import GRPC_SERVICE_PORT +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService + from opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl import \ OpticalAttackDetectorServiceServicerImpl LOGGER = logging.getLogger(__name__) -class OpticalAttackDetectorService: - def __init__( - self, - address=DEFAULT_GRPC_BIND_ADDRESS, - port=GRPC_SERVICE_PORT, - max_workers=DEFAULT_GRPC_MAX_WORKERS, - grace_period=DEFAULT_GRPC_GRACE_PERIOD, - ): - - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.attack_detector_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = "{:s}:{:s}".format(str(self.address), str(self.port)) - LOGGER.debug( - "Starting Service (tentative endpoint: {:s}, max_workers: {:s})...".format( - str(self.endpoint), str(self.max_workers) - ) - ) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) +class OpticalAttackDetectorService(GenericGrpcService): + def __init__(self, cls_name: str = __name__): + port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKDETECTOR) + super().__init__(port, cls_name=cls_name) + self.opticalattackdetector_servicer = OpticalAttackDetectorServiceServicerImpl() - self.attack_detector_servicer = OpticalAttackDetectorServiceServicerImpl() + def install_servicers(self): add_OpticalAttackDetectorServiceServicer_to_server( - self.attack_detector_servicer, self.server - ) - - self.health_servicer = HealthServicer( - experimental_non_blocking=True, - experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1), - ) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = "{:s}:{:s}".format(str(self.address), str(port)) - LOGGER.info("Listening on {:s}...".format(self.endpoint)) - self.server.start() - self.health_servicer.set( - OVERALL_HEALTH, HealthCheckResponse.SERVING - ) # pylint: disable=maybe-no-member - - LOGGER.debug("Service started") - - def stop(self): - LOGGER.debug( - "Stopping service (grace period {:s} seconds)...".format( - str(self.grace_period) - ) + self.opticalattackdetector_servicer, self.server ) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug("Service stopped") diff --git a/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py b/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py index 6a3ee2243b20720434742677ad536cab79f6bab5..00c244c45f3d8499d13b2eae319f8d7fe87f38c4 100644 --- a/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py +++ b/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py @@ -19,9 +19,9 @@ import random import grpc import numpy as np import redis -from prometheus_client import Histogram - from common.Constants import ServiceNameEnum +from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, + safe_and_metered_rpc_method) from common.proto import dbscanserving_pb2 as dbscan from common.proto import optical_attack_detector_pb2 as oad from common.proto.context_pb2 import Empty @@ -30,33 +30,48 @@ from common.proto.optical_attack_detector_pb2_grpc import \ OpticalAttackDetectorServiceServicer from common.proto.optical_attack_mitigator_pb2 import (AttackDescription, AttackResponse) -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.Settings import get_service_host, get_setting from common.tools.timestamp.Converters import timestamp_utcnow_to_float from dbscanserving.client.DbscanServingClient import DbscanServingClient from monitoring.client.MonitoringClient import MonitoringClient from opticalattackmitigator.client.OpticalAttackMitigatorClient import \ OpticalAttackMitigatorClient +from prometheus_client import Histogram LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('OpticalAttackDetector', 'RPC') +METRICS_POOL = MetricsPool("OpticalAttackDetector", "RPC") + +METRICS_POOL_DETAILS = MetricsPool( + "OpticalAttackDetector", + "execution", + labels={ + "operation": "", + "step": "", + }, +) + + +metric_labels = dict(operation="detect") +histogram_duration: Histogram = METRICS_POOL_DETAILS.get_or_create( + "details", MetricTypeEnum.HISTOGRAM_DURATION +) + DETECTION_RESPONSE_TIME = Histogram( "tfs_opticalattackdetector_inference_response_time", - "Time taken by the inference component to reply" + "Time taken by the inference component to reply", ) MONITORING_RESPONSE_TIME = Histogram( "tfs_opticalattackdetector_monitoring_response_time", - "Time taken by the monitoring component to reply" + "Time taken by the monitoring component to reply", ) MITIGATION_RESPONSE_TIME = Histogram( "tfs_opticalattackdetector_mitigation_response_time", - "Time taken by the attack mitigator to reply" + "Time taken by the attack mitigator to reply", ) CACHE_RESPONSE_TIME = Histogram( - "tfs_opticalattackdetector_cache_response_time", - "Time taken by the cache to reply" + "tfs_opticalattackdetector_cache_response_time", "Time taken by the cache to reply" ) monitoring_client: MonitoringClient = MonitoringClient() @@ -182,7 +197,8 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi detection_sample.append(random.uniform(5000.0, 6000.0)) # adding the sample to the cache and recovering the cache - with CACHE_RESPONSE_TIME.time(): + with histogram_duration.labels(step="cachefetch", **metric_labels).time(): + # with CACHE_RESPONSE_TIME.time(): r.rpush( "opm_{}".format(s_uuid.replace("-", "_")), pickle.dumps(tuple(detection_sample)), @@ -199,7 +215,8 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi detection_sample.features.append(feature) detection_request.samples.append(detection_sample) - with DETECTION_RESPONSE_TIME.time(): + with histogram_duration.labels(step="uldetection", **metric_labels).time(): + # with DETECTION_RESPONSE_TIME.time(): response: dbscan.DetectionResponse = dbscanserving_client.Detect( detection_request ) @@ -219,14 +236,16 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi else: kpi.kpi_value.int32Val = 0 - with MONITORING_RESPONSE_TIME.time(): + with histogram_duration.labels(step="includekpi", **metric_labels).time(): + # with MONITORING_RESPONSE_TIME.time(): monitoring_client.IncludeKpi(kpi) # if -1 in response.cluster_indices: # attack detected if kpi.kpi_value.int32Val == -1: attack = AttackDescription() attack.cs_id.uuid = request.service_id.service_uuid.uuid - with MITIGATION_RESPONSE_TIME.time(): + with histogram_duration.labels(step="mitigation", **metric_labels).time(): + # with MITIGATION_RESPONSE_TIME.time(): response: AttackResponse = attack_mitigator_client.NotifyAttack(attack) # if attack is detected, run the attack mitigator diff --git a/src/opticalattackdetector/service/__main__.py b/src/opticalattackdetector/service/__main__.py index d090dc1d2a4493210ee594817748b6cc6f8a88a2..27706fce44d9635fcdaf1cc64a2dfb6292fc7833 100644 --- a/src/opticalattackdetector/service/__main__.py +++ b/src/opticalattackdetector/service/__main__.py @@ -19,15 +19,14 @@ import threading from prometheus_client import start_http_server -from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS, ServiceNameEnum) +from common.Constants import ServiceNameEnum from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, - get_log_level, get_metrics_port, get_setting, + get_log_level, get_metrics_port, wait_for_environment_variables) + from opticalattackdetector.client.OpticalAttackDetectorClient import \ OpticalAttackDetectorClient -from opticalattackdetector.Config import GRPC_SERVICE_PORT from opticalattackdetector.service.OpticalAttackDetectorService import \ OpticalAttackDetectorService @@ -55,11 +54,6 @@ def main(): get_env_var_name( ServiceNameEnum.DBSCANSERVING, ENVVAR_SUFIX_SERVICE_PORT_GRPC ), - ] - ) - - wait_for_environment_variables( - [ get_env_var_name( ServiceNameEnum.OPTICALATTACKMITIGATOR, ENVVAR_SUFIX_SERVICE_HOST ), @@ -69,12 +63,6 @@ def main(): ] ) - service_port = get_setting( - "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC", default=GRPC_SERVICE_PORT - ) - max_workers = get_setting("MAX_WORKERS", default=DEFAULT_GRPC_MAX_WORKERS) - grace_period = get_setting("GRACE_PERIOD", default=DEFAULT_GRPC_GRACE_PERIOD) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -82,16 +70,14 @@ def main(): # Start metrics server metrics_port = get_metrics_port() - start_http_server(metrics_port) # TODO: make sure this is uncommented + start_http_server(metrics_port) # Starting CentralizedCybersecurity service - grpc_service = OpticalAttackDetectorService( - port=service_port, max_workers=max_workers, grace_period=grace_period - ) + grpc_service = OpticalAttackDetectorService() grpc_service.start() # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=0.1): + while not terminate.wait(timeout=1): pass LOGGER.info("Terminating...") diff --git a/src/opticalattackdetector/tests/test_unitary.py b/src/opticalattackdetector/tests/test_unitary.py index b1fc487fe45c73b011ce8b55b4dd14eace4365ad..72e3a4ac124bd034a76461baee316ab0f1139ed9 100644 --- a/src/opticalattackdetector/tests/test_unitary.py +++ b/src/opticalattackdetector/tests/test_unitary.py @@ -17,11 +17,11 @@ import os from unittest.mock import patch import pytest - from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, DEFAULT_GRPC_MAX_WORKERS) from common.proto import dbscanserving_pb2 as dbscan from common.proto.optical_attack_detector_pb2 import DetectionRequest + from opticalattackdetector.client.OpticalAttackDetectorClient import \ OpticalAttackDetectorClient from opticalattackdetector.Config import GRPC_SERVICE_PORT @@ -53,17 +53,9 @@ def optical_attack_detector_service(): max_workers=DEFAULT_GRPC_MAX_WORKERS, grace_period=DEFAULT_GRPC_GRACE_PERIOD, ) - # mocker_context_client = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client') - # mocker_context_client.start() - - # mocker_influx_db = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client') - # mocker_influx_db.start() - _service.start() yield _service _service.stop() - # mocker_context_client.stop() - # mocker_influx_db.stop() @pytest.fixture(scope="session") @@ -106,170 +98,3 @@ def test_detect_attack( mitigator.NotifyAttack.assert_called_once() monitoring.IncludeKpi.assert_called_once() dbscanserving.assert_called_once() - - -# def test_detect_attack_with_context( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb: -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# context.ListContextIds.return_value = cid_list - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_called_with(cid_list.context_ids[0]) -# influxdb.query.assert_called_once() - - -# def test_detect_attack_with_contexts( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb: -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# cid_list.context_ids.append(ContextId(**CONTEXT_ID_2)) -# context.ListContextIds.return_value = cid_list - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_any_call(cid_list.context_ids[0]) -# context.ListServices.assert_any_call(cid_list.context_ids[1]) -# influxdb.query.assert_called_once() - - -# def test_detect_attack_with_service( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" -# ) as dbscan: - -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# context.ListContextIds.return_value = cid_list - -# service_list = ServiceList() -# service_list.services.append(Service(**SERVICE_DEV1_DEV2)) -# context.ListServices.return_value = service_list - -# influxdb.query.return_value.get_points.return_value = [(1, 2), (3, 4)] - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_called_with(cid_list.context_ids[0]) -# influxdb.query.assert_called_once() -# dbscan.Detect.assert_called() - - -# def test_detect_attack_no_attack( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" -# ) as dbscan, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client" -# ) as mitigator: - -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# context.ListContextIds.return_value = cid_list - -# service_list = ServiceList() -# service_list.services.append(Service(**SERVICE_DEV1_DEV2)) -# context.ListServices.return_value = service_list - -# # dbscan.Detect.return_value = object() -# dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, 5] - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_called_with(cid_list.context_ids[0]) -# influxdb.query.assert_called_once() -# dbscan.Detect.assert_called() -# mitigator.NotifyAttack.assert_not_called() - - -# def test_detect_attack_with_attack( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# with patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" -# ) as context, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" -# ) as influxdb, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" -# ) as dbscan, patch( -# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client" -# ) as mitigator: - -# # setting up the mock -# cid_list = ContextIdList() -# cid_list.context_ids.append(ContextId(**CONTEXT_ID)) -# context.ListContextIds.return_value = cid_list - -# service_list = ServiceList() -# service_list.services.append(Service(**SERVICE_DEV1_DEV2)) -# context.ListServices.return_value = service_list - -# # dbscan.Detect.return_value = object() -# dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, -1] - -# # making the test -# request = Empty() -# optical_attack_detector_client.DetectAttack(request) - -# # checking behavior -# context.ListContextIds.assert_called_once() -# context.ListServices.assert_called_with(cid_list.context_ids[0]) -# influxdb.query.assert_called_once() -# dbscan.Detect.assert_called() -# mitigator.NotifyAttack.assert_called() - - -# def test_report_summarized_kpi( -# optical_attack_detector_client: OpticalAttackDetectorClient, -# ): -# kpi_list = KpiList() -# optical_attack_detector_client.ReportSummarizedKpi(kpi_list) - - -# def test_report_kpi(optical_attack_detector_client: OpticalAttackDetectorClient): -# kpi_list = KpiList() -# optical_attack_detector_client.ReportKpi(kpi_list) diff --git a/src/opticalattackmanager/Config.py b/src/opticalattackmanager/Config.py index 1cf42c6bcbcc42a928a49f21638ea39b08ff68dd..7cbb8423de9118b5e79e96b535e3831aa469b7b2 100644 --- a/src/opticalattackmanager/Config.py +++ b/src/opticalattackmanager/Config.py @@ -12,13 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - -# General settings -LOG_LEVEL = logging.DEBUG - -# service settings MONITORING_INTERVAL = 10 # monitoring interval in seconds - -# Prometheus settings -METRICS_PORT = 9192 diff --git a/src/opticalattackmanager/requirements.in b/src/opticalattackmanager/requirements.in index 91fa143a93bc7f9d5d2a649d52fc9a52c854a5cd..6f2b2b7d36ddab234f6d1f874ad3eb86173ce654 100644 --- a/src/opticalattackmanager/requirements.in +++ b/src/opticalattackmanager/requirements.in @@ -1,5 +1,19 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + grpcio==1.49.* grpcio-health-checking==1.49.* grpcio-tools==1.49.* grpclib[protobuf] -redis \ No newline at end of file +redis diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py index 2ea79d7146e93ed7a5abb7e485bdfbf3e0dc5af0..7c3c8b0526f1d8b06b92143ad43635d560b5153b 100644 --- a/src/opticalattackmanager/service/__main__.py +++ b/src/opticalattackmanager/service/__main__.py @@ -18,14 +18,12 @@ import math import pickle import signal import sys -import threading import time from concurrent.futures import ProcessPoolExecutor -from multiprocessing import Manager, Process +from multiprocessing import Event, Manager, Process from typing import Dict, List import redis -from grpclib.client import Channel from prometheus_client import Counter, Gauge, Histogram, start_http_server from common.Constants import ServiceNameEnum @@ -51,12 +49,12 @@ from common.Settings import ( ) from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient +from context.client.EventsCollector import EventsCollector from monitoring.client.MonitoringClient import MonitoringClient from opticalattackmanager.Config import MONITORING_INTERVAL -from opticalattackmanager.utils.EventsCollector import EventsCollector from opticalattackmanager.utils.monitor import delegate_services -terminate = threading.Event() +terminate = Event() LOGGER = None # SERVICE_LIST_MODE: @@ -174,14 +172,24 @@ def get_context_updates(terminate, service_list, cache): context_client: ContextClient = ContextClient() monitoring_client: MonitoringClient = MonitoringClient() - events_collector: EventsCollector = EventsCollector(context_client) + events_collector: EventsCollector = EventsCollector( + context_client, + log_events_received=True, + activate_connection_collector=False, + activate_context_collector=False, + activate_device_collector=False, + activate_link_collector=False, + activate_slice_collector=False, + activate_topology_collector=False, + activate_service_collector=True, + ) events_collector.start() LOGGER.info("Connected with components successfully... Waiting for events...") time.sleep(20) - while not terminate.wait(timeout=1): + while not terminate.wait(timeout=5): event = events_collector.get_event(block=True, timeout=1) if event is None: LOGGER.debug("No event received") @@ -245,8 +253,8 @@ def get_number_workers( async def monitor_services(terminate, service_list=None, cache=None): - host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST") - port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC")) + host = get_service_host(ServiceNameEnum.OPTICALATTACKDETECTOR) + port = get_metrics_port(ServiceNameEnum.OPTICALATTACKDETECTOR) cur_number_workers = MIN_NUMBER_WORKERS desired_monitoring_interval = 30 # defaults to 30 seconds @@ -398,7 +406,6 @@ async def monitor_services(terminate, service_list=None, cache=None): ) ) - # TODO: implement logic to only register change when the number changes # calculate new number of workers cur_number_workers = get_number_workers( cur_number_workers, duration, desired_monitoring_interval * 0.9 @@ -428,22 +435,12 @@ def main(): wait_for_environment_variables( [ + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST), get_env_var_name( ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC ), - ] - ) - - wait_for_environment_variables( - [ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - ] - ) - - wait_for_environment_variables( - [ get_env_var_name( ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST ), @@ -546,7 +543,7 @@ def main(): # asyncio.create_task(monitor_services(service_list)) # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=0.1): + while not terminate.wait(timeout=1): pass LOGGER.info("Terminating...") diff --git a/src/opticalattackmanager/utils/EventsCollector.py b/src/opticalattackmanager/utils/EventsCollector.py deleted file mode 100644 index f4aabc4c4d82c3187ad8ec815b68c831787f3990..0000000000000000000000000000000000000000 --- a/src/opticalattackmanager/utils/EventsCollector.py +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import queue -import threading - -import grpc - -from common.proto.context_pb2 import Empty -from common.tools.grpc.Tools import grpc_message_to_json_string -from context.client.ContextClient import ContextClient - -LOGGER = logging.getLogger(__name__) -LOGGER.setLevel(logging.DEBUG) - - -class EventsCollector: - def __init__( - self, context_client_grpc: ContextClient, log_events_received=False - ) -> None: - self._events_queue = queue.Queue() - self._log_events_received = log_events_received - - self._service_stream = context_client_grpc.GetServiceEvents(Empty()) - - self._service_thread = threading.Thread( - target=self._collect, args=(self._service_stream,), daemon=False - ) - - def _collect(self, events_stream) -> None: - try: - for event in events_stream: - if self._log_events_received: - LOGGER.info( - "[_collect] event: {:s}".format( - grpc_message_to_json_string(event) - ) - ) - self._events_queue.put_nowait(event) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member - raise # pragma: no cover - - def start(self): - self._service_thread.start() - - def get_event(self, block: bool = True, timeout: float = 0.1): - try: - return self._events_queue.get(block=block, timeout=timeout) - except queue.Empty: # pylint: disable=catching-non-exception - return None - - def get_events(self, block: bool = True, timeout: float = 0.1, count: int = None): - events = [] - if count is None: - while True: - event = self.get_event(block=block, timeout=timeout) - if event is None: - break - events.append(event) - else: - for _ in range(count): - event = self.get_event(block=block, timeout=timeout) - if event is None: - continue - events.append(event) - return sorted(events, key=lambda e: e.event.timestamp.timestamp) - - def stop(self): - self._service_stream.cancel() - - self._service_thread.join() diff --git a/src/opticalattackmanager/utils/monitor.py b/src/opticalattackmanager/utils/monitor.py index 2a40fe16f45ad102f06a43697a3f06cd4c9b6c0d..22dfa03fa45325138c749ec6256ce2b86f32502f 100644 --- a/src/opticalattackmanager/utils/monitor.py +++ b/src/opticalattackmanager/utils/monitor.py @@ -14,7 +14,6 @@ import asyncio import logging -from typing import List from grpclib.client import Channel from prometheus_client import Counter diff --git a/src/opticalattackmitigator/Config.py b/src/opticalattackmitigator/Config.py index a8c620ab8b22df54c4dd541160276b926c463cd9..38d04994fb0fa1951fb465bc127eb72659dc2eaf 100644 --- a/src/opticalattackmitigator/Config.py +++ b/src/opticalattackmitigator/Config.py @@ -11,14 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - -# General settings -LOG_LEVEL = logging.DEBUG - -# gRPC settings -GRPC_SERVICE_PORT = 10007 - -# Prometheus settings -METRICS_PORT = 9192 diff --git a/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py b/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py index 6851d349231dd2b428245b3ef91c09ce36191ffa..4d283202b95a840a6cbe84daf373a806f5a1f463 100644 --- a/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py +++ b/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import logging + +import grpc from common.Constants import ServiceNameEnum -from common.Settings import get_service_host, get_service_port_grpc -from common.tools.client.RetryDecorator import delay_exponential, retry -from common.tools.grpc.Tools import grpc_message_to_json from common.proto.optical_attack_mitigator_pb2 import (AttackDescription, AttackResponse) from common.proto.optical_attack_mitigator_pb2_grpc import AttackMitigatorStub +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import delay_exponential, retry +from common.tools.grpc.Tools import grpc_message_to_json LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -34,8 +36,10 @@ RETRY_DECORATOR = retry( class OpticalAttackMitigatorClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.OPTICALATTACKMITIGATOR) - if not port: port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKMITIGATOR) + if not host: + host = get_service_host(ServiceNameEnum.OPTICALATTACKMITIGATOR) + if not port: + port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKMITIGATOR) self.endpoint = "{:s}:{:s}".format(str(host), str(port)) LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) self.channel = None diff --git a/src/opticalattackmitigator/requirements.in b/src/opticalattackmitigator/requirements.in index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..38d04994fb0fa1951fb465bc127eb72659dc2eaf 100644 --- a/src/opticalattackmitigator/requirements.in +++ b/src/opticalattackmitigator/requirements.in @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalattackmitigator/service/OpticalAttackMitigatorService.py b/src/opticalattackmitigator/service/OpticalAttackMitigatorService.py index a37a3b5ca99d806706d075ebc1026f7c6dc6f124..c5a3ff4cdda1f800a5571b4b975a6427690834c9 100644 --- a/src/opticalattackmitigator/service/OpticalAttackMitigatorService.py +++ b/src/opticalattackmitigator/service/OpticalAttackMitigatorService.py @@ -13,83 +13,27 @@ # limitations under the License. import logging -from concurrent import futures -import grpc -from grpc_health.v1.health import OVERALL_HEALTH, HealthServicer -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server - -from common.Constants import (DEFAULT_GRPC_BIND_ADDRESS, - DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS) +from common.Constants import ServiceNameEnum from common.proto.optical_attack_mitigator_pb2_grpc import \ add_AttackMitigatorServicer_to_server -from opticalattackmitigator.Config import GRPC_SERVICE_PORT +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService + from opticalattackmitigator.service.OpticalAttackMitigatorServiceServicerImpl import \ OpticalAttackMitigatorServiceServicerImpl -BIND_ADDRESS = "0.0.0.0" LOGGER = logging.getLogger(__name__) -class OpticalAttackMitigatorService: +class OpticalAttackMitigatorService(GenericGrpcService): def __init__( self, - address=DEFAULT_GRPC_BIND_ADDRESS, - port=GRPC_SERVICE_PORT, - max_workers=DEFAULT_GRPC_MAX_WORKERS, - grace_period=DEFAULT_GRPC_GRACE_PERIOD, + cls_name=__name__, ): + port = get_service_port_grpc(ServiceNameEnum.OPTICALATTACKMITIGATOR) + super().__init__(port, cls_name=cls_name) + self.opticalattackmitigator_services = OpticalAttackMitigatorServiceServicerImpl() - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.attack_mitigator_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = "{:s}:{:s}".format(str(self.address), str(self.port)) - LOGGER.debug( - "Starting Service (tentative endpoint: {:s}, max_workers: {:s})...".format( - str(self.endpoint), str(self.max_workers) - ) - ) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - - self.attack_mitigator_servicer = OpticalAttackMitigatorServiceServicerImpl() - add_AttackMitigatorServicer_to_server( - self.attack_mitigator_servicer, self.server - ) - - self.health_servicer = HealthServicer( - experimental_non_blocking=True, - experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1), - ) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = "{:s}:{:s}".format(str(self.address), str(port)) - LOGGER.info("Listening on {:s}...".format(self.endpoint)) - self.server.start() - self.health_servicer.set( - OVERALL_HEALTH, HealthCheckResponse.SERVING - ) # pylint: disable=maybe-no-member - - LOGGER.debug("Service started") - - def stop(self): - LOGGER.debug( - "Stopping service (grace period {:s} seconds)...".format( - str(self.grace_period) - ) - ) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug("Service stopped") + def install_servicers(self): + add_AttackMitigatorServicer_to_server(self.opticalattackmitigator_services, self.server) diff --git a/src/opticalattackmitigator/service/OpticalAttackMitigatorServiceServicerImpl.py b/src/opticalattackmitigator/service/OpticalAttackMitigatorServiceServicerImpl.py index 31fbae28bf89f4b5432cae7de666bd2443a35c98..d6018b733f9dc2078027420cc2d55f627a12c1a7 100644 --- a/src/opticalattackmitigator/service/OpticalAttackMitigatorServiceServicerImpl.py +++ b/src/opticalattackmitigator/service/OpticalAttackMitigatorServiceServicerImpl.py @@ -15,16 +15,16 @@ import logging import grpc - +from common.method_wrappers.Decorator import (MetricsPool, + safe_and_metered_rpc_method) from common.proto.optical_attack_mitigator_pb2 import (AttackDescription, AttackResponse) from common.proto.optical_attack_mitigator_pb2_grpc import \ AttackMitigatorServicer -from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('OpticalAttackMitigator', 'RPC') +METRICS_POOL = MetricsPool("OpticalAttackMitigator", "RPC") class OpticalAttackMitigatorServiceServicerImpl(AttackMitigatorServicer): diff --git a/src/opticalattackmitigator/service/__main__.py b/src/opticalattackmitigator/service/__main__.py index 6f65c52b6df59eb640d4b7cbb2f1e86a33f4dbef..fc874f08d1e1915e91a1a3e749812cb35390978f 100644 --- a/src/opticalattackmitigator/service/__main__.py +++ b/src/opticalattackmitigator/service/__main__.py @@ -19,8 +19,7 @@ import threading from prometheus_client import start_http_server -from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, - DEFAULT_GRPC_MAX_WORKERS, ServiceNameEnum) +from common.Constants import ServiceNameEnum from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, get_setting, @@ -52,12 +51,6 @@ def main(): ] ) - service_port = get_setting( - "OPTICALATTACKMITIGATORSERVICE_SERVICE_PORT_GRPC", default=GRPC_SERVICE_PORT - ) - max_workers = get_setting("MAX_WORKERS", default=DEFAULT_GRPC_MAX_WORKERS) - grace_period = get_setting("GRACE_PERIOD", default=DEFAULT_GRPC_GRACE_PERIOD) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -68,9 +61,7 @@ def main(): start_http_server(metrics_port) # Starting CentralizedCybersecurity service - grpc_service = OpticalAttackMitigatorService( - port=service_port, max_workers=max_workers, grace_period=grace_period - ) + grpc_service = OpticalAttackMitigatorService() grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/opticalattackmitigator/tests/test_unitary.py b/src/opticalattackmitigator/tests/test_unitary.py index 8149ceb1b1388b20ea92734f73983da584c87ff3..68836c4e81dd8e88b34cf85c9e2a29fd9f5e5678 100644 --- a/src/opticalattackmitigator/tests/test_unitary.py +++ b/src/opticalattackmitigator/tests/test_unitary.py @@ -17,10 +17,10 @@ import os from unittest.mock import patch import pytest - from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, DEFAULT_GRPC_MAX_WORKERS) from common.proto.optical_attack_mitigator_pb2 import AttackDescription + from opticalattackmitigator.client.OpticalAttackMitigatorClient import \ OpticalAttackMitigatorClient from opticalattackmitigator.Config import GRPC_SERVICE_PORT diff --git a/src/tests/scenario3/optical/deploy_specs.sh b/src/tests/scenario3/optical/deploy_specs.sh index 4e998991618bde68e15329fe4431f7a8fd47f919..878013d8b82177e3d70aa432e01583f03eded237 100644 --- a/src/tests/scenario3/optical/deploy_specs.sh +++ b/src/tests/scenario3/optical/deploy_specs.sh @@ -16,27 +16,9 @@ # ----- TeraFlowSDN ------------------------------------------------------------ -# Set the URL of the internal MicroK8s Docker registry where the images will be uploaded to. -export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" - # Set the list of components, separated by spaces, you want to build images for, and deploy. export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator" -# Set the tag you want to use for your images. -export TFS_IMAGE_TAG="dev" - -# Set the name of the Kubernetes namespace to deploy TFS to. -export TFS_K8S_NAMESPACE="tfs" - -# Set additional manifest files to be applied after the deployment -export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml" - -# Set the new Grafana admin password -export TFS_GRAFANA_PASSWORD="admin123+" - -# Disable skip-build flag to rebuild the Docker images. -export TFS_SKIP_BUILD="" - # addition for the optical cybersecurity component export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager" export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml" @@ -44,87 +26,16 @@ export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml # ----- CockroachDB ------------------------------------------------------------ -# Set the namespace where CockroackDB will be deployed. -export CRDB_NAMESPACE="crdb" - -# Set the external port CockroackDB Postgre SQL interface will be exposed to. -export CRDB_EXT_PORT_SQL="26257" - -# Set the external port CockroackDB HTTP Mgmt GUI interface will be exposed to. -export CRDB_EXT_PORT_HTTP="8081" - -# Set the database username to be used by Context. -export CRDB_USERNAME="tfs" - -# Set the database user's password to be used by Context. -export CRDB_PASSWORD="tfs123" - -# Set the database name to be used by Context. -export CRDB_DATABASE="tfs" - -# Set CockroachDB installation mode to 'single'. This option is convenient for development and testing. -# See ./deploy/all.sh or ./deploy/crdb.sh for additional details -export CRDB_DEPLOY_MODE="single" # Disable flag for dropping database, if it exists. export CRDB_DROP_DATABASE_IF_EXISTS="YES" -# Disable flag for re-deploying CockroachDB from scratch. -export CRDB_REDEPLOY="" - - -# ----- NATS ------------------------------------------------------------------- - -# Set the namespace where NATS will be deployed. -export NATS_NAMESPACE="nats" - -# Set the external port NATS Client interface will be exposed to. -export NATS_EXT_PORT_CLIENT="4222" - -# Set the external port NATS HTTP Mgmt GUI interface will be exposed to. -export NATS_EXT_PORT_HTTP="8222" - -# Disable flag for re-deploying NATS from scratch. -export NATS_REDEPLOY="" # ----- QuestDB ---------------------------------------------------------------- -# Set the namespace where QuestDB will be deployed. -export QDB_NAMESPACE="qdb" - -# Set the external port QuestDB Postgre SQL interface will be exposed to. -export QDB_EXT_PORT_SQL="8812" - -# Set the external port QuestDB Influx Line Protocol interface will be exposed to. -export QDB_EXT_PORT_ILP="9009" - -# Set the external port QuestDB HTTP Mgmt GUI interface will be exposed to. -export QDB_EXT_PORT_HTTP="9000" - -# Set the database username to be used for QuestDB. -export QDB_USERNAME="admin" - -# Set the database user's password to be used for QuestDB. -export QDB_PASSWORD="quest" - -# Set the table name to be used by Monitoring for KPIs. -export QDB_TABLE_MONITORING_KPIS="tfs_monitoring_kpis" - -# Set the table name to be used by Slice for plotting groups. -export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups" - # Disable flag for dropping tables if they exist. export QDB_DROP_TABLES_IF_EXIST="YES" # Disable flag for re-deploying QuestDB from scratch. export QDB_REDEPLOY="" - - -# ----- K8s Observability ------------------------------------------------------ - -# Set the external port Prometheus Mgmt HTTP GUI interface will be exposed to. -export PROM_EXT_PORT_HTTP="9090" - -# Set the external port Grafana HTTP Dashboards will be exposed to. -export GRAF_EXT_PORT_HTTP="3000" diff --git a/src/tests/scenario3/optical/scaphandre.yaml b/src/tests/scenario3/optical/scaphandre.yaml index 6ec17e423580c57ff3acdbffc494277a8bd0b4b1..9cf98b5ee768055388e933910e9a02b56d231282 100644 --- a/src/tests/scenario3/optical/scaphandre.yaml +++ b/src/tests/scenario3/optical/scaphandre.yaml @@ -1,3 +1,6 @@ +# this file deploys a tool that enables the monitoring of energy consumption +# per component. +# More info: https://github.com/hubblo-org/scaphandre apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: