Skip to content
Snippets Groups Projects
Commit 9625a02c authored by Carlos Natalino Da Silva's avatar Carlos Natalino Da Silva
Browse files

Initial implementation of the work queue.

parent 2caba6a6
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,7 @@ export TFS_REGISTRY_IMAGE="http://localhost:32000/tfs/"
# interdomain slice pathcomp dlt
# dbscanserving opticalattackmitigator opticalcentralizedattackdetector
# l3_attackmitigator l3_centralizedattackdetector l3_distributedattackdetector
export TFS_COMPONENTS="context device automation service compute monitoring webui dbscanserving opticalattackmanager opticalattackmitigator opticalattackdetector "
export TFS_COMPONENTS="context device automation service compute monitoring webui dbscanserving opticalattackmitigator" # opticalattackmanager opticalattackdetector
# Set the tag you want to use for your images.
export TFS_IMAGE_TAG="dev"
......
......@@ -109,8 +109,8 @@ message KpiDescriptorList {
message SubsDescriptor{
SubscriptionID subs_id = 1;
KpiId kpi_id = 2;
float sampling_duration_s = 3;
float sampling_interval_s = 4;
float monitoring_window_s = 3;
float sampling_rate_s = 4;
context.Timestamp start_timestamp = 5; // used when you want something like "get the samples since X date/time"
context.Timestamp end_timestamp = 6; // used when you want something like "get the samples until X date/time"
// Pending add field to reflect Available Device Protocols
......
......@@ -14,11 +14,13 @@
from email.policy import default
import grpc, logging
from common.Settings import get_setting
from common.Settings import get_log_level, get_setting
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse
from common.proto.dbscanserving_pb2_grpc import DetectorStub
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
......@@ -46,7 +48,10 @@ class DbscanServingClient:
@RETRY_DECORATOR
def Detect(self, request : DetectionRequest) -> DetectionResponse:
LOGGER.debug('Detect request')
LOGGER.debug('Detect request with {} samples and {} features'.format(
request.num_samples,
request.num_features
))
response = self.stub.Detect(request)
LOGGER.debug('Detect result: {:s}'.format(str(response)))
return response
......@@ -45,8 +45,8 @@ def start_monitoring():
# Create Monitor Kpi Requests
monitor_kpi_request = monitoring_pb2.MonitorKpiRequest()
monitor_kpi_request.kpi_id.CopyFrom(kpi_id)
monitor_kpi_request.sampling_duration_s = 86400
monitor_kpi_request.sampling_interval_s = 30
monitor_kpi_request.monitoring_window_s = 86400
monitor_kpi_request.sampling_rate_s = 30
events_collector._monitoring_client.MonitorKpi(monitor_kpi_request)
else:
# Terminate is set, looping terminates
......
......@@ -22,17 +22,6 @@ GRPC_SERVICE_PORT = 10006
# service settings
MONITORING_INTERVAL = 10 # monitoring interval in seconds
#TODO: adjust the addresses below for the specific case
MONITORING_SERVICE_ADDRESS = 'monitoringservice' # address/name of the monitoring service
# MONITORING_SERVICE_ADDRESS = '10.99.41.20' # address/name of the monitoring service
CONTEXT_SERVICE_ADDRESS = 'contextservice' # address/name of the monitoring service
# CONTEXT_SERVICE_ADDRESS = '10.107.199.65' # address/name of the monitoring service
SERVICE_SERVICE_ADDRESS = 'serviceservice' # address/name of the service service
# SERVICE_SERVICE_ADDRESS = '10.99.234.88' # address/name of the service service
# INFERENCE_SERVICE_ADDRESS = '10.108.113.78' # address/name of the inference service
INFERENCE_SERVICE_ADDRESS = 'dbscanservingservice' # address/name of the inference service
# ATTACK_MITIGATOR_SERVICE_ADDRESS = '10.96.248.167'
ATTACK_MITIGATOR_SERVICE_ADDRESS = 'opticalattackmitigatorservice'
# Prometheus settings
METRICS_PORT = 9192
......@@ -18,6 +18,7 @@ from celery import Celery
from common.Constants import DEFAULT_GRPC_MAX_WORKERS, DEFAULT_GRPC_GRACE_PERIOD
from common.Settings import get_log_level, get_metrics_port, get_setting
from common.tools.timestamp.Converters import timestamp_utcnow_to_float
from opticalattackdetector.Config import (
GRPC_SERVICE_PORT, MONITORING_INTERVAL)
from common.proto.context_pb2 import (Empty,
......@@ -79,7 +80,6 @@ def main():
metrics_port = get_metrics_port()
start_http_server(metrics_port)
dbscanserving_client: DbscanServingClient = DbscanServingClient()
attack_mitigator_client: OpticalAttackMitigatorClient = OpticalAttackMitigatorClient()
monitoring_client: MonitoringClient = MonitoringClient()
......@@ -95,12 +95,13 @@ def main():
LOGGER.info('Connecting with REDIS...')
REDIS_PASSWORD = get_setting('REDIS_PASSWORD')
REDIS_HOST = get_setting('CACHINGSERVICE_SERVICE_HOST')
REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT_REDIS')
REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT')
BROKER_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0'
BACKEND_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/1'
app = Celery(
'cybersecurity',
broker=BROKER_URL,
backend=BROKER_URL
backend=BACKEND_URL
)
LOGGER.info('Connected to REDIS...')
......@@ -110,38 +111,44 @@ def main():
alien_samples = random.randint(2, 10)
# run attack detection for every service
request: DetectionRequest = DetectionRequest()
request.num_samples = 300 + alien_samples
request.num_samples = 200 + alien_samples
request.num_features = 20
request.eps = 100.5
request.min_samples = 5
for _ in range(200):
grpc_sample = Sample()
for __ in range(20):
for __ in range(request.num_features):
grpc_sample.features.append(random.uniform(0., 10.))
request.samples.append(grpc_sample)
for _ in range(100):
grpc_sample = Sample()
for __ in range(20):
grpc_sample.features.append(random.uniform(50., 60.))
request.samples.append(grpc_sample)
# for _ in range(100):
# grpc_sample = Sample()
# for __ in range(20):
# grpc_sample.features.append(random.uniform(50., 60.))
# request.samples.append(grpc_sample)
for _ in range(alien_samples):
grpc_sample = Sample()
for __ in range(20):
for __ in range(request.num_features):
grpc_sample.features.append(random.uniform(5000., 6000.))
request.samples.append(grpc_sample)
response: DetectionResponse = dbscanserving_client.Detect(request)
try:
dbscanserving_client: DbscanServingClient = DbscanServingClient()
response: DetectionResponse = dbscanserving_client.Detect(request)
dbscanserving_client.close()
except Exception as e:
LOGGER.exception(e)
# including KPI
kpi = Kpi()
kpi.kpi_id.kpi_id.uuid = kpi_id
kpi.timestamp = Timestamp()
kpi.kpi_value.int32Val = response.cluster_indices[-1]
monitoring_client.IncludeKpi(kpi)
if -1 in response.cluster_indices: # attack detected
attack = AttackDescription()
# attack.cs_id.uuid = service.service_id.service_uuid.uuid
response: AttackResponse = attack_mitigator_client.NotifyAttack(attack)
kpi.timestamp.timestamp = timestamp_utcnow_to_float()
# kpi.kpi_value.int32Val = response.cluster_indices[-1]
kpi.kpi_value.int32Val = 1
# monitoring_client.IncludeKpi(kpi)
# if -1 in response.cluster_indices: # attack detected
# attack = AttackDescription()
# # attack.cs_id.uuid = service.service_id.service_uuid.uuid
# response: AttackResponse = attack_mitigator_client.NotifyAttack(attack)
return 0
app.worker_main([
......
......@@ -16,9 +16,11 @@ FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ && \
apt-get --yes --quiet --quiet install wget g++ nano && \
rm -rf /var/lib/apt/lists/*
# TODO: remove nano from installation
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
......
from celery import Celery
import random
from common.Settings import get_log_level, get_metrics_port, get_setting
from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse, Sample
from dbscanserving.client.DbscanServingClient import DbscanServingClient
dbscanserving_client: DbscanServingClient = DbscanServingClient()
alien_samples = random.randint(2, 20)
request: DetectionRequest = DetectionRequest()
request.num_samples = 300 + alien_samples
request.num_features = 20
request.eps = 100.5
request.min_samples = 5
for _ in range(200):
grpc_sample = Sample()
for __ in range(20):
grpc_sample.features.append(random.uniform(0., 10.))
request.samples.append(grpc_sample)
for _ in range(100):
grpc_sample = Sample()
for __ in range(20):
grpc_sample.features.append(random.uniform(50., 60.))
request.samples.append(grpc_sample)
for _ in range(alien_samples):
grpc_sample = Sample()
for __ in range(20):
grpc_sample.features.append(random.uniform(5000., 6000.))
request.samples.append(grpc_sample)
response: DetectionResponse = dbscanserving_client.Detect(request)
REDIS_PASSWORD = get_setting('REDIS_PASSWORD')
REDIS_HOST = get_setting('CACHINGSERVICE_SERVICE_HOST')
REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT')
BROKER_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0'
app = Celery(
'cybersecurity',
broker=BROKER_URL,
backend=BROKER_URL
)
service = {'context': 'admin', 'service': '23bb5c96-e377-4943-a47a-4db9c54104cc', 'kpi': '1'}
result = app.send_task('detect_attack', (service['context'], service['service'], service['kpi']))
print('waiting for result...')
print('Result:', result.get())
\ No newline at end of file
......@@ -129,10 +129,11 @@ def main():
REDIS_HOST = get_setting('CACHINGSERVICE_SERVICE_HOST')
REDIS_PORT = get_setting('CACHINGSERVICE_SERVICE_PORT_REDIS')
BROKER_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0'
BACKEND_URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/1'
app = Celery(
'cybersecurity',
broker=BROKER_URL,
backend=BROKER_URL
backend=BACKEND_URL
)
LOGGER.info('Connected to REDIS...')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment