import grpc, logging from enum import Enum from typing import Dict, List from prometheus_client import Counter, Histogram from prometheus_client.metrics import MetricWrapperBase from common.exceptions.ServiceException import ServiceException class RequestConditionEnum(Enum): STARTED = 'started' COMPLETED = 'completed' FAILED = 'failed' def get_counter_requests(method_name : str, request_condition : RequestConditionEnum) -> Counter: str_request_condition = request_condition.value name = '{:s}_counter_requests_{:s}'.format(method_name.replace(':', '_'), str_request_condition) description = '{:s} counter of requests {:s}'.format(method_name, str_request_condition) return Counter(name, description) def get_histogram_duration(method_name : str) -> Histogram: name = '{:s}_histogram_duration'.format(method_name.replace(':', '_')) description = '{:s} histogram of request duration'.format(method_name) return Histogram(name, description) METRIC_TEMPLATES = { '{:s}_COUNTER_STARTED' : lambda method_name: get_counter_requests (method_name, RequestConditionEnum.STARTED), '{:s}_COUNTER_COMPLETED' : lambda method_name: get_counter_requests (method_name, RequestConditionEnum.COMPLETED), '{:s}_COUNTER_FAILED' : lambda method_name: get_counter_requests (method_name, RequestConditionEnum.FAILED), '{:s}_HISTOGRAM_DURATION': lambda method_name: get_histogram_duration(method_name), } def create_metrics(service_name : str, method_names : List[str]) -> Dict[str, MetricWrapperBase]: metrics = {} for method_name in method_names: for template_name, template_generator_function in METRIC_TEMPLATES.items(): metric_name = template_name.format(method_name).upper() metrics[metric_name] = template_generator_function('{:s}:{:s}'.format(service_name, method_name)) return metrics def safe_and_metered_rpc_method(metrics : Dict[str, MetricWrapperBase], logger : logging.Logger): def outer_wrapper(func): function_name = func.__name__ print('function_name', function_name) HISTOGRAM_DURATION : Histogram = metrics.get('{:s}_HISTOGRAM_DURATION'.format(function_name).upper()) COUNTER_STARTED : Counter = metrics.get('{:s}_COUNTER_STARTED' .format(function_name).upper()) COUNTER_COMPLETED : Counter = metrics.get('{:s}_COUNTER_COMPLETED' .format(function_name).upper()) COUNTER_FAILED : Counter = metrics.get('{:s}_COUNTER_FAILED' .format(function_name).upper()) @HISTOGRAM_DURATION.time() def inner_wrapper(self, request, grpc_context : grpc.ServicerContext): COUNTER_STARTED.inc() try: logger.debug('{:s} request: {:s}'.format(function_name, str(request))) reply = func(self, request, grpc_context) logger.debug('{:s} reply: {:s}'.format(function_name, str(reply))) COUNTER_COMPLETED.inc() return reply except ServiceException as e: # pragma: no cover (ServiceException not thrown) logger.exception('{:s} exception'.format(function_name)) COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover logger.exception('{:s} exception'.format(function_name)) COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) return inner_wrapper return outer_wrapper