Newer
Older
Carlos Natalino
committed
import asyncio
import logging
import random
import signal
import sys
import threading
import time
Carlos Natalino
committed
from multiprocessing import Manager, Process
from typing import List
Carlos Natalino
committed
from grpclib.client import Channel
from common.Constants import ServiceNameEnum
from common.proto.context_pb2 import (
ContextIdList,
Empty,
EventTypeEnum,
ServiceIdList,
)
Carlos Natalino
committed
from common.proto.kpi_sample_types_pb2 import KpiSampleType
Carlos Natalino
committed
from common.proto.monitoring_pb2 import KpiDescriptor
from common.proto.asyncio.optical_attack_detector_grpc import (
OpticalAttackDetectorServiceStub,
)
from common.proto.asyncio.context_pb2 import (
ServiceId,
)
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST,
ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name,
get_log_level,
get_metrics_port,
get_setting,
wait_for_environment_variables,
)
from context.client.ContextClient import ContextClient
Carlos Natalino
committed
from monitoring.client.MonitoringClient import MonitoringClient
Carlos Natalino
committed
from opticalattackmanager.Config import MONITORING_INTERVAL
from opticalattackmanager.utils.EventsCollector import EventsCollector
from prometheus_client import start_http_server
Carlos Natalino
committed
terminate = threading.Event()
LOGGER = None
Carlos Natalino
committed
# For more channel options, please see:
# https://grpc.io/grpc/core/group__grpc__arg__keys.html
# CHANNEL_OPTIONS = [
# ("grpc.lb_policy_name", "pick_first"),
# ("grpc.enable_retries", True),
# ("grpc.keepalive_timeout_ms", 10000),
# ]
Carlos Natalino
committed
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning("Terminate signal received")
Carlos Natalino
committed
terminate.set()
Carlos Natalino
committed
async def detect_attack(host: str, port: int, context_id: str, service_id: str) -> None:
Carlos Natalino
committed
LOGGER.info("Sending request for {}...".format(service_id))
async with Channel(host, port) as channel:
# async with grpc.aio.insecure_channel(
# target=endpoint, options=CHANNEL_OPTIONS
# ) as channel:
stub = OpticalAttackDetectorServiceStub(channel)
service = ServiceId()
service.context_id.context_uuid.uuid = context_id
service.service_uuid.uuid = str(service_id)
# Timeout in seconds.
Carlos Natalino
committed
# Please refer gRPC Python documents for more detail.
# https://grpc.io/grpc/python/grpc.html
await stub.DetectAttack(service, timeout=10)
LOGGER.info("Monitoring finished for {}".format(service_id))
except Exception as e:
Carlos Natalino
committed
LOGGER.warning("Exception while processing service_id {}".format(service_id))
async def monitor_services(service_list: List[ServiceId]):
Carlos Natalino
committed
monitoring_interval = int(
get_setting("MONITORING_INTERVAL", default=MONITORING_INTERVAL)
)
Carlos Natalino
committed
host = get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_HOST")
port = int(get_setting("OPTICALATTACKDETECTORSERVICE_SERVICE_PORT_GRPC"))
Carlos Natalino
committed
Carlos Natalino
committed
LOGGER.info("Starting execution of the async loop")
Carlos Natalino
committed
while not terminate.is_set():
Carlos Natalino
committed
if len(service_list) == 0:
LOGGER.debug("No services to monitor...")
time.sleep(monitoring_interval)
continue
LOGGER.info("Starting new monitoring cycle...")
Carlos Natalino
committed
start_time = time.time()
tasks = []
for service in service_list:
Carlos Natalino
committed
aw = detect_attack(host, port, service["context"], service["service"])
tasks.append(aw)
[await aw for aw in tasks]
Carlos Natalino
committed
Carlos Natalino
committed
end_time = time.time()
Carlos Natalino
committed
Carlos Natalino
committed
time_taken = end_time - start_time
LOGGER.info(
"Monitoring loop with {} services took {:.3f} seconds ({:.2f}%)... "
"Waiting for {:.2f} seconds...".format(
len(service_list),
time_taken,
(time_taken / monitoring_interval) * 100,
monitoring_interval - time_taken,
)
)
if time_taken / monitoring_interval > 0.9:
LOGGER.warning(
"Monitoring loop is taking {} % of the desired time "
"({} seconds)".format(
(time_taken / monitoring_interval) * 100, monitoring_interval
)
)
if monitoring_interval - time_taken > 0:
time.sleep(monitoring_interval - time_taken)
Carlos Natalino
committed
def create_kpi(client: MonitoringClient, service_id):
# create kpi
kpi_description: KpiDescriptor = KpiDescriptor()
Carlos Natalino
committed
kpi_description.kpi_description = "Security status of service {}".format(service_id)
Carlos Natalino
committed
kpi_description.service_id.service_uuid.uuid = service_id
kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN
new_kpi = client.SetKpi(kpi_description)
Carlos Natalino
committed
LOGGER.info("Created KPI {}...".format(new_kpi.kpi_id))
Carlos Natalino
committed
return new_kpi
def get_context_updates(service_list: List[ServiceId]):
# to make sure we are thread safe...
Carlos Natalino
committed
LOGGER.info("Connecting with context and monitoring components...")
Carlos Natalino
committed
context_client: ContextClient = ContextClient()
monitoring_client: MonitoringClient = MonitoringClient()
Carlos Natalino
committed
events_collector: EventsCollector = EventsCollector(context_client)
events_collector.start()
LOGGER.info("Connected successfully... Waiting for events...")
Carlos Natalino
committed
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, service["service"])
Carlos Natalino
committed
time.sleep(20)
Carlos Natalino
committed
while not terminate.wait(timeout=1):
event = events_collector.get_event(block=True, timeout=1)
if event is None:
LOGGER.info("No event received")
continue # no event received
LOGGER.info("Event received: {}".format(event))
Carlos Natalino
committed
if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
Carlos Natalino
committed
LOGGER.info("Service created: {}".format(event.service_id))
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, event.service_id.service_uuid.uuid)
Carlos Natalino
committed
service_list.append(
{
"context": event.service_id.context_id.context_uuid.uuid,
"service": event.service_id.service_uuid.uuid,
"kpi": kpi_id.kpi_id.uuid,
}
)
Carlos Natalino
committed
elif event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE:
Carlos Natalino
committed
LOGGER.info("Service removed: {}".format(event.service_id))
Carlos Natalino
committed
# find service and remove it from the list of currently monitored
for service in service_list:
Carlos Natalino
committed
if (
service["service"] == event.service_id.service_uuid.uuid
and service["context"]
== event.service_id.context_id.context_uuid.uuid
):
Carlos Natalino
committed
service_list.remove(service)
break
Carlos Natalino
committed
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
events_collector.stop()
# for event in context_client.GetServiceEvents(Empty()):
# LOGGER.info("Event received: {}".format(event))
# if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
# LOGGER.info("Service created: {}".format(event.service_id))
# kpi_id = create_kpi(monitoring_client, event.service_id.service_uuid.uuid)
# service_list.append(
# {
# "context": event.service_id.context_id.context_uuid.uuid,
# "service": event.service_id.service_uuid.uuid,
# "kpi": kpi_id.kpi_id.uuid,
# }
# )
# elif event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE:
# LOGGER.info("Service removed: {}".format(event.service_id))
# # find service and remove it from the list of currently monitored
# for service in service_list:
# if (
# service["service"] == event.service_id.service_uuid.uuid
# and service["context"]
# == event.service_id.context_id.context_uuid.uuid
# ):
# service_list.remove(service)
# break
# if terminate.is_set(): # if terminate is set
# LOGGER.warning("Stopping execution of the get_context_updates...")
# context_client.close()
# monitoring_client.close()
# break # break the while and stop execution
# LOGGER.debug("Waiting for next event...")
Carlos Natalino
committed
def main():
Carlos Natalino
committed
global LOGGER # pylint: disable=global-statement
Carlos Natalino
committed
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
Carlos Natalino
committed
wait_for_environment_variables(
[
get_env_var_name(
ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST
),
get_env_var_name(
ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC
),
]
)
signal.signal(signal.SIGINT, signal_handler)
Carlos Natalino
committed
signal.signal(signal.SIGTERM, signal_handler)
Carlos Natalino
committed
LOGGER.info("Starting...")
Carlos Natalino
committed
# Start metrics server
metrics_port = get_metrics_port()
# start_http_server(metrics_port) # TODO: uncomment this line
Carlos Natalino
committed
Carlos Natalino
committed
LOGGER.info("Connecting with context component...")
Carlos Natalino
committed
context_client: ContextClient = ContextClient()
Carlos Natalino
committed
monitoring_client: MonitoringClient = MonitoringClient()
LOGGER.info("Connected successfully...")
Carlos Natalino
committed
# creating a thread-safe list to be shared among threads
service_list = Manager().list()
Carlos Natalino
committed
service_list.append({"context": "admin", "service": "1213"})
service_list.append({"context": "admin", "service": "1456"})
Carlos Natalino
committed
context_ids: ContextIdList = context_client.ListContextIds(Empty())
# populate with initial services
for context_id in context_ids.context_ids:
context_services: ServiceIdList = context_client.ListServiceIds(context_id)
for service in context_services.service_ids:
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, service.service_uuid.uuid)
service_list.append(
{
"context": context_id.context_uuid.uuid,
"service": service.service_uuid.uuid,
"kpi": kpi_id.kpi_id.uuid,
}
)
Carlos Natalino
committed
context_client.close()
Carlos Natalino
committed
monitoring_client.close()
Carlos Natalino
committed
# starting background process to monitor service addition/removal
process_context = Process(target=get_context_updates, args=(service_list,))
process_context.start()
Carlos Natalino
committed
time.sleep(5) # wait for the context updates to startup
# runs the async loop in the background
loop = asyncio.get_event_loop()
loop.run_until_complete(monitor_services(service_list))
# asyncio.create_task(monitor_services(service_list))
Carlos Natalino
committed
# Wait for Ctrl+C or termination signal
Carlos Natalino
committed
while not terminate.wait(timeout=0.1):
pass
Carlos Natalino
committed
Carlos Natalino
committed
LOGGER.info("Terminating...")
Carlos Natalino
committed
process_context.kill()
# process_security_loop.kill()
Carlos Natalino
committed
LOGGER.info("Bye")
Carlos Natalino
committed
return 0
Carlos Natalino
committed
if __name__ == "__main__":
Carlos Natalino
committed
sys.exit(main())