Skip to content
Snippets Groups Projects
test_unitary.py 2.76 KiB
Newer Older
ldemarcosm's avatar
ldemarcosm committed
import copy
import grpc
import logging
import pytest
import multiprocessing
import time
from l3_attackmitigator.proto.monitoring_pb2 import Kpi, KpiList
from common.orm.Factory import get_database_backend as get_database, BackendEnum as DatabaseEngineEnum
from l3_attackmitigator.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient
from l3_attackmitigator.proto.context_pb2 import Empty
from l3_attackmitigator.proto.service_pb2 import Service
from l3_attackmitigator.service.l3_attackmitigatorService import l3_attackmitigatorService
from l3_attackmitigator.proto.l3_attackmitigator_pb2 import (
ldemarcosm's avatar
ldemarcosm committed
    Output,
ldemarcosm's avatar
ldemarcosm committed
)
from l3_attackmitigator.proto.l3_attackmitigator_pb2_grpc import (
ldemarcosm's avatar
ldemarcosm committed
    L3AttackmitigatorStub,
ldemarcosm's avatar
ldemarcosm committed
)

port = 10000 + GRPC_SERVICE_PORT  # avoid privileged ports

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


@pytest.fixture(scope='session')
def database():
    _database = get_database(engine=DatabaseEngineEnum.INMEMORY)
    return _database


@pytest.fixture(scope='session')
def l3_attackmitigator_service(database):
    _service = l3_attackmitigatorService(
        database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
    _service.start()
    yield _service
    _service.stop()


@pytest.fixture(scope='session')
def l3_attackmitigator_client(l3_attackmitigator_service):
    _client = l3_attackmitigatorClient(address='127.0.0.1', port=port)
    yield _client
    _client.close()

def test_demo():
    print('Demo Test')
    pass

def test_grpc_server(database):
    print('Starting AM')
    _service = l3_attackmitigatorService(
        database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
    p1 = multiprocessing.Process(target=_service.start, args=())
    #_service.start()
    p1.start()
    print('Test Started')
    time.sleep(10)
    output_message = {
            "confidence": 0.8,
            "timestamp": "date",
            "ip_o": "randomIP",
            "tag_name": "HTTP",
            "tag": 0,
            "flow_id": "FlowID",
            "protocol": "RandomProtocol",
            "port_d": "3456",
            "ml_id": "RandomForest",
            "time_start": 0.0,
            "time_end": 10.0,
        }
    
    def open_channel(input_information):
        with grpc.insecure_channel("localhost:10002") as channel:
ldemarcosm's avatar
ldemarcosm committed
            stub = L3AttackmitigatorStub(channel)
ldemarcosm's avatar
ldemarcosm committed
            response = stub.SendOutput(input_information)
ldemarcosm's avatar
ldemarcosm committed
            print("Inferencer send_input sent and received: ",response.message)
    try:
ldemarcosm's avatar
ldemarcosm committed
        open_channel(Output(**output_message))
ldemarcosm's avatar
ldemarcosm committed
    except:
        p1.terminate()
        assert 0=="Couldn't open channel"

    p1.terminate()
    print('Test Stopped')
    #_service.stop()