Commit a8046fe5 authored by Carlos Natalino's avatar Carlos Natalino
Browse files

Intermediate code stashed here for later use.

parent 661ecb43
Loading
Loading
Loading
Loading
+21 −20
Original line number Diff line number Diff line
@@ -67,7 +67,7 @@ SERVICE_LIST_MODE = int(
    get_setting("OPTICALATTACKMANAGER_SERVICE_LIST_MODE", default=1)
)
SERVICE_LIST_KEY = get_setting(
    "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services"
    "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec-active-services"
)
MIN_NUMBER_WORKERS = int(
    get_setting("OPTICALATTACKMANAGERSERVICE_LOOP_MIN_WORKERS", default=2)
@@ -295,6 +295,7 @@ async def monitor_services(terminate, service_list=None, cache=None):

        current_list = []
        if SERVICE_LIST_MODE == LIST_REDIS_MODE:
            LOGGER.debug(f"Services at the Redis DB: {cache.llen(SERVICE_LIST_KEY)}")
            current_list.extend(
                [
                    pickle.loads(service)
@@ -385,7 +386,6 @@ async def monitor_services(terminate, service_list=None, cache=None):
                (i + 1) * k + min(i + 1, m),  # last index
                host,
                port,
                DROP_COUNTER,
                desired_monitoring_interval * 0.9,
            )
            for i in range(cur_number_workers)
@@ -434,22 +434,22 @@ def main():

    logging.getLogger("hpack").setLevel(logging.CRITICAL)

    wait_for_environment_variables(
        [
            get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST),
            get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
            get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST),
            get_env_var_name(
                ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC
            ),
            get_env_var_name(
                ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST
            ),
            get_env_var_name(
                ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC
            ),
        ]
    )
    # wait_for_environment_variables(
    #     [
    #         get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST),
    #         get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
    #         get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST),
    #         get_env_var_name(
    #             ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC
    #         ),
    #         get_env_var_name(
    #             ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_HOST
    #         ),
    #         get_env_var_name(
    #             ServiceNameEnum.OPTICALATTACKDETECTOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC
    #         ),
    #     ]
    # )

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
@@ -476,9 +476,10 @@ def main():
    if SERVICE_LIST_MODE == LIST_REDIS_MODE:
        cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password)
        cache.ping()
        LOGGER.info(f"Connecting to Redis: host={redis_host}, port={redis_port}, password={redis_password}")

        # clean the existing list that will be populated later on in this function
        cache.delete(SERVICE_LIST_KEY)
        # cache.delete(SERVICE_LIST_KEY)
    elif SERVICE_LIST_MODE == LIST_SHARED_MODE:
        # creating a thread-safe list to be shared among threads
        service_list = Manager().list()
@@ -544,7 +545,7 @@ def main():
    # asyncio.create_task(monitor_services(service_list))

    # Wait for Ctrl+C or termination signal
    while not terminate.wait(timeout=1):
    while not terminate.wait(timeout=10):
        pass

    LOGGER.info("Terminating...")
+3 −6
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@

import asyncio
import logging
import traceback

from grpclib.client import Channel
from prometheus_client import Counter
@@ -31,7 +32,6 @@ async def detect_attack(
    context_id: str,
    service_id: str,
    kpi_id: str,
    drop_counter: Counter,
    timeout: float = 20.0,
) -> None:
    try:
