Commit f4e0a2ac authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'fix/opt-tests' into 'develop'

Fixing the tests of the optical cybersecurity components

See merge request !138
parents e5aa369b 3bbeb584
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ include:
  - local: '/src/dbscanserving/.gitlab-ci.yml'
  - local: '/src/opticalattackmitigator/.gitlab-ci.yml'
  - local: '/src/opticalattackdetector/.gitlab-ci.yml'
  - local: '/src/opticalattackmanager/.gitlab-ci.yml'
  # - local: '/src/opticalattackmanager/.gitlab-ci.yml'
  - local: '/src/automation/.gitlab-ci.yml'
  - local: '/src/policy/.gitlab-ci.yml'
  #- local: '/src/webui/.gitlab-ci.yml'
+1 −1
Original line number Diff line number Diff line
@@ -54,7 +54,7 @@ unit_test dbscanserving:
    - sleep 5
    - docker ps -a
    - docker logs $IMAGE_NAME
    - docker exec ps -a
    - docker ps -a
    - sleep 5
    - docker logs $IMAGE_NAME
    - docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/home/${IMAGE_NAME}/results/${IMAGE_NAME}_report.xml"
+3 −6
Original line number Diff line number Diff line
@@ -23,30 +23,27 @@ from common.proto.dbscanserving_pb2 import (DetectionRequest,
                                            DetectionResponse, Sample)

from dbscanserving.client.DbscanServingClient import DbscanServingClient
from dbscanserving.Config import GRPC_SERVICE_PORT
from dbscanserving.service.DbscanService import DbscanService

port = 10000 + GRPC_SERVICE_PORT  # avoid privileged ports

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


@pytest.fixture(scope="session")
def dbscanserving_service():
    _service = DbscanService(port=port)
    _service = DbscanService()
    _service.start()
    yield _service
    _service.stop()


@pytest.fixture(scope="session")
def dbscanserving_client():
def dbscanserving_client(dbscanserving_service: DbscanService):
    with patch.dict(
        os.environ,
        {
            "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1",
            "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port),
            "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(dbscanserving_service.bind_port),
        },
        clear=True,
    ):
