Commit 0482f489 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common:

- unified RPC method wrapper with performance evaluation
- migrated components to new method wrapping API
- added new grafana dashboard backups
parent 96a253cf
Loading
Loading
Loading
Loading
+127 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging, threading
from enum import Enum
from typing import Dict, Tuple
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase, INF
from common.tools.grpc.Tools import grpc_message_to_json_string
from .ServiceExceptions import ServiceException

class MetricTypeEnum(Enum):
    COUNTER_STARTED    = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_started'
    COUNTER_COMPLETED  = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_completed'
    COUNTER_FAILED     = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_failed'
    HISTOGRAM_DURATION = 'tfs_{component:s}_{sub_module:s}_{method:s}_histogram_duration'

METRIC_TO_CLASS_PARAMS = {
    MetricTypeEnum.COUNTER_STARTED   : (Counter,   {}),
    MetricTypeEnum.COUNTER_COMPLETED : (Counter,   {}),
    MetricTypeEnum.COUNTER_FAILED    : (Counter,   {}),
    MetricTypeEnum.HISTOGRAM_DURATION: (Histogram, {
        'buckets': (
            # .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF
            0.001, 0.002, 0.003, 0.004, 0.005, 0.0075,
            0.010, 0.025, 0.050, 0.075,
            0.100, 0.250, 0.500, 0.750,
            1.000, 2.500, 5.000, 7.500,
            10.00, INF)
    })
}

class MetricsPool:
    lock = threading.Lock()
    metrics : Dict[str, MetricWrapperBase] = dict()

    def __init__(self, component : str, sub_module : str, labels : Dict[str, str] = {}) -> None:
        self._component = component
        self._sub_module = sub_module
        self._labels = labels

    def get_or_create(self, method : str, metric_type : MetricTypeEnum, **metric_params) -> MetricWrapperBase:
        metric_name = str(metric_type.value).format(
            component=self._component, sub_module=self._sub_module, method=method).upper()
        with MetricsPool.lock:
            if metric_name not in MetricsPool.metrics:
                metric_tuple : Tuple[MetricWrapperBase, Dict] = METRIC_TO_CLASS_PARAMS.get(metric_type)
                metric_class, default_metric_params = metric_tuple
                if len(metric_params) == 0: metric_params = default_metric_params
                labels = sorted(self._labels.keys())
                MetricsPool.metrics[metric_name] = metric_class(metric_name.lower(), '', labels, **metric_params)
            return MetricsPool.metrics[metric_name]

    def get_metrics(
        self, method : str
    ) -> Tuple[MetricWrapperBase, MetricWrapperBase, MetricWrapperBase, MetricWrapperBase]:
        histogram_duration : Histogram = self.get_or_create(method, MetricTypeEnum.HISTOGRAM_DURATION)
        counter_started    : Counter   = self.get_or_create(method, MetricTypeEnum.COUNTER_STARTED)
        counter_completed  : Counter   = self.get_or_create(method, MetricTypeEnum.COUNTER_COMPLETED)
        counter_failed     : Counter   = self.get_or_create(method, MetricTypeEnum.COUNTER_FAILED)

        if len(self._labels) > 0:
            histogram_duration = histogram_duration.labels(**(self._labels))
            counter_started    = counter_started.labels(**(self._labels))
            counter_completed  = counter_completed.labels(**(self._labels))
            counter_failed     = counter_failed.labels(**(self._labels))

        return histogram_duration, counter_started, counter_completed, counter_failed

def metered_subclass_method(metrics_pool : MetricsPool):
    def outer_wrapper(func):
        metrics = metrics_pool.get_metrics(func.__name__)
        histogram_duration, counter_started, counter_completed, counter_failed = metrics

        @histogram_duration.time()
        def inner_wrapper(self, *args, **kwargs):
            counter_started.inc()
            try:
                reply = func(self, *args, **kwargs)
                counter_completed.inc()
                return reply
            except KeyboardInterrupt:   # pylint: disable=try-except-raise
                raise
            except Exception:           # pylint: disable=broad-except
                counter_failed.inc()

        return inner_wrapper
    return outer_wrapper

def safe_and_metered_rpc_method(metrics_pool : MetricsPool, logger : logging.Logger):
    def outer_wrapper(func):
        method_name = func.__name__
        metrics = metrics_pool.get_metrics(method_name)
        histogram_duration, counter_started, counter_completed, counter_failed = metrics

        @histogram_duration.time()
        def inner_wrapper(self, request, grpc_context : grpc.ServicerContext):
            counter_started.inc()
            try:
                logger.debug('{:s} request: {:s}'.format(method_name, grpc_message_to_json_string(request)))
                reply = func(self, request, grpc_context)
                logger.debug('{:s} reply: {:s}'.format(method_name, grpc_message_to_json_string(reply)))
                counter_completed.inc()
                return reply
            except ServiceException as e:   # pragma: no cover (ServiceException not thrown)
                if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]:
                    # Assume not found or already exists is just a condition, not an error
                    logger.exception('{:s} exception'.format(method_name))
                    counter_failed.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:          # pragma: no cover, pylint: disable=broad-except
                logger.exception('{:s} exception'.format(method_name))
                counter_failed.inc()
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
        return inner_wrapper
    return outer_wrapper
+4 −3
Original line number Diff line number Diff line
@@ -10,14 +10,14 @@ tfs@tfs-vm:~/tfs-ctrl$ microk8s.status --wait-ready

- deploy as:
```
tfs@tfs-vm:~/tfs-ctrl$ source src/common/perf_eval_method_wrapper/tests/deploy_specs.sh 
tfs@tfs-vm:~/tfs-ctrl$ source src/common/method_wrappers/tests/deploy_specs.sh 
tfs@tfs-vm:~/tfs-ctrl$ ./deploy.sh 
```

- expose prometheus and grafana

  - terminal 1 (prometheus UI): `kubectl port-forward -n monitoring service/prometheus-k8s --address 0.0.0.0 9090:9090`
  - terminal 2 (grafana UI): `kubectl port-forward -n monitoring service/grafana --address 0.0.0.0 3000:3000`
  - terminal 2 (grafana UI): `kubectl port-forward -n monitoring service/grafana --address 0.0.0.0 3001:3000`
  - terminal 3 (alertmanager UI): `kubectl port-forward -n monitoring service/alertmanager-main --address 0.0.0.0 9093:9093`

- if using remote server/VM for running MicroK8s and VSCode, forward ports 9090, 3000, 9093
@@ -25,7 +25,7 @@ tfs@tfs-vm:~/tfs-ctrl$ ./deploy.sh
terminal 4 (tun test_set):
```
export PYTHONPATH=/home/tfs/tfs-ctrl/src
python -m common.perf_eval_method_wrapper.tests.test_set
python -m common.method_wrappers.tests.test_set
```

- log into grafana:
@@ -48,3 +48,4 @@ python -m common.perf_eval_method_wrapper.tests.test_set
- [Prometheus Operator - ServiceMonitor example 1](https://stackoverflow.com/questions/45613660/how-do-you-add-scrape-targets-to-a-prometheus-server-that-was-installed-with-kub)
- [Prometheus Operator - ServiceMonitor example 2](https://stackoverflow.com/questions/52991038/how-to-create-a-servicemonitor-for-prometheus-operator)
- [How to visualize Prometheus histograms in Grafana](https://grafana.com/blog/2020/06/23/how-to-visualize-prometheus-histograms-in-grafana/)
- [Prometheus Histograms with Grafana Heatmaps](https://towardsdatascience.com/prometheus-histograms-with-grafana-heatmaps-d556c28612c7)
Loading