Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, threading
from enum import Enum
from typing import Dict, Tuple
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase, INF
from common.tools.grpc.Tools import grpc_message_to_json_string
from .ServiceExceptions import ServiceException
class MetricTypeEnum(Enum):
COUNTER_STARTED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_started'
COUNTER_COMPLETED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_completed'
COUNTER_FAILED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_failed'
HISTOGRAM_DURATION = 'tfs_{component:s}_{sub_module:s}_{method:s}_histogram_duration'
METRIC_TO_CLASS_PARAMS = {
MetricTypeEnum.COUNTER_STARTED : (Counter, {}),
MetricTypeEnum.COUNTER_COMPLETED : (Counter, {}),
MetricTypeEnum.COUNTER_FAILED : (Counter, {}),
MetricTypeEnum.HISTOGRAM_DURATION: (Histogram, {
'buckets': (
# .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF
0.001, 0.002, 0.003, 0.004, 0.005, 0.0075,
0.010, 0.025, 0.050, 0.075,
0.100, 0.250, 0.500, 0.750,
1.000, 2.500, 5.000, 7.500,
10.00, INF)
})
}
class MetricsPool:
lock = threading.Lock()
metrics : Dict[str, MetricWrapperBase] = dict()
def __init__(self, component : str, sub_module : str, labels : Dict[str, str] = {}) -> None:
self._component = component
self._sub_module = sub_module
self._labels = labels
def get_or_create(self, method : str, metric_type : MetricTypeEnum, **metric_params) -> MetricWrapperBase:
metric_name = str(metric_type.value).format(
component=self._component, sub_module=self._sub_module, method=method).upper()
with MetricsPool.lock:
if metric_name not in MetricsPool.metrics:
metric_tuple : Tuple[MetricWrapperBase, Dict] = METRIC_TO_CLASS_PARAMS.get(metric_type)
metric_class, default_metric_params = metric_tuple
if len(metric_params) == 0: metric_params = default_metric_params
labels = sorted(self._labels.keys())
MetricsPool.metrics[metric_name] = metric_class(metric_name.lower(), '', labels, **metric_params)
return MetricsPool.metrics[metric_name]
def get_metrics(
self, method : str
) -> Tuple[MetricWrapperBase, MetricWrapperBase, MetricWrapperBase, MetricWrapperBase]:
histogram_duration : Histogram = self.get_or_create(method, MetricTypeEnum.HISTOGRAM_DURATION)
counter_started : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_STARTED)
counter_completed : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_COMPLETED)
counter_failed : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_FAILED)
if len(self._labels) > 0:
histogram_duration = histogram_duration.labels(**(self._labels))
counter_started = counter_started.labels(**(self._labels))
counter_completed = counter_completed.labels(**(self._labels))
counter_failed = counter_failed.labels(**(self._labels))
return histogram_duration, counter_started, counter_completed, counter_failed
def metered_subclass_method(metrics_pool : MetricsPool):
def outer_wrapper(func):
metrics = metrics_pool.get_metrics(func.__name__)
histogram_duration, counter_started, counter_completed, counter_failed = metrics
@histogram_duration.time()
def inner_wrapper(self, *args, **kwargs):
counter_started.inc()
try:
reply = func(self, *args, **kwargs)
counter_completed.inc()
return reply
except KeyboardInterrupt: # pylint: disable=try-except-raise
raise
except Exception: # pylint: disable=broad-except
counter_failed.inc()
return inner_wrapper
return outer_wrapper
def safe_and_metered_rpc_method(metrics_pool : MetricsPool, logger : logging.Logger):
def outer_wrapper(func):
method_name = func.__name__
metrics = metrics_pool.get_metrics(method_name)
histogram_duration, counter_started, counter_completed, counter_failed = metrics
@histogram_duration.time()
def inner_wrapper(self, request, grpc_context : grpc.ServicerContext):
counter_started.inc()
try:
logger.debug('{:s} request: {:s}'.format(method_name, grpc_message_to_json_string(request)))
reply = func(self, request, grpc_context)
logger.debug('{:s} reply: {:s}'.format(method_name, grpc_message_to_json_string(reply)))
counter_completed.inc()
return reply
except ServiceException as e: # pragma: no cover (ServiceException not thrown)
if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]:
# Assume not found or already exists is just a condition, not an error
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover, pylint: disable=broad-except
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
return inner_wrapper
return outer_wrapper