Skip to content
Snippets Groups Projects

Fixing issues with the WebUI and the optical cybersecurity module

Merged Carlos Natalino Da Silva requested to merge fix/webui into develop
1 file
+ 2
0
Compare changes
  • Side-by-side
  • Inline
@@ -15,13 +15,13 @@
@@ -15,13 +15,13 @@
import logging
import logging
import uuid
import uuid
import queue
import queue
 
import time
from unittest.mock import patch
from unittest.mock import patch
import pytest
import pytest
from common.proto import dbscanserving_pb2 as dbscan
from common.proto import dbscanserving_pb2 as dbscan
from common.proto.optical_attack_detector_pb2 import DetectionRequest
from common.proto.optical_attack_detector_pb2 import DetectionRequest
from common.tests.MockServicerImpl_Monitoring import MockServicerImpl_Monitoring
from opticalattackdetector.client.OpticalAttackDetectorClient import \
from opticalattackdetector.client.OpticalAttackDetectorClient import \
OpticalAttackDetectorClient
OpticalAttackDetectorClient
@@ -37,6 +37,7 @@ LOGGER = logging.getLogger(__name__)
@@ -37,6 +37,7 @@ LOGGER = logging.getLogger(__name__)
def optical_attack_detector_service():
def optical_attack_detector_service():
_service = OpticalAttackDetectorService()
_service = OpticalAttackDetectorService()
_service.start()
_service.start()
 
time.sleep(2)
yield _service
yield _service
_service.stop()
_service.stop()
@@ -44,7 +45,7 @@ def optical_attack_detector_service():
@@ -44,7 +45,7 @@ def optical_attack_detector_service():
@pytest.fixture(scope="session")
@pytest.fixture(scope="session")
def optical_attack_detector_client(optical_attack_detector_service: OpticalAttackDetectorService):
def optical_attack_detector_client(optical_attack_detector_service: OpticalAttackDetectorService):
_client = OpticalAttackDetectorClient(
_client = OpticalAttackDetectorClient(
host=optical_attack_detector_service.bind_address,
host="127.0.0.1",
port=optical_attack_detector_service.bind_port,
port=optical_attack_detector_service.bind_port,
)
)
yield _client
yield _client
@@ -56,26 +57,20 @@ def test_detect_attack(
@@ -56,26 +57,20 @@ def test_detect_attack(
optical_attack_detector_client: OpticalAttackDetectorClient,
optical_attack_detector_client: OpticalAttackDetectorClient,
):
):
message = dbscan.DetectionResponse()
message = dbscan.DetectionResponse()
message.cluster_indices.extend([0, 1, -1, -1, -1])
message.cluster_indices.extend([0, 1] * 5 + [-1] * 10)
monitoring_mock = MockServicerImpl_Monitoring(queue_samples = queue.Queue())
with patch(
with patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client"
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client"
) as mitigator, patch(
) as mitigator, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.monitoring_client",
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.monitoring_client.IncludeKpi",
monitoring_mock,
) as monitoring, patch(
) as monitoring, patch(
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client.Detect",
"opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client.Detect",
# TODO: return dumb object with "cluster_indices" attribute
# idea: create new response object
return_value=message,
return_value=message,
) as dbscanserving:
) as dbscanserving:
for _ in range(10):
request: DetectionRequest = DetectionRequest()
request: DetectionRequest = DetectionRequest()
request.service_id.context_id.context_uuid.uuid = str(uuid.uuid4())
request.service_id.context_id.context_uuid.uuid = str(uuid.uuid4())
request.service_id.service_uuid.uuid = str(uuid.uuid4())
request.service_id.service_uuid.uuid = str(uuid.uuid4())
request.kpi_id.kpi_id.uuid = "1"
request.kpi_id.kpi_id.uuid = "1"
optical_attack_detector_client.DetectAttack(request)
optical_attack_detector_client.DetectAttack(request)
dbscanserving.assert_called()
dbscanserving.assert_called_once()
monitoring.assert_called()
monitoring.IncludeKpi.assert_called_once()
mitigator.NotifyAttack.assert_called()
Loading