Newer
Older
import copy
import grpc
import logging
import pytest
from l3_centralizedattackdetector.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from l3_centralizedattackdetector.client.l3_centralizedattackdetectorClient import l3_centralizedattackdetectorClient
from l3_centralizedattackdetector.service.l3_centralizedattackdetectorService import l3_centralizedattackdetectorService
from l3_centralizedattackdetector.proto.l3_centralizedattackdetector_pb2_grpc import (
)
from l3_centralizedattackdetector.proto.l3_centralizedattackdetector_pb2 import (
)
port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
@pytest.fixture(scope='session')
port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def l3_centralizedattackdetector_client(l3_centralizedattackdetector_service):
_client = l3_centralizedattackdetectorClient(address='127.0.0.1', port=port)
yield _client
_client.close()
def test_demo():
inference_information = {
"n_packets_server_seconds": 5.0,
"n_packets_client_seconds": 5.0,
"n_bits_server_seconds": 5.0,
"n_bits_client_seconds": 5.0,
"n_bits_server_n_packets_server": 5.0,
"n_bits_client_n_packets_client": 5.0,
"n_packets_server_n_packets_client": 5.0,
"n_bits_server_n_bits_client": 5.0,
"ip_o": "ipo",
"port_o": "porto",
"ip_d": "ipd",
"port_d": "portd",
"flow_id": "flowid",
"protocol": "protocol",
"time_start": 0.0,
"time_end": 10.0,
}
def open_channel(input_information):
LOGGER.info(str(f"localhost:{port}"))
with grpc.insecure_channel(f"localhost:{port}") as channel:
response = stub.SendInput(input_information)
LOGGER.info("Cad send_input sent and received: "+str(response.message))