# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os from unittest.mock import patch import pytest from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, DEFAULT_GRPC_MAX_WORKERS) from common.proto import dbscanserving_pb2 as dbscan from common.proto.optical_attack_detector_pb2 import DetectionRequest from opticalattackdetector.client.OpticalAttackDetectorClient import \ OpticalAttackDetectorClient from opticalattackdetector.Config import GRPC_SERVICE_PORT from opticalattackdetector.service.OpticalAttackDetectorService import \ OpticalAttackDetectorService # from .example_objects import CONTEXT_ID, CONTEXT_ID_2, SERVICE_DEV1_DEV2 port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @pytest.fixture(scope="session") def optical_attack_detector_service(): with patch.dict( os.environ, { "OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1", "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str(port), "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1", "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port), }, clear=True, ): _service = OpticalAttackDetectorService( port=port, max_workers=DEFAULT_GRPC_MAX_WORKERS, grace_period=DEFAULT_GRPC_GRACE_PERIOD, ) _service.start() yield _service _service.stop() @pytest.fixture(scope="session") def optical_attack_detector_client(optical_attack_detector_service): with patch.dict( os.environ, { "OPTICALATTACKDETECTORSERVICE_SERVICE_HOST": "127.0.0.1", "OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC": str(port), "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1", "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port), }, clear=True, ): _client = OpticalAttackDetectorClient() yield _client _client.close() def test_detect_attack( optical_attack_detector_client: OpticalAttackDetectorClient, ): message = dbscan.DetectionResponse() message.cluster_indices.extend([0, 1, -1, -1, -1]) with patch( "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client" ) as mitigator, patch( "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.monitoring_client" ) as monitoring, patch( "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client.Detect", # TODO: return dumb object with "cluster_indices" attribute # idea: create new response object return_value=message, ) as dbscanserving: request: DetectionRequest = DetectionRequest() request.service_id.context_id.context_uuid.uuid = "" request.service_id.service_uuid.uuid = str("") request.kpi_id.kpi_id.uuid = "" optical_attack_detector_client.DetectAttack(request) mitigator.NotifyAttack.assert_called_once() monitoring.IncludeKpi.assert_called_once() dbscanserving.assert_called_once()