@@ -49,10 +49,9 @@ async def detect_attack(
        LOGGER.debug("Monitoring finished for {}/{}".format(service_id, kpi_id))
    except Exception as e:
        LOGGER.warning(
            "Exception while processing service_id {}/{}".format(service_id, kpi_id)
            "Exception while processing service_id {}/{}: {}".format(service_id, kpi_id, e)
        )
        # LOGGER.exception(e)
        drop_counter.inc()
        traceback.print_exc()


def delegate_services(
@@ -61,7 +60,6 @@ def delegate_services(
    end_index: int,
    host: str,
    port: str,
    drop_counter: Counter,
    monitoring_interval: float,
):
    async def run_internal_loop():
@@ -73,7 +71,6 @@ def delegate_services(
                service["context"],
                service["service"],
                service["kpi"],
                drop_counter,
                # allow at most 90% of the monitoring interval to succeed
                monitoring_interval * 0.9,
            )
+2 −0
Original line number Diff line number Diff line
@@ -21,6 +21,8 @@ export TFS_COMPONENTS="context device automation monitoring pathcomp service sli

# addition for the optical cybersecurity component
export TFS_COMPONENTS="${TFS_COMPONENTS} dbscanserving opticalattackmitigator opticalattackdetector opticalattackmanager"

export TFS_EXTRA_MANIFESTS="manifests/nginx_ingress_http.yaml manifests/servicemonitors.yaml"
export TFS_EXTRA_MANIFESTS="${TFS_EXTRA_MANIFESTS} manifests/cachingservice.yaml"


+17 −184
Original line number Diff line number Diff line
@@ -29,15 +29,24 @@ from kubernetes import client, config
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_setting, wait_for_environment_variables

from configs import base_results_folder, datetime_format, hpa_data

LOGGER = None
SERVICE_LIST_KEY = get_setting(
    "OPTICALATTACKMANAGER_SERVICE_LIST_KEY", default="opt-sec:active-services"
)

# Configs can be set in Configuration class directly or using helper utility
namespace = get_setting("TFS_K8S_NAMESPACE", default="tfs")
config.load_kube_config()
v1 = client.CoreV1Api()

ret = v1.list_namespaced_endpoints(namespace=namespace, watch=False)
for item in ret.items:
    if "caching" in item.metadata.name:
        for subset in item.subsets:
            for port in subset.ports:
                if "redis" in port.name:  # endpoint is ready for being scraped
                    CACHING_HOST = subset.addresses[0].ip
                    CACHING_PORT = port.port

logging.getLogger("kubernetes").setLevel(logging.INFO) # avoid lengthy messages

@@ -53,17 +62,11 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
def manage_number_services(terminate, folder):

    # connecting with Redis
    redis_host = get_service_host(ServiceNameEnum.CACHING)
    redis_password = None
    if redis_host is not None:
        redis_port = int(get_setting("CACHINGSERVICE_SERVICE_PORT_REDIS"))
    redis_password = get_setting("REDIS_PASSWORD")
    else:
        LOGGER.fatal("No environment variables set for Redis")
    
    cache = None
    try:
        cache = redis.Redis(host=redis_host, port=redis_port, password=redis_password)
        cache = redis.Redis(host=CACHING_HOST, port=CACHING_PORT, password=redis_password)
    except Exception as e:
        LOGGER.exception(e)
    
@@ -76,40 +79,11 @@ def manage_number_services(terminate, folder):
    # connecting to the HPA API
    autoscaling = client.AutoscalingV1Api()

    # connecting to the custom objects API
    api = client.CustomObjectsApi()

    # open the file that will store the information
    services_file = open(os.path.join(folder, "services.csv"), "wt", encoding="utf-8")
    services_file.write("# file with number of services\n")
    services_file.write("timestamp,number_services")

    hpa_file = open(os.path.join(folder, "hpas.csv"), "wt", encoding="utf-8")

    # writing headers for the HPA columns
    hpas = autoscaling.list_namespaced_horizontal_pod_autoscaler(namespace="tfs")
    
    for hpa in hpas.items:
        hpa_file.write(hpa.metadata.name + "\n")
        for d in hpa_data:
            services_file.write(f",{hpa.metadata.name}_{d}")
    
    # monitoring CPU and RAM usage of the single Pods
    for s in ["cache", "manager"]:
        for k in ["cpu", "ram"]:
            services_file.write(f",{s}_{k}")

    services_file.write("\n")
    services_file.flush()

    hpa_file.flush()
    hpa_file.close()

    # define number of services
    # 6 values followed by two zeros
    number_services = [0, 10]

    loads = [120, 240, 480, 960, 1440, 1920, 1922]
    loads = [120, 240, 480, 960, 1440, 1920, 1921]
    for load in loads:
        number_services.append(int(load/2))
        for _ in range(5):
@@ -125,41 +99,6 @@ def manage_number_services(terminate, folder):
        if cur_tick % ticks == 0:
            LOGGER.debug("go load!")

            # getting data from autoscaler
            hpas = autoscaling.list_namespaced_horizontal_pod_autoscaler(namespace="tfs")
            # - "cur_utilization"
            # - "target_utilization"
            # - "cur_replicas"
            # - "desired_replicas"
            hpa_string = ""
            for hpa in hpas.items:
                hpa_string += f",{hpa.status.current_cpu_utilization_percentage}"
                hpa_string += f",{hpa.spec.target_cpu_utilization_percentage}"
                hpa_string += f",{hpa.status.current_replicas}"
                hpa_string += f",{hpa.status.desired_replicas}"
            
            # monitoring resource usage
            k8s_pods = api.list_cluster_custom_object(
                "metrics.k8s.io", "v1beta1", "namespaces/tfs/pods"
            )
            # - "cache_cpu"
            # - "cache_ram"
            # - "manager_cpu"
            # - "manager_ram"
            resource_string = ""

            # we use two loops to ensure the same order
            for stats in k8s_pods["items"]:
                if "caching" in stats['metadata']['name']:
                    resource_string += f",{stats['containers'][0]['usage']['cpu']}"
                    resource_string += f",{stats['containers'][0]['usage']['memory']}"
                    break
            for stats in k8s_pods["items"]:
                if "opticalattackmanager" in stats['metadata']['name']:
                    resource_string += f",{stats['containers'][0]['usage']['cpu']}"
                    resource_string += f",{stats['containers'][0]['usage']['memory']}"
                    break

            # calculate the difference between current and expected
            cur_services = cache.llen(SERVICE_LIST_KEY)
            diff_services = cur_services - number_services[cur_tick % len(number_services)]
@@ -169,12 +108,6 @@ def manage_number_services(terminate, folder):
                LOGGER.info("Setting monitoring interval to 60")
                set_to_60 = True

            # write current number with one second difference
            cur_datetime = datetime.datetime.now()
            reported_datetime = cur_datetime - datetime.timedelta(seconds=1)
            reported_datetime_str = reported_datetime.strftime(datetime_format)
            services_file.write(f"{reported_datetime_str},{cur_services}{hpa_string}\n")

            if diff_services < 0:  # current is lower than expected
                LOGGER.debug(f"inserting <{-diff_services}> services")
                for _ in range(-diff_services):
@@ -193,97 +126,20 @@ def manage_number_services(terminate, folder):
                LOGGER.debug(f"deleting <{diff_services}> services")
                cache.lpop(SERVICE_LIST_KEY, diff_services)

            # writing the new number with the current time
            services_file.write(
                f"{datetime.datetime.now().strftime(datetime_format)},"
                f"{number_services[cur_tick % len(number_services)]}"
                f"{hpa_string}{resource_string}\n"
            )

            assert number_services[cur_tick % len(number_services)] == cache.llen(SERVICE_LIST_KEY)

            services_file.flush()
        else:
            LOGGER.debug("tick load!")
        cur_tick += 1
        if cur_tick > len(number_services) + 1:
            break
    services_file.flush()
    services_file.close()

    # make sure we have the correct loop time
    cache.set("MONITORING_INTERVAL", 30)

    LOGGER.info("Finished load!")


def monitor_endpoints(terminate):
    LOGGER.info("starting experiment!")
    v1 = client.CoreV1Api()
    while not terminate.wait(timeout=30):

        # load base yaml
        with open("/home/carda/projects/prometheus/prometheus.yml.backup", "rt") as file:
            current_version = yaml.load(file, Loader=yaml.FullLoader)

        # checking endpoints
        ret = v1.list_namespaced_endpoints(namespace="tfs", watch=False)
        for item in ret.items:
            found = False

            for subset in item.subsets:
                for p, q in enumerate(subset.ports):
                    if q.name == "metrics":  # endpoint is ready for being scraped
                        found = True
            if not found:
                continue  # if no `metrics` port, jump!

            found = False  # now look for existing configuration
            for i in range(len(current_version["scrape_configs"])):
                if current_version["scrape_configs"][i]["job_name"] == item.metadata.name:
                    found = True
                    break # found it! `i` will contain the correct index

            if not found:  # write it from zero!
                current_version["scrape_configs"].append({})
                current_version["scrape_configs"][-1]["job_name"] = item.metadata.name

                # set the correct `i` value
                i = len(current_version["scrape_configs"]) - 1

            if "static_configs" not in current_version["scrape_configs"][i]:
                current_version["scrape_configs"][i]["static_configs"] = [{"targets": []}]
            # reset IPs
            current_version["scrape_configs"][i]["static_configs"][0]["targets"] = []
            for subset in item.subsets:
                for p, q in enumerate(subset.ports):
                    if q.name == "metrics":
                        for c, a in enumerate(subset.addresses):
                            print(f"{item.metadata.name}\t{a.ip}:{q.port}")
                            current_version["scrape_configs"][i]["static_configs"][0]["targets"].append(f"{a.ip}:9192")

        # write yaml
        with open("/home/carda/projects/prometheus/prometheus.yml", "wt") as file:
            yaml.dump(current_version, file)
        
        # reloading configuration
        # docs: https://www.robustperception.io/reloading-prometheus-configuration/
        requests.post("http://127.0.0.1:9090/-/reload")

    # resetting prometheus to the original state
    # load base yaml
    with open("/home/carda/projects/prometheus/prometheus.yml.backup", "rt") as file:
        current_version = yaml.load(file, Loader=yaml.FullLoader)
    
    # write yaml
    with open("/home/carda/projects/prometheus/prometheus.yml", "wt") as file:
        yaml.dump(current_version, file)        
    # reloading configuration
    # docs: https://www.robustperception.io/reloading-prometheus-configuration/
    requests.post("http://127.0.0.1:9090/-/reload")

    LOGGER.info("Finished experiment!")


if __name__ == "__main__":
    # logging.basicConfig(level="DEBUG")
    logging.basicConfig(
@@ -294,41 +150,18 @@ if __name__ == "__main__":
    LOGGER = logging.getLogger(__name__)

    wait_for_environment_variables(
        ["CACHINGSERVICE_SERVICE_PORT_REDIS", "REDIS_PASSWORD"]
        ["REDIS_PASSWORD"]
    )

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    # generate results folder
    output_folder = os.path.join(base_results_folder, "jocn_" + datetime.datetime.now(
        datetime.timezone.utc
    ).strftime("%Y%m%dT%H%M%S.%fUTC"))
    os.makedirs(output_folder)

    # start load handler
    proc_load = multiprocessing.Process(
        target=manage_number_services,
        args=(
            terminate,
            output_folder,
        ),
    )
    proc_load.start()

    # start experiment monitoring
    proc_experiment = multiprocessing.Process(
        target=monitor_endpoints, args=(terminate,)
    )
    proc_experiment.start()
    manage_number_services(terminate=terminate)

    # Wait for Ctrl+C or termination signal
    while not terminate.wait(timeout=0.1):
        pass

    # waits for the processes to finish
    proc_load.join()
    proc_experiment.join()

    # exits
    LOGGER.info("Bye!")
+8 −0
Original line number Diff line number Diff line
@@ -4,14 +4,22 @@ __Authors__: [Carlos Natalino](https://www.chalmers.se/en/persons/carda/), Lluis

## Executing

All the commands here assume you are in the TFS home folder.

First, we need to load the TFS deploy specifications:

```bash
source src/tests/scenario3/optical/deploy_specs.sh
```

Then, we load the environment variables that identify the TFS deployment:

```bash
source tfs_runtime_env_vars.sh
```

Then, we are able to execute the load generator:

```bash
python src/tests/scenario3/optical/ofc23/run_experiment_demo.py
```
 No newline at end of file
Loading