Newer
Older
Carlos Natalino
committed
import asyncio
import logging
import signal
import sys
import threading
import time
Carlos Natalino
committed
from multiprocessing import Manager, Process
from typing import List
Carlos Natalino
committed
from common.Constants import ServiceNameEnum
from common.proto.asyncio.optical_attack_detector_grpc import (
OpticalAttackDetectorServiceStub,
)
Carlos Natalino
committed
from common.proto.asyncio.optical_attack_detector_pb2 import DetectionRequest
from common.proto.context_pb2 import ContextIdList, Empty, EventTypeEnum, ServiceIdList
Carlos Natalino
committed
from common.proto.kpi_sample_types_pb2 import KpiSampleType
Carlos Natalino
committed
from common.proto.monitoring_pb2 import KpiDescriptor
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST,
ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name,
get_log_level,
get_metrics_port,
get_setting,
wait_for_environment_variables,
)
from common.tools.grpc.Tools import grpc_message_to_json_string
Carlos Natalino
committed
from context.client.ContextClient import ContextClient
Carlos Natalino
committed
from grpclib.client import Channel
Carlos Natalino
committed
from monitoring.client.MonitoringClient import MonitoringClient
Carlos Natalino
committed
from opticalattackmanager.Config import MONITORING_INTERVAL
from opticalattackmanager.utils.EventsCollector import EventsCollector
from prometheus_client import start_http_server, Histogram, Counter
Carlos Natalino
committed
terminate = threading.Event()
LOGGER = None
# Create a metric to track time spent and requests made.
# TODO: adjust histogram buckets to more realistic values
LOOP_TIME = Histogram('optical_security_loop_seconds', 'Time taken by each security loop')
DROP_COUNTER = Counter('optical_security_dropped_assessments', 'Dropped assessments due to detector timeout')
Carlos Natalino
committed
def signal_handler(_, _): # pylint: disable=redefined-outer-name
Carlos Natalino
committed
LOGGER.warning("Terminate signal received")
Carlos Natalino
committed
terminate.set()
Carlos Natalino
committed
async def detect_attack(
host: str,
port: int,
context_id: str,
service_id: str,
kpi_id: str,
timeout: float = 10.0,
) -> None:
Carlos Natalino
committed
LOGGER.info("Sending request for {}...".format(service_id))
async with Channel(host, port) as channel:
stub = OpticalAttackDetectorServiceStub(channel)
Carlos Natalino
committed
request: DetectionRequest = DetectionRequest()
request.service_id.context_id.context_uuid.uuid = context_id
request.service_id.service_uuid.uuid = str(service_id)
request.kpi_id.kpi_id.uuid = kpi_id
await stub.DetectAttack(request, timeout=timeout)
LOGGER.info("Monitoring finished for {}".format(service_id))
except Exception as e:
Carlos Natalino
committed
LOGGER.warning("Exception while processing service_id {}".format(service_id))
DROP_COUNTER.inc()
Carlos Natalino
committed
async def monitor_services(service_list: List):
Carlos Natalino
committed
monitoring_interval = int(
get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL)
)
Carlos Natalino
committed
host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST")
port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC"))
Carlos Natalino
committed
Carlos Natalino
committed
LOGGER.info("Starting execution of the async loop")
Carlos Natalino
committed
while not terminate.is_set():
Carlos Natalino
committed
if len(service_list) == 0:
LOGGER.debug("No services to monitor...")
time.sleep(monitoring_interval)
continue
LOGGER.info("Starting new monitoring cycle...")
Carlos Natalino
committed
start_time = time.time()
tasks = []
for service in service_list:
Carlos Natalino
committed
aw = detect_attack(
host,
port,
service["context"],
service["service"],
service["kpi"],
# allow at most 90% of the monitoring interval to succeed
monitoring_interval * 0.9,
)
tasks.append(aw)
[await aw for aw in tasks]
Carlos Natalino
committed
Carlos Natalino
committed
end_time = time.time()
Carlos Natalino
committed
Carlos Natalino
committed
time_taken = end_time - start_time
LOOP_TIME.observe(time_taken)
Carlos Natalino
committed
LOGGER.info(
"Monitoring loop with {} services took {:.3f} seconds ({:.2f}%)... "
"Waiting for {:.2f} seconds...".format(
len(service_list),
time_taken,
(time_taken / monitoring_interval) * 100,
monitoring_interval - time_taken,
)
)
if time_taken / monitoring_interval > 0.9:
LOGGER.warning(
"Monitoring loop is taking {} % of the desired time "
"({} seconds)".format(
(time_taken / monitoring_interval) * 100, monitoring_interval
)
)
if monitoring_interval - time_taken > 0:
time.sleep(monitoring_interval - time_taken)
Carlos Natalino
committed
def create_kpi(client: MonitoringClient, service_id):
# create kpi
kpi_description: KpiDescriptor = KpiDescriptor()
Carlos Natalino
committed
kpi_description.kpi_description = "Security status of service {}".format(service_id)
Carlos Natalino
committed
kpi_description.service_id.service_uuid.uuid = service_id
kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN
new_kpi = client.SetKpi(kpi_description)
Carlos Natalino
committed
LOGGER.info("Created KPI {}: ".format(grpc_message_to_json_string(new_kpi)))
Carlos Natalino
committed
return new_kpi
Carlos Natalino
committed
def get_context_updates(service_list: List):
Carlos Natalino
committed
# to make sure we are thread safe...
Carlos Natalino
committed
LOGGER.info("Connecting with context and monitoring components...")
Carlos Natalino
committed
context_client: ContextClient = ContextClient()
monitoring_client: MonitoringClient = MonitoringClient()
Carlos Natalino
committed
events_collector: EventsCollector = EventsCollector(context_client)
events_collector.start()
LOGGER.info("Connected successfully... Waiting for events...")
Carlos Natalino
committed
time.sleep(20)
Carlos Natalino
committed
while not terminate.wait(timeout=1):
event = events_collector.get_event(block=True, timeout=1)
if event is None:
LOGGER.info("No event received")
continue # no event received
Carlos Natalino
committed
LOGGER.info("Event received: {}".format(grpc_message_to_json_string(event)))
Carlos Natalino
committed
if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
LOGGER.info(
"Service created: {}".format(
grpc_message_to_json_string(event.service_id)
)
)
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, event.service_id.service_uuid.uuid)
Carlos Natalino
committed
service_list.append(
{
"context": event.service_id.context_id.context_uuid.uuid,
"service": event.service_id.service_uuid.uuid,
"kpi": kpi_id.kpi_id.uuid,
}
)
Carlos Natalino
committed
elif event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE:
LOGGER.info(
"Service removed: {}".format(
grpc_message_to_json_string(event.service_id)
)
)
Carlos Natalino
committed
# find service and remove it from the list of currently monitored
for service in service_list:
Carlos Natalino
committed
if (
service["service"] == event.service_id.service_uuid.uuid
and service["context"]
== event.service_id.context_id.context_uuid.uuid
):
Carlos Natalino
committed
service_list.remove(service)
break
Carlos Natalino
committed
events_collector.stop()
Carlos Natalino
committed
def main():
Carlos Natalino
committed
global LOGGER # pylint: disable=global-statement
Carlos Natalino
committed
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
Carlos Natalino
committed
logging.getLogger("hpack").setLevel(logging.CRITICAL)
Carlos Natalino
committed
wait_for_environment_variables(
[
Carlos Natalino
committed
get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST),
Carlos Natalino
committed
get_env_var_name(
ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC
),
]
)
wait_for_environment_variables(
[
Carlos Natalino
committed
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
Carlos Natalino
committed
]
)
Carlos Natalino
committed
wait_for_environment_variables(
[
get_env_var_name(
ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST
),
get_env_var_name(
ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC
),
]
)
signal.signal(signal.SIGINT, signal_handler)
Carlos Natalino
committed
signal.signal(signal.SIGTERM, signal_handler)
Carlos Natalino
committed
LOGGER.info("Starting...")
Carlos Natalino
committed
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
Carlos Natalino
committed
Carlos Natalino
committed
LOGGER.info("Connecting with context component...")
Carlos Natalino
committed
context_client: ContextClient = ContextClient()
Carlos Natalino
committed
monitoring_client: MonitoringClient = MonitoringClient()
LOGGER.info("Connected successfully...")
Carlos Natalino
committed
# creating a thread-safe list to be shared among threads
# TODO: comment the lines below to stop monitoring dummy services
Carlos Natalino
committed
service_list = Manager().list()
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, "1213")
Carlos Natalino
committed
service_list.append(
{"context": "admin", "service": "1213", "kpi": kpi_id.kpi_id.uuid}
)
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, "1456")
Carlos Natalino
committed
service_list.append(
{"context": "admin", "service": "1456", "kpi": kpi_id.kpi_id.uuid}
)
Carlos Natalino
committed
context_ids: ContextIdList = context_client.ListContextIds(Empty())
# populate with initial services
for context_id in context_ids.context_ids:
context_services: ServiceIdList = context_client.ListServiceIds(context_id)
for service in context_services.service_ids:
Carlos Natalino
committed
# in case of a service restart, monitoring component will not duplicate KPIs
# but rather return the existing KPI if that's the case
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, service.service_uuid.uuid)
service_list.append(
{
"context": context_id.context_uuid.uuid,
"service": service.service_uuid.uuid,
"kpi": kpi_id.kpi_id.uuid,
}
)
Carlos Natalino
committed
context_client.close()
Carlos Natalino
committed
monitoring_client.close()
Carlos Natalino
committed
# starting background process to monitor service addition/removal
process_context = Process(target=get_context_updates, args=(service_list,))
process_context.start()
Carlos Natalino
committed
time.sleep(5) # wait for the context updates to startup
# runs the async loop in the background
loop = asyncio.get_event_loop()
loop.run_until_complete(monitor_services(service_list))
# asyncio.create_task(monitor_services(service_list))
Carlos Natalino
committed
# Wait for Ctrl+C or termination signal
Carlos Natalino
committed
while not terminate.wait(timeout=0.1):
pass
Carlos Natalino
committed
Carlos Natalino
committed
LOGGER.info("Terminating...")
Carlos Natalino
committed
process_context.kill()
# process_security_loop.kill()
Carlos Natalino
committed
LOGGER.info("Bye")
Carlos Natalino
committed
return 0
Carlos Natalino
committed
if __name__ == "__main__":
Carlos Natalino
committed
sys.exit(main())