Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, threading
from enum import Enum
from typing import Dict, Tuple
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase, INF
from common.tools.grpc.Tools import grpc_message_to_json_string
from .ServiceExceptions import ServiceException
class MetricTypeEnum(Enum):
COUNTER_STARTED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_started'
COUNTER_COMPLETED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_completed'
COUNTER_FAILED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_failed'
HISTOGRAM_DURATION = 'tfs_{component:s}_{sub_module:s}_{method:s}_histogram_duration'
METRIC_TO_CLASS_PARAMS = {
MetricTypeEnum.COUNTER_STARTED : (Counter, {}),
MetricTypeEnum.COUNTER_COMPLETED : (Counter, {}),
MetricTypeEnum.COUNTER_FAILED : (Counter, {}),
MetricTypeEnum.HISTOGRAM_DURATION: (Histogram, {
'buckets': (
# .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF
0.0010, 0.0025, 0.0050, 0.0075,
0.0100, 0.0250, 0.0500, 0.0750,
0.1000, 0.2500, 0.5000, 0.7500,
1.0000, 2.5000, 5.0000, 7.5000,
INF)
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
})
}
class MetricsPool:
lock = threading.Lock()
metrics : Dict[str, MetricWrapperBase] = dict()
def __init__(self, component : str, sub_module : str, labels : Dict[str, str] = {}) -> None:
self._component = component
self._sub_module = sub_module
self._labels = labels
def get_or_create(self, method : str, metric_type : MetricTypeEnum, **metric_params) -> MetricWrapperBase:
metric_name = str(metric_type.value).format(
component=self._component, sub_module=self._sub_module, method=method).upper()
with MetricsPool.lock:
if metric_name not in MetricsPool.metrics:
metric_tuple : Tuple[MetricWrapperBase, Dict] = METRIC_TO_CLASS_PARAMS.get(metric_type)
metric_class, default_metric_params = metric_tuple
if len(metric_params) == 0: metric_params = default_metric_params
labels = sorted(self._labels.keys())
MetricsPool.metrics[metric_name] = metric_class(metric_name.lower(), '', labels, **metric_params)
return MetricsPool.metrics[metric_name]
def get_metrics(
self, method : str
) -> Tuple[MetricWrapperBase, MetricWrapperBase, MetricWrapperBase, MetricWrapperBase]:
histogram_duration : Histogram = self.get_or_create(method, MetricTypeEnum.HISTOGRAM_DURATION)
counter_started : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_STARTED)
counter_completed : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_COMPLETED)
counter_failed : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_FAILED)
if len(self._labels) > 0:
histogram_duration = histogram_duration.labels(**(self._labels))
counter_started = counter_started.labels(**(self._labels))
counter_completed = counter_completed.labels(**(self._labels))
counter_failed = counter_failed.labels(**(self._labels))
return histogram_duration, counter_started, counter_completed, counter_failed
def metered_subclass_method(metrics_pool : MetricsPool):
def outer_wrapper(func):
metrics = metrics_pool.get_metrics(func.__name__)
histogram_duration, counter_started, counter_completed, counter_failed = metrics
@histogram_duration.time()
def inner_wrapper(self, *args, **kwargs):
counter_started.inc()
try:
reply = func(self, *args, **kwargs)
counter_completed.inc()
return reply
except KeyboardInterrupt: # pylint: disable=try-except-raise
raise
except Exception: # pylint: disable=broad-except
counter_failed.inc()
return inner_wrapper
return outer_wrapper
def safe_and_metered_rpc_method(metrics_pool : MetricsPool, logger : logging.Logger):
def outer_wrapper(func):
method_name = func.__name__
metrics = metrics_pool.get_metrics(method_name)
histogram_duration, counter_started, counter_completed, counter_failed = metrics
@histogram_duration.time()
def inner_wrapper(self, request, grpc_context : grpc.ServicerContext):
counter_started.inc()
try:
logger.debug('{:s} request: {:s}'.format(method_name, grpc_message_to_json_string(request)))
reply = func(self, request, grpc_context)
logger.debug('{:s} reply: {:s}'.format(method_name, grpc_message_to_json_string(reply)))
counter_completed.inc()
return reply
except ServiceException as e: # pragma: no cover (ServiceException not thrown)
if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]:
# Assume not found or already exists is just a condition, not an error
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover, pylint: disable=broad-except
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
return inner_wrapper
return outer_wrapper