diff --git a/src/common/method_wrappers/Decorator.py b/src/common/method_wrappers/Decorator.py index 7ee2a919e10f25104d0fa77caaf8bafa11c2b30f..1a384d15ab65eb115f31d5fa157d9e25fdfe991c 100644 --- a/src/common/method_wrappers/Decorator.py +++ b/src/common/method_wrappers/Decorator.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging, threading +import grpc, json, logging, threading from enum import Enum -from typing import Dict, Tuple +from prettytable import PrettyTable +from typing import Any, Dict, List, Set, Tuple from prometheus_client import Counter, Histogram from prometheus_client.metrics import MetricWrapperBase, INF from common.tools.grpc.Tools import grpc_message_to_json_string @@ -83,6 +84,75 @@ class MetricsPool: return histogram_duration, counter_started, counter_completed, counter_failed + def get_pretty_table(self, remove_empty_buckets : bool = True) -> PrettyTable: + with MetricsPool.lock: + method_to_metric_fields : Dict[str, Dict[str, Dict[str, Any]]] = dict() + bucket_bounds : Set[str] = set() + for raw_metric_name,raw_metric_data in MetricsPool.metrics.items(): + if '_COUNTER_' in raw_metric_name: + method_name,metric_name = raw_metric_name.split('_COUNTER_') + elif '_HISTOGRAM_' in raw_metric_name: + method_name,metric_name = raw_metric_name.split('_HISTOGRAM_') + else: + raise Exception('Unsupported metric: {:s}'.format(raw_metric_name)) # pragma: no cover + metric_data = method_to_metric_fields.setdefault(method_name, dict()).setdefault(metric_name, dict()) + for field_name,labels,value,_,_ in raw_metric_data._child_samples(): + if field_name == '_bucket': bucket_bounds.add(labels['le']) + if len(labels) > 0: field_name = '{:s}:{:s}'.format(field_name, json.dumps(labels, sort_keys=True)) + metric_data[field_name] = value + print('method_to_metric_fields', method_to_metric_fields) + + def sort_stats_key(item : List) -> float: + str_duration = str(item[0]) + if str_duration == '---': return 0.0 + return float(str_duration.replace(' ms', '')) + + field_names = ['Method', 'TOT', 'OK', 'ERR', 'avg(Dur)'] + bucket_bounds = sorted(bucket_bounds, key=float) # convert buckets to float to get the key + bucket_column_names = ['<={:s}'.format(bucket_bound) for bucket_bound in bucket_bounds] + field_names.extend(bucket_column_names) + + pt_stats = PrettyTable( + field_names=field_names, sortby='avg(Dur)', sort_key=sort_stats_key, reversesort=True) + for f in field_names: pt_stats.align[f] = 'r' + for f in ['Method']: pt_stats.align[f] = 'l' + + for method_name,metrics in method_to_metric_fields.items(): + counter_started_value = int(metrics['STARTED']['_total']) + if counter_started_value == 0: + #pt_stats.add_row([method_name, '---', '---', '---', '---']) + continue + counter_completed_value = int(metrics['COMPLETED']['_total']) + counter_failed_value = int(metrics['FAILED']['_total']) + duration_count_value = float(metrics['DURATION']['_count']) + duration_sum_value = float(metrics['DURATION']['_sum']) + duration_avg_value = duration_sum_value/duration_count_value + + row = [ + method_name, str(counter_started_value), str(counter_completed_value), str(counter_failed_value), + '{:.3f} ms'.format(1000.0 * duration_avg_value), + ] + + total_count = 0 + for bucket_bound in bucket_bounds: + labels = json.dumps({"le": bucket_bound}, sort_keys=True) + bucket_name = '_bucket:{:s}'.format(labels) + accumulated_count = int(metrics['DURATION'][bucket_name]) + bucket_count = accumulated_count - total_count + row.append(str(bucket_count) if bucket_count > 0 else '') + total_count = accumulated_count + + pt_stats.add_row(row) + + if remove_empty_buckets: + for bucket_column_name in bucket_column_names: + col_index = pt_stats._field_names.index(bucket_column_name) + num_non_empties = sum([1 for row in pt_stats._rows if len(row[col_index]) > 0]) + if num_non_empties > 0: continue + pt_stats.del_column(bucket_column_name) + + return pt_stats + def metered_subclass_method(metrics_pool : MetricsPool): def outer_wrapper(func): metrics = metrics_pool.get_metrics(func.__name__) diff --git a/src/context/service/ContextServiceServicerImpl.py b/src/context/service/ContextServiceServicerImpl.py index 7e72265705e23841413abac19488a0d31754531c..3f1bd9c202550f5aad3ce64408677fe55d344b50 100644 --- a/src/context/service/ContextServiceServicerImpl.py +++ b/src/context/service/ContextServiceServicerImpl.py @@ -53,7 +53,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer self.messagebroker = messagebroker LOGGER.debug('Servicer Created') - def _get_metrics(self): return METRICS_POOL + def _get_metrics(self) -> MetricsPool: return METRICS_POOL # ----- Context ---------------------------------------------------------------------------------------------------- diff --git a/src/context/service/__main__.py b/src/context/service/__main__.py index 9960e94b519c5f4604557a70bab6c8e80eab6fc5..145c91cf098ead558ff9cc5c2ac704fed3bc4dd2 100644 --- a/src/context/service/__main__.py +++ b/src/context/service/__main__.py @@ -25,11 +25,10 @@ LOG_LEVEL = get_log_level() logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) -LOGGER.addHandler(logging.StreamHandler(stream=sys.stderr)) -LOGGER.setLevel(logging.WARNING) +#LOGGER.addHandler(logging.StreamHandler(stream=sys.stderr)) +#LOGGER.setLevel(logging.WARNING) terminate = threading.Event() -LOGGER : logging.Logger = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name LOGGER.warning('Terminate signal received') diff --git a/src/context/tests/conftest.py b/src/context/tests/conftest.py index dc54c8cdcbdc29e8bbdfb316f3214172c1a993b7..25de05842e7596429cb236db0b7589ad7a94213c 100644 --- a/src/context/tests/conftest.py +++ b/src/context/tests/conftest.py @@ -12,17 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json, os, pytest, sqlalchemy +import os, pytest, sqlalchemy from _pytest.config import Config from _pytest.terminal import TerminalReporter -from prettytable import PrettyTable -from typing import Any, Dict, List, Set, Tuple +from typing import Tuple from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name, get_service_port_grpc, get_service_port_http) from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker +from common.method_wrappers.Decorator import MetricsPool from context.client.ContextClient import ContextClient from context.service.ContextService import ContextService from context.service.database.Engine import Engine @@ -47,7 +47,7 @@ def context_db_mb(request) -> Tuple[sqlalchemy.engine.Engine, MessageBroker]: yield _db_engine, _msg_broker _msg_broker.terminate() -RAW_METRICS = dict() +RAW_METRICS : MetricsPool = None @pytest.fixture(scope='session') def context_service( @@ -72,69 +72,6 @@ def pytest_terminal_summary( ): yield - method_to_metric_fields : Dict[str, Dict[str, Dict[str, Any]]] = dict() - bucket_bounds : Set[str] = set() - for raw_metric_name,raw_metric_data in RAW_METRICS.items(): - if '_COUNTER_' in raw_metric_name: - method_name,metric_name = raw_metric_name.split('_COUNTER_') - elif '_HISTOGRAM_' in raw_metric_name: - method_name,metric_name = raw_metric_name.split('_HISTOGRAM_') - else: - raise Exception('Unsupported metric: {:s}'.format(raw_metric_name)) # pragma: no cover - metric_data = method_to_metric_fields.setdefault(method_name, dict()).setdefault(metric_name, dict()) - for field_name,labels,value,_,_ in raw_metric_data._child_samples(): - if field_name == '_bucket': bucket_bounds.add(labels['le']) - if len(labels) > 0: field_name = '{:s}:{:s}'.format(field_name, json.dumps(labels, sort_keys=True)) - metric_data[field_name] = value - #print('method_to_metric_fields', method_to_metric_fields) - - def sort_stats_key(item : List) -> float: - str_duration = str(item[0]) - if str_duration == '---': return 0.0 - return float(str_duration.replace(' ms', '')) - - field_names = ['Method', 'TOT', 'OK', 'ERR', 'avg(Dur)'] - bucket_bounds = sorted(bucket_bounds, key=float) # convert buckets to float to get the key - bucket_column_names = ['<={:s}'.format(bucket_bound) for bucket_bound in bucket_bounds] - field_names.extend(bucket_column_names) - - pt_stats = PrettyTable(field_names=field_names, sortby='avg(Dur)', sort_key=sort_stats_key, reversesort=True) - for f in field_names: pt_stats.align[f] = 'r' - for f in ['Method']: pt_stats.align[f] = 'l' - - for method_name,metrics in method_to_metric_fields.items(): - counter_started_value = int(metrics['STARTED']['_total']) - if counter_started_value == 0: - #pt_stats.add_row([method_name, '---', '---', '---', '---']) - continue - counter_completed_value = int(metrics['COMPLETED']['_total']) - counter_failed_value = int(metrics['FAILED']['_total']) - duration_count_value = float(metrics['DURATION']['_count']) - duration_sum_value = float(metrics['DURATION']['_sum']) - duration_avg_value = duration_sum_value/duration_count_value - - row = [ - method_name, str(counter_started_value), str(counter_completed_value), str(counter_failed_value), - '{:.3f} ms'.format(1000.0 * duration_avg_value), - ] - - total_count = 0 - for bucket_bound in bucket_bounds: - labels = json.dumps({"le": bucket_bound}, sort_keys=True) - bucket_name = '_bucket:{:s}'.format(labels) - accumulated_count = int(metrics['DURATION'][bucket_name]) - bucket_count = accumulated_count - total_count - row.append(str(bucket_count) if bucket_count > 0 else '') - total_count = accumulated_count - - pt_stats.add_row(row) - - for bucket_column_name in bucket_column_names: - col_index = pt_stats._field_names.index(bucket_column_name) - num_non_empties = sum([1 for row in pt_stats._rows if len(row[col_index]) > 0]) - if num_non_empties > 0: continue - pt_stats.del_column(bucket_column_name) - print('') print('Performance Results:') - print(pt_stats.get_string()) + print(RAW_METRICS.get_pretty_table().get_string())