+20 −1
Original line number Diff line number Diff line
@@ -49,9 +49,26 @@ unit_test opticalattackdetector:
    - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
    - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
    - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi
    - if docker container ls | grep redis; then docker rm -f redis; else echo "redis image is not in the system"; fi
    - if docker container ls | grep dbscanserving; then docker rm -f dbscanserving; else echo "dbscanserving image is not in the system"; fi
  script:
    - export REDIS_PASSWORD=$(uuidgen)
    - docker pull "redis:7.0-alpine"
    - docker run --name redis -d --network=teraflowbridge -p 16379:6379 -e REDIS_PASSWORD=${REDIS_PASSWORD} --rm redis:7.0-alpine redis-server --requirepass ${REDIS_PASSWORD}
    - docker logs redis
    - REDIS_ADDRESS=$(docker inspect redis --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
    - docker pull "$CI_REGISTRY_IMAGE/dbscanserving:$IMAGE_TAG"
    - docker run --name dbscanserving -d -p 10008:10008 --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/dbscanserving:$IMAGE_TAG "python -m dbscanserving.service"
    - docker logs dbscanserving
    - DBSCANSERVING_ADDRESS=$(docker inspect dbscanserving --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
    - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
    - docker run --name $IMAGE_NAME -d -p 10006:10006 --env-file "$PWD/src/$IMAGE_NAME/.env" -v "$PWD/src/$IMAGE_NAME/tests:/home/${IMAGE_NAME}/results" --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
    - >
      docker run --name $IMAGE_NAME -d -p 10006:10006 
      -v "$PWD/src/$IMAGE_NAME/tests:/home/${IMAGE_NAME}/results" 
      -e REDIS_PASSWORD=${REDIS_PASSWORD}
      -e DBSCANSERVINGSERVICE_SERVICE_HOST=${DBSCANSERVING_ADDRESS}
      -e CACHINGSERVICE_SERVICE_HOST=${REDIS_ADDRESS}
      --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
    - sleep 5
    - docker ps -a
    - docker logs $IMAGE_NAME
@@ -59,6 +76,8 @@ unit_test opticalattackdetector:
  coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
  after_script:
    - docker rm -f $IMAGE_NAME
    - docker rm -f redis
    - docker rm -f dbscanserving
    - docker network rm teraflowbridge
  rules:
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
+28 −47
Original line number Diff line number Diff line
@@ -13,88 +13,69 @@
# limitations under the License.

import logging
import os
import uuid
import queue
from unittest.mock import patch

import pytest
from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD,
                              DEFAULT_GRPC_MAX_WORKERS)

from common.proto import dbscanserving_pb2 as dbscan
from common.proto.optical_attack_detector_pb2 import DetectionRequest
from common.tests.MockServicerImpl_Monitoring import MockServicerImpl_Monitoring

from opticalattackdetector.client.OpticalAttackDetectorClient import \
    OpticalAttackDetectorClient
from opticalattackdetector.Config import GRPC_SERVICE_PORT
from opticalattackdetector.service.OpticalAttackDetectorService import \
    OpticalAttackDetectorService

# from .example_objects import CONTEXT_ID, CONTEXT_ID_2, SERVICE_DEV1_DEV2

port = 10000 + GRPC_SERVICE_PORT  # avoid privileged ports

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


@pytest.fixture(scope="session")
def optical_attack_detector_service():
    with patch.dict(
        os.environ,
        {
            "OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1",
            "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str(port),
            "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1",
            "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port),
        },
        clear=True,
    ):
        _service = OpticalAttackDetectorService(
            port=port,
            max_workers=DEFAULT_GRPC_MAX_WORKERS,
            grace_period=DEFAULT_GRPC_GRACE_PERIOD,
        )
    _service = OpticalAttackDetectorService()
    _service.start()
    yield _service
    _service.stop()


@pytest.fixture(scope="session")
def optical_attack_detector_client(optical_attack_detector_service):
    with patch.dict(
        os.environ,
        {
            "OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1",
            "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str(port),
            "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1",
            "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port),
        },
        clear=True,
    ):
        _client = OpticalAttackDetectorClient()
def optical_attack_detector_client(optical_attack_detector_service: OpticalAttackDetectorService):
    _client = OpticalAttackDetectorClient(
        host=optical_attack_detector_service.bind_address,
        port=optical_attack_detector_service.bind_port,
    )
    yield _client
    _client.close()


def test_detect_attack(
    optical_attack_detector_service: OpticalAttackDetectorService,
    optical_attack_detector_client: OpticalAttackDetectorClient,
):
    message = dbscan.DetectionResponse()
    message.cluster_indices.extend([0, 1, -1, -1, -1])

    monitoring_mock = MockServicerImpl_Monitoring(queue_samples = queue.Queue())
    with patch(
        "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client"
    ) as mitigator, patch(
        "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.monitoring_client"
        "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.monitoring_client",
        monitoring_mock,
    ) as monitoring, patch(
        "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client.Detect",
        # TODO: return dumb object with "cluster_indices" attribute
        # idea: create new response object
        return_value=message,
    ) as dbscanserving:
        for _ in range(10):
            request: DetectionRequest = DetectionRequest()
        request.service_id.context_id.context_uuid.uuid = ""
        request.service_id.service_uuid.uuid = str("")
        request.kpi_id.kpi_id.uuid = ""
            request.service_id.context_id.context_uuid.uuid = str(uuid.uuid4())
            request.service_id.service_uuid.uuid = str(uuid.uuid4())
            request.kpi_id.kpi_id.uuid = "1"
            optical_attack_detector_client.DetectAttack(request)
        mitigator.NotifyAttack.assert_called_once()
        monitoring.IncludeKpi.assert_called_once()
            dbscanserving.assert_called_once()
            monitoring.IncludeKpi.assert_called_once()
        mitigator.NotifyAttack.assert_called()
Loading