import copy import grpc import logging import pytest import multiprocessing import time from l3_attackmitigator.proto.monitoring_pb2 import Kpi, KpiList from common.orm.Factory import get_database_backend as get_database, BackendEnum as DatabaseEngineEnum from l3_attackmitigator.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD from l3_attackmitigator.client.l3_attackmitigatorClient import l3_attackmitigatorClient from l3_attackmitigator.proto.context_pb2 import Empty from l3_attackmitigator.proto.service_pb2 import Service from l3_attackmitigator.service.l3_attackmitigatorService import l3_attackmitigatorService from l3_attackmitigator.proto.l3_attackmitigator_pb2 import ( Output, ) from l3_attackmitigator.proto.l3_attackmitigator_pb2_grpc import ( L3AttackmitigatorStub, ) port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @pytest.fixture(scope='session') def database(): _database = get_database(engine=DatabaseEngineEnum.INMEMORY) return _database @pytest.fixture(scope='session') def l3_attackmitigator_service(database): _service = l3_attackmitigatorService( database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD) _service.start() yield _service _service.stop() @pytest.fixture(scope='session') def l3_attackmitigator_client(l3_attackmitigator_service): _client = l3_attackmitigatorClient(address='127.0.0.1', port=port) yield _client _client.close() def test_demo(): print('Demo Test') pass def test_grpc_server(database): print('Starting AM') _service = l3_attackmitigatorService( database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD) p1 = multiprocessing.Process(target=_service.start, args=()) #_service.start() p1.start() print('Test Started') time.sleep(10) output_message = { "confidence": 0.8, "timestamp": "date", "ip_o": "randomIP", "tag_name": "HTTP", "tag": 0, "flow_id": "FlowID", "protocol": "RandomProtocol", "port_d": "3456", "ml_id": "RandomForest", "time_start": 0.0, "time_end": 10.0, } def open_channel(input_information): with grpc.insecure_channel("localhost:10002") as channel: stub = L3AttackmitigatorStub(channel) response = stub.SendOutput(input_information) print("Inferencer send_input sent and received: ",response.message) try: open_channel(Output(**output_message)) except: p1.terminate() assert 0=="Couldn't open channel" p1.terminate() print('Test Stopped') #_service.stop()