Skip to content
Snippets Groups Projects
Commit e46bb879 authored by Carlos Natalino Da Silva's avatar Carlos Natalino Da Silva
Browse files

Fixing tests of the opticalattackdetector.

parent 253eece4
No related branches found
No related tags found
No related merge requests found
......@@ -38,7 +38,9 @@ fi
docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$COMPONENT_NAME/Dockerfile .
docker run --name $IMAGE_NAME -d -v "${PWD}/src/${COMPONENT_NAME}/tests:/home/${COMPONENT_NAME}/results" --network=teraflowbridge --rm $IMAGE_NAME:$IMAGE_TAG
# docker run --name $IMAGE_NAME -d -v "${PWD}/src/${COMPONENT_NAME}/tests:/home/${COMPONENT_NAME}/results" --network=teraflowbridge --rm $IMAGE_NAME:$IMAGE_TAG
docker run --name $IMAGE_NAME -d --env-file ./src/$COMPONENT_NAME/.env -v "${PWD}/src/${COMPONENT_NAME}/tests:/home/${COMPONENT_NAME}/results" --network=teraflowbridge --rm $IMAGE_NAME:$IMAGE_TAG
docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $COMPONENT_NAME/tests/ --junitxml=/home/${COMPONENT_NAME}/results/${COMPONENT_NAME}_report.xml"
......
......@@ -16,6 +16,7 @@ import logging
import grpc
from common.proto.context_pb2 import Empty, Service
from common.Settings import get_setting
from common.proto.monitoring_pb2 import KpiList
from common.proto.optical_attack_detector_pb2_grpc import (
OpticalAttackDetectorServiceStub,
......@@ -34,8 +35,16 @@ RETRY_DECORATOR = retry(
class OpticalAttackDetectorClient:
def __init__(self, address, port):
self.endpoint = "{:s}:{:s}".format(str(address), str(port))
def __init__(self, host=None, port=None):
if not host:
host = get_setting(
"OPTICALATTACKDETECTORSERVICE_SERVICE_HOST", default="DBSCANSERVING"
)
if not port:
port = get_setting(
"OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC", default=10007
)
self.endpoint = "{:s}:{:s}".format(str(host), str(port))
LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint)))
self.channel = None
self.stub = None
......
......@@ -29,13 +29,11 @@ from common.rpc_method_wrapper.Decorator import (
safe_and_metered_rpc_method,
)
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
from context.client.ContextClient import ContextClient
from dbscanserving.client.DbscanServingClient import DbscanServingClient
from monitoring.client.MonitoringClient import MonitoringClient
from opticalattackmitigator.client.OpticalAttackMitigatorClient import (
OpticalAttackMitigatorClient,
)
from service.client.ServiceClient import ServiceClient
LOGGER = logging.getLogger(__name__)
......@@ -43,10 +41,8 @@ SERVICE_NAME = "OpticalAttackDetector"
METHOD_NAMES = ["DetectAttack"]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
context_client: ContextClient = ContextClient()
monitoring_client: MonitoringClient = MonitoringClient()
dbscanserving_client: DbscanServingClient = DbscanServingClient()
service_client: ServiceClient = ServiceClient()
attack_mitigator_client: OpticalAttackMitigatorClient = OpticalAttackMitigatorClient()
......
......@@ -13,22 +13,16 @@
# limitations under the License.
import logging
import os
from unittest.mock import patch
import pytest
from common.Constants import (
DEFAULT_GRPC_BIND_ADDRESS,
DEFAULT_GRPC_GRACE_PERIOD,
DEFAULT_GRPC_MAX_WORKERS,
)
from common.proto.context_pb2 import (
ContextId,
ContextIdList,
Empty,
Service,
ServiceList,
)
from common.proto.monitoring_pb2 import Kpi, KpiList
from common.proto import dbscanserving_pb2 as dbscan
from common.proto.optical_attack_detector_pb2 import DetectionRequest
from opticalattackdetector.client.OpticalAttackDetectorClient import (
OpticalAttackDetectorClient,
)
......@@ -37,7 +31,7 @@ from opticalattackdetector.service.OpticalAttackDetectorService import (
OpticalAttackDetectorService,
)
from .example_objects import CONTEXT_ID, CONTEXT_ID_2, SERVICE_DEV1_DEV2
# from .example_objects import CONTEXT_ID, CONTEXT_ID_2, SERVICE_DEV1_DEV2
port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
......@@ -47,215 +41,238 @@ LOGGER.setLevel(logging.DEBUG)
@pytest.fixture(scope="session")
def optical_attack_detector_service():
_service = OpticalAttackDetectorService(
port=port,
max_workers=DEFAULT_GRPC_MAX_WORKERS,
grace_period=DEFAULT_GRPC_GRACE_PERIOD,
)
# mocker_context_client = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client')
# mocker_context_client.start()
# mocker_influx_db = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client')
# mocker_influx_db.start()
_service.start()
yield _service
_service.stop()
# mocker_context_client.stop()
# mocker_influx_db.stop()
with patch.dict(
os.environ,
{
"OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1",
"OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str(port),
"DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1",
"DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port),
},
clear=True,
):
_service = OpticalAttackDetectorService(
port=port,
max_workers=DEFAULT_GRPC_MAX_WORKERS,
grace_period=DEFAULT_GRPC_GRACE_PERIOD,
)
# mocker_context_client = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client')
# mocker_context_client.start()
# mocker_influx_db = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client')
# mocker_influx_db.start()
_service.start()
yield _service
_service.stop()
# mocker_context_client.stop()
# mocker_influx_db.stop()
@pytest.fixture(scope="session")
def optical_attack_detector_client(optical_attack_detector_service):
_client = OpticalAttackDetectorClient(address="127.0.0.1", port=port)
yield _client
with patch.dict(
os.environ,
{
"OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1",
"OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str(port),
"DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1",
"DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port),
},
clear=True,
):
_client = OpticalAttackDetectorClient()
yield _client
_client.close()
def test_notify_service_update(
optical_attack_detector_client: OpticalAttackDetectorClient,
):
service = Service()
optical_attack_detector_client.NotifyServiceUpdate(service)
def test_detect_attack_no_contexts(
def test_detect_attack(
optical_attack_detector_client: OpticalAttackDetectorClient,
):
message = dbscan.DetectionResponse()
message.cluster_indices.extend([0, 1, -1, -1, -1])
with patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
) as context, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
) as influxdb:
request = Empty()
optical_attack_detector_client.DetectAttack(request)
context.ListContextIds.assert_called_once()
influxdb.query.assert_called_once()
context.ListServices.assert_not_called()
def test_detect_attack_with_context(
optical_attack_detector_client: OpticalAttackDetectorClient,
):
with patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
) as context, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
) as influxdb:
# setting up the mock
cid_list = ContextIdList()
cid_list.context_ids.append(ContextId(**CONTEXT_ID))
context.ListContextIds.return_value = cid_list
# making the test
request = Empty()
optical_attack_detector_client.DetectAttack(request)
# checking behavior
context.ListContextIds.assert_called_once()
context.ListServices.assert_called_with(cid_list.context_ids[0])
influxdb.query.assert_called_once()
def test_detect_attack_with_contexts(
optical_attack_detector_client: OpticalAttackDetectorClient,
):
with patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
) as context, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
) as influxdb:
# setting up the mock
cid_list = ContextIdList()
cid_list.context_ids.append(ContextId(**CONTEXT_ID))
cid_list.context_ids.append(ContextId(**CONTEXT_ID_2))
context.ListContextIds.return_value = cid_list
# making the test
request = Empty()
optical_attack_detector_client.DetectAttack(request)
# checking behavior
context.ListContextIds.assert_called_once()
context.ListServices.assert_any_call(cid_list.context_ids[0])
context.ListServices.assert_any_call(cid_list.context_ids[1])
influxdb.query.assert_called_once()
def test_detect_attack_with_service(
optical_attack_detector_client: OpticalAttackDetectorClient,
):
with patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
) as context, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
) as influxdb, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client"
) as dbscan:
# setting up the mock
cid_list = ContextIdList()
cid_list.context_ids.append(ContextId(**CONTEXT_ID))
context.ListContextIds.return_value = cid_list
service_list = ServiceList()
service_list.services.append(Service(**SERVICE_DEV1_DEV2))
context.ListServices.return_value = service_list
influxdb.query.return_value.get_points.return_value = [(1, 2), (3, 4)]
# making the test
request = Empty()
optical_attack_detector_client.DetectAttack(request)
# checking behavior
context.ListContextIds.assert_called_once()
context.ListServices.assert_called_with(cid_list.context_ids[0])
influxdb.query.assert_called_once()
dbscan.Detect.assert_called()
def test_detect_attack_no_attack(
optical_attack_detector_client: OpticalAttackDetectorClient,
):
with patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
) as context, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
) as influxdb, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client"
) as dbscan, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client"
) as mitigator:
# setting up the mock
cid_list = ContextIdList()
cid_list.context_ids.append(ContextId(**CONTEXT_ID))
context.ListContextIds.return_value = cid_list
service_list = ServiceList()
service_list.services.append(Service(**SERVICE_DEV1_DEV2))
context.ListServices.return_value = service_list
# dbscan.Detect.return_value = object()
dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, 5]
# making the test
request = Empty()
optical_attack_detector_client.DetectAttack(request)
# checking behavior
context.ListContextIds.assert_called_once()
context.ListServices.assert_called_with(cid_list.context_ids[0])
influxdb.query.assert_called_once()
dbscan.Detect.assert_called()
mitigator.NotifyAttack.assert_not_called()
def test_detect_attack_with_attack(
optical_attack_detector_client: OpticalAttackDetectorClient,
):
with patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
) as context, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
) as influxdb, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client"
) as dbscan, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client"
) as mitigator:
# setting up the mock
cid_list = ContextIdList()
cid_list.context_ids.append(ContextId(**CONTEXT_ID))
context.ListContextIds.return_value = cid_list
service_list = ServiceList()
service_list.services.append(Service(**SERVICE_DEV1_DEV2))
context.ListServices.return_value = service_list
# dbscan.Detect.return_value = object()
dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, -1]
# making the test
request = Empty()
) as mitigator, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.monitoring_client"
) as monitoring, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client.Detect",
# TODO: return dumb object with "cluster_indices" attribute
# idea: create new response object
return_value=message
) as dbscanserving:
request: DetectionRequest = DetectionRequest()
request.service_id.context_id.context_uuid.uuid = ""
request.service_id.service_uuid.uuid = str("")
request.kpi_id.kpi_id.uuid = ""
optical_attack_detector_client.DetectAttack(request)
# checking behavior
context.ListContextIds.assert_called_once()
context.ListServices.assert_called_with(cid_list.context_ids[0])
influxdb.query.assert_called_once()
dbscan.Detect.assert_called()
mitigator.NotifyAttack.assert_called()
def test_report_summarized_kpi(
optical_attack_detector_client: OpticalAttackDetectorClient,
):
kpi_list = KpiList()
optical_attack_detector_client.ReportSummarizedKpi(kpi_list)
def test_report_kpi(optical_attack_detector_client: OpticalAttackDetectorClient):
kpi_list = KpiList()
optical_attack_detector_client.ReportKpi(kpi_list)
mitigator.NotifyAttack.assert_called_once()
monitoring.IncludeKpi.assert_called_once()
dbscanserving.assert_called_once()
# def test_detect_attack_with_context(
# optical_attack_detector_client: OpticalAttackDetectorClient,
# ):
# with patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
# ) as context, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
# ) as influxdb:
# # setting up the mock
# cid_list = ContextIdList()
# cid_list.context_ids.append(ContextId(**CONTEXT_ID))
# context.ListContextIds.return_value = cid_list
# # making the test
# request = Empty()
# optical_attack_detector_client.DetectAttack(request)
# # checking behavior
# context.ListContextIds.assert_called_once()
# context.ListServices.assert_called_with(cid_list.context_ids[0])
# influxdb.query.assert_called_once()
# def test_detect_attack_with_contexts(
# optical_attack_detector_client: OpticalAttackDetectorClient,
# ):
# with patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
# ) as context, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
# ) as influxdb:
# # setting up the mock
# cid_list = ContextIdList()
# cid_list.context_ids.append(ContextId(**CONTEXT_ID))
# cid_list.context_ids.append(ContextId(**CONTEXT_ID_2))
# context.ListContextIds.return_value = cid_list
# # making the test
# request = Empty()
# optical_attack_detector_client.DetectAttack(request)
# # checking behavior
# context.ListContextIds.assert_called_once()
# context.ListServices.assert_any_call(cid_list.context_ids[0])
# context.ListServices.assert_any_call(cid_list.context_ids[1])
# influxdb.query.assert_called_once()
# def test_detect_attack_with_service(
# optical_attack_detector_client: OpticalAttackDetectorClient,
# ):
# with patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
# ) as context, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
# ) as influxdb, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client"
# ) as dbscan:
# # setting up the mock
# cid_list = ContextIdList()
# cid_list.context_ids.append(ContextId(**CONTEXT_ID))
# context.ListContextIds.return_value = cid_list
# service_list = ServiceList()
# service_list.services.append(Service(**SERVICE_DEV1_DEV2))
# context.ListServices.return_value = service_list
# influxdb.query.return_value.get_points.return_value = [(1, 2), (3, 4)]
# # making the test
# request = Empty()
# optical_attack_detector_client.DetectAttack(request)
# # checking behavior
# context.ListContextIds.assert_called_once()
# context.ListServices.assert_called_with(cid_list.context_ids[0])
# influxdb.query.assert_called_once()
# dbscan.Detect.assert_called()
# def test_detect_attack_no_attack(
# optical_attack_detector_client: OpticalAttackDetectorClient,
# ):
# with patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
# ) as context, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
# ) as influxdb, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client"
# ) as dbscan, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client"
# ) as mitigator:
# # setting up the mock
# cid_list = ContextIdList()
# cid_list.context_ids.append(ContextId(**CONTEXT_ID))
# context.ListContextIds.return_value = cid_list
# service_list = ServiceList()
# service_list.services.append(Service(**SERVICE_DEV1_DEV2))
# context.ListServices.return_value = service_list
# # dbscan.Detect.return_value = object()
# dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, 5]
# # making the test
# request = Empty()
# optical_attack_detector_client.DetectAttack(request)
# # checking behavior
# context.ListContextIds.assert_called_once()
# context.ListServices.assert_called_with(cid_list.context_ids[0])
# influxdb.query.assert_called_once()
# dbscan.Detect.assert_called()
# mitigator.NotifyAttack.assert_not_called()
# def test_detect_attack_with_attack(
# optical_attack_detector_client: OpticalAttackDetectorClient,
# ):
# with patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client"
# ) as context, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client"
# ) as influxdb, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client"
# ) as dbscan, patch(
# "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client"
# ) as mitigator:
# # setting up the mock
# cid_list = ContextIdList()
# cid_list.context_ids.append(ContextId(**CONTEXT_ID))
# context.ListContextIds.return_value = cid_list
# service_list = ServiceList()
# service_list.services.append(Service(**SERVICE_DEV1_DEV2))
# context.ListServices.return_value = service_list
# # dbscan.Detect.return_value = object()
# dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, -1]
# # making the test
# request = Empty()
# optical_attack_detector_client.DetectAttack(request)
# # checking behavior
# context.ListContextIds.assert_called_once()
# context.ListServices.assert_called_with(cid_list.context_ids[0])
# influxdb.query.assert_called_once()
# dbscan.Detect.assert_called()
# mitigator.NotifyAttack.assert_called()
# def test_report_summarized_kpi(
# optical_attack_detector_client: OpticalAttackDetectorClient,
# ):
# kpi_list = KpiList()
# optical_attack_detector_client.ReportSummarizedKpi(kpi_list)
# def test_report_kpi(optical_attack_detector_client: OpticalAttackDetectorClient):
# kpi_list = KpiList()
# optical_attack_detector_client.ReportKpi(kpi_list)
......@@ -17,6 +17,10 @@ import os
from unittest.mock import patch
import pytest
from common.Constants import (
DEFAULT_GRPC_GRACE_PERIOD,
DEFAULT_GRPC_MAX_WORKERS,
)
from common.proto.optical_attack_mitigator_pb2 import AttackDescription, AttackResponse
from opticalattackmitigator.client.OpticalAttackMitigatorClient import (
OpticalAttackMitigatorClient,
......@@ -37,7 +41,9 @@ LOGGER.setLevel(logging.DEBUG)
@pytest.fixture(scope="session")
def optical_attack_mitigator_service():
_service = OpticalAttackMitigatorService(
port=port
port=port,
max_workers=DEFAULT_GRPC_MAX_WORKERS,
grace_period=DEFAULT_GRPC_GRACE_PERIOD,
)
_service.start()
yield _service
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment