Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import grpc, logging
from enum import Enum
from typing import Dict, List
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase
from common.exceptions.ServiceException import ServiceException
class RequestConditionEnum(Enum):
STARTED = 'started'
COMPLETED = 'completed'
FAILED = 'failed'
def get_counter_requests(method_name : str, request_condition : RequestConditionEnum) -> Counter:
str_request_condition = request_condition.value
name = '{:s}_counter_requests_{:s}'.format(method_name.replace(':', '_'), str_request_condition)
description = '{:s} counter of requests {:s}'.format(method_name, str_request_condition)
return Counter(name, description)
def get_histogram_duration(method_name : str) -> Histogram:
name = '{:s}_histogram_duration'.format(method_name.replace(':', '_'))
description = '{:s} histogram of request duration'.format(method_name)
return Histogram(name, description)
METRIC_TEMPLATES = {
'{:s}_COUNTER_STARTED' : lambda method_name: get_counter_requests (method_name, RequestConditionEnum.STARTED),
'{:s}_COUNTER_COMPLETED' : lambda method_name: get_counter_requests (method_name, RequestConditionEnum.COMPLETED),
'{:s}_COUNTER_FAILED' : lambda method_name: get_counter_requests (method_name, RequestConditionEnum.FAILED),
'{:s}_HISTOGRAM_DURATION': lambda method_name: get_histogram_duration(method_name),
}
def create_metrics(service_name : str, method_names : List[str]) -> Dict[str, MetricWrapperBase]:
metrics = {}
for method_name in method_names:
for template_name, template_generator_function in METRIC_TEMPLATES.items():
metric_name = template_name.format(method_name).upper()
metrics[metric_name] = template_generator_function('{:s}:{:s}'.format(service_name, method_name))
return metrics
def safe_and_metered_rpc_method(metrics : Dict[str, MetricWrapperBase], logger : logging.Logger):
def outer_wrapper(func):
function_name = func.__name__
print('function_name', function_name)
HISTOGRAM_DURATION : Histogram = metrics.get('{:s}_HISTOGRAM_DURATION'.format(function_name).upper())
COUNTER_STARTED : Counter = metrics.get('{:s}_COUNTER_STARTED' .format(function_name).upper())
COUNTER_COMPLETED : Counter = metrics.get('{:s}_COUNTER_COMPLETED' .format(function_name).upper())
COUNTER_FAILED : Counter = metrics.get('{:s}_COUNTER_FAILED' .format(function_name).upper())
@HISTOGRAM_DURATION.time()
def inner_wrapper(self, request, grpc_context : grpc.ServicerContext):
COUNTER_STARTED.inc()
try:
logger.debug('{:s} request: {:s}'.format(function_name, str(request)))
reply = func(self, request, grpc_context)
logger.debug('{:s} reply: {:s}'.format(function_name, str(reply)))
COUNTER_COMPLETED.inc()
return reply
except ServiceException as e: # pragma: no cover (ServiceException not thrown)
logger.exception('{:s} exception'.format(function_name))
COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
logger.exception('{:s} exception'.format(function_name))
COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
return inner_wrapper
return outer_wrapper