Skip to content
test_unitary.py 4.21 KiB
Newer Older
ldemarcosm's avatar
ldemarcosm committed
import copy
import grpc
import logging
import pytest
import multiprocessing
from time import sleep
from l3_centralizedattackdetector.proto.monitoring_pb2 import Kpi, KpiList
from google.protobuf.json_format import MessageToDict
#from common.database.Factory import get_database, DatabaseEngineEnum
#from common.database.api.Database import Database
#from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
#from common.database.tests.script import populate_example
from common.orm.Factory import get_database_backend as get_database, BackendEnum as DatabaseEngineEnum
#from common.tests.Assertions import validate_empty, validate_service, validate_service_id, \
#    validate_service_list_is_empty, validate_service_list_is_not_empty
from l3_centralizedattackdetector.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from l3_centralizedattackdetector.client.l3_centralizedattackdetectorClient import l3_centralizedattackdetectorClient
from l3_centralizedattackdetector.proto.context_pb2 import Empty
from l3_centralizedattackdetector.proto.service_pb2 import Service
from l3_centralizedattackdetector.service.l3_centralizedattackdetectorService import l3_centralizedattackdetectorService
from l3_centralizedattackdetector.proto.l3_centralizedattackdetector_pb2_grpc import (
    l3_centralizedattackdetectorStub,
)
from l3_centralizedattackdetector.proto.l3_centralizedattackdetector_pb2 import (
    model_input,
        )
#from l3_attackmitigator.service.l3_attackmitigatorService import l3_attackmitigatorService
from l3_centralizedattackdetector.proto.l3_attackmitigator_pb2 import (
    output,
)
from l3_centralizedattackdetector.proto.l3_attackmitigator_pb2_grpc import (
    l3_attackmitigatorStub,
)


port = 10000 + GRPC_SERVICE_PORT  # avoid privileged ports

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


@pytest.fixture(scope='session')
def database():
    _database = get_database(engine=DatabaseEngineEnum.INMEMORY)
    return _database


@pytest.fixture(scope='session')
def l3_centralizedattackdetector_service(database):
    _service = l3_centralizedattackdetectorService(
        database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
    _service.start()
    yield _service
    _service.stop()


@pytest.fixture(scope='session')
def l3_centralizedattackdetector_client(l3_centralizedattackdetector_service):
    _client = l3_centralizedattackdetectorClient(address='127.0.0.1', port=port)
    yield _client
    _client.close()

def test_demo():
    pass

def test_system():
    inference_information = {
            "n_packets_server_seconds": 5.0,
            "n_packets_client_seconds": 5.0,
            "n_bits_server_seconds": 5.0,
            "n_bits_client_seconds": 5.0,
            "n_bits_server_n_packets_server": 5.0,
            "n_bits_client_n_packets_client": 5.0,
            "n_packets_server_n_packets_client": 5.0,
            "n_bits_server_n_bits_client": 5.0,
            "ip_o": "ipo",
            "port_o": "porto",
            "ip_d": "ipd",
            "port_d": "portd",
            "flow_id": "flowid",
            "protocol": "protocol",
            "time_start": 0.0,
            "time_end": 10.0,
    }
    def open_channel(input_information):
        with grpc.insecure_channel("localhost:10001") as channel:
            stub = l3_centralizedattackdetectorStub(channel)
            response = stub.send_input(model_input(**input_information))
            print("Cad send_input sent and received: ", response.message)

    #print('Starting AM')
    #am_service = l3_attackmitigatorService(
    #    database, port=port+1, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
    #p1 = multiprocessing.Process(target=am_service.start, args=())
    #p1.start()
    #sleep(5)
    cad_service = l3_centralizedattackdetectorService(
        database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
    p2 = multiprocessing.Process(
        target=cad_service.start, args=())
    p2.start()
    sleep(10)
    print('All started!')
    try:
        open_channel(inference_information)
    except:
        p2.terminate()
        assert 0=="Couldn't open channel"
    #p1.terminate()
    p2.terminate()
    print('All Done!')