Skip to content
Snippets Groups Projects
test_unitary.py 2.69 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
    
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import logging
    
    import uuid
    import queue
    
    from unittest.mock import patch
    
    import pytest
    
    from common.proto import dbscanserving_pb2 as dbscan
    from common.proto.optical_attack_detector_pb2 import DetectionRequest
    
    from opticalattackdetector.client.OpticalAttackDetectorClient import \
        OpticalAttackDetectorClient
    from opticalattackdetector.service.OpticalAttackDetectorService import \
        OpticalAttackDetectorService
    
    # from .example_objects import CONTEXT_ID, CONTEXT_ID_2, SERVICE_DEV1_DEV2
    
    LOGGER = logging.getLogger(__name__)
    
    
    @pytest.fixture(scope="session")
    def optical_attack_detector_service():
    
        _service = OpticalAttackDetectorService()
        _service.start()
    
        yield _service
        _service.stop()
    
    
    
    @pytest.fixture(scope="session")
    
    def optical_attack_detector_client(optical_attack_detector_service: OpticalAttackDetectorService):
        _client = OpticalAttackDetectorClient(
    
            port=optical_attack_detector_service.bind_port,
        )
    
        _client.close()
    
    
    def test_detect_attack(
    
        optical_attack_detector_service: OpticalAttackDetectorService,
    
        optical_attack_detector_client: OpticalAttackDetectorClient,
    ):
        message = dbscan.DetectionResponse()
    
        message.cluster_indices.extend([0, 1] * 5 + [-1] * 10)
    
        with patch(
            "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.attack_mitigator_client"
        ) as mitigator, patch(
    
            "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.monitoring_client.IncludeKpi",
    
        ) as monitoring, patch(
            "opticalattackdetector.service.OpticalAttackDetectorServiceServicerImpl.dbscanserving_client.Detect",
            return_value=message,
        ) as dbscanserving:
    
            request: DetectionRequest = DetectionRequest()
            request.service_id.context_id.context_uuid.uuid = str(uuid.uuid4())
            request.service_id.service_uuid.uuid = str(uuid.uuid4())
            request.kpi_id.kpi_id.uuid = "1"
            optical_attack_detector_client.DetectAttack(request)
            dbscanserving.assert_called()
            monitoring.assert_called()