Skip to content
Snippets Groups Projects
Metrics.py 3.53 KiB
Newer Older
import grpc, logging
from enum import Enum
from typing import Dict, List
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase
from common.exceptions.ServiceException import ServiceException

class RequestConditionEnum(Enum):
    STARTED   = 'started'
    COMPLETED = 'completed'
    FAILED    = 'failed'

def get_counter_requests(method_name : str, request_condition : RequestConditionEnum) -> Counter:
    str_request_condition = request_condition.value
    name = '{:s}_counter_requests_{:s}'.format(method_name.replace(':', '_'), str_request_condition)
    description = '{:s} counter of requests {:s}'.format(method_name, str_request_condition)
    return Counter(name, description)

def get_histogram_duration(method_name : str) -> Histogram:
    name = '{:s}_histogram_duration'.format(method_name.replace(':', '_'))
    description = '{:s} histogram of request duration'.format(method_name)
    return Histogram(name, description)

METRIC_TEMPLATES = {
    '{:s}_COUNTER_STARTED'   : lambda method_name: get_counter_requests  (method_name, RequestConditionEnum.STARTED),
    '{:s}_COUNTER_COMPLETED' : lambda method_name: get_counter_requests  (method_name, RequestConditionEnum.COMPLETED),
    '{:s}_COUNTER_FAILED'    : lambda method_name: get_counter_requests  (method_name, RequestConditionEnum.FAILED),
    '{:s}_HISTOGRAM_DURATION': lambda method_name: get_histogram_duration(method_name),
}

def create_metrics(service_name : str, method_names : List[str]) -> Dict[str, MetricWrapperBase]:
    metrics = {}
    for method_name in method_names:
        for template_name, template_generator_function in METRIC_TEMPLATES.items():
            metric_name = template_name.format(method_name).upper()
            metrics[metric_name] = template_generator_function('{:s}:{:s}'.format(service_name, method_name))
    return metrics

def safe_and_metered_rpc_method(metrics : Dict[str, MetricWrapperBase], logger : logging.Logger):
    def outer_wrapper(func):
        function_name = func.__name__
        print('function_name', function_name)
        HISTOGRAM_DURATION : Histogram = metrics.get('{:s}_HISTOGRAM_DURATION'.format(function_name).upper())
        COUNTER_STARTED    : Counter   = metrics.get('{:s}_COUNTER_STARTED'   .format(function_name).upper())
        COUNTER_COMPLETED  : Counter   = metrics.get('{:s}_COUNTER_COMPLETED' .format(function_name).upper())
        COUNTER_FAILED     : Counter   = metrics.get('{:s}_COUNTER_FAILED'    .format(function_name).upper())

        @HISTOGRAM_DURATION.time()
        def inner_wrapper(self, request, grpc_context : grpc.ServicerContext):
            COUNTER_STARTED.inc()
            try:
                logger.debug('{:s} request: {:s}'.format(function_name, str(request)))
                reply = func(self, request, grpc_context)
                logger.debug('{:s} reply: {:s}'.format(function_name, str(reply)))
                COUNTER_COMPLETED.inc()
                return reply
            except ServiceException as e:                               # pragma: no cover (ServiceException not thrown)
                logger.exception('{:s} exception'.format(function_name))
                COUNTER_FAILED.inc()
                grpc_context.abort(e.code, e.details)
            except Exception as e:                                      # pragma: no cover
                logger.exception('{:s} exception'.format(function_name))
                COUNTER_FAILED.inc()
                grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
        return inner_wrapper
    return outer_wrapper