diff --git a/deploy/tfs.sh b/deploy/tfs.sh index 399d3a248414955d2bfbd1a67a4082cadd0c22d6..0fe2f60f922f9002a5448587480c85925325fc48 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -169,11 +169,17 @@ kubectl create secret generic qdb-data --namespace ${TFS_K8S_NAMESPACE} --type=' --from-literal=METRICSDB_PASSWORD=${QDB_PASSWORD} printf "\n" +echo "Create Redis secret..." +REDIS_PASSWORD=`uuidgen` +kubectl create secret generic redis-secrets --namespace=$TFS_K8S_NAMESPACE \ + --from-literal=REDIS_PASSWORD=$REDIS_PASSWORD + echo "Deploying components and collecting environment variables..." ENV_VARS_SCRIPT=tfs_runtime_env_vars.sh echo "# Environment variables for TeraFlowSDN deployment" > $ENV_VARS_SCRIPT PYTHONPATH=$(pwd)/src echo "export PYTHONPATH=${PYTHONPATH}" >> $ENV_VARS_SCRIPT +echo "export REDIS_PASSWORD=${REDIS_PASSWORD}" >> $ENV_VARS_SCRIPT for COMPONENT in $TFS_COMPONENTS; do echo "Processing '$COMPONENT' component..." diff --git a/manifests/opticalattackmanagerservice.yaml b/manifests/opticalattackmanagerservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..e7b010f8cfac66ffe295086805e1ae6a12b4bc16 --- /dev/null +++ b/manifests/opticalattackmanagerservice.yaml @@ -0,0 +1,86 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: opticalattackmanagerservice +spec: + selector: + matchLabels: + app: opticalattackmanagerservice + template: + metadata: + labels: + app: opticalattackmanagerservice + spec: + terminationGracePeriodSeconds: 5 + containers: + - name: server + image: labs.etsi.org:5050/tfs/controller/opticalattackmanager:latest + imagePullPolicy: Always + ports: + - containerPort: 10005 + - containerPort: 9192 + env: + - name: LOG_LEVEL + value: "DEBUG" + - name: MONITORING_INTERVAL + value: "30" + - name: OPTICALATTACKMANAGERSERVICE_LOOP_MIN_WORKERS + value: "2" # remember to align this with the resource limits + - name: OPTICALATTACKMANAGERSERVICE_LOOP_MAX_WORKERS + value: "10" # remember to align this with the resource limits + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-secrets + key: REDIS_PASSWORD + resources: + requests: + cpu: 250m + memory: 128Mi + limits: + cpu: 10000m + memory: 10240Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: opticalattackmanagerservice + labels: + app: opticalattackmanagerservice +spec: + type: ClusterIP + selector: + app: opticalattackmanagerservice + ports: + - name: grpc + port: 10005 + targetPort: 10005 + - name: metrics + port: 9192 + targetPort: 9192 +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: opticalattackdetectorservice-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: opticalattackdetectorservice + minReplicas: 1 + maxReplicas: 1 diff --git a/my_deploy.sh b/my_deploy.sh index d6f3513e9b2090905b7814c4563644ecda7bd2c6..bfe2978a6720bf995d320ba788f813bfa83757f6 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -22,6 +22,9 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator" +# addition for the optical cybersecurity component +export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager" + # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" @@ -29,7 +32,7 @@ export TFS_IMAGE_TAG="dev" export TFS_K8S_NAMESPACE="tfs" # Set additional manifest files to be applied after the deployment -export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml" +export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml manifests/cachingservice.yaml" # Set the new Grafana admin password export TFS_GRAFANA_PASSWORD="admin123+" @@ -63,7 +66,7 @@ export CRDB_DATABASE="tfs" export CRDB_DEPLOY_MODE="single" # Disable flag for dropping database, if it exists. -export CRDB_DROP_DATABASE_IF_EXISTS="" +export CRDB_DROP_DATABASE_IF_EXISTS="YES" # Disable flag for re-deploying CockroachDB from scratch. export CRDB_REDEPLOY="" @@ -111,7 +114,7 @@ export QDB_TABLE_MONITORING_KPIS="tfs_monitoring_kpis" export QDB_TABLE_SLICE_GROUPS="tfs_slice_groups" # Disable flag for dropping tables if they exist. -export QDB_DROP_TABLES_IF_EXIST="" +export QDB_DROP_TABLES_IF_EXIST="YES" # Disable flag for re-deploying QuestDB from scratch. export QDB_REDEPLOY="" diff --git a/src/opticalattackmanager/.gitlab-ci.yml b/src/opticalattackmanager/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..06aa9eb92defa77c46a92f339b6a07fff2d7b984 --- /dev/null +++ b/src/opticalattackmanager/.gitlab-ci.yml @@ -0,0 +1,105 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build, tag, and push the Docker image to the GitLab Docker registry +build opticalattackmanager: + variables: + IMAGE_NAME: 'opticalattackmanager' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: build + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + script: + - docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile . + - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + - docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + after_script: + - docker images --filter="dangling=true" --quiet | xargs -r docker rmi + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/**/*.{py,in,yml} + - src/$IMAGE_NAME/Dockerfile + - src/$IMAGE_NAME/tests/*.py + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + +# Apply unit test to the component +unit test opticalattackmanager: + variables: + IMAGE_NAME: 'opticalattackmanager' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: unit_test + needs: + - build opticalattackmanager + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi + - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi + script: + - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + - docker run --name $IMAGE_NAME -d -p 10005:10005 -v "$PWD/src/$IMAGE_NAME/tests:/home/teraflow/$IMAGE_NAME/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker ps -a + - docker logs $IMAGE_NAME + - docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/home/teraflow/$IMAGE_NAME/results/${IMAGE_NAME}_report.xml" + - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" + coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' + after_script: + - docker rm -f $IMAGE_NAME + - docker network rm teraflowbridge + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/common/**/*.py + - proto/*.proto + - src/$IMAGE_NAME/**/*.{py,in,yml} + - src/$IMAGE_NAME/Dockerfile + - src/$IMAGE_NAME/tests/*.py + - src/$IMAGE_NAME/tests/Dockerfile + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml + artifacts: + when: always + reports: + junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml + +# Deployment of the service in Kubernetes Cluster +deploy opticalattackmanager: + variables: + IMAGE_NAME: 'opticalattackmanager' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: deploy + needs: + - unit test opticalattackmanager + # - integ_test execute + script: + - 'sed -i "s/$IMAGE_NAME:.*/$IMAGE_NAME:$IMAGE_TAG/" manifests/${IMAGE_NAME}service.yaml' + - kubectl version + - kubectl get all + - kubectl apply -f "manifests/${IMAGE_NAME}service.yaml" + - kubectl get all + # environment: + # name: test + # url: https://example.com + # kubernetes: + # namespace: test + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + when: manual + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + when: manual diff --git a/src/opticalattackmanager/Config.py b/src/opticalattackmanager/Config.py new file mode 100644 index 0000000000000000000000000000000000000000..a3222dc475732687849962784d25deedb0d75544 --- /dev/null +++ b/src/opticalattackmanager/Config.py @@ -0,0 +1,24 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +# General settings +LOG_LEVEL = logging.DEBUG + +# service settings +MONITORING_INTERVAL = 10 # monitoring interval in seconds + +# Prometheus settings +METRICS_PORT = 9192 diff --git a/src/opticalattackmanager/Dockerfile b/src/opticalattackmanager/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..746c65bf04f04442653627b864b02c8308163301 --- /dev/null +++ b/src/opticalattackmanager/Dockerfile @@ -0,0 +1,93 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ nano && \ + rm -rf /var/lib/apt/lists/* + +# TODO: remove nano from installation + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 +ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Creating a user for security reasons +RUN groupadd -r teraflow && useradd -u 1001 --no-log-init -r -m -g teraflow teraflow +USER teraflow + +# set working directory +RUN mkdir -p /home/teraflow/controller/common/ +WORKDIR /home/teraflow/controller + +# Get Python packages per module +ENV VIRTUAL_ENV=/home/teraflow/venv +RUN python3 -m venv ${VIRTUAL_ENV} +ENV PATH="${VIRTUAL_ENV}/bin:${PATH}" + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +COPY --chown=teraflow:teraflow common_requirements.in common_requirements.in +COPY --chown=teraflow:teraflow src/opticalattackmanager/requirements.in opticalattackmanager/requirements.in +RUN sed -i '/protobuf/d' common_requirements.in && sed -i '/grpc/d' common_requirements.in +RUN pip-compile --output-file=common_requirements.txt common_requirements.in opticalattackmanager/requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Get Python packages per module +# COPY --chown=opticalattackmanager:opticalattackmanager src/opticalattackmanager/requirements.in opticalattackmanager/requirements.in +# RUN pip-compile --quiet --output-file=opticalattackmanager/requirements.txt opticalattackmanager/requirements.in +# RUN python3 -m pip install -r opticalattackmanager/requirements.txt + +# Add common files into working directory +WORKDIR /home/teraflow/controller/common +COPY --chown=teraflow:teraflow src/common/. ./ + +# Create proto sub-folder, copy .proto files, and generate Python code +WORKDIR /home/teraflow/controller/common/proto +RUN touch __init__.py +RUN mkdir -p /home/teraflow/controller/common/proto/asyncio +RUN touch asyncio/__init__.py +COPY --chown=teraflow:teraflow proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +# new line added to generate protobuf for the `grpclib` library +RUN python3 -m grpc_tools.protoc -I=./ --python_out=./asyncio --grpclib_python_out=./asyncio *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; + +# Create module sub-folders +RUN mkdir -p /home/teraflow/controller/opticalattackmanager +WORKDIR /home/teraflow/controller + +# Add files into working directory +COPY --chown=teraflow:teraflow src/context/. context +COPY --chown=teraflow:teraflow src/monitoring/. monitoring +COPY --chown=teraflow:teraflow src/dbscanserving/. dbscanserving +COPY --chown=teraflow:teraflow src/opticalattackdetector/. opticalattackdetector +COPY --chown=teraflow:teraflow src/opticalattackmitigator/. opticalattackmitigator +COPY --chown=teraflow:teraflow src/opticalattackmanager/. opticalattackmanager + +# Start opticalattackmanager service +ENTRYPOINT ["python", "-m", "opticalattackmanager.service"] diff --git a/src/opticalattackmanager/__init__.py b/src/opticalattackmanager/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..9953c820575d42fa88351cc8de022d880ba96e6a --- /dev/null +++ b/src/opticalattackmanager/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalattackmanager/requirements.in b/src/opticalattackmanager/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..91fa143a93bc7f9d5d2a649d52fc9a52c854a5cd --- /dev/null +++ b/src/opticalattackmanager/requirements.in @@ -0,0 +1,5 @@ +grpcio==1.49.* +grpcio-health-checking==1.49.* +grpcio-tools==1.49.* +grpclib[protobuf] +redis \ No newline at end of file diff --git a/src/opticalattackmanager/service/__init__.py b/src/opticalattackmanager/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..9953c820575d42fa88351cc8de022d880ba96e6a --- /dev/null +++ b/src/opticalattackmanager/service/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..08c0c7f77994879b44ccba956897bcd06103e4f3 --- /dev/null +++ b/src/opticalattackmanager/service/__main__.py @@ -0,0 +1,572 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import math +import pickle +import signal +import sys +import threading +import time +from concurrent.futures import ProcessPoolExecutor +from multiprocessing import Manager, Process +from typing import Dict, List + +import redis +from grpclib.client import Channel +from prometheus_client import Counter, Gauge, Histogram, start_http_server + +from common.Constants import ServiceNameEnum +from common.proto.context_pb2 import ( + ContextIdList, + Empty, + EventTypeEnum, + Service, + ServiceIdList, + ServiceTypeEnum, +) +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.monitoring_pb2 import KpiDescriptor +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, + ENVVAR_SUFIX_SERVICE_PORT_GRPC, + get_env_var_name, + get_log_level, + get_metrics_port, + get_service_host, + get_setting, + wait_for_environment_variables, +) +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from monitoring.client.MonitoringClient import MonitoringClient +from opticalattackmanager.Config import MONITORING_INTERVAL +from opticalattackmanager.utils.EventsCollector import EventsCollector +from opticalattackmanager.utils.monitor import delegate_services + +terminate = threading.Event() +LOGGER = None + +# SERVICE_LIST_MODE: +# 1 => use Redis +LIST_REDIS_MODE = 1 +# 2 => use shared list +LIST_SHARED_MODE = 2 +SERVICE_LIST_MODE = int( + get_setting("OPTICALATTACKMANAGER_SERVICE_LIST_MODE", default=1) +) +SERVICE_LIST_KEY = get_setting( + "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services" +) +MIN_NUMBER_WORKERS = int( + get_setting("OPTICALATTACKMANAGERSERVICE_LOOP_MIN_WORKERS", default=2) +) +MAX_NUMBER_WORKERS = int( + get_setting("OPTICALATTACKMANAGERSERVICE_LOOP_MAX_WORKERS", default=10) +) + +# Create a metric to track time spent and requests made. +# TODO: adjust histogram buckets to more realistic values +LOOP_TIME = Histogram( + "OPTICAL_ATTACK_MANAGER_loop_seconds", + "Time taken by each security loop", + buckets=( + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + 12.5, + 15.0, + 17.5, + 20.0, + 22.5, + 25.0, + 27.5, + 30.0, + 32.5, + 35.0, + 37.5, + 40.0, + 42.5, + 45.0, + 47.5, + 50.0, + 52.5, + 55.0, + 57.5, + 60.0, + float("inf"), + ), +) + +CURRENT_SERVICES = Gauge( + "OPTICAL_ATTACK_MANAGER_active_services", + "Active optical services currently in the network", +) + +NUMBER_WORKERS = Gauge( + "OPTICAL_ATTACK_MANAGER_number_workers", + "Number of workers being used by the loop", +) + +DESIRED_MONITORING_INTERVAL = Gauge( + "OPTICAL_ATTACK_MANAGER_desired_monitoring_interval", + "Desired loop monitoring interval", +) + +global service_list +global cache + + +def append_service( + info: Dict[str, str], service_list: List = None, cache: redis.Redis = None +) -> None: + if SERVICE_LIST_MODE == LIST_REDIS_MODE: + cache.lpush(SERVICE_LIST_KEY, pickle.dumps(info)) + elif SERVICE_LIST_MODE == LIST_SHARED_MODE: + service_list.append(info) + + +def delete_service( + info: Dict[str, str], service_list: List = None, cache: redis.Redis = None +) -> None: + # here we need to test if the service exists in the list because it has been + # already deleted and there is not way of knowing if it is optical or not + if SERVICE_LIST_MODE == LIST_REDIS_MODE: + service_list = cache.lrange(SERVICE_LIST_KEY, 0, -1) + for encoded in service_list: + service = pickle.loads(encoded) + if ( + service["service"] == info["service"] + and service["context"] == info["context"] + ): + cache.lrem(SERVICE_LIST_KEY, 1, encoded) + break + elif SERVICE_LIST_MODE == LIST_SHARED_MODE: + # find service and remove it from the list of currently monitored + for service in service_list: + if ( + service["service"] == info["service"] + and service["context"] == info["context"] + ): + service_list.remove(service) + break + + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning("Terminate signal received") + terminate.set() + + +def create_kpi(client: MonitoringClient, service_id): + # create kpi + kpi_description: KpiDescriptor = KpiDescriptor() + kpi_description.kpi_description = "Security status of service {}".format(service_id) + kpi_description.service_id.service_uuid.uuid = service_id + kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN + new_kpi = client.SetKpi(kpi_description) + LOGGER.debug("Created KPI {}: ".format(grpc_message_to_json_string(new_kpi))) + return new_kpi + + +def get_context_updates(terminate, service_list, cache): + # to make sure we are thread safe... + LOGGER.info("Connecting with context and monitoring components...") + context_client: ContextClient = ContextClient() + monitoring_client: MonitoringClient = MonitoringClient() + + events_collector: EventsCollector = EventsCollector(context_client) + events_collector.start() + + LOGGER.info("Connected with components successfully... Waiting for events...") + + time.sleep(20) + + while not terminate.wait(timeout=1): + event = events_collector.get_event(block=True, timeout=1) + if event is None: + LOGGER.debug("No event received") + continue # no event received + LOGGER.debug("Event received: {}".format(grpc_message_to_json_string(event))) + if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: + service: Service = context_client.GetService(event.service_id) + # only add if service is of type TAPI + if ( + service.service_type + == ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE + ): + LOGGER.info( + "Service created: {}".format( + grpc_message_to_json_string(event.service_id) + ) + ) + kpi_id = create_kpi( + monitoring_client, event.service_id.service_uuid.uuid + ) + + append_service( + { + "context": event.service_id.context_id.context_uuid.uuid, + "service": event.service_id.service_uuid.uuid, + "kpi": kpi_id.kpi_id.uuid, + }, + service_list=service_list, + cache=cache, + ) + + elif event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE: + # cannot fetch service details because it does not exist anymore + LOGGER.info( + "Service removed: {}".format( + grpc_message_to_json_string(event.service_id) + ) + ) + delete_service( + { + "service": event.service_id.service_uuid.uuid, + "context": event.service_id.context_id.context_uuid.uuid, + }, + service_list=service_list, + cache=cache, + ) + + events_collector.stop() + + +def get_number_workers( + cur_value: int, cur_duration: float, desired_duration: float +) -> int: + factor = cur_duration / desired_duration + desired_number = cur_value * factor + new_value = min( + MAX_NUMBER_WORKERS, max(MIN_NUMBER_WORKERS, math.ceil(desired_number)) + ) + return new_value + + +async def monitor_services(terminate, service_list=None, cache=None): + + host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST") + port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC")) + + cur_number_workers = MIN_NUMBER_WORKERS + desired_monitoring_interval = 30 # defaults to 30 seconds + DESIRED_MONITORING_INTERVAL.set(desired_monitoring_interval) + + event_loop = asyncio.get_running_loop() + + LOGGER.info("Starting execution of the async loop") + + while not terminate.is_set(): + # we account the entire set of procedures as the loop time + start_time = time.time() + + # obtain desired monitoring interval + temp = cache.get("MONITORING_INTERVAL") + if temp is not None: + new_desired_monitoring_interval = int(temp) + else: + # if not set in Redis, fallback to the environment variable + new_desired_monitoring_interval = int( + get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL) + ) + cache.set("MONITORING_INTERVAL", new_desired_monitoring_interval) + + # only reports when changes happen + if desired_monitoring_interval != new_desired_monitoring_interval: + LOGGER.info( + "Changing monitoring interval from {} [sec.] to {} [sec.]".format( + desired_monitoring_interval, new_desired_monitoring_interval + ) + ) + desired_monitoring_interval = new_desired_monitoring_interval + DESIRED_MONITORING_INTERVAL.set(desired_monitoring_interval) + + pool_executor = ProcessPoolExecutor(max_workers=cur_number_workers) + NUMBER_WORKERS.set(cur_number_workers) + + current_list = [] + if SERVICE_LIST_MODE == LIST_REDIS_MODE: + current_list.extend( + [ + pickle.loads(service) + for service in cache.lrange(SERVICE_LIST_KEY, 0, -1) + ] + ) + elif SERVICE_LIST_MODE == LIST_SHARED_MODE: + current_list.extend(service_list) + + CURRENT_SERVICES.set(len(current_list)) + + if len(current_list) == 0: + LOGGER.info( + f"No services to monitor... {desired_monitoring_interval} [sec.] / {cur_number_workers} workers" + ) + + duration = time.time() - start_time + + # calculate new number of workers + cur_number_workers = get_number_workers( + cur_number_workers, duration, desired_monitoring_interval + ) + + LOOP_TIME.observe(0) # ignore internal procedure time + time.sleep(desired_monitoring_interval - duration) + continue + + LOGGER.info( + "Starting new monitoring cycle for {} sec. with {} for {} " + "services with {} workers...".format( + desired_monitoring_interval, + "REDIS" if SERVICE_LIST_MODE == 1 else "local", + len(current_list), + cur_number_workers, + ) + ) + + # start standard implementation + # tasks = [] + # for service in current_list: + # aw = detect_attack( + # host, + # port, + # service["context"], + # service["service"], + # service["kpi"], + # # allow at most 90% of the monitoring interval to succeed + # monitoring_interval * 0.9, + # ) + # tasks.append(aw) + # [await aw for aw in tasks] + # end standard implementation + + # start pool implementation + if len(current_list) == 0: # guard clause to re-check if services still there + LOGGER.info( + f"No services to monitor... " + f"{desired_monitoring_interval} / {cur_number_workers}" + ) + + duration = time.time() - start_time + + # calculate new number of workers + cur_number_workers = get_number_workers( + cur_number_workers, duration, desired_monitoring_interval + ) + + LOOP_TIME.observe(0) + time.sleep(desired_monitoring_interval - duration) + continue + + # process to get (close to) equal slices: + # https://stackoverflow.com/questions/2130016/splitting-a-list-into-n-parts-of-approximately-equal-length + k, m = divmod(len(current_list), cur_number_workers) + if k == 0: # happens when a single process is working + k = m + m = 0 + # dividing async work across multiple processes: + # https://stackoverflow.com/questions/69729488/how-to-run-multiple-asyncio-loops-inside-syncrhonous-sub-processes-inside-a-main + # https://stackoverflow.com/questions/65557258/typeerror-cant-pickle-coroutine-objects-when-i-am-using-asyncio-loop-run-in-ex + # https://stackoverflow.com/questions/69741177/run-multiple-async-loops-in-separate-processes-within-a-main-async-app + tasks = [ + event_loop.run_in_executor( + pool_executor, + delegate_services, + current_list, + i * k + min(i, m), # first index + (i + 1) * k + min(i + 1, m), # last index + host, + port, + desired_monitoring_interval * 0.9, + ) + for i in range(cur_number_workers) + ] + # await for all tasks to finish + await asyncio.gather(*tasks) + + end_time = time.time() + + duration = end_time - start_time + LOOP_TIME.observe(duration) + LOGGER.info( + "Monitoring loop with {} services took {:.3f} seconds ({:.2f}%)... " + "Waiting for {:.2f} seconds...".format( + len(current_list), + duration, + (duration / desired_monitoring_interval) * 100, + desired_monitoring_interval - duration, + ) + ) + + # TODO: implement logic to only register change when the number changes + # calculate new number of workers + cur_number_workers = get_number_workers( + cur_number_workers, duration, desired_monitoring_interval * 0.9 + ) + LOGGER.info(f"New number of workers: {cur_number_workers}") + + if duration / desired_monitoring_interval > 0.9: + LOGGER.warning( + "Monitoring loop is taking {} % of the desired time " + "({} seconds)".format( + (duration / desired_monitoring_interval) * 100, + desired_monitoring_interval, + ) + ) + if desired_monitoring_interval - duration > 0: + time.sleep(desired_monitoring_interval - duration) + + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level) + LOGGER = logging.getLogger(__name__) + + logging.getLogger("hpack").setLevel(logging.CRITICAL) + + wait_for_environment_variables( + [ + get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST), + get_env_var_name( + ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC + ), + ] + ) + + wait_for_environment_variables( + [ + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + ] + ) + + wait_for_environment_variables( + [ + get_env_var_name( + ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST + ), + get_env_var_name( + ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC + ), + ] + ) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info("Starting...") + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + if SERVICE_LIST_MODE not in [1, 2]: + LOGGER.critical( + "Service mode has wrong configuration. Value: {}.".format(SERVICE_LIST_MODE) + ) + + redis_host = get_service_host(ServiceNameEnum.CACHING) + if redis_host is not None: + redis_port = int(get_setting("CACHINGSERVICE_SERVICE_PORT_REDIS")) + redis_password = get_setting("REDIS_PASSWORD") + + service_list = None + cache = None + if SERVICE_LIST_MODE == LIST_REDIS_MODE: + cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password) + + # clean the existing list that will be populated later on in this function + cache.delete(SERVICE_LIST_KEY) + elif SERVICE_LIST_MODE == LIST_SHARED_MODE: + # creating a thread-safe list to be shared among threads + service_list = Manager().list() + + LOGGER.info("Connecting with context component...") + context_client: ContextClient = ContextClient() + monitoring_client: MonitoringClient = MonitoringClient() + LOGGER.info("Connected successfully...") + + if get_setting("TESTING", default=False): + # if testing, create dummy services + kpi_id = create_kpi(monitoring_client, "1213") + append_service( + {"context": "admin", "service": "1213", "kpi": kpi_id.kpi_id.uuid}, + service_list=service_list, + cache=cache, + ) + kpi_id = create_kpi(monitoring_client, "1456") + append_service( + {"context": "admin", "service": "1456", "kpi": kpi_id.kpi_id.uuid}, + service_list=service_list, + cache=cache, + ) + + context_ids: ContextIdList = context_client.ListContextIds(Empty()) + + # populate with initial services + for context_id in context_ids.context_ids: + context_services: ServiceIdList = context_client.ListServiceIds(context_id) + for service_id in context_services.service_ids: + service: Service = context_client.GetService(service_id) + if ( + service.service_type + == ServiceTypeEnum.SERVICETYPE_TAPI_CONNECTIVITY_SERVICE + ): + # in case of a service restart, monitoring component will not duplicate KPIs + # but rather return the existing KPI if that's the case + kpi_id = create_kpi(monitoring_client, service_id.service_uuid.uuid) + append_service( + { + "context": context_id.context_uuid.uuid, + "service": service_id.service_uuid.uuid, + "kpi": kpi_id.kpi_id.uuid, + }, + service_list=service_list, + cache=cache, + ) + + context_client.close() + monitoring_client.close() + + # starting background process to monitor service addition/removal + process_context = Process( + target=get_context_updates, args=(terminate, service_list, cache) + ) + process_context.start() + + time.sleep(5) # wait for the context updates to startup + + # runs the async loop in the background + loop = asyncio.get_event_loop() + loop.run_until_complete(monitor_services(terminate, service_list, cache)) + # asyncio.create_task(monitor_services(service_list)) + + # Wait for Ctrl+C or termination signal + while not terminate.wait(timeout=0.1): + pass + + LOGGER.info("Terminating...") + process_context.kill() + # process_security_loop.kill() + + LOGGER.info("Bye") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/opticalattackmanager/tests/__init__.py b/src/opticalattackmanager/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/opticalattackmanager/tests/test_unitary.py b/src/opticalattackmanager/tests/test_unitary.py new file mode 100644 index 0000000000000000000000000000000000000000..7fa13960a2d1df09b89249ffbf7954d5e1ec5841 --- /dev/null +++ b/src/opticalattackmanager/tests/test_unitary.py @@ -0,0 +1,120 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +from unittest.mock import patch + +import pytest + +from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, + DEFAULT_GRPC_MAX_WORKERS, + DEFAULT_SERVICE_GRPC_PORTS, ServiceNameEnum) +from common.proto import dbscanserving_pb2 as dbscan +from common.proto.optical_attack_detector_pb2 import DetectionRequest +from opticalattackdetector.client.OpticalAttackDetectorClient import \ + OpticalAttackDetectorClient +from opticalattackdetector.Config import GRPC_SERVICE_PORT +from opticalattackdetector.service.OpticalAttackDetectorService import \ + OpticalAttackDetectorService + +# from .example_objects import CONTEXT_ID, CONTEXT_ID_2, SERVICE_DEV1_DEV2 + +port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +@pytest.fixture(scope="session") +def optical_attack_detector_service(): + with patch.dict( + os.environ, + { + "OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1", + "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str( + 1000 + + DEFAULT_SERVICE_GRPC_PORTS.get( + ServiceNameEnum.OPTICALATTACKDETECTOR.value + ) + ), + "OPTICALATTACKMITIGATORSERVICE_SERVICE_HOST": "127.0.0.1", + "OPTICALATTACKMITIGATORSERVICE_SERVICE_PORT_GRPC": str( + 1000 + + DEFAULT_SERVICE_GRPC_PORTS.get( + ServiceNameEnum.OPTICALATTACKMITIGATOR.value + ) + ), + "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1", + "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str( + 1000 + + DEFAULT_SERVICE_GRPC_PORTS.get(ServiceNameEnum.DBSCANSERVING.value) + ), + }, + clear=True, + ): + _service = OpticalAttackDetectorService( + port=port, + max_workers=DEFAULT_GRPC_MAX_WORKERS, + grace_period=DEFAULT_GRPC_GRACE_PERIOD, + ) + # mocker_context_client = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client') + # mocker_context_client.start() + + # mocker_influx_db = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client') + # mocker_influx_db.start() + + _service.start() + yield _service + _service.stop() + # mocker_context_client.stop() + # mocker_influx_db.stop() + + +@pytest.fixture(scope="session") +def optical_attack_detector_client(optical_attack_detector_service): + with patch.dict( + os.environ, + { + "OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1", + "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str( + 1000 + + DEFAULT_SERVICE_GRPC_PORTS.get( + ServiceNameEnum.OPTICALATTACKDETECTOR.value + ) + ), + "OPTICALATTACKMITIGATORSERVICE_SERVICE_HOST": "127.0.0.1", + "OPTICALATTACKMITIGATORSERVICE_SERVICE_PORT_GRPC": str( + 1000 + + DEFAULT_SERVICE_GRPC_PORTS.get( + ServiceNameEnum.OPTICALATTACKMITIGATOR.value + ) + ), + "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1", + "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str( + 1000 + + DEFAULT_SERVICE_GRPC_PORTS.get(ServiceNameEnum.DBSCANSERVING.value) + ), + }, + clear=True, + ): + _client = OpticalAttackDetectorClient() + yield _client + _client.close() + + +def test_detect_attack( + optical_attack_detector_client: OpticalAttackDetectorClient, +): + LOGGER.info("placeholder") diff --git a/src/opticalattackmanager/utils/EventsCollector.py b/src/opticalattackmanager/utils/EventsCollector.py new file mode 100644 index 0000000000000000000000000000000000000000..2515ed82d7b30914047a091f9370ebfc50a167b7 --- /dev/null +++ b/src/opticalattackmanager/utils/EventsCollector.py @@ -0,0 +1,84 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import queue +import threading + +import grpc + +from common.proto.context_pb2 import Empty +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +class EventsCollector: + def __init__( + self, context_client_grpc: ContextClient, log_events_received=False + ) -> None: + self._events_queue = queue.Queue() + self._log_events_received = log_events_received + + self._service_stream = context_client_grpc.GetServiceEvents(Empty()) + + self._service_thread = threading.Thread( + target=self._collect, args=(self._service_stream,), daemon=False + ) + + def _collect(self, events_stream) -> None: + try: + for event in events_stream: + if self._log_events_received: + LOGGER.info( + "[_collect] event: {:s}".format( + grpc_message_to_json_string(event) + ) + ) + self._events_queue.put_nowait(event) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.CANCELLED: # pylint: disable=no-member + raise # pragma: no cover + + def start(self): + self._service_thread.start() + + def get_event(self, block: bool = True, timeout: float = 0.1): + try: + return self._events_queue.get(block=block, timeout=timeout) + except queue.Empty: # pylint: disable=catching-non-exception + return None + + def get_events(self, block: bool = True, timeout: float = 0.1, count: int = None): + events = [] + if count is None: + while True: + event = self.get_event(block=block, timeout=timeout) + if event is None: + break + events.append(event) + else: + for _ in range(count): + event = self.get_event(block=block, timeout=timeout) + if event is None: + continue + events.append(event) + return sorted(events, key=lambda e: e.event.timestamp.timestamp) + + def stop(self): + self._service_stream.cancel() + + self._service_thread.join() diff --git a/src/opticalattackmanager/utils/__init__.py b/src/opticalattackmanager/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..9953c820575d42fa88351cc8de022d880ba96e6a --- /dev/null +++ b/src/opticalattackmanager/utils/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/opticalattackmanager/utils/monitor.py b/src/opticalattackmanager/utils/monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..79c37a3191d774da44e385ecefc6d72d6b4018f1 --- /dev/null +++ b/src/opticalattackmanager/utils/monitor.py @@ -0,0 +1,64 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import List + +from grpclib.client import Channel +from prometheus_client import Counter + +from common.proto.asyncio.optical_attack_detector_grpc import \ + OpticalAttackDetectorServiceStub +from common.proto.asyncio.optical_attack_detector_pb2 import DetectionRequest +from common.Settings import get_log_level, get_setting + +DROP_COUNTER = Counter( + "optical_security_dropped_assessments", + "Dropped assessments due to detector timeout", +) + + +log_level = get_log_level() +logging.basicConfig(level=log_level) +LOGGER = logging.getLogger(__name__) + + +async def detect_attack( + host: str, + port: int, + context_id: str, + service_id: str, + kpi_id: str, + timeout: float = 20.0, +) -> None: + try: + LOGGER.debug("Sending request for {}...".format(service_id)) + async with Channel(host, port) as channel: + stub = OpticalAttackDetectorServiceStub(channel) + + request: DetectionRequest = DetectionRequest() + request.service_id.context_id.context_uuid.uuid = context_id + request.service_id.service_uuid.uuid = str(service_id) + + request.kpi_id.kpi_id.uuid = kpi_id + + await stub.DetectAttack(request, timeout=timeout) + LOGGER.debug("Monitoring finished for {}/{}".format(service_id, kpi_id)) + except Exception as e: + LOGGER.warning( + "Exception while processing service_id {}/{}".format(service_id, kpi_id) + ) + LOGGER.exception(e) + DROP_COUNTER.inc() diff --git a/src/opticalattackmitigator/Dockerfile b/src/opticalattackmitigator/Dockerfile index c736ed5d9d594fd7dff27ee2f4e2647927e363e1..18f9945addad12f21914605ea95dc6833ad370ae 100644 --- a/src/opticalattackmitigator/Dockerfile +++ b/src/opticalattackmitigator/Dockerfile @@ -32,7 +32,7 @@ RUN groupadd -r teraflow && useradd -u 1001 --no-log-init -r -m -g teraflow tera USER teraflow # set working directory -RUN mkdir -p /home/teraflow/controller/common/proto +RUN mkdir -p /home/teraflow/controller/common/ WORKDIR /home/teraflow/controller # Get Python packages per module