Skip to content
Snippets Groups Projects
Commit 885962b9 authored by Carlos Natalino Da Silva's avatar Carlos Natalino Da Silva
Browse files

Working version of the security loop. The new message between manager and...

Working version of the security loop. The new message between manager and detector is implemented and tested.
parent d9b1c554
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,7 @@ export TFS_REGISTRY_IMAGE="http://localhost:32000/tfs/"
# interdomain slice pathcomp dlt
# dbscanserving opticalattackmitigator opticalcentralizedattackdetector
# l3_attackmitigator l3_centralizedattackdetector l3_distributedattackdetector
export TFS_COMPONENTS="context device automation service compute monitoring webui dbscanserving opticalattackmitigator opticalattackmanager opticalattackdetector"
export TFS_COMPONENTS="context device automation service compute monitoring webui dbscanserving opticalattackmitigator" # opticalattackmanager opticalattackdetector
# Set the tag you want to use for your images.
export TFS_IMAGE_TAG="dev"
......
......@@ -19,6 +19,9 @@ cd $(dirname $0)
mkdir -p src/python
rm -rf src/python/*.py
mkdir -p src/python/asyncio
rm -rf src/python/asyncio/*.py
tee src/python/__init__.py << EOF > /dev/null
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
......@@ -35,6 +38,22 @@ tee src/python/__init__.py << EOF > /dev/null
# limitations under the License.
EOF
tee src/python/asyncio/__init__.py << EOF > /dev/null
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
EOF
# Generate Python code
python3 -m grpc_tools.protoc -I=./ --python_out=src/python/ --grpc_python_out=src/python/ *.proto
......
......@@ -27,6 +27,6 @@ service OpticalAttackDetectorService {
}
message DetectionRequest {
KpiId kpi_id = 1;
context.ServiceId service_id = 7;
context.ServiceId service_id = 1;
monitoring.KpiId kpi_id = 2;
}
......@@ -13,7 +13,7 @@
# limitations under the License.
import logging
from email.policy import default
from typing import Counter
import grpc
from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse
......@@ -63,10 +63,11 @@ class DbscanServingClient:
request.num_samples, request.num_features
)
)
response = self.stub.Detect(request)
response: DetectionResponse = self.stub.Detect(request)
LOGGER.debug(
"Detect result with {} cluster indices".format(
len(response.cluster_indices)
"Detect result with {} cluster indices [{}]".format(
len(response.cluster_indices),
Counter(response.cluster_indices)
)
)
return response
......@@ -13,7 +13,6 @@
# limitations under the License.
import logging
import os
import grpc
from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse
......
......@@ -19,6 +19,7 @@ class ManagementDB():
self.client = sl.connect(database, check_same_thread=False)
self.create_monitoring_table()
self.create_subscription_table()
self.create_alarm_table()
def create_monitoring_table(self):
self.client.execute("""
......@@ -45,9 +46,24 @@ class ManagementDB():
);
""")
def create_alarm_table(self):
self.client.execute("""
CREATE TABLE IF NOT EXISTS alarm(
alarm_id INTEGER PRIMARY KEY AUTOINCREMENT,
alarm_description TEXT,
alarm_name TEXT,
kpi_id INTEGER,
kpi_min_value REAL,
kpi_max_value REAL,
in_range INTEGER,
include_min_value INTEGER,
include_max_value INTEGER
);
""")
def insert_KPI(self,kpi_description,kpi_sample_type,device_id,endpoint_id,service_id):
c = self.client.cursor()
c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ?",(device_id,kpi_sample_type,endpoint_id))
c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ? AND endpoint_id is ? AND service_id is ?",(device_id,kpi_sample_type,endpoint_id,service_id))
data=c.fetchone()
if data is None:
c.execute("INSERT INTO kpi (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id) VALUES (?,?,?,?,?)", (kpi_description,kpi_sample_type,device_id,endpoint_id,service_id))
......@@ -68,14 +84,26 @@ class ManagementDB():
print("already exists")
return data[0]
def delete_KPI(self,device_id,kpi_sample_type):
def insert_alarm(self,alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value):
c = self.client.cursor()
c.execute("SELECT alarm_id FROM alarm WHERE alarm_description is ? AND alarm_name is ? AND kpi_id is ? AND kpi_min_value is ? AND kpi_max_value is ? AND in_range is ? AND include_min_value is ? AND include_max_value is ?",(alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value))
data=c.fetchone()
if data is None:
c.execute("INSERT INTO alarm (alarm_description, alarm_name, kpi_id, kpi_min_value, kpi_max_value, in_range, include_min_value, include_max_value) VALUES (?,?,?,?,?,?,?,?)", (alarm_description,alarm_name,kpi_id,kpi_min_value,kpi_max_value,in_range,include_min_value,include_max_value))
self.client.commit()
return c.lastrowid
else:
print("already exists")
return data[0]
def delete_KPI(self,kpi_id):
c = self.client.cursor()
c.execute("SELECT kpi_id FROM kpi WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type))
c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,))
data=c.fetchone()
if data is None:
return False
else:
c.execute("DELETE FROM kpi WHERE device_id is ? AND kpi_sample_type is ?",(device_id,kpi_sample_type))
c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,))
self.client.commit()
return True
......@@ -90,14 +118,14 @@ class ManagementDB():
self.client.commit()
return True
def delete_kpid_id(self,kpi_id):
def delete_alarm(self,alarm_id):
c = self.client.cursor()
c.execute("SELECT * FROM kpi WHERE kpi_id is ?",(kpi_id,))
c.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,))
data=c.fetchone()
if data is None:
return False
else:
c.execute("DELETE FROM kpi WHERE kpi_id is ?",(kpi_id,))
c.execute("DELETE FROM alarm WHERE alarm_id is ?",(alarm_id,))
self.client.commit()
return True
......@@ -108,17 +136,19 @@ class ManagementDB():
def get_subscription(self,subs_id):
data = self.client.execute("SELECT * FROM subscription WHERE subs_id is ?",(subs_id,))
return data.fetchone()
def get_alarm(self,alarm_id):
data = self.client.execute("SELECT * FROM alarm WHERE alarm_id is ?",(alarm_id,))
return data.fetchone()
def get_KPIS(self):
data = self.client.execute("SELECT * FROM kpi")
#print("\n")
#for row in data:
# print(row)
return data.fetchall()
def get_subscriptions(self):
data = self.client.execute("SELECT * FROM subscription")
#print("\n")
#for row in data:
# print(row)
return data.fetchall()
def get_alarms(self):
data = self.client.execute("SELECT * FROM alarm")
return data.fetchall()
\ No newline at end of file
......@@ -16,13 +16,14 @@ import logging
import random
import grpc
from common.proto.context_pb2 import Empty, ServiceId
from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse, Sample
from common.proto.context_pb2 import Empty
from common.proto import dbscanserving_pb2 as dbscan
from common.proto.monitoring_pb2 import Kpi
from common.proto.optical_attack_detector_pb2_grpc import (
OpticalAttackDetectorServiceServicer,
)
from common.proto.optical_attack_mitigator_pb2 import AttackDescription, AttackResponse
from common.proto import optical_attack_detector_pb2 as oad
from common.rpc_method_wrapper.Decorator import (
create_metrics,
safe_and_metered_rpc_method,
......@@ -56,35 +57,30 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DetectAttack(
self, service_id: ServiceId, context: grpc.ServicerContext
self, request: oad.DetectionRequest, context: grpc.ServicerContext
) -> Empty:
LOGGER.debug(
"Received request for {}/{}...".format(
service_id.context_id.context_uuid.uuid, service_id.service_uuid.uuid
)
)
# run attack detection for every service
request: DetectionRequest = DetectionRequest()
request.num_samples = 310
request.num_features = 100
request.eps = 100.5
request.min_samples = 5
detection_request: dbscan.DetectionRequest = dbscan.DetectionRequest()
detection_request.num_samples = 310
detection_request.num_features = 100
detection_request.eps = 100.5
detection_request.min_samples = 5
for _ in range(200):
grpc_sample = Sample()
grpc_sample = dbscan.Sample()
for __ in range(100):
grpc_sample.features.append(random.uniform(0.0, 10.0))
request.samples.append(grpc_sample)
detection_request.samples.append(grpc_sample)
for _ in range(100):
grpc_sample = Sample()
grpc_sample = dbscan.Sample()
for __ in range(100):
grpc_sample.features.append(random.uniform(50.0, 60.0))
request.samples.append(grpc_sample)
detection_request.samples.append(grpc_sample)
for _ in range(10):
grpc_sample = Sample()
grpc_sample = dbscan.Sample()
for __ in range(100):
grpc_sample.features.append(random.uniform(5000.0, 6000.0))
request.samples.append(grpc_sample)
response: DetectionResponse = dbscanserving_client.Detect(request)
detection_request.samples.append(grpc_sample)
response: dbscan.DetectionResponse = dbscanserving_client.Detect(detection_request)
# including KPI
# TODO: set kpi_id and kpi_value according to the service
......@@ -96,7 +92,7 @@ class OpticalAttackDetectorServiceServicerImpl(OpticalAttackDetectorServiceServi
if -1 in response.cluster_indices: # attack detected
attack = AttackDescription()
attack.cs_id.uuid = service_id.service_uuid.uuid
attack.cs_id.uuid = request.service_id.service_uuid.uuid
response: AttackResponse = attack_mitigator_client.NotifyAttack(attack)
# if attack is detected, run the attack mitigator
......
......@@ -8,23 +8,13 @@ import time
from multiprocessing import Manager, Process
from typing import List
from grpclib.client import Channel
from common.Constants import ServiceNameEnum
from common.proto.context_pb2 import (
ContextIdList,
Empty,
EventTypeEnum,
ServiceIdList,
)
from common.proto.asyncio.optical_attack_detector_grpc import OpticalAttackDetectorServiceStub
from common.proto.asyncio.optical_attack_detector_pb2 import DetectionRequest
from common.proto.context_pb2 import ContextIdList, Empty, EventTypeEnum, ServiceIdList
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiDescriptor
from common.proto.asyncio.optical_attack_detector_grpc import (
OpticalAttackDetectorServiceStub,
)
from common.proto.asyncio.context_pb2 import (
ServiceId,
)
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST,
ENVVAR_SUFIX_SERVICE_PORT_GRPC,
......@@ -35,6 +25,7 @@ from common.Settings import (
wait_for_environment_variables,
)
from context.client.ContextClient import ContextClient
from grpclib.client import Channel
from monitoring.client.MonitoringClient import MonitoringClient
from opticalattackmanager.Config import MONITORING_INTERVAL
from opticalattackmanager.utils.EventsCollector import EventsCollector
......@@ -43,44 +34,41 @@ from prometheus_client import start_http_server
terminate = threading.Event()
LOGGER = None
# For more channel options, please see:
# https://grpc.io/grpc/core/group__grpc__arg__keys.html
# CHANNEL_OPTIONS = [
# ("grpc.lb_policy_name", "pick_first"),
# ("grpc.enable_retries", True),
# ("grpc.keepalive_timeout_ms", 10000),
# ]
# TODO: configure retries
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning("Terminate signal received")
terminate.set()
async def detect_attack(host: str, port: int, context_id: str, service_id: str) -> None:
async def detect_attack(
host: str,
port: int,
context_id: str,
service_id: str,
kpi_id: str,
timeout: float = 10.0,
) -> None:
try:
LOGGER.info("Sending request for {}...".format(service_id))
async with Channel(host, port) as channel:
# async with grpc.aio.insecure_channel(
# target=endpoint, options=CHANNEL_OPTIONS
# ) as channel:
stub = OpticalAttackDetectorServiceStub(channel)
service = ServiceId()
service.context_id.context_uuid.uuid = context_id
service.service_uuid.uuid = str(service_id)
# Timeout in seconds.
# Please refer gRPC Python documents for more detail.
# https://grpc.io/grpc/python/grpc.html
await stub.DetectAttack(service, timeout=10)
request: DetectionRequest = DetectionRequest()
request.service_id.context_id.context_uuid.uuid = context_id
request.service_id.service_uuid.uuid = str(service_id)
request.kpi_id.kpi_id.uuid = kpi_id
await stub.DetectAttack(request, timeout=timeout)
LOGGER.info("Monitoring finished for {}".format(service_id))
except Exception as e:
LOGGER.warning("Exception while processing service_id {}".format(service_id))
LOGGER.exception(e)
# TODO: create prometheus metric to be increased here that counts the number
# of dropped detections
async def monitor_services(service_list: List[ServiceId]):
async def monitor_services(service_list: List):
monitoring_interval = int(
get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL)
......@@ -104,7 +92,15 @@ async def monitor_services(service_list: List[ServiceId]):
tasks = []
for service in service_list:
aw = detect_attack(host, port, service["context"], service["service"])
aw = detect_attack(
host,
port,
service["context"],
service["service"],
service["kpi"],
# allow at most 90% of the monitoring interval to succeed
monitoring_interval * 0.9,
)
tasks.append(aw)
[await aw for aw in tasks]
......@@ -139,11 +135,11 @@ def create_kpi(client: MonitoringClient, service_id):
kpi_description.service_id.service_uuid.uuid = service_id
kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN
new_kpi = client.SetKpi(kpi_description)
LOGGER.info("Created KPI {}...".format(new_kpi.kpi_id))
LOGGER.info("Created KPI {}: ".format(grpc_message_to_json_string(new_kpi)))
return new_kpi
def get_context_updates(service_list: List[ServiceId]):
def get_context_updates(service_list: List):
# to make sure we are thread safe...
LOGGER.info("Connecting with context and monitoring components...")
context_client: ContextClient = ContextClient()
......@@ -161,9 +157,9 @@ def get_context_updates(service_list: List[ServiceId]):
if event is None:
LOGGER.info("No event received")
continue # no event received
LOGGER.info("Event received: {}".format(event))
LOGGER.info("Event received: {}".format(grpc_message_to_json_string(event)))
if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
LOGGER.info("Service created: {}".format(event.service_id))
LOGGER.info("Service created: {}".format(grpc_message_to_json_string(event.service_id)))
kpi_id = create_kpi(monitoring_client, event.service_id.service_uuid.uuid)
service_list.append(
{
......@@ -174,7 +170,7 @@ def get_context_updates(service_list: List[ServiceId]):
)
elif event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE:
LOGGER.info("Service removed: {}".format(event.service_id))
LOGGER.info("Service removed: {}".format(grpc_message_to_json_string(event.service_id)))
# find service and remove it from the list of currently monitored
for service in service_list:
if (
......@@ -195,11 +191,11 @@ def main():
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
logging.getLogger("hpack").setLevel(logging.CRITICAL)
wait_for_environment_variables(
[
get_env_var_name(
ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST
),
get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST),
get_env_var_name(
ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC
),
......@@ -208,12 +204,8 @@ def main():
wait_for_environment_variables(
[
get_env_var_name(
ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST
),
get_env_var_name(
ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC
),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
]
)
......@@ -245,9 +237,13 @@ def main():
# creating a thread-safe list to be shared among threads
service_list = Manager().list()
kpi_id = create_kpi(monitoring_client, "1213")
service_list.append({"context": "admin", "service": "1213", "kpi": kpi_id.kpi_id.uuid})
service_list.append(
{"context": "admin", "service": "1213", "kpi": kpi_id.kpi_id.uuid}
)
kpi_id = create_kpi(monitoring_client, "1456")
service_list.append({"context": "admin", "service": "1456", "kpi": kpi_id.kpi_id.uuid})
service_list.append(
{"context": "admin", "service": "1456", "kpi": kpi_id.kpi_id.uuid}
)
context_ids: ContextIdList = context_client.ListContextIds(Empty())
......@@ -255,6 +251,8 @@ def main():
for context_id in context_ids.context_ids:
context_services: ServiceIdList = context_client.ListServiceIds(context_id)
for service in context_services.service_ids:
# in case of a service restart, monitoring component will not duplicate KPIs
# but rather return the existing KPI if that's the case
kpi_id = create_kpi(monitoring_client, service.service_uuid.uuid)
service_list.append(
{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment