Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from prettytable import PrettyTable
from typing import Any, Dict, List, Set, Tuple
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase, INF
from common.tools.grpc.Tools import grpc_message_to_json_string
from .ServiceExceptions import ServiceException
class MetricTypeEnum(Enum):
COUNTER_STARTED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_started'
COUNTER_COMPLETED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_completed'
COUNTER_FAILED = 'tfs_{component:s}_{sub_module:s}_{method:s}_counter_requests_failed'
HISTOGRAM_DURATION = 'tfs_{component:s}_{sub_module:s}_{method:s}_histogram_duration'
METRIC_TO_CLASS_PARAMS = {
MetricTypeEnum.COUNTER_STARTED : (Counter, {}),
MetricTypeEnum.COUNTER_COMPLETED : (Counter, {}),
MetricTypeEnum.COUNTER_FAILED : (Counter, {}),
MetricTypeEnum.HISTOGRAM_DURATION: (Histogram, {
'buckets': (
# .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, INF
#0.0010, 0.0025, 0.0050, 0.0075,
#0.0100, 0.0250, 0.0500, 0.0750,
#0.1000, 0.2500, 0.5000, 0.7500,
#1.0000, 2.5000, 5.0000, 7.5000,
0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, # 1~9 ms
0.010, 0.020, 0.030, 0.040, 0.050, 0.060, 0.070, 0.080, 0.090, # 10~90 ms
0.100, 0.200, 0.300, 0.400, 0.500, 0.600, 0.700, 0.800, 0.900, # 100~900 ms
1.000, 2.000, 3.000, 4.000, 5.000, 6.000, 7.000, 8.000, 9.000, # 1~9 sec
10.0, 20.0, 30.0, 40.0, 50.0, 60.0, INF # 10~60 sec & Infinity
)
})
}
class MetricsPool:
lock = threading.Lock()
metrics : Dict[str, MetricWrapperBase] = dict()
def __init__(
self, component : str, sub_module : str, labels : Dict[str, str] = {},
default_metric_params : Dict[MetricTypeEnum, Dict] = dict()
) -> None:
self._component = component
self._sub_module = sub_module
self._labels = labels
self._default_metric_params = default_metric_params
def get_or_create(self, method : str, metric_type : MetricTypeEnum, **metric_params) -> MetricWrapperBase:
metric_name = str(metric_type.value).format(
component=self._component, sub_module=self._sub_module, method=method).upper()
with MetricsPool.lock:
if metric_name not in MetricsPool.metrics:
metric_tuple : Tuple[MetricWrapperBase, Dict] = METRIC_TO_CLASS_PARAMS.get(metric_type)
metric_class, default_metric_params = metric_tuple
if len(metric_params) == 0: metric_params = self._default_metric_params.get(metric_type, {})
if len(metric_params) == 0: metric_params = default_metric_params
labels = sorted(self._labels.keys())
MetricsPool.metrics[metric_name] = metric_class(metric_name.lower(), '', labels, **metric_params)
return MetricsPool.metrics[metric_name]
def get_metrics(
self, method : str
) -> Tuple[MetricWrapperBase, MetricWrapperBase, MetricWrapperBase, MetricWrapperBase]:
histogram_duration : Histogram = self.get_or_create(method, MetricTypeEnum.HISTOGRAM_DURATION)
counter_started : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_STARTED)
counter_completed : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_COMPLETED)
counter_failed : Counter = self.get_or_create(method, MetricTypeEnum.COUNTER_FAILED)
if len(self._labels) > 0:
histogram_duration = histogram_duration.labels(**(self._labels))
counter_started = counter_started.labels(**(self._labels))
counter_completed = counter_completed.labels(**(self._labels))
counter_failed = counter_failed.labels(**(self._labels))
return histogram_duration, counter_started, counter_completed, counter_failed
def get_pretty_table(self, remove_empty_buckets : bool = True) -> PrettyTable:
with MetricsPool.lock:
method_to_metric_fields : Dict[str, Dict[str, Dict[str, Any]]] = dict()
bucket_bounds : Set[str] = set()
for raw_metric_name,raw_metric_data in MetricsPool.metrics.items():
if '_COUNTER_' in raw_metric_name:
method_name,metric_name = raw_metric_name.split('_COUNTER_')
elif '_HISTOGRAM_' in raw_metric_name:
method_name,metric_name = raw_metric_name.split('_HISTOGRAM_')
else:
raise Exception('Unsupported metric: {:s}'.format(raw_metric_name)) # pragma: no cover
metric_data = method_to_metric_fields.setdefault(method_name, dict()).setdefault(metric_name, dict())
for field_name,labels,value,_,_ in raw_metric_data._child_samples():
if field_name == '_bucket': bucket_bounds.add(labels['le'])
if len(labels) > 0: field_name = '{:s}:{:s}'.format(field_name, json.dumps(labels, sort_keys=True))
metric_data[field_name] = value
#print('method_to_metric_fields', method_to_metric_fields)
def sort_stats_key(item : List) -> float:
str_duration = str(item[0])
if str_duration == '---': return 0.0
return float(str_duration.replace(' ms', ''))
field_names = ['Method', 'TOT', 'OK', 'ERR', 'avg(Dur)']
bucket_bounds = sorted(bucket_bounds, key=float) # convert buckets to float to get the key
bucket_column_names = ['<={:s}'.format(bucket_bound) for bucket_bound in bucket_bounds]
field_names.extend(bucket_column_names)
pt_stats = PrettyTable(
field_names=field_names, sortby='avg(Dur)', sort_key=sort_stats_key, reversesort=True)
for f in field_names: pt_stats.align[f] = 'r'
for f in ['Method']: pt_stats.align[f] = 'l'
for method_name,metrics in method_to_metric_fields.items():
counter_started_value = int(metrics['REQUESTS_STARTED']['_total'])
if counter_started_value == 0:
#pt_stats.add_row([method_name, '---', '---', '---', '---'])
continue
counter_completed_value = int(metrics['REQUESTS_COMPLETED']['_total'])
counter_failed_value = int(metrics['REQUESTS_FAILED']['_total'])
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
duration_count_value = float(metrics['DURATION']['_count'])
duration_sum_value = float(metrics['DURATION']['_sum'])
duration_avg_value = duration_sum_value/duration_count_value
row = [
method_name, str(counter_started_value), str(counter_completed_value), str(counter_failed_value),
'{:.3f} ms'.format(1000.0 * duration_avg_value),
]
total_count = 0
for bucket_bound in bucket_bounds:
labels = json.dumps({"le": bucket_bound}, sort_keys=True)
bucket_name = '_bucket:{:s}'.format(labels)
accumulated_count = int(metrics['DURATION'][bucket_name])
bucket_count = accumulated_count - total_count
row.append(str(bucket_count) if bucket_count > 0 else '')
total_count = accumulated_count
pt_stats.add_row(row)
if remove_empty_buckets:
for bucket_column_name in bucket_column_names:
col_index = pt_stats._field_names.index(bucket_column_name)
num_non_empties = sum([1 for row in pt_stats._rows if len(row[col_index]) > 0])
if num_non_empties > 0: continue
pt_stats.del_column(bucket_column_name)
return pt_stats
def metered_subclass_method(metrics_pool : MetricsPool):
def outer_wrapper(func):
metrics = metrics_pool.get_metrics(func.__name__)
histogram_duration, counter_started, counter_completed, counter_failed = metrics
@histogram_duration.time()
def inner_wrapper(self, *args, **kwargs):
counter_started.inc()
try:
reply = func(self, *args, **kwargs)
counter_completed.inc()
return reply
except KeyboardInterrupt: # pylint: disable=try-except-raise
raise
except Exception: # pylint: disable=broad-except
counter_failed.inc()
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
return inner_wrapper
return outer_wrapper
def safe_and_metered_rpc_method(metrics_pool : MetricsPool, logger : logging.Logger):
def outer_wrapper(func):
method_name = func.__name__
metrics = metrics_pool.get_metrics(method_name)
histogram_duration, counter_started, counter_completed, counter_failed = metrics
@histogram_duration.time()
def inner_wrapper(self, request, grpc_context : grpc.ServicerContext):
counter_started.inc()
try:
logger.debug('{:s} request: {:s}'.format(method_name, grpc_message_to_json_string(request)))
reply = func(self, request, grpc_context)
logger.debug('{:s} reply: {:s}'.format(method_name, grpc_message_to_json_string(reply)))
counter_completed.inc()
return reply
except ServiceException as e: # pragma: no cover (ServiceException not thrown)
if e.code not in [grpc.StatusCode.NOT_FOUND, grpc.StatusCode.ALREADY_EXISTS]:
# Assume not found or already exists is just a condition, not an error
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover, pylint: disable=broad-except
logger.exception('{:s} exception'.format(method_name))
counter_failed.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
return inner_wrapper
return outer_wrapper