Skip to content
Snippets Groups Projects
Decorator.py 6.14 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging, threading
from enum import Enum
from typing import Dict, Tuple
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase, INF
from common.tools.grpc.Tools import grpc_message_to_json_string
from .ServiceExceptions import ServiceException

class MetricTypeEnum(Enum):
    COUNTER_STARTED    = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_started'
    COUNTER_COMPLETED  = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_completed'
    COUNTER_FAILED     = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_failed'
    HISTOGRAM_DURATION = 'tfs_{component:s}_{sub_module:s}_{method:s}_histogram_duration'

METRIC_TO_CLASS_PARAMS = {
    MetricTypeEnum.COUNTER_STARTED   : (Counter,   {}),
    MetricTypeEnum.COUNTER_COMPLETED : (Counter,   {}),
    MetricTypeEnum.COUNTER_FAILED    : (Counter,   {}),
    MetricTypeEnum.HISTOGRAM_DURATION: (Histogram, {
        'buckets': (
            # .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF
            0.0010, 0.0025, 0.0050, 0.0075,
            0.0100, 0.0250, 0.0500, 0.0750,
            0.1000, 0.2500, 0.5000, 0.7500,
            1.0000, 2.5000, 5.0000, 7.5000,
            INF)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    })
}

class MetricsPool:
    lock = threading.Lock()
    metrics : Dict[str, MetricWrapperBase] = dict()

    def __init__(self, component : str, sub_module : str, labels : Dict[str, str] = {}) -> None:
        self._component = component
        self._sub_module = sub_module
        self._labels = labels

    def get_or_create(self, method : str, metric_type : MetricTypeEnum, **metric_params) -> MetricWrapperBase:
        metric_name = str(metric_type.value).format(
            component=self._component, sub_module=self._sub_module, method=method).upper()
        with MetricsPool.lock:
            if metric_name not in MetricsPool.metrics:
                metric_tuple : Tuple[MetricWrapperBase, Dict] = METRIC_TO_CLASS_PARAMS.get(metric_type)
                metric_class, default_metric_params = metric_tuple
                if len(metric_params) == 0: metric_params = default_metric_params
                labels = sorted(self._labels.keys())
                MetricsPool.metrics[metric_name] = metric_class(metric_name.lower(), '', labels, **metric_params)
            return MetricsPool.metrics[metric_name]

    def get_metrics(
        self, method : str
    ) -> Tuple[MetricWrapperBase, MetricWrapperBase, MetricWrapperBase, MetricWrapperBase]:
        histogram_duration : Histogram = self.get_or_create(method, MetricTypeEnum.HISTOGRAM_DURATION)
        counter_started    : Counter   = self.get_or_create(method, MetricTypeEnum.COUNTER_STARTED)
        counter_completed  : Counter   = self.get_or_create(method, MetricTypeEnum.COUNTER_COMPLETED)
        counter_failed     : Counter   = self.get_or_create(method, MetricTypeEnum.COUNTER_FAILED)

        if len(self._labels) > 0:
            histogram_duration = histogram_duration.labels(**(self._labels))
            counter_started    = counter_started.labels(**(self._labels))
            counter_completed  = counter_completed.labels(**(self._labels))
            counter_failed     = counter_failed.labels(**(self._labels))

        return histogram_duration, counter_started, counter_completed, counter_failed

def metered_subclass_method(metrics_pool : MetricsPool):
    def outer_wrapper(func):
        metrics = metrics_pool.get_metrics(func.__name__)
        histogram_duration, counter_started, counter_completed, counter_failed = metrics

        @histogram_duration.time()
        def inner_wrapper(self, *args, **kwargs):
            counter_started.inc()
            try:
                reply = func(self, *args, **kwargs)
                counter_completed.inc()
                return reply
            except KeyboardInterrupt:   # pylint: disable=try-except-raise
                raise
            except Exception:           # pylint: disable=broad-except
                counter_failed.inc()

        return inner_wrapper
    return outer_wrapper

def safe_and_metered_rpc_method(metrics_pool : MetricsPool, logger : logging.Logger):
    def outer_wrapper(func):
        method_name = func.__name__
        metrics = metrics_pool.get_metrics(method_name)
        histogram_duration, counter_started, counter_completed, counter_failed = metrics

        @histogram_duration.time()
        def inner_wrapper(self, request, grpc_context : grpc.ServicerContext):
            counter_started.inc()
            try:
                logger.debug('{:s} request: {:s}'.format(method_name, grpc_message_to_json_string(request)))
                reply = func(self, request, grpc_context)
                logger.debug('{:s} reply: {:s}'.format(method_name, grpc_message_to_json_string(reply)))
                counter_completed.inc()
                return reply
            except ServiceException as e:   # pragma: no cover (ServiceException not thrown)
                if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]:
                    # Assume not found or already exists is just a condition, not an error
                    logger.exception('{:s} exception'.format(method_name))
                    counter_failed.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:          # pragma: no cover, pylint: disable=broad-except
                logger.exception('{:s} exception'.format(method_name))
                counter_failed.inc()
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
        return inner_wrapper
    return outer_wrapper