diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py index af38e02a0c4a3098ce9684822654a2494611661d..85b1039fcb5a8594f70e054d683be31e6c585110 100644 --- a/src/opticalattackmanager/service/__main__.py +++ b/src/opticalattackmanager/service/__main__.py @@ -67,7 +67,7 @@ SERVICE_LIST_MODE = int( get_setting("OPTICALATTACKMANAGER_SERVICE_LIST_MODE", default=1) ) SERVICE_LIST_KEY = get_setting( - "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services" + "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec-active-services" ) MIN_NUMBER_WORKERS = int( get_setting("OPTICALATTACKMANAGERSERVICE_LOOP_MIN_WORKERS", default=2) @@ -295,6 +295,7 @@ async def monitor_services(terminate, service_list=None, cache=None): current_list = [] if SERVICE_LIST_MODE == LIST_REDIS_MODE: + LOGGER.debug(f"Services at the Redis DB: {cache.llen(SERVICE_LIST_KEY)}") current_list.extend( [ pickle.loads(service) @@ -385,7 +386,6 @@ async def monitor_services(terminate, service_list=None, cache=None): (i + 1) * k + min(i + 1, m), # last index host, port, - DROP_COUNTER, desired_monitoring_interval * 0.9, ) for i in range(cur_number_workers) @@ -434,22 +434,22 @@ def main(): logging.getLogger("hpack").setLevel(logging.CRITICAL) - wait_for_environment_variables( - [ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST), - get_env_var_name( - ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC - ), - get_env_var_name( - ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST - ), - get_env_var_name( - ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC - ), - ] - ) + # wait_for_environment_variables( + # [ + # get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST), + # get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + # get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST), + # get_env_var_name( + # ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC + # ), + # get_env_var_name( + # ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST + # ), + # get_env_var_name( + # ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC + # ), + # ] + # ) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -476,9 +476,10 @@ def main(): if SERVICE_LIST_MODE == LIST_REDIS_MODE: cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password) cache.ping() + LOGGER.info(f"Connecting to Redis: host={redis_host}, port={redis_port}, password={redis_password}") # clean the existing list that will be populated later on in this function - cache.delete(SERVICE_LIST_KEY) + # cache.delete(SERVICE_LIST_KEY) elif SERVICE_LIST_MODE == LIST_SHARED_MODE: # creating a thread-safe list to be shared among threads service_list = Manager().list() @@ -544,7 +545,7 @@ def main(): # asyncio.create_task(monitor_services(service_list)) # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=1): + while not terminate.wait(timeout=10): pass LOGGER.info("Terminating...") diff --git a/src/opticalattackmanager/utils/monitor.py b/src/opticalattackmanager/utils/monitor.py index 0d37cd0effdbb9cb10ca5454ef75d138a3d5fb4a..c34ac217c0d29b086101e7fafc7d6cefef1fa114 100644 --- a/src/opticalattackmanager/utils/monitor.py +++ b/src/opticalattackmanager/utils/monitor.py @@ -14,6 +14,7 @@ import asyncio import logging +import traceback from grpclib.client import Channel from prometheus_client import Counter @@ -31,7 +32,6 @@ async def detect_attack( context_id: str, service_id: str, kpi_id: str, - drop_counter: Counter, timeout: float = 20.0, ) -> None: try: @@ -49,10 +49,9 @@ async def detect_attack( LOGGER.debug("Monitoring finished for {}/{}".format(service_id, kpi_id)) except Exception as e: LOGGER.warning( - "Exception while processing service_id {}/{}".format(service_id, kpi_id) + "Exception while processing service_id {}/{}: {}".format(service_id, kpi_id, e) ) - # LOGGER.exception(e) - drop_counter.inc() + traceback.print_exc() def delegate_services( @@ -61,7 +60,6 @@ def delegate_services( end_index: int, host: str, port: str, - drop_counter: Counter, monitoring_interval: float, ): async def run_internal_loop(): @@ -73,7 +71,6 @@ def delegate_services( service["context"], service["service"], service["kpi"], - drop_counter, # allow at most 90% of the monitoring interval to succeed monitoring_interval * 0.9, ) diff --git a/src/tests/scenario3/optical/deploy_specs.sh b/src/tests/scenario3/optical/deploy_specs.sh index 878013d8b82177e3d70aa432e01583f03eded237..73c9112254c3e95eea4155abd7afd26ee358e368 100644 --- a/src/tests/scenario3/optical/deploy_specs.sh +++ b/src/tests/scenario3/optical/deploy_specs.sh @@ -21,6 +21,8 @@ export TFS_COMPONENTS="context device automation monitoring pathcomp service sli # addition for the optical cybersecurity component export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager" + +export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml" export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml" diff --git a/src/tests/scenario3/optical/jocn/run_experiment.py b/src/tests/scenario3/optical/jocn/run_experiment.py index 4ebf5ea80baecc89a66fef5bcb0f240bd61f1f5a..e522d1a2fbcaf2c81045155a1642be636ceb9fd2 100644 --- a/src/tests/scenario3/optical/jocn/run_experiment.py +++ b/src/tests/scenario3/optical/jocn/run_experiment.py @@ -29,15 +29,24 @@ from kubernetes import client, config from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_setting, wait_for_environment_variables -from configs import base_results_folder, datetime_format, hpa_data - LOGGER = None SERVICE_LIST_KEY = get_setting( "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services" ) # Configs can be set in Configuration class directly or using helper utility +namespace = get_setting("TFS_K8S_NAMESPACE", default="tfs") config.load_kube_config() +v1 = client.CoreV1Api() + +ret = v1.list_namespaced_endpoints(namespace=namespace, watch=False) +for item in ret.items: + if "caching" in item.metadata.name: + for subset in item.subsets: + for port in subset.ports: + if "redis" in port.name: # endpoint is ready for being scraped + CACHING_HOST = subset.addresses[0].ip + CACHING_PORT = port.port logging.getLogger("kubernetes").setLevel(logging.INFO) # avoid lengthy messages @@ -53,17 +62,11 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def manage_number_services(terminate, folder): # connecting with Redis - redis_host = get_service_host(ServiceNameEnum.CACHING) - redis_password = None - if redis_host is not None: - redis_port = int(get_setting("CACHINGSERVICE_SERVICE_PORT_REDIS")) - redis_password = get_setting("REDIS_PASSWORD") - else: - LOGGER.fatal("No environment variables set for Redis") + redis_password = get_setting("REDIS_PASSWORD") cache = None try: - cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password) + cache = redis.Redis(host=CACHING_HOST, port=CACHING_PORT, password=redis_password) except Exception as e: LOGGER.exception(e) @@ -76,40 +79,11 @@ def manage_number_services(terminate, folder): # connecting to the HPA API autoscaling = client.AutoscalingV1Api() - # connecting to the custom objects API - api = client.CustomObjectsApi() - - # open the file that will store the information - services_file = open(os.path.join(folder, "services.csv"), "wt", encoding="utf-8") - services_file.write("# file with number of services\n") - services_file.write("timestamp,number_services") - - hpa_file = open(os.path.join(folder, "hpas.csv"), "wt", encoding="utf-8") - - # writing headers for the HPA columns - hpas = autoscaling.list_namespaced_horizontal_pod_autoscaler(namespace="tfs") - - for hpa in hpas.items: - hpa_file.write(hpa.metadata.name + "\n") - for d in hpa_data: - services_file.write(f",{hpa.metadata.name}_{d}") - - # monitoring CPU and RAM usage of the single Pods - for s in ["cache", "manager"]: - for k in ["cpu", "ram"]: - services_file.write(f",{s}_{k}") - - services_file.write("\n") - services_file.flush() - - hpa_file.flush() - hpa_file.close() - # define number of services # 6 values followed by two zeros number_services = [0, 10] - loads = [120, 240, 480, 960, 1440, 1920, 1922] + loads = [120, 240, 480, 960, 1440, 1920, 1921] for load in loads: number_services.append(int(load/2)) for _ in range(5): @@ -125,41 +99,6 @@ def manage_number_services(terminate, folder): if cur_tick % ticks == 0: LOGGER.debug("go load!") - # getting data from autoscaler - hpas = autoscaling.list_namespaced_horizontal_pod_autoscaler(namespace="tfs") - # - "cur_utilization" - # - "target_utilization" - # - "cur_replicas" - # - "desired_replicas" - hpa_string = "" - for hpa in hpas.items: - hpa_string += f",{hpa.status.current_cpu_utilization_percentage}" - hpa_string += f",{hpa.spec.target_cpu_utilization_percentage}" - hpa_string += f",{hpa.status.current_replicas}" - hpa_string += f",{hpa.status.desired_replicas}" - - # monitoring resource usage - k8s_pods = api.list_cluster_custom_object( - "metrics.k8s.io", "v1beta1", "namespaces/tfs/pods" - ) - # - "cache_cpu" - # - "cache_ram" - # - "manager_cpu" - # - "manager_ram" - resource_string = "" - - # we use two loops to ensure the same order - for stats in k8s_pods["items"]: - if "caching" in stats['metadata']['name']: - resource_string += f",{stats['containers'][0]['usage']['cpu']}" - resource_string += f",{stats['containers'][0]['usage']['memory']}" - break - for stats in k8s_pods["items"]: - if "opticalattackmanager" in stats['metadata']['name']: - resource_string += f",{stats['containers'][0]['usage']['cpu']}" - resource_string += f",{stats['containers'][0]['usage']['memory']}" - break - # calculate the difference between current and expected cur_services = cache.llen(SERVICE_LIST_KEY) diff_services = cur_services - number_services[cur_tick % len(number_services)] @@ -169,12 +108,6 @@ def manage_number_services(terminate, folder): LOGGER.info("Setting monitoring interval to 60") set_to_60 = True - # write current number with one second difference - cur_datetime = datetime.datetime.now() - reported_datetime = cur_datetime - datetime.timedelta(seconds=1) - reported_datetime_str = reported_datetime.strftime(datetime_format) - services_file.write(f"{reported_datetime_str},{cur_services}{hpa_string}\n") - if diff_services < 0: # current is lower than expected LOGGER.debug(f"inserting <{-diff_services}> services") for _ in range(-diff_services): @@ -193,97 +126,20 @@ def manage_number_services(terminate, folder): LOGGER.debug(f"deleting <{diff_services}> services") cache.lpop(SERVICE_LIST_KEY, diff_services) - # writing the new number with the current time - services_file.write( - f"{datetime.datetime.now().strftime(datetime_format)}," - f"{number_services[cur_tick % len(number_services)]}" - f"{hpa_string}{resource_string}\n" - ) - assert number_services[cur_tick % len(number_services)] == cache.llen(SERVICE_LIST_KEY) - services_file.flush() else: LOGGER.debug("tick load!") cur_tick += 1 if cur_tick > len(number_services) + 1: break - services_file.flush() - services_file.close() + # make sure we have the correct loop time cache.set("MONITORING_INTERVAL", 30) LOGGER.info("Finished load!") -def monitor_endpoints(terminate): - LOGGER.info("starting experiment!") - v1 = client.CoreV1Api() - while not terminate.wait(timeout=30): - - # load base yaml - with open("/home/carda/projects/prometheus/prometheus.yml.backup", "rt") as file: - current_version = yaml.load(file, Loader=yaml.FullLoader) - - # checking endpoints - ret = v1.list_namespaced_endpoints(namespace="tfs", watch=False) - for item in ret.items: - found = False - - for subset in item.subsets: - for p, q in enumerate(subset.ports): - if q.name == "metrics": # endpoint is ready for being scraped - found = True - if not found: - continue # if no `metrics` port, jump! - - found = False # now look for existing configuration - for i in range(len(current_version["scrape_configs"])): - if current_version["scrape_configs"][i]["job_name"] == item.metadata.name: - found = True - break # found it! `i` will contain the correct index - - if not found: # write it from zero! - current_version["scrape_configs"].append({}) - current_version["scrape_configs"][-1]["job_name"] = item.metadata.name - - # set the correct `i` value - i = len(current_version["scrape_configs"]) - 1 - - if "static_configs" not in current_version["scrape_configs"][i]: - current_version["scrape_configs"][i]["static_configs"] = [{"targets": []}] - # reset IPs - current_version["scrape_configs"][i]["static_configs"][0]["targets"] = [] - for subset in item.subsets: - for p, q in enumerate(subset.ports): - if q.name == "metrics": - for c, a in enumerate(subset.addresses): - print(f"{item.metadata.name}\t{a.ip}:{q.port}") - current_version["scrape_configs"][i]["static_configs"][0]["targets"].append(f"{a.ip}:9192") - - # write yaml - with open("/home/carda/projects/prometheus/prometheus.yml", "wt") as file: - yaml.dump(current_version, file) - - # reloading configuration - # docs: https://www.robustperception.io/reloading-prometheus-configuration/ - requests.post("http://127.0.0.1:9090/-/reload") - - # resetting prometheus to the original state - # load base yaml - with open("/home/carda/projects/prometheus/prometheus.yml.backup", "rt") as file: - current_version = yaml.load(file, Loader=yaml.FullLoader) - - # write yaml - with open("/home/carda/projects/prometheus/prometheus.yml", "wt") as file: - yaml.dump(current_version, file) - # reloading configuration - # docs: https://www.robustperception.io/reloading-prometheus-configuration/ - requests.post("http://127.0.0.1:9090/-/reload") - - LOGGER.info("Finished experiment!") - - if __name__ == "__main__": # logging.basicConfig(level="DEBUG") logging.basicConfig( @@ -294,41 +150,18 @@ if __name__ == "__main__": LOGGER = logging.getLogger(__name__) wait_for_environment_variables( - ["CACHINGSERVICE_SERVICE_PORT_REDIS", "REDIS_PASSWORD"] + ["REDIS_PASSWORD"] ) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - # generate results folder - output_folder = os.path.join(base_results_folder, "jocn_" + datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y%m%dT%H%M%S.%fUTC")) - os.makedirs(output_folder) - # start load handler - proc_load = multiprocessing.Process( - target=manage_number_services, - args=( - terminate, - output_folder, - ), - ) - proc_load.start() - - # start experiment monitoring - proc_experiment = multiprocessing.Process( - target=monitor_endpoints, args=(terminate,) - ) - proc_experiment.start() + manage_number_services(terminate=terminate) # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass - # waits for the processes to finish - proc_load.join() - proc_experiment.join() - # exits LOGGER.info("Bye!") diff --git a/src/tests/scenario3/optical/ofc23/README.md b/src/tests/scenario3/optical/ofc23/README.md index f517ab728906fa7640143a7b27d76418c5ee7960..32d5eb25216dfe17b5b80ca0430da4eee69f26f5 100644 --- a/src/tests/scenario3/optical/ofc23/README.md +++ b/src/tests/scenario3/optical/ofc23/README.md @@ -4,14 +4,22 @@ __Authors__: [Carlos Natalino](https://www.chalmers.se/en/persons/carda/), Lluis ## Executing +All the commands here assume you are in the TFS home folder. + +First, we need to load the TFS deploy specifications: + ```bash source src/tests/scenario3/optical/deploy_specs.sh ``` +Then, we load the environment variables that identify the TFS deployment: + ```bash source tfs_runtime_env_vars.sh ``` +Then, we are able to execute the load generator: + ```bash python src/tests/scenario3/optical/ofc23/run_experiment_demo.py ``` \ No newline at end of file diff --git a/src/tests/scenario3/optical/ofc23/run_experiment_demo.py b/src/tests/scenario3/optical/ofc23/run_experiment_demo.py index 27151695b82426cc76c97971bbbb1749d4ebbbf5..16ddb9c315784c39258aaf0f342ebd8da35b17e8 100644 --- a/src/tests/scenario3/optical/ofc23/run_experiment_demo.py +++ b/src/tests/scenario3/optical/ofc23/run_experiment_demo.py @@ -29,16 +29,19 @@ namespace = get_setting("TFS_K8S_NAMESPACE", default="tfs") config.load_kube_config() v1 = client.CoreV1Api() -caching_pod = None -pods = v1.list_namespaced_pod(namespace=namespace) -for pod in pods.items: - # print(pod.metadata) - if "app" in pod.metadata.labels and "caching" in pod.metadata.labels["app"]: - caching_pod = pod +ret = v1.list_namespaced_endpoints(namespace=namespace, watch=False) +for item in ret.items: + if "caching" in item.metadata.name: + for subset in item.subsets: + for port in subset.ports: + print(item.metadata.name, port) + if "redis" in port.name: # endpoint is ready for being scraped + CACHING_HOST = subset.addresses[0].ip + CACHING_PORT = port.port LOGGER = None SERVICE_LIST_KEY = get_setting( - "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services" + "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec-active-services" ) # setting up graceful shutdown @@ -53,19 +56,12 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def manage_number_services(terminate): # connecting with Redis - redis_host = caching_pod.status.pod_ip - redis_password = None - if redis_host is not None: - redis_port = int(get_setting("CACHINGSERVICE_SERVICE_PORT_REDIS", default=6379)) - redis_password = get_setting("REDIS_PASSWORD") - LOGGER.info(f"Redis password: {redis_password}") - else: - LOGGER.fatal("No environment variables set for Redis") + redis_password = get_setting("REDIS_PASSWORD") cache = None try: - LOGGER.info(f"Connecting to Redis: host={redis_host}, port={redis_port}, password={redis_password}") - cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password) + LOGGER.info(f"Connecting to Redis: host={CACHING_HOST}, port={CACHING_PORT}, password={redis_password}") + cache = redis.Redis(host=CACHING_HOST, port=CACHING_PORT, password=redis_password) cache.ping() LOGGER.info("Connected to Redis") except Exception as e: @@ -78,10 +74,11 @@ def manage_number_services(terminate): cache.set("MONITORING_INTERVAL", 30) # define number of services + next_kpi_id = 0 print("Starting load!") while not terminate.wait(timeout=1): # timeout=300 - print("\no -> sets the number services currently monitored") + print("\n\no -> sets the number services currently monitored") print("p -> sets the loop period") print("m <1=SL / 2=UL> -> sets the ML model used") print("q -> exit") @@ -101,16 +98,20 @@ def manage_number_services(terminate): if diff_services < 0: # current is lower than expected LOGGER.debug(f"\tinserting <{-diff_services}> services") for _ in range(-diff_services): + next_kpi_id += 1 cache.lpush( SERVICE_LIST_KEY, pickle.dumps( { "context": str(uuid.uuid4()), "service": str(uuid.uuid4()), - "kpi": str(uuid.uuid4()), + # "kpi": str(uuid.uuid4()), + "kpi": str(next_kpi_id), } ), ) + LOGGER.debug(f"Services at the Redis DB: {cache.llen(SERVICE_LIST_KEY)}") + elif diff_services > 0: # current is greater than expected # delete services LOGGER.debug(f"\tdeleting <{diff_services}> services") diff --git a/src/tests/scenario3/optical/test_prom_timestamp.py b/src/tests/scenario3/optical/test_prom_timestamp.py new file mode 100644 index 0000000000000000000000000000000000000000..452e633ac22898e6c325b00c160eed795b46496d --- /dev/null +++ b/src/tests/scenario3/optical/test_prom_timestamp.py @@ -0,0 +1,15 @@ +import requests + +from kubernetes import client, config + +config.load_kube_config() +v1 = client.CoreV1Api() + +ret = v1.list_namespaced_endpoints(namespace=namespace, watch=False) +for item in ret.items: + if "caching" in item.metadata.name: + for subset in item.subsets: + for port in subset.ports: + if "redis" in port.name: # endpoint is ready for being scraped + CACHING_HOST = subset.addresses[0].ip + CACHING_PORT = port.port