Skip to content
Snippets Groups Projects
Commit 2977f991 authored by Carlos Natalino Da Silva's avatar Carlos Natalino Da Silva
Browse files

Final tests with the celery queue.

parent 9625a02c
No related branches found
No related tags found
No related merge requests found
......@@ -40,6 +40,11 @@ for COMPONENT in $TFS_COMPONENTS
do
if [ "$COMPONENT" == "automation" ] || [ "$COMPONENT" == "policy" ]; then continue; fi
diff requirements.in src/$COMPONENT/requirements.in | grep '^>' | sed 's/^>\ //' >> requirements.in
echo "Dependencies from $COMPONENT:"
for d in `diff requirements.in src/$COMPONENT/requirements.in | grep '^>' | sed 's/^>\ //'`
do
echo " $d"
done
done
printf "\n"
......
......@@ -28,8 +28,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION,
class DbscanServingClient:
def __init__(self, host=None, port=None):
if not host: host = get_setting('DBSCANSERVINGSERVICE_SERVICE_HOST', default="DBSCANSERVING")
if not port: port = get_setting('DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC', default=10008)
if not host: host = get_setting('DBSCANSERVINGSERVICE_SERVICE_HOST')
if not port: port = get_setting('DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC')
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
......
......@@ -13,8 +13,10 @@
# limitations under the License.
import logging, signal, sys, time, threading, random
from multiprocessing import Manager, Process
from prometheus_client import start_http_server
from celery import Celery
import asyncio
from common.Constants import DEFAULT_GRPC_MAX_WORKERS, DEFAULT_GRPC_GRACE_PERIOD
from common.Settings import get_log_level, get_metrics_port, get_setting
......@@ -60,6 +62,13 @@ def detect_attack_old(monitoring_interval):
LOGGER.debug("Sleeping for {} seconds...".format(monitoring_interval))
time.sleep(monitoring_interval)
async def call_detection_grpc(request):
dbscanserving_client: DbscanServingClient = DbscanServingClient()
response: DetectionResponse = dbscanserving_client.Detect(request)
dbscanserving_client.close()
return result
def main():
global LOGGER # pylint: disable=global-statement
......@@ -130,12 +139,12 @@ def main():
for __ in range(request.num_features):
grpc_sample.features.append(random.uniform(5000., 6000.))
request.samples.append(grpc_sample)
try:
dbscanserving_client: DbscanServingClient = DbscanServingClient()
response: DetectionResponse = dbscanserving_client.Detect(request)
dbscanserving_client.close()
except Exception as e:
LOGGER.exception(e)
# call the grpc
dbscanserving_client: DbscanServingClient = DbscanServingClient()
# response: DetectionResponse = dbscanserving_client.Detect(request)
dbscanserving_client.connect()
dbscanserving_client.close()
# including KPI
kpi = Kpi()
......@@ -149,11 +158,14 @@ def main():
# attack = AttackDescription()
# # attack.cs_id.uuid = service.service_id.service_uuid.uuid
# response: AttackResponse = attack_mitigator_client.NotifyAttack(attack)
return 0
return "0"
app.worker_main([
'worker',
'--loglevel={}'.format(log_level)
'--loglevel={}'.format(log_level),
'--autoscale',
'1',
'--pool=gevent'
])
# Wait for Ctrl+C or termination signal
......
......@@ -127,7 +127,7 @@ def main():
LOGGER.info('Connecting with REDIS...')
REDIS_PASSWORD = get_setting('REDIS_PASSWORD')
REDIS_HOST = get_setting('CACHINGSERVICE_SERVICE_HOST')
REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT_REDIS')
REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT')
BROKER_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0'
BACKEND_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/1'
app = Celery(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment