Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, os, pytest, sqlalchemy
from _pytest.config import Config
from _pytest.terminal import TerminalReporter
from prettytable import PrettyTable
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
get_service_port_grpc, get_service_port_http)
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from context.client.ContextClient import ContextClient
from context.service.ContextService import ContextService
from context.service.Database import Database
from context.service.Engine import Engine
from context.service.database.models._Base import rebuild_database
LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + int(get_service_port_grpc(ServiceNameEnum.CONTEXT)) # avoid privileged ports
HTTP_PORT = 10000 + int(get_service_port_http(ServiceNameEnum.CONTEXT)) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
@pytest.fixture(scope='session')
def context_db_mb(request) -> Tuple[sqlalchemy.engine.Engine, MessageBroker]: # pylint: disable=unused-argument
_db_engine = Engine.get_engine()
Engine.drop_database(_db_engine)
Engine.create_database(_db_engine)
rebuild_database(_db_engine)
_msg_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY))
yield _db_engine, _msg_broker
_msg_broker.terminate()
def context_service(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
global RAW_METRICS # pylint: disable=global-statement
_service = ContextService(context_db_mb[0], context_db_mb[1])
RAW_METRICS = _service.context_servicer._get_metrics()
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def context_client(context_service : ContextService): # pylint: disable=redefined-outer-name,unused-argument
_client = ContextClient()
yield _client
_client.close()
@pytest.hookimpl(hookwrapper=True)
def pytest_terminal_summary(
terminalreporter : TerminalReporter, exitstatus : int, config : Config # pylint: disable=unused-argument
):
yield
method_to_metric_fields : Dict[str, Dict[str, Dict[str, Any]]] = dict()
bucket_bounds : Set[str] = set()
for raw_metric_name,raw_metric_data in RAW_METRICS.items():
if '_COUNTER_' in raw_metric_name:
method_name,metric_name = raw_metric_name.split('_COUNTER_')
elif '_HISTOGRAM_' in raw_metric_name:
method_name,metric_name = raw_metric_name.split('_HISTOGRAM_')
else:
raise Exception('Unsupported metric: {:s}'.format(raw_metric_name)) # pragma: no cover
metric_data = method_to_metric_fields.setdefault(method_name, dict()).setdefault(metric_name, dict())
for field_name,labels,value,_,_ in raw_metric_data._child_samples():
if field_name == '_bucket': bucket_bounds.add(labels['le'])
if len(labels) > 0: field_name = '{:s}:{:s}'.format(field_name, json.dumps(labels, sort_keys=True))
metric_data[field_name] = value
#print('method_to_metric_fields', method_to_metric_fields)
def sort_stats_key(item : List) -> float:
str_duration = str(item[0])
if str_duration == '---': return 0.0
return float(str_duration.replace(' ms', ''))
field_names = ['Method', 'TOT', 'OK', 'ERR', 'avg(Dur)']
bucket_bounds = sorted(bucket_bounds, key=lambda b: float(b))
bucket_column_names = ['<={:s}'.format(bucket_bound) for bucket_bound in bucket_bounds]
field_names.extend(bucket_column_names)
pt_stats = PrettyTable(field_names=field_names, sortby='avg(Dur)', sort_key=sort_stats_key, reversesort=True)
for f in field_names: pt_stats.align[f] = 'r'
for f in ['Method']: pt_stats.align[f] = 'l'
for method_name,metrics in method_to_metric_fields.items():
counter_started_value = int(metrics['STARTED']['_total'])
if counter_started_value == 0:
#pt_stats.add_row([method_name, '---', '---', '---', '---'])
continue
counter_completed_value = int(metrics['COMPLETED']['_total'])
counter_failed_value = int(metrics['FAILED']['_total'])
duration_count_value = float(metrics['DURATION']['_count'])
duration_sum_value = float(metrics['DURATION']['_sum'])
duration_avg_value = duration_sum_value/duration_count_value
method_name, str(counter_started_value), str(counter_completed_value), str(counter_failed_value),
'{:.3f} ms'.format(1000.0 * duration_avg_value),
]
total_count = 0
for bucket_bound in bucket_bounds:
labels = json.dumps({"le": bucket_bound}, sort_keys=True)
bucket_name = '_bucket:{:s}'.format(labels)
accumulated_count = int(metrics['DURATION'][bucket_name])
bucket_count = accumulated_count - total_count
row.append(str(bucket_count) if bucket_count > 0 else '')
total_count = accumulated_count
pt_stats.add_row(row)
for bucket_column_name in bucket_column_names:
col_index = pt_stats._field_names.index(bucket_column_name)
num_non_empties = sum([1 for row in pt_stats._rows if len(row[col_index]) > 0])
if num_non_empties > 0: continue
pt_stats.del_column(bucket_column_name)
print('')
print('Performance Results:')
print(pt_stats.get_string())