diff --git a/src/opticalattackmanager/service/__main__.py b/src/opticalattackmanager/service/__main__.py index af38e02a0c4a3098ce9684822654a2494611661d..6ea636f42eb4b992bb7d722188ccbc945a6a153d 100644 --- a/src/opticalattackmanager/service/__main__.py +++ b/src/opticalattackmanager/service/__main__.py @@ -67,7 +67,7 @@ SERVICE_LIST_MODE = int( get_setting("OPTICALATTACKMANAGER_SERVICE_LIST_MODE", default=1) ) SERVICE_LIST_KEY = get_setting( - "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services" + "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec-active-services" ) MIN_NUMBER_WORKERS = int( get_setting("OPTICALATTACKMANAGERSERVICE_LOOP_MIN_WORKERS", default=2) @@ -295,6 +295,7 @@ async def monitor_services(terminate, service_list=None, cache=None): current_list = [] if SERVICE_LIST_MODE == LIST_REDIS_MODE: + LOGGER.debug(f"Services at the Redis DB: {cache.llen(SERVICE_LIST_KEY)}") current_list.extend( [ pickle.loads(service) @@ -332,22 +333,6 @@ async def monitor_services(terminate, service_list=None, cache=None): ) ) - # start standard implementation - # tasks = [] - # for service in current_list: - # aw = detect_attack( - # host, - # port, - # service["context"], - # service["service"], - # service["kpi"], - # # allow at most 90% of the monitoring interval to succeed - # monitoring_interval * 0.9, - # ) - # tasks.append(aw) - # [await aw for aw in tasks] - # end standard implementation - # start pool implementation if len(current_list) == 0: # guard clause to re-check if services still there LOGGER.info( @@ -385,7 +370,6 @@ async def monitor_services(terminate, service_list=None, cache=None): (i + 1) * k + min(i + 1, m), # last index host, port, - DROP_COUNTER, desired_monitoring_interval * 0.9, ) for i in range(cur_number_workers) @@ -476,6 +460,7 @@ def main(): if SERVICE_LIST_MODE == LIST_REDIS_MODE: cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password) cache.ping() + LOGGER.info(f"Connecting to Redis: host={redis_host}, port={redis_port}, password={redis_password}") # clean the existing list that will be populated later on in this function cache.delete(SERVICE_LIST_KEY) @@ -544,7 +529,7 @@ def main(): # asyncio.create_task(monitor_services(service_list)) # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=1): + while not terminate.wait(timeout=10): pass LOGGER.info("Terminating...") diff --git a/src/opticalattackmanager/utils/monitor.py b/src/opticalattackmanager/utils/monitor.py index 0d37cd0effdbb9cb10ca5454ef75d138a3d5fb4a..c34ac217c0d29b086101e7fafc7d6cefef1fa114 100644 --- a/src/opticalattackmanager/utils/monitor.py +++ b/src/opticalattackmanager/utils/monitor.py @@ -14,6 +14,7 @@ import asyncio import logging +import traceback from grpclib.client import Channel from prometheus_client import Counter @@ -31,7 +32,6 @@ async def detect_attack( context_id: str, service_id: str, kpi_id: str, - drop_counter: Counter, timeout: float = 20.0, ) -> None: try: @@ -49,10 +49,9 @@ async def detect_attack( LOGGER.debug("Monitoring finished for {}/{}".format(service_id, kpi_id)) except Exception as e: LOGGER.warning( - "Exception while processing service_id {}/{}".format(service_id, kpi_id) + "Exception while processing service_id {}/{}: {}".format(service_id, kpi_id, e) ) - # LOGGER.exception(e) - drop_counter.inc() + traceback.print_exc() def delegate_services( @@ -61,7 +60,6 @@ def delegate_services( end_index: int, host: str, port: str, - drop_counter: Counter, monitoring_interval: float, ): async def run_internal_loop(): @@ -73,7 +71,6 @@ def delegate_services( service["context"], service["service"], service["kpi"], - drop_counter, # allow at most 90% of the monitoring interval to succeed monitoring_interval * 0.9, ) diff --git a/src/tests/scenario3/optical/dashboard.json b/src/tests/scenario3/optical/dashboard.json index 990ab47e95f9db5bc021ca91333f4c5fe61f7ff7..0ec738eacd422138c4a93d5a75b1864279173132 100644 --- a/src/tests/scenario3/optical/dashboard.json +++ b/src/tests/scenario3/optical/dashboard.json @@ -66,6 +66,7 @@ "liveNow": false, "panels": [ { + "collapsed": false, "gridPos": { "h": 1, "w": 24, @@ -73,6 +74,7 @@ "y": 0 }, "id": 12, + "panels": [], "title": "General status", "type": "row" }, @@ -216,7 +218,7 @@ "options": { "legend": { "calcs": [], - "displayMode": "list", + "displayMode": "hidden", "placement": "bottom" }, "tooltip": { @@ -230,7 +232,11 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "expr": "tfs_opticalattackmanager_dropped_assessments_created", + "editorMode": "code", + "exemplar": false, + "expr": "rate(tfs_opticalattackmanager_dropped_assessments_created[1m])", + "instant": false, + "range": true, "refId": "A" } ], @@ -238,6 +244,7 @@ "type": "timeseries" }, { + "collapsed": false, "gridPos": { "h": 1, "w": 24, @@ -245,6 +252,7 @@ "y": 9 }, "id": 10, + "panels": [], "title": "Pipeline", "type": "row" }, @@ -253,6 +261,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "description": "", "fieldConfig": { "defaults": { "color": { @@ -309,11 +318,11 @@ "x": 0, "y": 10 }, - "id": 16, + "id": 14, "options": { "legend": { "calcs": [], - "displayMode": "list", + "displayMode": "hidden", "placement": "bottom" }, "tooltip": { @@ -328,25 +337,13 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(rate(tfs_opticalattackmanager_loop_seconds_bucket[$__rate_interval])) by (le))", - "legendFormat": "Measured", + "expr": "tfs_opticalattackmanager_active_services", + "legendFormat": "Active services", "range": true, "refId": "A" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "editorMode": "code", - "expr": "tfs_opticalattackmanager_desired_monitoring_interval", - "hide": false, - "legendFormat": "Desired", - "range": true, - "refId": "B" } ], - "title": "Loop time", + "title": "Number of active optical services", "type": "timeseries" }, { @@ -354,7 +351,6 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "description": "", "fieldConfig": { "defaults": { "color": { @@ -411,7 +407,7 @@ "x": 12, "y": 10 }, - "id": 14, + "id": 16, "options": { "legend": { "calcs": [], @@ -430,13 +426,25 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "tfs_opticalattackmanager_active_services", - "legendFormat": "Active services", + "expr": "histogram_quantile(0.95, sum(rate(tfs_opticalattackmanager_loop_seconds_bucket[$__rate_interval])) by (le))", + "legendFormat": "Measured", "range": true, "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "tfs_opticalattackmanager_desired_monitoring_interval", + "hide": false, + "legendFormat": "Desired", + "range": true, + "refId": "B" } ], - "title": "Number of active optical services", + "title": "Loop time", "type": "timeseries" }, { @@ -543,7 +551,7 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(rate(tfs_opticalattackdetector_inference_response_time_bucket[$__rate_interval])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(tfs_opticalattackdetector_execution_details_histogram_duration_bucket{step=\"uldetection\"}[$__rate_interval])) by (le))", "hide": false, "legendFormat": "UL Inference", "range": true, @@ -555,7 +563,7 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(rate(tfs_opticalattackdetector_cache_response_time_bucket[$__rate_interval])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(tfs_opticalattackdetector_execution_details_histogram_duration_bucket{step=\"cachefetch\"}[$__rate_interval])) by (le))", "hide": false, "interval": "", "legendFormat": "Cache", @@ -568,7 +576,7 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(rate(tfs_opticalattackdetector_mitigation_response_time_bucket[$__rate_interval])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(tfs_opticalattackdetector_execution_details_histogram_duration_bucket{step=\"mitigation\"}[$__rate_interval])) by (le))", "hide": false, "interval": "", "legendFormat": "Mitigator", @@ -1353,7 +1361,7 @@ "type": "timeseries" }, { - "collapsed": false, + "collapsed": true, "gridPos": { "h": 1, "w": 24, @@ -1361,163 +1369,164 @@ "y": 44 }, "id": 2, - "panels": [], - "title": "General status", - "type": "row" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 29, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "normal" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "mwatt" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 45 - }, - "id": 33, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "right" - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "editorMode": "code", - "exemplar": false, - "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\".+opticalattackmanager.+\"})/1000", - "instant": false, - "legendFormat": "Manager", - "range": true, - "refId": "A" - }, + "panels": [ { "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "editorMode": "code", - "exemplar": false, - "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\".+opticalattackdetector.+\"})/1000", - "hide": false, - "instant": false, - "legendFormat": "Detector", - "range": true, - "refId": "B" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 29, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "mwatt" + }, + "overrides": [] }, - "editorMode": "code", - "exemplar": false, - "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\".+dbscan.+\"})/1000", - "hide": false, - "instant": false, - "interval": "", - "legendFormat": "UL Inference", - "range": true, - "refId": "C" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 45 }, - "editorMode": "code", - "exemplar": false, - "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\"redis-server.+\"})/1000", - "hide": false, - "instant": false, - "interval": "", - "legendFormat": "Cache", - "range": true, - "refId": "D" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" + "id": 33, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right" + }, + "tooltip": { + "mode": "single", + "sort": "none" + } }, - "editorMode": "code", - "exemplar": false, - "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\".+opticalattackmitigator.+\"})/1000", - "hide": false, - "instant": false, - "interval": "", - "legendFormat": "Mitigator", - "range": true, - "refId": "E" + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\".+opticalattackmanager.+\"})/1000", + "instant": false, + "legendFormat": "Manager", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\".+opticalattackdetector.+\"})/1000", + "hide": false, + "instant": false, + "legendFormat": "Detector", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\".+dbscan.+\"})/1000", + "hide": false, + "instant": false, + "interval": "", + "legendFormat": "UL Inference", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\"redis-server.+\"})/1000", + "hide": false, + "instant": false, + "interval": "", + "legendFormat": "Cache", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(scaph_process_power_consumption_microwatts{namespace=\"tfs\", cmdline=~\".+opticalattackmitigator.+\"})/1000", + "hide": false, + "instant": false, + "interval": "", + "legendFormat": "Mitigator", + "range": true, + "refId": "E" + } + ], + "title": "Energy consumption", + "type": "timeseries" } ], - "title": "Energy consumption", - "type": "timeseries" + "title": "General status", + "type": "row" } ], - "refresh": "5s", + "refresh": "", "schemaVersion": 36, "style": "dark", "tags": [], @@ -1525,13 +1534,13 @@ "list": [] }, "time": { - "from": "now-5m", + "from": "now-30m", "to": "now" }, "timepicker": {}, "timezone": "", "title": "TFS / Optical cybersecurity", "uid": "-Q-B-AsVk", - "version": 1, + "version": 5, "weekStart": "" } \ No newline at end of file diff --git a/src/tests/scenario3/optical/deploy_specs.sh b/src/tests/scenario3/optical/deploy_specs.sh index 878013d8b82177e3d70aa432e01583f03eded237..73c9112254c3e95eea4155abd7afd26ee358e368 100644 --- a/src/tests/scenario3/optical/deploy_specs.sh +++ b/src/tests/scenario3/optical/deploy_specs.sh @@ -21,6 +21,8 @@ export TFS_COMPONENTS="context device automation monitoring pathcomp service sli # addition for the optical cybersecurity component export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager" + +export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml" export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml" diff --git a/src/tests/scenario3/optical/jocn/README.md b/src/tests/scenario3/optical/jocn/README.md index 0981331111d00ff7e6a6245c20186d4dc60f55f8..dfc3d373b626e8f68a52ecd86ea6834568c82f00 100644 --- a/src/tests/scenario3/optical/jocn/README.md +++ b/src/tests/scenario3/optical/jocn/README.md @@ -2,4 +2,6 @@ __Authors__: [Carlos Natalino](https://www.chalmers.se/en/persons/carda/), Lluis Gifre Renom, Francisco-Javier Moreno-Muro, Sergio Gonzalez Diaz, Ricard Vilalta, Raul Muñoz, Paolo Monti, and Marija Furdek -Experiments from the JOCN paper. \ No newline at end of file +The paper has been published as open access: https://doi.org/10.1364/JOCN.482932 + +This folder contains the experiments published in the JOCN paper. diff --git a/src/tests/scenario3/optical/jocn/run_experiment.py b/src/tests/scenario3/optical/jocn/run_experiment.py index 4ebf5ea80baecc89a66fef5bcb0f240bd61f1f5a..d9cfb9e4b4d7daa55f2b399c90c52a35715d279c 100644 --- a/src/tests/scenario3/optical/jocn/run_experiment.py +++ b/src/tests/scenario3/optical/jocn/run_experiment.py @@ -29,15 +29,24 @@ from kubernetes import client, config from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_setting, wait_for_environment_variables -from configs import base_results_folder, datetime_format, hpa_data - LOGGER = None SERVICE_LIST_KEY = get_setting( "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services" ) # Configs can be set in Configuration class directly or using helper utility +namespace = get_setting("TFS_K8S_NAMESPACE", default="tfs") config.load_kube_config() +v1 = client.CoreV1Api() + +ret = v1.list_namespaced_endpoints(namespace=namespace, watch=False) +for item in ret.items: + if "caching" in item.metadata.name: + for subset in item.subsets: + for port in subset.ports: + if "redis" in port.name: # endpoint is ready for being scraped + CACHING_HOST = subset.addresses[0].ip + CACHING_PORT = port.port logging.getLogger("kubernetes").setLevel(logging.INFO) # avoid lengthy messages @@ -53,17 +62,11 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def manage_number_services(terminate, folder): # connecting with Redis - redis_host = get_service_host(ServiceNameEnum.CACHING) - redis_password = None - if redis_host is not None: - redis_port = int(get_setting("CACHINGSERVICE_SERVICE_PORT_REDIS")) - redis_password = get_setting("REDIS_PASSWORD") - else: - LOGGER.fatal("No environment variables set for Redis") + redis_password = get_setting("REDIS_PASSWORD") cache = None try: - cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password) + cache = redis.Redis(host=CACHING_HOST, port=CACHING_PORT, password=redis_password) except Exception as e: LOGGER.exception(e) @@ -73,43 +76,11 @@ def manage_number_services(terminate, folder): # make sure we have the correct loop time cache.set("MONITORING_INTERVAL", 30) - # connecting to the HPA API - autoscaling = client.AutoscalingV1Api() - - # connecting to the custom objects API - api = client.CustomObjectsApi() - - # open the file that will store the information - services_file = open(os.path.join(folder, "services.csv"), "wt", encoding="utf-8") - services_file.write("# file with number of services\n") - services_file.write("timestamp,number_services") - - hpa_file = open(os.path.join(folder, "hpas.csv"), "wt", encoding="utf-8") - - # writing headers for the HPA columns - hpas = autoscaling.list_namespaced_horizontal_pod_autoscaler(namespace="tfs") - - for hpa in hpas.items: - hpa_file.write(hpa.metadata.name + "\n") - for d in hpa_data: - services_file.write(f",{hpa.metadata.name}_{d}") - - # monitoring CPU and RAM usage of the single Pods - for s in ["cache", "manager"]: - for k in ["cpu", "ram"]: - services_file.write(f",{s}_{k}") - - services_file.write("\n") - services_file.flush() - - hpa_file.flush() - hpa_file.close() - # define number of services # 6 values followed by two zeros number_services = [0, 10] - loads = [120, 240, 480, 960, 1440, 1920, 1922] + loads = [120, 240, 480, 960, 1440, 1920, 1921] for load in loads: number_services.append(int(load/2)) for _ in range(5): @@ -125,41 +96,6 @@ def manage_number_services(terminate, folder): if cur_tick % ticks == 0: LOGGER.debug("go load!") - # getting data from autoscaler - hpas = autoscaling.list_namespaced_horizontal_pod_autoscaler(namespace="tfs") - # - "cur_utilization" - # - "target_utilization" - # - "cur_replicas" - # - "desired_replicas" - hpa_string = "" - for hpa in hpas.items: - hpa_string += f",{hpa.status.current_cpu_utilization_percentage}" - hpa_string += f",{hpa.spec.target_cpu_utilization_percentage}" - hpa_string += f",{hpa.status.current_replicas}" - hpa_string += f",{hpa.status.desired_replicas}" - - # monitoring resource usage - k8s_pods = api.list_cluster_custom_object( - "metrics.k8s.io", "v1beta1", "namespaces/tfs/pods" - ) - # - "cache_cpu" - # - "cache_ram" - # - "manager_cpu" - # - "manager_ram" - resource_string = "" - - # we use two loops to ensure the same order - for stats in k8s_pods["items"]: - if "caching" in stats['metadata']['name']: - resource_string += f",{stats['containers'][0]['usage']['cpu']}" - resource_string += f",{stats['containers'][0]['usage']['memory']}" - break - for stats in k8s_pods["items"]: - if "opticalattackmanager" in stats['metadata']['name']: - resource_string += f",{stats['containers'][0]['usage']['cpu']}" - resource_string += f",{stats['containers'][0]['usage']['memory']}" - break - # calculate the difference between current and expected cur_services = cache.llen(SERVICE_LIST_KEY) diff_services = cur_services - number_services[cur_tick % len(number_services)] @@ -169,12 +105,6 @@ def manage_number_services(terminate, folder): LOGGER.info("Setting monitoring interval to 60") set_to_60 = True - # write current number with one second difference - cur_datetime = datetime.datetime.now() - reported_datetime = cur_datetime - datetime.timedelta(seconds=1) - reported_datetime_str = reported_datetime.strftime(datetime_format) - services_file.write(f"{reported_datetime_str},{cur_services}{hpa_string}\n") - if diff_services < 0: # current is lower than expected LOGGER.debug(f"inserting <{-diff_services}> services") for _ in range(-diff_services): @@ -193,97 +123,20 @@ def manage_number_services(terminate, folder): LOGGER.debug(f"deleting <{diff_services}> services") cache.lpop(SERVICE_LIST_KEY, diff_services) - # writing the new number with the current time - services_file.write( - f"{datetime.datetime.now().strftime(datetime_format)}," - f"{number_services[cur_tick % len(number_services)]}" - f"{hpa_string}{resource_string}\n" - ) - assert number_services[cur_tick % len(number_services)] == cache.llen(SERVICE_LIST_KEY) - services_file.flush() else: LOGGER.debug("tick load!") cur_tick += 1 if cur_tick > len(number_services) + 1: break - services_file.flush() - services_file.close() + # make sure we have the correct loop time cache.set("MONITORING_INTERVAL", 30) LOGGER.info("Finished load!") -def monitor_endpoints(terminate): - LOGGER.info("starting experiment!") - v1 = client.CoreV1Api() - while not terminate.wait(timeout=30): - - # load base yaml - with open("/home/carda/projects/prometheus/prometheus.yml.backup", "rt") as file: - current_version = yaml.load(file, Loader=yaml.FullLoader) - - # checking endpoints - ret = v1.list_namespaced_endpoints(namespace="tfs", watch=False) - for item in ret.items: - found = False - - for subset in item.subsets: - for p, q in enumerate(subset.ports): - if q.name == "metrics": # endpoint is ready for being scraped - found = True - if not found: - continue # if no `metrics` port, jump! - - found = False # now look for existing configuration - for i in range(len(current_version["scrape_configs"])): - if current_version["scrape_configs"][i]["job_name"] == item.metadata.name: - found = True - break # found it! `i` will contain the correct index - - if not found: # write it from zero! - current_version["scrape_configs"].append({}) - current_version["scrape_configs"][-1]["job_name"] = item.metadata.name - - # set the correct `i` value - i = len(current_version["scrape_configs"]) - 1 - - if "static_configs" not in current_version["scrape_configs"][i]: - current_version["scrape_configs"][i]["static_configs"] = [{"targets": []}] - # reset IPs - current_version["scrape_configs"][i]["static_configs"][0]["targets"] = [] - for subset in item.subsets: - for p, q in enumerate(subset.ports): - if q.name == "metrics": - for c, a in enumerate(subset.addresses): - print(f"{item.metadata.name}\t{a.ip}:{q.port}") - current_version["scrape_configs"][i]["static_configs"][0]["targets"].append(f"{a.ip}:9192") - - # write yaml - with open("/home/carda/projects/prometheus/prometheus.yml", "wt") as file: - yaml.dump(current_version, file) - - # reloading configuration - # docs: https://www.robustperception.io/reloading-prometheus-configuration/ - requests.post("http://127.0.0.1:9090/-/reload") - - # resetting prometheus to the original state - # load base yaml - with open("/home/carda/projects/prometheus/prometheus.yml.backup", "rt") as file: - current_version = yaml.load(file, Loader=yaml.FullLoader) - - # write yaml - with open("/home/carda/projects/prometheus/prometheus.yml", "wt") as file: - yaml.dump(current_version, file) - # reloading configuration - # docs: https://www.robustperception.io/reloading-prometheus-configuration/ - requests.post("http://127.0.0.1:9090/-/reload") - - LOGGER.info("Finished experiment!") - - if __name__ == "__main__": # logging.basicConfig(level="DEBUG") logging.basicConfig( @@ -294,41 +147,18 @@ if __name__ == "__main__": LOGGER = logging.getLogger(__name__) wait_for_environment_variables( - ["CACHINGSERVICE_SERVICE_PORT_REDIS", "REDIS_PASSWORD"] + ["REDIS_PASSWORD"] ) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - # generate results folder - output_folder = os.path.join(base_results_folder, "jocn_" + datetime.datetime.now( - datetime.timezone.utc - ).strftime("%Y%m%dT%H%M%S.%fUTC")) - os.makedirs(output_folder) - # start load handler - proc_load = multiprocessing.Process( - target=manage_number_services, - args=( - terminate, - output_folder, - ), - ) - proc_load.start() - - # start experiment monitoring - proc_experiment = multiprocessing.Process( - target=monitor_endpoints, args=(terminate,) - ) - proc_experiment.start() + manage_number_services(terminate=terminate) # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass - # waits for the processes to finish - proc_load.join() - proc_experiment.join() - # exits LOGGER.info("Bye!") diff --git a/src/tests/scenario3/optical/ofc23/README.md b/src/tests/scenario3/optical/ofc23/README.md index f517ab728906fa7640143a7b27d76418c5ee7960..32d5eb25216dfe17b5b80ca0430da4eee69f26f5 100644 --- a/src/tests/scenario3/optical/ofc23/README.md +++ b/src/tests/scenario3/optical/ofc23/README.md @@ -4,14 +4,22 @@ __Authors__: [Carlos Natalino](https://www.chalmers.se/en/persons/carda/), Lluis ## Executing +All the commands here assume you are in the TFS home folder. + +First, we need to load the TFS deploy specifications: + ```bash source src/tests/scenario3/optical/deploy_specs.sh ``` +Then, we load the environment variables that identify the TFS deployment: + ```bash source tfs_runtime_env_vars.sh ``` +Then, we are able to execute the load generator: + ```bash python src/tests/scenario3/optical/ofc23/run_experiment_demo.py ``` \ No newline at end of file diff --git a/src/tests/scenario3/optical/ofc23/dashboard.json b/src/tests/scenario3/optical/ofc23/dashboard.json index d62fe95a0407287168cd03195f3d48457d7ec288..98328dfd4d0a03dd8be710532c483e9e92aa13d4 100644 --- a/src/tests/scenario3/optical/ofc23/dashboard.json +++ b/src/tests/scenario3/optical/ofc23/dashboard.json @@ -725,7 +725,7 @@ }, "timepicker": {}, "timezone": "", - "title": "Scalable and Efficient Pipeline for ML-based Optical Network Monitoring", + "title": "Scalable and Efficient Pipeline for ML-based Optical Network Monitoring - No longer maintained!", "uid": "IYQSZX0Vk", "version": 4, "weekStart": "" diff --git a/src/tests/scenario3/optical/ofc23/run_experiment_demo.py b/src/tests/scenario3/optical/ofc23/run_experiment_demo.py index 27151695b82426cc76c97971bbbb1749d4ebbbf5..16ddb9c315784c39258aaf0f342ebd8da35b17e8 100644 --- a/src/tests/scenario3/optical/ofc23/run_experiment_demo.py +++ b/src/tests/scenario3/optical/ofc23/run_experiment_demo.py @@ -29,16 +29,19 @@ namespace = get_setting("TFS_K8S_NAMESPACE", default="tfs") config.load_kube_config() v1 = client.CoreV1Api() -caching_pod = None -pods = v1.list_namespaced_pod(namespace=namespace) -for pod in pods.items: - # print(pod.metadata) - if "app" in pod.metadata.labels and "caching" in pod.metadata.labels["app"]: - caching_pod = pod +ret = v1.list_namespaced_endpoints(namespace=namespace, watch=False) +for item in ret.items: + if "caching" in item.metadata.name: + for subset in item.subsets: + for port in subset.ports: + print(item.metadata.name, port) + if "redis" in port.name: # endpoint is ready for being scraped + CACHING_HOST = subset.addresses[0].ip + CACHING_PORT = port.port LOGGER = None SERVICE_LIST_KEY = get_setting( - "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services" + "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec-active-services" ) # setting up graceful shutdown @@ -53,19 +56,12 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def manage_number_services(terminate): # connecting with Redis - redis_host = caching_pod.status.pod_ip - redis_password = None - if redis_host is not None: - redis_port = int(get_setting("CACHINGSERVICE_SERVICE_PORT_REDIS", default=6379)) - redis_password = get_setting("REDIS_PASSWORD") - LOGGER.info(f"Redis password: {redis_password}") - else: - LOGGER.fatal("No environment variables set for Redis") + redis_password = get_setting("REDIS_PASSWORD") cache = None try: - LOGGER.info(f"Connecting to Redis: host={redis_host}, port={redis_port}, password={redis_password}") - cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password) + LOGGER.info(f"Connecting to Redis: host={CACHING_HOST}, port={CACHING_PORT}, password={redis_password}") + cache = redis.Redis(host=CACHING_HOST, port=CACHING_PORT, password=redis_password) cache.ping() LOGGER.info("Connected to Redis") except Exception as e: @@ -78,10 +74,11 @@ def manage_number_services(terminate): cache.set("MONITORING_INTERVAL", 30) # define number of services + next_kpi_id = 0 print("Starting load!") while not terminate.wait(timeout=1): # timeout=300 - print("\no <number> -> sets the number services currently monitored") + print("\n\no <number> -> sets the number services currently monitored") print("p <seconds> -> sets the loop period") print("m <1=SL / 2=UL> -> sets the ML model used") print("q -> exit") @@ -101,16 +98,20 @@ def manage_number_services(terminate): if diff_services < 0: # current is lower than expected LOGGER.debug(f"\tinserting <{-diff_services}> services") for _ in range(-diff_services): + next_kpi_id += 1 cache.lpush( SERVICE_LIST_KEY, pickle.dumps( { "context": str(uuid.uuid4()), "service": str(uuid.uuid4()), - "kpi": str(uuid.uuid4()), + # "kpi": str(uuid.uuid4()), + "kpi": str(next_kpi_id), } ), ) + LOGGER.debug(f"Services at the Redis DB: {cache.llen(SERVICE_LIST_KEY)}") + elif diff_services > 0: # current is greater than expected # delete services LOGGER.debug(f"\tdeleting <{diff_services}> services") diff --git a/src/tests/scenario3/optical/scaphandre.yaml b/src/tests/scenario3/optical/scaphandre.yaml deleted file mode 100644 index 3b348cb8a91ee257eaa9aa73e96108d20931e2a2..0000000000000000000000000000000000000000 --- a/src/tests/scenario3/optical/scaphandre.yaml +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# this file deploys a tool that enables the monitoring of energy consumption -# per component. -# More info: https://github.com/hubblo-org/scaphandre - ---- -apiVersion: monitoring.coreos.com/v1 -kind: ServiceMonitor -metadata: - namespace: monitoring # namespace where prometheus is running - name: tfs-scaph-metric - labels: - app.kubernetes.io/name: scaphandre - #release: prometheus - #release: prom # name of the release - # ( VERY IMPORTANT: You need to know the correct release name by viewing - # the servicemonitor of Prometheus itself: Without the correct name, - # Prometheus cannot identify the metrics of the Flask app as the target.) -spec: - selector: - matchLabels: - # Target app service - #namespace: tfs - app.kubernetes.io/name: scaphandre - #release: prometheus # same as above - endpoints: - - port: metrics # named port in target app - scheme: http - path: /metrics # path to scrape - interval: 5s # scrape interval - namespaceSelector: - any: false - matchNames: - - tfs # namespace where the app is running \ No newline at end of file diff --git a/src/webui/grafana_db_mon_kpis_psql.json b/src/webui/grafana_db_mon_kpis_psql.json index 750e5254ea1e4e689d92fc39cedd22a5ee619e03..845ed4296605b6a0d15f38c9a20576a93195543e 100644 --- a/src/webui/grafana_db_mon_kpis_psql.json +++ b/src/webui/grafana_db_mon_kpis_psql.json @@ -169,7 +169,7 @@ "hide": false, "metricColumn": "kpi_value", "rawQuery": true, - "rawSql": "SELECT\r\n $__time(timestamp), kpi_value AS metric, device_name, endpoint_name, kpi_sample_type\r\nFROM\r\n tfs_monitoring_kpis\r\nWHERE\r\n $__timeFilter(timestamp) AND device_name IN (${device_name}) AND endpoint_name IN (${endpoint_name}) AND kpi_sample_type IN (${kpi_sample_type})\r\nGROUP BY\r\n device_name, endpoint_name, kpi_sample_type\r\nORDER BY\r\n timestamp", + "rawSql": "SELECT\r\n $__time(timestamp), kpi_value AS metric, device_name, endpoint_name, kpi_sample_type\r\nFROM\r\n tfs_monitoring_kpis\r\nWHERE\r\n $__timeFilter(timestamp) AND device_name IN (${device_name}) AND endpoint_name IN (${endpoint_name}) AND kpi_sample_type IN (${kpi_sample_type})\r\nORDER BY\r\n timestamp", "refId": "A", "select": [ [