import copy import grpc import logging import pytest import multiprocessing import subprocess import time from l3_attackmitigator.proto.monitoring_pb2 import Kpi, KpiList from common.orm.Factory import get_database_backend as get_database, BackendEnum as DatabaseEngineEnum from l3_attackmitigator.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient from l3_attackmitigator.service.l3_attackmitigatorService import l3_attackmitigatorService from l3_attackmitigator.proto.l3_attackmitigator_pb2 import ( Output, ) from l3_attackmitigator.proto.l3_attackmitigator_pb2_grpc import ( L3AttackmitigatorStub, ) port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @pytest.fixture(scope='session') def l3_attackmitigator_service(): _service = l3_attackmitigatorService( port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD) _service.start() LOGGER.info('Server started on '+str(port)) yield _service _service.stop() @pytest.fixture(scope='session') def l3_attackmitigator_client(l3_attackmitigator_service): _client = l3_attackmitigatorClient(address='127.0.0.1', port=port) yield _client _client.close() def test_demo(): LOGGER.info('Demo Test') pass def test_grpc_server(l3_attackmitigator_service): output_message = { "confidence": 0.8, "timestamp": "date", "ip_o": "randomIP", "tag_name": "HTTP", "tag": 0, "flow_id": "FlowID", "protocol": "RandomProtocol", "port_d": "3456", "ml_id": "RandomForest", "time_start": 0.0, "time_end": 10.0, } def open_channel(input_information): LOGGER.info(str(f"localhost:{port}")) with grpc.insecure_channel(f"localhost:{port}") as channel: stub = L3AttackmitigatorStub(channel) response = stub.SendOutput(input_information) LOGGER.info("Inferencer send_input sent and received: "+str(response.message)) try: open_channel(Output(**output_message)) LOGGER.info('Success!') except: assert 0=="GRPC server failed"