# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os from unittest.mock import patch import pytest from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, DEFAULT_GRPC_MAX_WORKERS) from common.proto import dbscanserving_pb2 as dbscan from common.proto.optical_attack_detector_pb2 import DetectionRequest from opticalattackdetector.client.OpticalAttackDetectorClient import \ OpticalAttackDetectorClient from opticalattackdetector.Config import GRPC_SERVICE_PORT from opticalattackdetector.service.OpticalAttackDetectorService import \ OpticalAttackDetectorService # from .example_objects import CONTEXT_ID, CONTEXT_ID_2, SERVICE_DEV1_DEV2 port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @pytest.fixture(scope="session") def optical_attack_detector_service(): with patch.dict( os.environ, { "OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1", "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str(port), "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1", "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port), }, clear=True, ): _service = OpticalAttackDetectorService( port=port, max_workers=DEFAULT_GRPC_MAX_WORKERS, grace_period=DEFAULT_GRPC_GRACE_PERIOD, ) # mocker_context_client = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client') # mocker_context_client.start() # mocker_influx_db = mock.patch('opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client') # mocker_influx_db.start() _service.start() yield _service _service.stop() # mocker_context_client.stop() # mocker_influx_db.stop() @pytest.fixture(scope="session") def optical_attack_detector_client(optical_attack_detector_service): with patch.dict( os.environ, { "OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1", "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str(port), "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1", "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port), }, clear=True, ): _client = OpticalAttackDetectorClient() yield _client _client.close() def test_detect_attack( optical_attack_detector_client: OpticalAttackDetectorClient, ): message = dbscan.DetectionResponse() message.cluster_indices.extend([0, 1, -1, -1, -1]) with patch( "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client" ) as mitigator, patch( "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.monitoring_client" ) as monitoring, patch( "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client.Detect", # TODO: return dumb object with "cluster_indices" attribute # idea: create new response object return_value=message, ) as dbscanserving: request: DetectionRequest = DetectionRequest() request.service_id.context_id.context_uuid.uuid = "" request.service_id.service_uuid.uuid = str("") request.kpi_id.kpi_id.uuid = "" optical_attack_detector_client.DetectAttack(request) mitigator.NotifyAttack.assert_called_once() monitoring.IncludeKpi.assert_called_once() dbscanserving.assert_called_once() # def test_detect_attack_with_context( # optical_attack_detector_client: OpticalAttackDetectorClient, # ): # with patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" # ) as context, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" # ) as influxdb: # # setting up the mock # cid_list = ContextIdList() # cid_list.context_ids.append(ContextId(**CONTEXT_ID)) # context.ListContextIds.return_value = cid_list # # making the test # request = Empty() # optical_attack_detector_client.DetectAttack(request) # # checking behavior # context.ListContextIds.assert_called_once() # context.ListServices.assert_called_with(cid_list.context_ids[0]) # influxdb.query.assert_called_once() # def test_detect_attack_with_contexts( # optical_attack_detector_client: OpticalAttackDetectorClient, # ): # with patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" # ) as context, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" # ) as influxdb: # # setting up the mock # cid_list = ContextIdList() # cid_list.context_ids.append(ContextId(**CONTEXT_ID)) # cid_list.context_ids.append(ContextId(**CONTEXT_ID_2)) # context.ListContextIds.return_value = cid_list # # making the test # request = Empty() # optical_attack_detector_client.DetectAttack(request) # # checking behavior # context.ListContextIds.assert_called_once() # context.ListServices.assert_any_call(cid_list.context_ids[0]) # context.ListServices.assert_any_call(cid_list.context_ids[1]) # influxdb.query.assert_called_once() # def test_detect_attack_with_service( # optical_attack_detector_client: OpticalAttackDetectorClient, # ): # with patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" # ) as context, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" # ) as influxdb, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" # ) as dbscan: # # setting up the mock # cid_list = ContextIdList() # cid_list.context_ids.append(ContextId(**CONTEXT_ID)) # context.ListContextIds.return_value = cid_list # service_list = ServiceList() # service_list.services.append(Service(**SERVICE_DEV1_DEV2)) # context.ListServices.return_value = service_list # influxdb.query.return_value.get_points.return_value = [(1, 2), (3, 4)] # # making the test # request = Empty() # optical_attack_detector_client.DetectAttack(request) # # checking behavior # context.ListContextIds.assert_called_once() # context.ListServices.assert_called_with(cid_list.context_ids[0]) # influxdb.query.assert_called_once() # dbscan.Detect.assert_called() # def test_detect_attack_no_attack( # optical_attack_detector_client: OpticalAttackDetectorClient, # ): # with patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" # ) as context, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" # ) as influxdb, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" # ) as dbscan, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client" # ) as mitigator: # # setting up the mock # cid_list = ContextIdList() # cid_list.context_ids.append(ContextId(**CONTEXT_ID)) # context.ListContextIds.return_value = cid_list # service_list = ServiceList() # service_list.services.append(Service(**SERVICE_DEV1_DEV2)) # context.ListServices.return_value = service_list # # dbscan.Detect.return_value = object() # dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, 5] # # making the test # request = Empty() # optical_attack_detector_client.DetectAttack(request) # # checking behavior # context.ListContextIds.assert_called_once() # context.ListServices.assert_called_with(cid_list.context_ids[0]) # influxdb.query.assert_called_once() # dbscan.Detect.assert_called() # mitigator.NotifyAttack.assert_not_called() # def test_detect_attack_with_attack( # optical_attack_detector_client: OpticalAttackDetectorClient, # ): # with patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.context_client" # ) as context, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.influxdb_client" # ) as influxdb, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client" # ) as dbscan, patch( # "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client" # ) as mitigator: # # setting up the mock # cid_list = ContextIdList() # cid_list.context_ids.append(ContextId(**CONTEXT_ID)) # context.ListContextIds.return_value = cid_list # service_list = ServiceList() # service_list.services.append(Service(**SERVICE_DEV1_DEV2)) # context.ListServices.return_value = service_list # # dbscan.Detect.return_value = object() # dbscan.Detect.return_value.cluster_indices = [0, 1, 2, 3, 4, -1] # # making the test # request = Empty() # optical_attack_detector_client.DetectAttack(request) # # checking behavior # context.ListContextIds.assert_called_once() # context.ListServices.assert_called_with(cid_list.context_ids[0]) # influxdb.query.assert_called_once() # dbscan.Detect.assert_called() # mitigator.NotifyAttack.assert_called() # def test_report_summarized_kpi( # optical_attack_detector_client: OpticalAttackDetectorClient, # ): # kpi_list = KpiList() # optical_attack_detector_client.ReportSummarizedKpi(kpi_list) # def test_report_kpi(optical_attack_detector_client: OpticalAttackDetectorClient): # kpi_list = KpiList() # optical_attack_detector_client.ReportKpi(kpi_list)