Skip to content
Snippets Groups Projects
Commit ed1e9819 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Context component:

- corrected logger instantiation
- arranged performance collector for unit tests
- moved performance evaluation dump method to Metrics Pool
parent f37d3357
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!34Context Scalability extensions using CockroachDB + Removal of Stateful database inside Device + other
......@@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, threading
import grpc, json, logging, threading
from enum import Enum
from typing import Dict, Tuple
from prettytable import PrettyTable
from typing import Any, Dict, List, Set, Tuple
from prometheus_client import Counter, Histogram
from prometheus_client.metrics import MetricWrapperBase, INF
from common.tools.grpc.Tools import grpc_message_to_json_string
......@@ -83,6 +84,75 @@ class MetricsPool:
return histogram_duration, counter_started, counter_completed, counter_failed
def get_pretty_table(self, remove_empty_buckets : bool = True) -> PrettyTable:
with MetricsPool.lock:
method_to_metric_fields : Dict[str, Dict[str, Dict[str, Any]]] = dict()
bucket_bounds : Set[str] = set()
for raw_metric_name,raw_metric_data in MetricsPool.metrics.items():
if '_COUNTER_' in raw_metric_name:
method_name,metric_name = raw_metric_name.split('_COUNTER_')
elif '_HISTOGRAM_' in raw_metric_name:
method_name,metric_name = raw_metric_name.split('_HISTOGRAM_')
else:
raise Exception('Unsupported metric: {:s}'.format(raw_metric_name)) # pragma: no cover
metric_data = method_to_metric_fields.setdefault(method_name, dict()).setdefault(metric_name, dict())
for field_name,labels,value,_,_ in raw_metric_data._child_samples():
if field_name == '_bucket': bucket_bounds.add(labels['le'])
if len(labels) > 0: field_name = '{:s}:{:s}'.format(field_name, json.dumps(labels, sort_keys=True))
metric_data[field_name] = value
print('method_to_metric_fields', method_to_metric_fields)
def sort_stats_key(item : List) -> float:
str_duration = str(item[0])
if str_duration == '---': return 0.0
return float(str_duration.replace(' ms', ''))
field_names = ['Method', 'TOT', 'OK', 'ERR', 'avg(Dur)']
bucket_bounds = sorted(bucket_bounds, key=float) # convert buckets to float to get the key
bucket_column_names = ['<={:s}'.format(bucket_bound) for bucket_bound in bucket_bounds]
field_names.extend(bucket_column_names)
pt_stats = PrettyTable(
field_names=field_names, sortby='avg(Dur)', sort_key=sort_stats_key, reversesort=True)
for f in field_names: pt_stats.align[f] = 'r'
for f in ['Method']: pt_stats.align[f] = 'l'
for method_name,metrics in method_to_metric_fields.items():
counter_started_value = int(metrics['STARTED']['_total'])
if counter_started_value == 0:
#pt_stats.add_row([method_name, '---', '---', '---', '---'])
continue
counter_completed_value = int(metrics['COMPLETED']['_total'])
counter_failed_value = int(metrics['FAILED']['_total'])
duration_count_value = float(metrics['DURATION']['_count'])
duration_sum_value = float(metrics['DURATION']['_sum'])
duration_avg_value = duration_sum_value/duration_count_value
row = [
method_name, str(counter_started_value), str(counter_completed_value), str(counter_failed_value),
'{:.3f} ms'.format(1000.0 * duration_avg_value),
]
total_count = 0
for bucket_bound in bucket_bounds:
labels = json.dumps({"le": bucket_bound}, sort_keys=True)
bucket_name = '_bucket:{:s}'.format(labels)
accumulated_count = int(metrics['DURATION'][bucket_name])
bucket_count = accumulated_count - total_count
row.append(str(bucket_count) if bucket_count > 0 else '')
total_count = accumulated_count
pt_stats.add_row(row)
if remove_empty_buckets:
for bucket_column_name in bucket_column_names:
col_index = pt_stats._field_names.index(bucket_column_name)
num_non_empties = sum([1 for row in pt_stats._rows if len(row[col_index]) > 0])
if num_non_empties > 0: continue
pt_stats.del_column(bucket_column_name)
return pt_stats
def metered_subclass_method(metrics_pool : MetricsPool):
def outer_wrapper(func):
metrics = metrics_pool.get_metrics(func.__name__)
......
......@@ -53,7 +53,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer
self.messagebroker = messagebroker
LOGGER.debug('Servicer Created')
def _get_metrics(self): return METRICS_POOL
def _get_metrics(self) -> MetricsPool: return METRICS_POOL
# ----- Context ----------------------------------------------------------------------------------------------------
......
......@@ -25,11 +25,10 @@ LOG_LEVEL = get_log_level()
logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler(stream=sys.stderr))
LOGGER.setLevel(logging.WARNING)
#LOGGER.addHandler(logging.StreamHandler(stream=sys.stderr))
#LOGGER.setLevel(logging.WARNING)
terminate = threading.Event()
LOGGER : logging.Logger = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
......
......@@ -12,17 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json, os, pytest, sqlalchemy
import os, pytest, sqlalchemy
from _pytest.config import Config
from _pytest.terminal import TerminalReporter
from prettytable import PrettyTable
from typing import Any, Dict, List, Set, Tuple
from typing import Tuple
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
get_service_port_grpc, get_service_port_http)
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.method_wrappers.Decorator import MetricsPool
from context.client.ContextClient import ContextClient
from context.service.ContextService import ContextService
from context.service.database.Engine import Engine
......@@ -47,7 +47,7 @@ def context_db_mb(request) -> Tuple[sqlalchemy.engine.Engine, MessageBroker]:
yield _db_engine, _msg_broker
_msg_broker.terminate()
RAW_METRICS = dict()
RAW_METRICS : MetricsPool = None
@pytest.fixture(scope='session')
def context_service(
......@@ -72,69 +72,6 @@ def pytest_terminal_summary(
):
yield
method_to_metric_fields : Dict[str, Dict[str, Dict[str, Any]]] = dict()
bucket_bounds : Set[str] = set()
for raw_metric_name,raw_metric_data in RAW_METRICS.items():
if '_COUNTER_' in raw_metric_name:
method_name,metric_name = raw_metric_name.split('_COUNTER_')
elif '_HISTOGRAM_' in raw_metric_name:
method_name,metric_name = raw_metric_name.split('_HISTOGRAM_')
else:
raise Exception('Unsupported metric: {:s}'.format(raw_metric_name)) # pragma: no cover
metric_data = method_to_metric_fields.setdefault(method_name, dict()).setdefault(metric_name, dict())
for field_name,labels,value,_,_ in raw_metric_data._child_samples():
if field_name == '_bucket': bucket_bounds.add(labels['le'])
if len(labels) > 0: field_name = '{:s}:{:s}'.format(field_name, json.dumps(labels, sort_keys=True))
metric_data[field_name] = value
#print('method_to_metric_fields', method_to_metric_fields)
def sort_stats_key(item : List) -> float:
str_duration = str(item[0])
if str_duration == '---': return 0.0
return float(str_duration.replace(' ms', ''))
field_names = ['Method', 'TOT', 'OK', 'ERR', 'avg(Dur)']
bucket_bounds = sorted(bucket_bounds, key=float) # convert buckets to float to get the key
bucket_column_names = ['<={:s}'.format(bucket_bound) for bucket_bound in bucket_bounds]
field_names.extend(bucket_column_names)
pt_stats = PrettyTable(field_names=field_names, sortby='avg(Dur)', sort_key=sort_stats_key, reversesort=True)
for f in field_names: pt_stats.align[f] = 'r'
for f in ['Method']: pt_stats.align[f] = 'l'
for method_name,metrics in method_to_metric_fields.items():
counter_started_value = int(metrics['STARTED']['_total'])
if counter_started_value == 0:
#pt_stats.add_row([method_name, '---', '---', '---', '---'])
continue
counter_completed_value = int(metrics['COMPLETED']['_total'])
counter_failed_value = int(metrics['FAILED']['_total'])
duration_count_value = float(metrics['DURATION']['_count'])
duration_sum_value = float(metrics['DURATION']['_sum'])
duration_avg_value = duration_sum_value/duration_count_value
row = [
method_name, str(counter_started_value), str(counter_completed_value), str(counter_failed_value),
'{:.3f} ms'.format(1000.0 * duration_avg_value),
]
total_count = 0
for bucket_bound in bucket_bounds:
labels = json.dumps({"le": bucket_bound}, sort_keys=True)
bucket_name = '_bucket:{:s}'.format(labels)
accumulated_count = int(metrics['DURATION'][bucket_name])
bucket_count = accumulated_count - total_count
row.append(str(bucket_count) if bucket_count > 0 else '')
total_count = accumulated_count
pt_stats.add_row(row)
for bucket_column_name in bucket_column_names:
col_index = pt_stats._field_names.index(bucket_column_name)
num_non_empties = sum([1 for row in pt_stats._rows if len(row[col_index]) > 0])
if num_non_empties > 0: continue
pt_stats.del_column(bucket_column_name)
print('')
print('Performance Results:')
print(pt_stats.get_string())
print(RAW_METRICS.get_pretty_table().get_string())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment