Skip to content
Decorator.py 4.24 KiB
Newer Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, logging
from enum import Enum
from typing import Dict, List
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .ServiceExceptions import ServiceException

class RequestConditionEnum(Enum):
    STARTED   = 'started'
    COMPLETED = 'completed'
    FAILED    = 'failed'

def get_counter_requests(method_name : str, request_condition : RequestConditionEnum) -> Counter:
    str_request_condition = request_condition.value
    name = '{:s}_counter_requests_{:s}'.format(method_name.replace(':', '_'), str_request_condition)
    description = '{:s} counter of requests {:s}'.format(method_name, str_request_condition)
    return Counter(name, description)

def get_histogram_duration(method_name : str) -> Histogram:
    name = '{:s}_histogram_duration'.format(method_name.replace(':', '_'))
    description = '{:s} histogram of request duration'.format(method_name)
    return Histogram(name, description)

METRIC_TEMPLATES = {
    '{:s}_COUNTER_STARTED'   : lambda method_name: get_counter_requests  (method_name, RequestConditionEnum.STARTED),
    '{:s}_COUNTER_COMPLETED' : lambda method_name: get_counter_requests  (method_name, RequestConditionEnum.COMPLETED),
    '{:s}_COUNTER_FAILED'    : lambda method_name: get_counter_requests  (method_name, RequestConditionEnum.FAILED),
    '{:s}_HISTOGRAM_DURATION': lambda method_name: get_histogram_duration(method_name),
}

def create_metrics(service_name : str, method_names : List[str]) -> Dict[str, MetricWrapperBase]:
    metrics = {}
    for method_name in method_names:
        for template_name, template_generator_function in METRIC_TEMPLATES.items():
            metric_name = template_name.format(method_name).upper()
            metrics[metric_name] = template_generator_function('{:s}:{:s}'.format(service_name, method_name))
    return metrics

def safe_and_metered_rpc_method(metrics : Dict[str, MetricWrapperBase], logger : logging.Logger):
    def outer_wrapper(func):
        function_name = func.__name__
        HISTOGRAM_DURATION : Histogram = metrics.get('{:s}_HISTOGRAM_DURATION'.format(function_name).upper())
        COUNTER_STARTED    : Counter   = metrics.get('{:s}_COUNTER_STARTED'   .format(function_name).upper())
        COUNTER_COMPLETED  : Counter   = metrics.get('{:s}_COUNTER_COMPLETED' .format(function_name).upper())
        COUNTER_FAILED     : Counter   = metrics.get('{:s}_COUNTER_FAILED'    .format(function_name).upper())

        @HISTOGRAM_DURATION.time()
        def inner_wrapper(self, request, grpc_context : grpc.ServicerContext):
            COUNTER_STARTED.inc()
            try:
                logger.debug('{:s} request: {:s}'.format(function_name, str(request)))
                reply = func(self, request, grpc_context)
                logger.debug('{:s} reply: {:s}'.format(function_name, str(reply)))
                COUNTER_COMPLETED.inc()
                return reply
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            except ServiceException as e:   # pragma: no cover (ServiceException not thrown)
                if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]:
                    # Assume not found or already exists is just a condition, not an error
                    logger.exception('{:s} exception'.format(function_name))
                    COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            except Exception as e:          # pragma: no cover, pylint: disable=broad-except
                logger.exception('{:s} exception'.format(function_name))
                COUNTER_FAILED.inc()
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
        return inner_wrapper
    return outer_wrapper