diff --git a/deploy.sh b/deploy.sh index ab7ab6555996e869df7fb8a93bef71aa2ca6dcb1..3ae4bbcaa0e7d665ff7f60e839f90e38a936feaf 100755 --- a/deploy.sh +++ b/deploy.sh @@ -58,17 +58,17 @@ kubectl delete namespace $TFS_K8S_NAMESPACE kubectl create namespace $TFS_K8S_NAMESPACE printf "\n" -if [[ "$TFS_COMPONENTS" == *"monitoring"* ]]; then - echo "Creating secrets for InfluxDB..." - #TODO: make sure to change this when having a production deployment - kubectl create secret generic influxdb-secrets --namespace=$TFS_K8S_NAMESPACE \ - --from-literal=INFLUXDB_DB="monitoring" --from-literal=INFLUXDB_ADMIN_USER="teraflow" \ - --from-literal=INFLUXDB_ADMIN_PASSWORD="teraflow" --from-literal=INFLUXDB_HTTP_AUTH_ENABLED="True" - kubectl create secret generic monitoring-secrets --namespace=$TFS_K8S_NAMESPACE \ - --from-literal=INFLUXDB_DATABASE="monitoring" --from-literal=INFLUXDB_USER="teraflow" \ - --from-literal=INFLUXDB_PASSWORD="teraflow" --from-literal=INFLUXDB_HOSTNAME="localhost" - printf "\n" -fi +# if [[ "$TFS_COMPONENTS" == *"monitoring"* ]]; then +# echo "Creating secrets for InfluxDB..." +# #TODO: make sure to change this when having a production deployment +# kubectl create secret generic influxdb-secrets --namespace=$TFS_K8S_NAMESPACE \ +# --from-literal=INFLUXDB_DB="monitoring" --from-literal=INFLUXDB_ADMIN_USER="teraflow" \ +# --from-literal=INFLUXDB_ADMIN_PASSWORD="teraflow" --from-literal=INFLUXDB_HTTP_AUTH_ENABLED="True" +# kubectl create secret generic monitoring-secrets --namespace=$TFS_K8S_NAMESPACE \ +# --from-literal=INFLUXDB_DATABASE="monitoring" --from-literal=INFLUXDB_USER="teraflow" \ +# --from-literal=INFLUXDB_PASSWORD="teraflow" --from-literal=INFLUXDB_HOSTNAME="localhost" +# printf "\n" +# fi echo "Deploying components and collecting environment variables..." ENV_VARS_SCRIPT=tfs_runtime_env_vars.sh @@ -76,6 +76,13 @@ echo "# Environment variables for TeraFlowSDN deployment" > $ENV_VARS_SCRIPT PYTHONPATH=$(pwd)/src echo "export PYTHONPATH=${PYTHONPATH}" >> $ENV_VARS_SCRIPT +# more info: https://www.containiq.com/post/deploy-redis-cluster-on-kubernetes +# generating password for Redis +REDIS_PASSWORD=`uuidgen` +kubectl create secret generic redis-secrets --namespace=$TFS_K8S_NAMESPACE \ + --from-literal=REDIS_PASSWORD=$REDIS_PASSWORD +echo "export REDIS_PASSWORD=${REDIS_PASSWORD}" >> $ENV_VARS_SCRIPT + for COMPONENT in $TFS_COMPONENTS; do echo "Processing '$COMPONENT' component..." IMAGE_NAME="$COMPONENT:$TFS_IMAGE_TAG" @@ -169,12 +176,12 @@ if [[ "$TFS_COMPONENTS" == *"webui"* ]] && [[ "$TFS_COMPONENTS" == *"monitoring" echo "Configuring WebUI DataStores and Dashboards..." sleep 3 - INFLUXDB_HOST="monitoringservice" - INFLUXDB_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service/monitoringservice -o jsonpath='{.spec.ports[?(@.name=="influxdb")].port}') - INFLUXDB_URL="http://${INFLUXDB_HOST}:${INFLUXDB_PORT}" - INFLUXDB_USER=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_ADMIN_USER}' | base64 --decode) - INFLUXDB_PASSWORD=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_ADMIN_PASSWORD}' | base64 --decode) - INFLUXDB_DATABASE=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_DB}' | base64 --decode) + # INFLUXDB_HOST="monitoringservice" + # INFLUXDB_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service/monitoringservice -o jsonpath='{.spec.ports[?(@.name=="influxdb")].port}') + # INFLUXDB_URL="http://${INFLUXDB_HOST}:${INFLUXDB_PORT}" + # INFLUXDB_USER=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_ADMIN_USER}' | base64 --decode) + # INFLUXDB_PASSWORD=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_ADMIN_PASSWORD}' | base64 --decode) + # INFLUXDB_DATABASE=$(kubectl --namespace $TFS_K8S_NAMESPACE get secrets influxdb-secrets -o jsonpath='{.data.INFLUXDB_DB}' | base64 --decode) # Exposed through the ingress controller "tfs-ingress" GRAFANA_HOSTNAME="127.0.0.1" diff --git a/manifests/cachingservice.yaml b/manifests/cachingservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..16e8cb5b6215b1fc5a13755fe6d576119f0c8357 --- /dev/null +++ b/manifests/cachingservice.yaml @@ -0,0 +1,57 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: cachingservice +spec: + selector: + matchLabels: + app: cachingservice + template: + metadata: + labels: + app: cachingservice + spec: + containers: + - name: redis + image: redis:7.0-alpine + env: + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-secrets + key: REDIS_PASSWORD + ports: + - containerPort: 6379 + name: client + command: ["redis-server"] + args: + - --requirepass + - $(REDIS_PASSWORD) +--- +apiVersion: v1 +kind: Service +metadata: + name: cachingservice +spec: + type: ClusterIP + selector: + app: cachingservice + ports: + - name: redis + port: 6379 + targetPort: 6379 \ No newline at end of file diff --git a/manifests/dbscanservingservice.yaml b/manifests/dbscanservingservice.yaml index 9553ed556bddaa437d89881f0c4220ae6e418239..883cc138e6a42aef7f2a1669b9b85d4c4712bd5b 100644 --- a/manifests/dbscanservingservice.yaml +++ b/manifests/dbscanservingservice.yaml @@ -31,16 +31,16 @@ spec: image: registry.gitlab.com/teraflow-h2020/controller/dbscanserving:latest imagePullPolicy: Always ports: - - containerPort: 10006 + - containerPort: 10008 env: - name: LOG_LEVEL value: "DEBUG" readinessProbe: exec: - command: ["/bin/grpc_health_probe", "-addr=:10006"] + command: ["/bin/grpc_health_probe", "-addr=:10008"] livenessProbe: exec: - command: ["/bin/grpc_health_probe", "-addr=:10006"] + command: ["/bin/grpc_health_probe", "-addr=:10008"] resources: requests: cpu: 250m @@ -59,5 +59,5 @@ spec: app: dbscanservingservice ports: - name: grpc - port: 10006 - targetPort: 10006 + port: 10008 + targetPort: 10008 diff --git a/manifests/opticalattackdetectorservice.yaml b/manifests/opticalattackdetectorservice.yaml index f19953df7b58b2ccbce725d1c4532dd65e7f4e11..ee806865552bf03470b6e0daaae1fccd9be1ad0f 100644 --- a/manifests/opticalattackdetectorservice.yaml +++ b/manifests/opticalattackdetectorservice.yaml @@ -32,15 +32,20 @@ spec: imagePullPolicy: Always ports: - containerPort: 10005 - envFrom: - - secretRef: - name: monitoring-secrets + env: + - name: LOG_LEVEL + value: "DEBUG" + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-secrets + key: REDIS_PASSWORD readinessProbe: exec: - command: ["/bin/grpc_health_probe", "-addr=:10005"] + command: ["/bin/grpc_health_probe", "-addr=:10006"] livenessProbe: exec: - command: ["/bin/grpc_health_probe", "-addr=:10005"] + command: ["/bin/grpc_health_probe", "-addr=:10006"] resources: requests: cpu: 250m @@ -59,5 +64,5 @@ spec: app: opticalattackdetectorservice ports: - name: grpc - port: 10005 - targetPort: 10005 + port: 10006 + targetPort: 10006 diff --git a/manifests/opticalattackmanagerservice.yaml b/manifests/opticalattackmanagerservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..a9e60ed8ac3190a441b681af0ce42bf200610941 --- /dev/null +++ b/manifests/opticalattackmanagerservice.yaml @@ -0,0 +1,62 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: opticalattackmanagerservice +spec: + selector: + matchLabels: + app: opticalattackmanagerservice + template: + metadata: + labels: + app: opticalattackmanagerservice + spec: + terminationGracePeriodSeconds: 5 + containers: + - name: server + image: registry.gitlab.com/teraflow-h2020/controller/opticalattackmanager:latest + imagePullPolicy: Always + ports: + - containerPort: 10005 + env: + - name: LOG_LEVEL + value: "DEBUG" + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-secrets + key: REDIS_PASSWORD + resources: + requests: + cpu: 250m + memory: 512Mi + limits: + cpu: 700m + memory: 1024Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: opticalattackmanagerservice +spec: + type: ClusterIP + selector: + app: opticalattackmanagerservice + ports: + - name: grpc + port: 10005 + targetPort: 10005 diff --git a/my_deploy.sh b/my_deploy.sh index dd0bb41c34e8f33b38d36e8cfc731390b4a6d85b..e0ae334ae4ce63356806ac3358ec49f6473a18f7 100644 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -7,7 +7,7 @@ export TFS_REGISTRY_IMAGE="http://localhost:32000/tfs/" # interdomain slice pathcomp dlt # dbscanserving opticalattackmitigator opticalcentralizedattackdetector # l3_attackmitigator l3_centralizedattackdetector l3_distributedattackdetector -export TFS_COMPONENTS="context device automation service compute monitoring webui dbscanserving opticalattackmitigator opticalattackdetector" +export TFS_COMPONENTS="context device automation service compute monitoring webui dbscanserving opticalattackmanager opticalattackmitigator opticalattackdetector " # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" @@ -16,7 +16,7 @@ export TFS_IMAGE_TAG="dev" export TFS_K8S_NAMESPACE="tfs" # Set additional manifest files to be applied after the deployment -export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml" +export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/cachingservice.yaml" # Set the neew Grafana admin password export TFS_GRAFANA_PASSWORD="admin123+" diff --git a/src/dbscanserving/Config.py b/src/dbscanserving/Config.py index 6604abe6cda5bd203621de9f09f48ca5f6c8256e..d3140b29373b0110c8571440db6816534131c482 100644 --- a/src/dbscanserving/Config.py +++ b/src/dbscanserving/Config.py @@ -18,7 +18,7 @@ import logging LOG_LEVEL = logging.DEBUG # gRPC settings -GRPC_SERVICE_PORT = 10006 +GRPC_SERVICE_PORT = 10008 GRPC_MAX_WORKERS = 10 GRPC_GRACE_PERIOD = 60 diff --git a/src/dbscanserving/client/DbscanServingClient.py b/src/dbscanserving/client/DbscanServingClient.py index 4e77cfb60d2c871b72966ab321506dc9e2bf8c69..0458deeb7465c8149ad6195956893bf2f880f49c 100644 --- a/src/dbscanserving/client/DbscanServingClient.py +++ b/src/dbscanserving/client/DbscanServingClient.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from email.policy import default import grpc, logging +from common.Settings import get_setting from common.tools.client.RetryDecorator import retry, delay_exponential from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse from common.proto.dbscanserving_pb2_grpc import DetectorStub @@ -23,8 +25,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class DbscanServingClient: - def __init__(self, address, port): - self.endpoint = '{:s}:{:s}'.format(str(address), str(port)) + def __init__(self, host=None, port=None): + if not host: host = get_setting('DBSCANSERVINGSERVICE_SERVICE_HOST', default="DBSCANSERVING") + if not port: port = get_setting('DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC', default=10008) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None self.stub = None @@ -42,7 +46,7 @@ class DbscanServingClient: @RETRY_DECORATOR def Detect(self, request : DetectionRequest) -> DetectionResponse: - LOGGER.debug('Detect request: {:s}'.format(str(request))) + LOGGER.debug('Detect request') response = self.stub.Detect(request) LOGGER.debug('Detect result: {:s}'.format(str(response))) return response diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index 14815e8f93efedc8f4aa0706819a62226cc8080d..21b47a7fb4605d81c5e174d218aa8e1b405381e8 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -42,31 +42,31 @@ DRIVERS.append( ], } ]), - (OpenConfigDriver, [ - { - # Real Packet Router, specifying OpenConfig Driver => use OpenConfigDriver - FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER, - FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, - } - ]), - (TransportApiDriver, [ - { - # Real OLS, specifying TAPI Driver => use TransportApiDriver - FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.OPTICAL_LINE_SYSTEM, - FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API, - } - ]), - (P4Driver, [ - { - # Real P4 Switch, specifying P4 Driver => use P4Driver - FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.P4_SWITCH, - FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.P4, - } - ]), - (IETFApiDriver, [ - { - FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MICROVAWE_RADIO_SYSTEM, - FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.IETF_NETWORK_TOPOLOGY, - } - ]), -] + # (OpenConfigDriver, [ + # { + # # Real Packet Router, specifying OpenConfig Driver => use OpenConfigDriver + # FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER, + # FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, + # } + # ]), + # (TransportApiDriver, [ + # { + # # Real OLS, specifying TAPI Driver => use TransportApiDriver + # FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.OPTICAL_LINE_SYSTEM, + # FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API, + # } + # ]), + # (P4Driver, [ + # { + # # Real P4 Switch, specifying P4 Driver => use P4Driver + # FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.P4_SWITCH, + # FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.P4, + # } + # ]), + # (IETFApiDriver, [ + # { + # FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.MICROVAWE_RADIO_SYSTEM, + # FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.IETF_NETWORK_TOPOLOGY, + # } + # ]), +) diff --git a/src/opticalattackdetector/Config.py b/src/opticalattackdetector/Config.py index f959cbfd40b57bb01fe2fdd183ab94607449bacf..ddc2843a673183117f8af8e1b1fcad3b650c6aaf 100644 --- a/src/opticalattackdetector/Config.py +++ b/src/opticalattackdetector/Config.py @@ -18,7 +18,7 @@ import logging LOG_LEVEL = logging.DEBUG # gRPC settings -GRPC_SERVICE_PORT = 10005 +GRPC_SERVICE_PORT = 10006 # service settings MONITORING_INTERVAL = 10 # monitoring interval in seconds diff --git a/src/opticalattackdetector/requirements.in b/src/opticalattackdetector/requirements.in index 30775bb658ff7d707fc15555b6d4c4b55f523949..d15b73f9d6671db272bba060d65f82c8ea3b9a3b 100644 --- a/src/opticalattackdetector/requirements.in +++ b/src/opticalattackdetector/requirements.in @@ -1 +1 @@ -influxdb +celery[redis] diff --git a/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py b/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py index 73c5c6abfb0b656b33469c1ba486c3add68d9d56..6488539a921fec505db712bc5dae9463747fa5f9 100644 --- a/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py +++ b/src/opticalattackdetector/service/OpticalAttackDetectorServiceServicerImpl.py @@ -13,17 +13,14 @@ # limitations under the License. import os, grpc, logging, random -from influxdb import InfluxDBClient from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from context.client.ContextClient import ContextClient from monitoring.client.MonitoringClient import MonitoringClient from service.client.ServiceClient import ServiceClient from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse, Sample from dbscanserving.client.DbscanServingClient import DbscanServingClient -from dbscanserving.Config import GRPC_SERVICE_PORT as DBSCANSERVING_GRPC_SERVICE_PORT from opticalattackmitigator.client.OpticalAttackMitigatorClient import OpticalAttackMitigatorClient from common.proto.optical_attack_mitigator_pb2 import AttackDescription, AttackResponse -from opticalattackmitigator.Config import GRPC_SERVICE_PORT as ATTACK_MITIGATOR_GRPC_SERVICE_PORT from common.proto.context_pb2 import (Empty, Context, ContextId, ContextIdList, ContextList, Service, ServiceId, ServiceIdList, ServiceList @@ -31,8 +28,6 @@ from common.proto.context_pb2 import (Empty, from common.proto.monitoring_pb2 import KpiList from common.proto.optical_attack_detector_pb2_grpc import ( OpticalAttackDetectorServiceServicer) -from opticalattackdetector.Config import ( - INFERENCE_SERVICE_ADDRESS, MONITORING_SERVICE_ADDRESS, ATTACK_MITIGATOR_SERVICE_ADDRESS) LOGGER = logging.getLogger(__name__) @@ -41,20 +36,11 @@ SERVICE_NAME = 'OpticalAttackDetector' METHOD_NAMES = ['NotifyServiceUpdate', 'DetectAttack', 'ReportSummarizedKpi', 'ReportKpi'] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) -INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") -INFLUXDB_USER = os.environ.get("INFLUXDB_USER") -INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") -INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") context_client: ContextClient = ContextClient() -influxdb_client: InfluxDBClient = InfluxDBClient( - host=MONITORING_SERVICE_ADDRESS, port=8086, username=INFLUXDB_USER, password=INFLUXDB_PASSWORD, - database=INFLUXDB_DATABASE) monitoring_client: MonitoringClient = MonitoringClient() -dbscanserving_client: DbscanServingClient = DbscanServingClient( - address=INFERENCE_SERVICE_ADDRESS, port=DBSCANSERVING_GRPC_SERVICE_PORT) +dbscanserving_client: DbscanServingClient = DbscanServingClient() service_client: ServiceClient = ServiceClient() -attack_mitigator_client: OpticalAttackMitigatorClient = OpticalAttackMitigatorClient( - address=ATTACK_MITIGATOR_SERVICE_ADDRESS, port=ATTACK_MITIGATOR_GRPC_SERVICE_PORT) +attack_mitigator_client: OpticalAttackMitigatorClient = OpticalAttackMitigatorClient() class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServicer): @@ -78,18 +64,10 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi for service in context_services.services: services.append(service) - # get monitoring data for each of the current services - results = influxdb_client.query('select * from samples;') - for service in services: for endpoint in service.service_endpoint_ids: # get instant KPI for this endpoint LOGGER.warning(f'service: {service.service_id.service_uuid.uuid}\t endpoint: {endpoint.endpoint_uuid.uuid}\tdevice: {endpoint.device_id.device_uuid.uuid}') - # how to get all KPIs for a particular device? - points = results.get_points(tags={'device_id': endpoint.device_id.device_uuid.uuid}) - print('points:', points) - for point in points: - print('\t', point) # run attack detection for every service request: DetectionRequest = DetectionRequest() diff --git a/src/opticalattackdetector/service/__main__.py b/src/opticalattackdetector/service/__main__.py index 4b7e9627af35d8e2048a96d22721d325c2d027d2..c0bea72ebc9d64618cc0caced378dbc390828351 100644 --- a/src/opticalattackdetector/service/__main__.py +++ b/src/opticalattackdetector/service/__main__.py @@ -12,30 +12,39 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, signal, sys, time, threading +import logging, signal, sys, time, threading, random from prometheus_client import start_http_server +from celery import Celery -from common.Constants import DEFAULT_GRPC_BIND_ADDRESS, DEFAULT_GRPC_MAX_WORKERS, DEFAULT_GRPC_GRACE_PERIOD +from common.Constants import DEFAULT_GRPC_MAX_WORKERS, DEFAULT_GRPC_GRACE_PERIOD from common.Settings import get_log_level, get_metrics_port, get_setting from opticalattackdetector.Config import ( GRPC_SERVICE_PORT, MONITORING_INTERVAL) from common.proto.context_pb2 import (Empty, Context, ContextId, ContextIdList, ContextList, - Service, ServiceId, ServiceIdList, ServiceList + Service, ServiceId, ServiceIdList, ServiceList, Timestamp ) +from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse, Sample +from common.proto.attack_mitigator_pb2 import AttackDescription, AttackResponse +from dbscanserving.client.DbscanServingClient import DbscanServingClient +from opticalattackmitigator.client.OpticalAttackMitigatorClient import OpticalAttackMitigatorClient from opticalattackdetector.service.OpticalAttackDetectorService import OpticalAttackDetectorService from opticalattackdetector.client.OpticalAttackDetectorClient import OpticalAttackDetectorClient +from monitoring.client.MonitoringClient import MonitoringClient +from common.proto.monitoring_pb2 import Kpi terminate = threading.Event() LOGGER = None client: OpticalAttackDetectorClient = None + def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') terminate.set() -def detect_attack(monitoring_interval): + +def detect_attack_old(monitoring_interval): time.sleep(10) # wait for the service to start LOGGER.info("Starting the attack detection loop") client = OpticalAttackDetectorClient(address='localhost', port=GRPC_SERVICE_PORT) @@ -60,7 +69,6 @@ def main(): service_port = get_setting('OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT) max_workers = get_setting('MAX_WORKERS', default=DEFAULT_GRPC_MAX_WORKERS ) grace_period = get_setting('GRACE_PERIOD', default=DEFAULT_GRPC_GRACE_PERIOD) - monitoring_interval = get_setting('MONITORING_INTERVAL', default=MONITORING_INTERVAL ) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -71,6 +79,10 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) + dbscanserving_client: DbscanServingClient = DbscanServingClient() + attack_mitigator_client: OpticalAttackMitigatorClient = OpticalAttackMitigatorClient() + monitoring_client: MonitoringClient = MonitoringClient() + # Starting CentralizedCybersecurity service grpc_service = OpticalAttackDetectorService( port=service_port, max_workers=max_workers, grace_period=grace_period) @@ -78,7 +90,64 @@ def main(): # p = multiprocessing.Process(target=detect_attack, args=(monitoring_interval, )) # p.start() - detect_attack(monitoring_interval) + # detect_attack(monitoring_interval) + + LOGGER.info('Connecting with REDIS...') + REDIS_PASSWORD = get_setting('REDIS_PASSWORD') + REDIS_HOST = get_setting('CACHINGSERVICE_SERVICE_HOST') + REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT_REDIS') + BROKER_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0' + app = Celery( + 'cybersecurity', + broker=BROKER_URL, + backend=BROKER_URL + ) + LOGGER.info('Connected to REDIS...') + + @app.task(name='detect_attack') + def detect_attack(context_id, service_id, kpi_id): + LOGGER.info('Detecting attack for {}/{}'.format(context_id, service_id)) + alien_samples = random.randint(2, 10) + # run attack detection for every service + request: DetectionRequest = DetectionRequest() + request.num_samples = 300 + alien_samples + request.num_features = 20 + request.eps = 100.5 + request.min_samples = 5 + for _ in range(200): + grpc_sample = Sample() + for __ in range(20): + grpc_sample.features.append(random.uniform(0., 10.)) + request.samples.append(grpc_sample) + for _ in range(100): + grpc_sample = Sample() + for __ in range(20): + grpc_sample.features.append(random.uniform(50., 60.)) + request.samples.append(grpc_sample) + for _ in range(alien_samples): + grpc_sample = Sample() + for __ in range(20): + grpc_sample.features.append(random.uniform(5000., 6000.)) + request.samples.append(grpc_sample) + response: DetectionResponse = dbscanserving_client.Detect(request) + + # including KPI + kpi = Kpi() + kpi.kpi_id.kpi_id.uuid = kpi_id + kpi.timestamp = Timestamp() + kpi.kpi_value.int32Val = response.cluster_indices[-1] + monitoring_client.IncludeKpi(kpi) + + if -1 in response.cluster_indices: # attack detected + attack = AttackDescription() + # attack.cs_id.uuid = service.service_id.service_uuid.uuid + response: AttackResponse = attack_mitigator_client.NotifyAttack(attack) + return 0 + + app.worker_main([ + 'worker', + '--loglevel={}'.format(log_level) + ]) # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass diff --git a/src/opticalattackmanager/.gitlab-ci.yml b/src/opticalattackmanager/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..6ca3b1807432733dda490b18ff3ea8d51dbea270 --- /dev/null +++ b/src/opticalattackmanager/.gitlab-ci.yml @@ -0,0 +1,110 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build, tag, and push the Docker image to the GitLab Docker registry +build opticalattackmanager: + variables: + IMAGE_NAME: 'opticalattackmanager' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: build + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + script: + - docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile . + - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + - docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + after_script: + - docker images --filter="dangling=true" --quiet | xargs -r docker rmi + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/**/*.{py,in,yml} + - src/$IMAGE_NAME/Dockerfile + - src/$IMAGE_NAME/tests/*.py + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + +# Apply unit test to the component +unit test opticalattackmanager: + variables: + IMAGE_NAME: 'opticalattackmanager' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: unit_test + needs: + - build opticalattackmanager + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi + - if docker container ls | grep redis; then docker rm -f redis; else echo "redis image is not in the system"; fi + - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi + script: + - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + - docker pull "redis:6.2" + - docker run --name redis -d --network=teraflowbridge redis:6.2 + - sleep 10 + - docker run --name $IMAGE_NAME -d -p 1010:1010 --env "DB_BACKEND=redis" --env "REDIS_SERVICE_HOST=redis" --env "REDIS_SERVICE_PORT=6379" --env "REDIS_DATABASE_ID=0" -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker ps -a + - docker logs $IMAGE_NAME + - docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml" + - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" + coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' + after_script: + - docker rm -f $IMAGE_NAME + - docker rm -f redis + - docker network rm teraflowbridge + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/**/*.{py,in,yml} + - src/$IMAGE_NAME/Dockerfile + - src/$IMAGE_NAME/tests/*.py + - src/$IMAGE_NAME/tests/Dockerfile + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + artifacts: + when: always + reports: + junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml + +# Deployment of the service in Kubernetes Cluster +deploy opticalattackmanager: + variables: + IMAGE_NAME: 'opticalattackmanager' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: deploy + needs: + - unit test opticalattackmanager + # - integ_test execute + script: + - 'sed -i "s/$IMAGE_NAME:.*/$IMAGE_NAME:$IMAGE_TAG/" manifests/${IMAGE_NAME}service.yaml' + - kubectl version + - kubectl get all + - kubectl apply -f "manifests/${IMAGE_NAME}service.yaml" + - kubectl get all + # environment: + # name: test + # url: https://example.com + # kubernetes: + # namespace: test + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + when: manual + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + when: manual diff --git a/src/opticalattackmanager/Config.py b/src/opticalattackmanager/Config.py new file mode 100644 index 0000000000000000000000000000000000000000..a3222dc475732687849962784d25deedb0d75544 --- /dev/null +++ b/src/opticalattackmanager/Config.py @@ -0,0 +1,24 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +# General settings +LOG_LEVEL = logging.DEBUG + +# service settings +MONITORING_INTERVAL = 10 # monitoring interval in seconds + +# Prometheus settings +METRICS_PORT = 9192 diff --git a/src/opticalattackmanager/Dockerfile b/src/opticalattackmanager/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..bbd680a5a4692178a1ee5121ce0fba9c9691270c --- /dev/null +++ b/src/opticalattackmanager/Dockerfile @@ -0,0 +1,87 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ && \ + rm -rf /var/lib/apt/lists/* + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 +ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Creating a user for security reasons +RUN groupadd -r opticalattackmanager && useradd -u 1001 --no-log-init -r -m -g opticalattackmanager opticalattackmanager +USER opticalattackmanager + +# set working directory +RUN mkdir -p /home/opticalattackmanager/teraflow/common/proto +WORKDIR /home/opticalattackmanager/teraflow + +# Get Python packages per module +ENV VIRTUAL_ENV=/home/opticalattackmanager/venv +RUN python3 -m venv ${VIRTUAL_ENV} +ENV PATH="${VIRTUAL_ENV}/bin:${PATH}" + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +WORKDIR /home/opticalattackmanager/teraflow +COPY --chown=opticalattackmanager:opticalattackmanager common_requirements.in common_requirements.in +RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Add common files into working directory +WORKDIR /home/opticalattackmanager/teraflow/common +COPY --chown=opticalattackmanager:opticalattackmanager src/common/. ./ + +# Create proto sub-folder, copy .proto files, and generate Python code +RUN mkdir -p /home/opticalattackmanager/teraflow/common/proto +WORKDIR /home/opticalattackmanager/teraflow/common/proto +RUN touch __init__.py +COPY --chown=opticalattackmanager:opticalattackmanager proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; + +# Create module sub-folders +RUN mkdir -p /home/opticalattackmanager/teraflow/opticalattackmanager +WORKDIR /home/opticalattackmanager/teraflow + +# Get Python packages per module +COPY --chown=opticalattackmanager:opticalattackmanager src/opticalattackmanager/requirements.in opticalattackmanager/requirements.in +RUN pip-compile --quiet --output-file=opticalattackmanager/requirements.txt opticalattackmanager/requirements.in +RUN python3 -m pip install -r opticalattackmanager/requirements.txt + +# Add files into working directory +COPY --chown=opticalattackmanager:opticalattackmanager src/context/. context +COPY --chown=opticalattackmanager:opticalattackmanager src/monitoring/. monitoring +COPY --chown=opticalattackmanager:opticalattackmanager src/service/. service +COPY --chown=opticalattackmanager:opticalattackmanager src/dbscanserving/. dbscanserving +COPY --chown=opticalattackmanager:opticalattackmanager src/opticalattackmitigator/. opticalattackmitigator +COPY --chown=opticalattackmanager:opticalattackmanager src/opticalattackmanager/. opticalattackmanager + +# Start opticalattackmanager service +ENTRYPOINT ["python", "-m", "opticalattackmanager.service"] diff --git a/src/opticalattackmanager/__init__.py b/src/opticalattackmanager/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/opticalattackmanager/requirements.in b/src/opticalattackmanager/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..1ce978e8401f43b5ce1b5a046d9fb3b650c303e5 --- /dev/null +++ b/src/opticalattackmanager/requirements.in @@ -0,0 +1 @@ +celery[redis] \ No newline at end of file diff --git a/src/opticalattackmanager/service/__init__.py b/src/opticalattackmanager/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..3652022d67250b8ae3aeab89fdeadaf7c715635a --- /dev/null +++ b/src/opticalattackmanager/service/__main__.py @@ -0,0 +1,173 @@ +import logging, signal, sys, time, threading +from multiprocessing import Manager, Process +from typing import List +from prometheus_client import start_http_server +from celery import Celery + +from common.Settings import get_log_level, get_metrics_port, get_setting +from common.proto.context_pb2 import ContextIdList, Empty, EventTypeEnum, ServiceId, ServiceIdList +from context.client.ContextClient import ContextClient +from opticalattackmanager.Config import MONITORING_INTERVAL +from common.proto.monitoring_pb2 import KpiDescriptor +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from monitoring.client.MonitoringClient import MonitoringClient + +terminate = threading.Event() +LOGGER = None + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning('Terminate signal received') + terminate.set() + + +def monitor_services(app: Celery, service_list: List[ServiceId]): + + monitoring_interval = get_setting('MONITORING_INTERVAL', default=MONITORING_INTERVAL) + + while not terminate.is_set(): + + time.sleep(monitoring_interval) + + LOGGER.info('Starting new monitoring cycle...') + + start_time = time.time() + + try: + tasks = [] + + for service in service_list: + LOGGER.debug('Scheduling service: {}'.format(service)) + tasks.append( + app.send_task('detect_attack', (service['context'], service['service'], service['kpi'])) + ) + + for task in tasks: + LOGGER.debug('Waiting for task {}...'.format(task)) + result = task.get() + LOGGER.debug('Result for task {} is {}...'.format(task, result)) + except Exception as e: + LOGGER.exception(e) + + end_time = time.time() + + diff = end_time - start_time + LOGGER.info('Monitoring loop with {} services took {} seconds...'.format(len(service_list), diff)) + + if diff / monitoring_interval > 0.9: + LOGGER.warning('Monitoring loop is taking {} % of the desired time ({} seconds)'.format((diff / monitoring_interval) * 100, monitoring_interval)) + + time.sleep(monitoring_interval - diff) + + +def create_kpi(client: MonitoringClient, service_id): + # create kpi + kpi_description: KpiDescriptor = KpiDescriptor() + kpi_description.kpi_description = 'Security status of service {}'.format(service_id) + kpi_description.service_id.service_uuid.uuid = service_id + kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN + new_kpi = client.SetKpi(kpi_description) + LOGGER.info('Created KPI {}...'.format(new_kpi.kpi_id)) + return new_kpi + + +def get_context_updates(service_list: List[ServiceId]): + # to make sure we are thread safe... + LOGGER.info('Connecting with context and monitoring components...') + context_client: ContextClient = ContextClient() + monitoring_client: MonitoringClient = MonitoringClient() + LOGGER.info('Connected successfully... Waiting for events...') + + time.sleep(20) + + for event in context_client.GetServiceEvents(Empty()): + LOGGER.info('Event received: {}'.format(event)) + if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: + LOGGER.info('Service created: {}'.format(event.service_id)) + kpi_id = create_kpi(monitoring_client, event.service_id.service_uuid.uuid) + service_list.append({'context': event.service_id.context_id.context_uuid.uuid, 'service': event.service_id.service_uuid.uuid, 'kpi': kpi_id.kpi_id.uuid}) + + elif event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE: + LOGGER.info('Service removed: {}'.format(event.service_id)) + # find service and remove it from the list of currently monitored + for service in service_list: + if service['service'] == event.service_id.service_uuid.uuid and service['context'] == event.service_id.context_id.context_uuid.uuid: + service_list.remove(service) + break + # service_list.remove({'context': event.service_id.context_id.context_uuid.uuid, 'service': event.service_id.service_uuid.uuid}) + + if terminate.is_set(): # if terminate is set + LOGGER.warning('Stopping execution of the get_context_updates...') + context_client.close() + monitoring_client.close() + break # break the while and stop execution + LOGGER.debug('Waiting for next event...') + + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level) + LOGGER = logging.getLogger(__name__) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info('Starting...') + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + LOGGER.info('Connecting with context component...') + context_client: ContextClient = ContextClient() + context_client.connect() + LOGGER.info('Connected successfully...') + + LOGGER.info('Connecting with REDIS...') + REDIS_PASSWORD = get_setting('REDIS_PASSWORD') + REDIS_HOST = get_setting('CACHINGSERVICE_SERVICE_HOST') + REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT_REDIS') + BROKER_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0' + app = Celery( + 'cybersecurity', + broker=BROKER_URL, + backend=BROKER_URL + ) + LOGGER.info('Connected to REDIS...') + + # creating a thread-safe list to be shared among threads + service_list = Manager().list() + + context_ids: ContextIdList = context_client.ListContextIds(Empty()) + + # populate with initial services + for context_id in context_ids.context_ids: + context_services: ServiceIdList = context_client.ListServiceIds(context_id) + for service in context_services.service_ids: + kpi_id = create_kpi(service.service_uuid.uuid) + service_list.append({'context': context_id.context_uuid.uuid, 'service': service.service_uuid.uuid, 'kpi': kpi_id}) + + context_client.close() + + # starting background process to monitor service addition/removal + process_context = Process(target=get_context_updates, args=(service_list,)) + process_context.start() + + monitor_services(app, service_list) + + # process_security_loop = Process(target=monitor_services, args=(app, service_list)) + # process_security_loop.start() + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=0.1): pass + + LOGGER.info('Terminating...') + process_context.kill() + # process_security_loop.kill() + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py b/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py index fe290772d6b33f171b092dbb16975583ee69ff56..ad2ff7928af0564a7f8cd2c79a491807a57e7a74 100644 --- a/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py +++ b/src/opticalattackmitigator/client/OpticalAttackMitigatorClient.py @@ -13,6 +13,7 @@ # limitations under the License. import grpc, logging +from common.Settings import get_setting from common.tools.client.RetryDecorator import retry, delay_exponential from common.proto.optical_attack_mitigator_pb2 import AttackDescription, AttackResponse from common.proto.optical_attack_mitigator_pb2_grpc import AttackMitigatorStub @@ -23,8 +24,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class OpticalAttackMitigatorClient: - def __init__(self, address, port): - self.endpoint = '{:s}:{:s}'.format(str(address), str(port)) + def __init__(self, host=None, port=None): + if not host: host = get_setting('OPTICALATTACKMITIGATORSERVICE_SERVICE_HOST', default="DBSCANSERVING") + if not port: port = get_setting('OPTICALATTACKMITIGATORSERVICE_SERVICE_PORT_GRPC', default=10007) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None self.stub = None