Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Carlos Natalino
committed
import asyncio
import logging
import signal
import sys
import threading
import time
Carlos Natalino
committed
from multiprocessing import Manager, Process
from typing import List
Carlos Natalino
committed
from common.Constants import ServiceNameEnum
from common.proto.asyncio.optical_attack_detector_grpc import (
OpticalAttackDetectorServiceStub,
)
Carlos Natalino
committed
from common.proto.asyncio.optical_attack_detector_pb2 import DetectionRequest
from common.proto.context_pb2 import ContextIdList, Empty, EventTypeEnum, ServiceIdList
Carlos Natalino
committed
from common.proto.kpi_sample_types_pb2 import KpiSampleType
Carlos Natalino
committed
from common.proto.monitoring_pb2 import KpiDescriptor
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST,
ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name,
get_log_level,
get_metrics_port,
get_setting,
wait_for_environment_variables,
)
from common.tools.grpc.Tools import grpc_message_to_json_string
Carlos Natalino
committed
from context.client.ContextClient import ContextClient
Carlos Natalino
committed
from grpclib.client import Channel
Carlos Natalino
committed
from monitoring.client.MonitoringClient import MonitoringClient
Carlos Natalino
committed
from opticalattackmanager.utils.EventsCollector import EventsCollector
from opticalattackmanager.utils.monitor import monitor_services
from prometheus_client import Counter, Histogram, start_http_server
Carlos Natalino
committed
terminate = threading.Event()
LOGGER = None
Carlos Natalino
committed
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
Carlos Natalino
committed
LOGGER.warning("Terminate signal received")
Carlos Natalino
committed
terminate.set()
def create_kpi(client: MonitoringClient, service_id):
# create kpi
kpi_description: KpiDescriptor = KpiDescriptor()
Carlos Natalino
committed
kpi_description.kpi_description = "Security status of service {}".format(service_id)
Carlos Natalino
committed
kpi_description.service_id.service_uuid.uuid = service_id
kpi_description.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN
new_kpi = client.SetKpi(kpi_description)
Carlos Natalino
committed
LOGGER.info("Created KPI {}: ".format(grpc_message_to_json_string(new_kpi)))
Carlos Natalino
committed
return new_kpi
Carlos Natalino
committed
def get_context_updates(service_list: List):
Carlos Natalino
committed
# to make sure we are thread safe...
Carlos Natalino
committed
LOGGER.info("Connecting with context and monitoring components...")
Carlos Natalino
committed
context_client: ContextClient = ContextClient()
monitoring_client: MonitoringClient = MonitoringClient()
Carlos Natalino
committed
events_collector: EventsCollector = EventsCollector(context_client)
events_collector.start()
LOGGER.info("Connected successfully... Waiting for events...")
Carlos Natalino
committed
time.sleep(20)
Carlos Natalino
committed
while not terminate.wait(timeout=1):
event = events_collector.get_event(block=True, timeout=1)
if event is None:
LOGGER.info("No event received")
continue # no event received
Carlos Natalino
committed
LOGGER.info("Event received: {}".format(grpc_message_to_json_string(event)))
Carlos Natalino
committed
if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
LOGGER.info(
"Service created: {}".format(
grpc_message_to_json_string(event.service_id)
)
)
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, event.service_id.service_uuid.uuid)
Carlos Natalino
committed
service_list.append(
{
"context": event.service_id.context_id.context_uuid.uuid,
"service": event.service_id.service_uuid.uuid,
"kpi": kpi_id.kpi_id.uuid,
}
)
Carlos Natalino
committed
elif event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE:
LOGGER.info(
"Service removed: {}".format(
grpc_message_to_json_string(event.service_id)
)
)
Carlos Natalino
committed
# find service and remove it from the list of currently monitored
for service in service_list:
Carlos Natalino
committed
if (
service["service"] == event.service_id.service_uuid.uuid
and service["context"]
== event.service_id.context_id.context_uuid.uuid
):
Carlos Natalino
committed
service_list.remove(service)
break
Carlos Natalino
committed
events_collector.stop()
Carlos Natalino
committed
def main():
Carlos Natalino
committed
global LOGGER # pylint: disable=global-statement
Carlos Natalino
committed
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
Carlos Natalino
committed
logging.getLogger("hpack").setLevel(logging.CRITICAL)
Carlos Natalino
committed
wait_for_environment_variables(
[
Carlos Natalino
committed
get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST),
Carlos Natalino
committed
get_env_var_name(
ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC
),
]
)
wait_for_environment_variables(
[
Carlos Natalino
committed
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
Carlos Natalino
committed
]
)
Carlos Natalino
committed
wait_for_environment_variables(
[
get_env_var_name(
ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST
),
get_env_var_name(
ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC
),
]
)
signal.signal(signal.SIGINT, signal_handler)
Carlos Natalino
committed
signal.signal(signal.SIGTERM, signal_handler)
Carlos Natalino
committed
LOGGER.info("Starting...")
Carlos Natalino
committed
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
Carlos Natalino
committed
Carlos Natalino
committed
LOGGER.info("Connecting with context component...")
Carlos Natalino
committed
context_client: ContextClient = ContextClient()
Carlos Natalino
committed
monitoring_client: MonitoringClient = MonitoringClient()
LOGGER.info("Connected successfully...")
Carlos Natalino
committed
# creating a thread-safe list to be shared among threads
# TODO: comment the lines below to stop monitoring dummy services
Carlos Natalino
committed
service_list = Manager().list()
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, "1213")
Carlos Natalino
committed
service_list.append(
{"context": "admin", "service": "1213", "kpi": kpi_id.kpi_id.uuid}
)
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, "1456")
Carlos Natalino
committed
service_list.append(
{"context": "admin", "service": "1456", "kpi": kpi_id.kpi_id.uuid}
)
Carlos Natalino
committed
context_ids: ContextIdList = context_client.ListContextIds(Empty())
# populate with initial services
for context_id in context_ids.context_ids:
context_services: ServiceIdList = context_client.ListServiceIds(context_id)
for service in context_services.service_ids:
Carlos Natalino
committed
# in case of a service restart, monitoring component will not duplicate KPIs
# but rather return the existing KPI if that's the case
Carlos Natalino
committed
kpi_id = create_kpi(monitoring_client, service.service_uuid.uuid)
service_list.append(
{
"context": context_id.context_uuid.uuid,
"service": service.service_uuid.uuid,
"kpi": kpi_id.kpi_id.uuid,
}
)
Carlos Natalino
committed
context_client.close()
Carlos Natalino
committed
monitoring_client.close()
Carlos Natalino
committed
# starting background process to monitor service addition/removal
process_context = Process(target=get_context_updates, args=(service_list,))
process_context.start()
Carlos Natalino
committed
time.sleep(5) # wait for the context updates to startup
# runs the async loop in the background
loop = asyncio.get_event_loop()
loop.run_until_complete(monitor_services(terminate, service_list))
# asyncio.create_task(monitor_services(service_list))
Carlos Natalino
committed
# Wait for Ctrl+C or termination signal
Carlos Natalino
committed
while not terminate.wait(timeout=0.1):
pass
Carlos Natalino
committed
Carlos Natalino
committed
LOGGER.info("Terminating...")
Carlos Natalino
committed
process_context.kill()
# process_security_loop.kill()
Carlos Natalino
committed
LOGGER.info("Bye")
Carlos Natalino
committed
return 0
Carlos Natalino
committed
if __name__ == "__main__":
Carlos Natalino
committed
sys.exit(main())