From 2977f99165006c73a6d57758f9db96bdeaee4082 Mon Sep 17 00:00:00 2001 From: Carlos Natalino <carlos.natalino@chalmers.se> Date: Tue, 27 Sep 2022 18:34:47 +0200 Subject: [PATCH] Final tests with the celery queue. --- install_requirements.sh | 5 ++++ .../client/DbscanServingClient.py | 4 +-- src/opticalattackdetector/service/__main__.py | 28 +++++++++++++------ src/opticalattackmanager/service/__main__.py | 2 +- 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/install_requirements.sh b/install_requirements.sh index ea9385729..5d4e95bf4 100755 --- a/install_requirements.sh +++ b/install_requirements.sh @@ -40,6 +40,11 @@ for COMPONENT in $TFS_COMPONENTS do if [ "$COMPONENT" == "automation" ] || [ "$COMPONENT" == "policy" ]; then continue; fi diff requirements.in src/$COMPONENT/requirements.in | grep '^>' | sed 's/^>\ //' >> requirements.in + echo "Dependencies from $COMPONENT:" + for d in `diff requirements.in src/$COMPONENT/requirements.in | grep '^>' | sed 's/^>\ //'` + do + echo " $d" + done done printf "\n" diff --git a/src/dbscanserving/client/DbscanServingClient.py b/src/dbscanserving/client/DbscanServingClient.py index fd2642d92..54a9ce9f2 100644 --- a/src/dbscanserving/client/DbscanServingClient.py +++ b/src/dbscanserving/client/DbscanServingClient.py @@ -28,8 +28,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, class DbscanServingClient: def __init__(self, host=None, port=None): - if not host: host = get_setting('DBSCANSERVINGSERVICE_SERVICE_HOST', default="DBSCANSERVING") - if not port: port = get_setting('DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC', default=10008) + if not host: host = get_setting('DBSCANSERVINGSERVICE_SERVICE_HOST') + if not port: port = get_setting('DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC') self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None diff --git a/src/opticalattackdetector/service/__main__.py b/src/opticalattackdetector/service/__main__.py index 75d170de8..3ae6b7f04 100644 --- a/src/opticalattackdetector/service/__main__.py +++ b/src/opticalattackdetector/service/__main__.py @@ -13,8 +13,10 @@ # limitations under the License. import logging, signal, sys, time, threading, random +from multiprocessing import Manager, Process from prometheus_client import start_http_server from celery import Celery +import asyncio from common.Constants import DEFAULT_GRPC_MAX_WORKERS, DEFAULT_GRPC_GRACE_PERIOD from common.Settings import get_log_level, get_metrics_port, get_setting @@ -60,6 +62,13 @@ def detect_attack_old(monitoring_interval): LOGGER.debug("Sleeping for {} seconds...".format(monitoring_interval)) time.sleep(monitoring_interval) + +async def call_detection_grpc(request): + dbscanserving_client: DbscanServingClient = DbscanServingClient() + response: DetectionResponse = dbscanserving_client.Detect(request) + dbscanserving_client.close() + return result + def main(): global LOGGER # pylint: disable=global-statement @@ -130,12 +139,12 @@ def main(): for __ in range(request.num_features): grpc_sample.features.append(random.uniform(5000., 6000.)) request.samples.append(grpc_sample) - try: - dbscanserving_client: DbscanServingClient = DbscanServingClient() - response: DetectionResponse = dbscanserving_client.Detect(request) - dbscanserving_client.close() - except Exception as e: - LOGGER.exception(e) + + # call the grpc + dbscanserving_client: DbscanServingClient = DbscanServingClient() + # response: DetectionResponse = dbscanserving_client.Detect(request) + dbscanserving_client.connect() + dbscanserving_client.close() # including KPI kpi = Kpi() @@ -149,11 +158,14 @@ def main(): # attack = AttackDescription() # # attack.cs_id.uuid = service.service_id.service_uuid.uuid # response: AttackResponse = attack_mitigator_client.NotifyAttack(attack) - return 0 + return "0" app.worker_main([ 'worker', - '--loglevel={}'.format(log_level) + '--loglevel={}'.format(log_level), + '--autoscale', + '1', + '--pool=gevent' ]) # Wait for Ctrl+C or termination signal diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py index 135cb257f..167f05361 100644 --- a/src/opticalattackmanager/service/__main__.py +++ b/src/opticalattackmanager/service/__main__.py @@ -127,7 +127,7 @@ def main(): LOGGER.info('Connecting with REDIS...') REDIS_PASSWORD = get_setting('REDIS_PASSWORD') REDIS_HOST = get_setting('CACHINGSERVICE_SERVICE_HOST') - REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT_REDIS') + REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT') BROKER_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0' BACKEND_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/1' app = Celery( -- GitLab