diff --git a/manifests/load_generatorservice.yaml b/manifests/load_generatorservice.yaml index 3f65c2c857a39f2b7a5ebeaccd9ddfd4916f2487..7cc6f19122573a612ddca774c3a785bff93f8b38 100644 --- a/manifests/load_generatorservice.yaml +++ b/manifests/load_generatorservice.yaml @@ -33,6 +33,7 @@ spec: imagePullPolicy: Always ports: - containerPort: 50052 + - containerPort: 9192 env: - name: LOG_LEVEL value: "INFO" @@ -65,3 +66,7 @@ spec: protocol: TCP port: 50052 targetPort: 50052 + - name: metrics + protocol: TCP + port: 9192 + targetPort: 9192 diff --git a/manifests/servicemonitors.yaml b/manifests/servicemonitors.yaml index f5da08182a4665b21607987ea97d9bf3cc5b7e21..ec929f757cdf5468a7db7a7c1f1e755611d5327b 100644 --- a/manifests/servicemonitors.yaml +++ b/manifests/servicemonitors.yaml @@ -330,3 +330,32 @@ spec: any: false matchNames: - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-load-generatorservice-metric + labels: + app: load-generatorservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: load-generatorservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running diff --git a/src/load_generator/load_gen/RequestGenerator.py b/src/load_generator/load_gen/RequestGenerator.py index cf56e221db0fbbe3f080d01af45cd36fc4ef56a0..791ff740713f193e0187ee34a7980031c1389195 100644 --- a/src/load_generator/load_gen/RequestGenerator.py +++ b/src/load_generator/load_gen/RequestGenerator.py @@ -186,11 +186,11 @@ class RequestGenerator: self._used_device_endpoints.setdefault(device_uuid, dict()).pop(endpoint_uuid, None) self._available_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid) - def compose_request(self) -> Tuple[bool, Optional[Dict]]: # completed, request + def compose_request(self) -> Tuple[bool, Optional[Dict], str]: # completed, request with self._lock: if not self.infinite_loop and (self._num_generated >= self._parameters.num_requests): LOGGER.info('Generation Done!') - return True, None # completed + return True, None, None # completed self._num_generated += 1 num_request = self._num_generated @@ -203,9 +203,9 @@ class RequestGenerator: if request_type in { RequestType.SERVICE_L2NM, RequestType.SERVICE_L3NM, RequestType.SERVICE_TAPI, RequestType.SERVICE_MW }: - return False, self._compose_service(num_request, request_uuid, request_type) + return False, self._compose_service(num_request, request_uuid, request_type), request_type elif request_type in {RequestType.SLICE_L2NM, RequestType.SLICE_L3NM}: - return False, self._compose_slice(num_request, request_uuid, request_type) + return False, self._compose_slice(num_request, request_uuid, request_type), request_type def _compose_service(self, num_request : int, request_uuid : str, request_type : str) -> Optional[Dict]: # choose source endpoint diff --git a/src/load_generator/load_gen/RequestScheduler.py b/src/load_generator/load_gen/RequestScheduler.py index 773a37eac258f8b3c16c966464ced124d3c77c85..6cee90bc1c70b17e543c70d219a60e5561b4f52b 100644 --- a/src/load_generator/load_gen/RequestScheduler.py +++ b/src/load_generator/load_gen/RequestScheduler.py @@ -18,7 +18,9 @@ from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime, timedelta from typing import Dict, Optional +from common.method_wrappers.Decorator import MetricsPool from common.proto.context_pb2 import Service, ServiceId, Slice, SliceId +from common.tools.grpc.Tools import grpc_message_to_json_string from service.client.ServiceClient import ServiceClient from slice.client.SliceClient import SliceClient from .Constants import MAX_WORKER_THREADS @@ -31,6 +33,10 @@ logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING) LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('LoadGen', 'Requests', labels={ + 'request_type': '' +}) + class RequestScheduler: def __init__( self, parameters : Parameters, generator : RequestGenerator, scheduler_class=BlockingScheduler @@ -64,11 +70,12 @@ class RequestScheduler: self._scheduler.add_job( self._request_setup, trigger='date', run_date=run_date, timezone=pytz.utc) - def _schedule_request_teardown(self, request : Dict) -> None: + def _schedule_request_teardown(self, request : Dict, request_type : str) -> None: ht = random.expovariate(1.0 / self._parameters.holding_time) run_date = datetime.utcnow() + timedelta(seconds=ht) + args = (request, request_type) self._scheduler.add_job( - self._request_teardown, args=(request,), trigger='date', run_date=run_date, timezone=pytz.utc) + self._request_teardown, args=args, trigger='date', run_date=run_date, timezone=pytz.utc) def start(self): self._running.set() @@ -80,7 +87,7 @@ class RequestScheduler: self._running.clear() def _request_setup(self) -> None: - completed,request = self._generator.compose_request() + completed, request, request_type = self._generator.compose_request() if completed: LOGGER.info('Generation Done!') #self._scheduler.shutdown() @@ -101,7 +108,7 @@ class RequestScheduler: dst_endpoint_uuid = request['service_endpoint_ids'][1]['endpoint_uuid']['uuid'] LOGGER.info('Setup Service: uuid=%s src=%s:%s dst=%s:%s', service_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid) - self._create_update(service=request) + self._create_update(request_type, service=request) elif 'slice_id' in request: slice_uuid = request['slice_id']['slice_uuid']['uuid'] @@ -111,12 +118,12 @@ class RequestScheduler: dst_endpoint_uuid = request['slice_endpoint_ids'][1]['endpoint_uuid']['uuid'] LOGGER.info('Setup Slice: uuid=%s src=%s:%s dst=%s:%s', slice_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid) - self._create_update(slice_=request) + self._create_update(request_type, slice_=request) if self._parameters.do_teardown: - self._schedule_request_teardown(request) + self._schedule_request_teardown(request, request_type) - def _request_teardown(self, request : Dict) -> None: + def _request_teardown(self, request : Dict, request_type : str) -> None: if 'service_id' in request: service_uuid = request['service_id']['service_uuid']['uuid'] src_device_uuid = request['service_endpoint_ids'][0]['device_id']['device_uuid']['uuid'] @@ -125,7 +132,7 @@ class RequestScheduler: dst_endpoint_uuid = request['service_endpoint_ids'][1]['endpoint_uuid']['uuid'] LOGGER.info('Teardown Service: uuid=%s src=%s:%s dst=%s:%s', service_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid) - self._delete(service_id=ServiceId(**(request['service_id']))) + self._delete(request_type, service_id=ServiceId(**(request['service_id']))) elif 'slice_id' in request: slice_uuid = request['slice_id']['slice_uuid']['uuid'] @@ -135,33 +142,64 @@ class RequestScheduler: dst_endpoint_uuid = request['slice_endpoint_ids'][1]['endpoint_uuid']['uuid'] LOGGER.info('Teardown Slice: uuid=%s src=%s:%s dst=%s:%s', slice_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid) - self._delete(slice_id=SliceId(**(request['slice_id']))) + self._delete(request_type, slice_id=SliceId(**(request['slice_id']))) self._generator.release_request(request) - def _create_update(self, service : Optional[Dict] = None, slice_ : Optional[Dict] = None) -> None: + def _create_update( + self, request_type : str, service : Optional[Dict] = None, slice_ : Optional[Dict] = None + ) -> None: if self._parameters.dry_mode: return + metrics = METRICS_POOL.get_metrics('setup', labels={'request_type': request_type}) + histogram_duration, counter_started, counter_completed, counter_failed = metrics + service_id = None if service is not None: + service_client = ServiceClient() + service_add = copy.deepcopy(service) service_add['service_endpoint_ids'] = [] service_add['service_constraints'] = [] service_add['service_config'] = {'config_rules': []} + service_add = Service(**service_add) + service = Service(**service) + + with histogram_duration.time(): + try: + counter_started.inc() + service_id = service_client.CreateService(service_add) + service_id = service_client.UpdateService(service) + counter_completed.inc() + except: # pylint: disable=bare-except + counter_failed.inc() + MSG = 'Exception Setting Up Service {:s}' + LOGGER.exception(MSG.format(grpc_message_to_json_string(service))) - service_client = ServiceClient() - service_id = service_client.CreateService(Service(**service_add)) service_client.close() slice_id = None if slice_ is not None: + slice_client = SliceClient() + slice_add = copy.deepcopy(slice_) slice_add['slice_endpoint_ids'] = [] slice_add['slice_constraints'] = [] slice_add['slice_config'] = {'config_rules': []} + slice_add = Slice(**slice_add) + slice_ = Slice(**slice_) + + with histogram_duration.time(): + try: + counter_started.inc() + slice_id = slice_client.CreateSlice(slice_add) + slice_id = slice_client.UpdateSlice(slice_) + counter_completed.inc() + except: # pylint: disable=bare-except + counter_failed.inc() + MSG = 'Exception Setting Up Slice {:s}' + LOGGER.exception(MSG.format(grpc_message_to_json_string(slice_))) - slice_client = SliceClient() - slice_id = slice_client.CreateSlice(Slice(**slice_add)) slice_client.close() if self._parameters.record_to_dlt: @@ -171,41 +209,47 @@ class RequestScheduler: slices_to_record=slices_to_record, services_to_record=services_to_record, devices_to_record=devices_to_record, delete=False) - service_id = None - if service is not None: - service_client = ServiceClient() - service_id = service_client.UpdateService(Service(**service)) - service_client.close() + def _delete( + self, request_type : str, service_id : Optional[ServiceId] = None, slice_id : Optional[SliceId] = None + ) -> None: + if self._parameters.dry_mode: return - slice_id = None - if slice_ is not None: - slice_client = SliceClient() - slice_id = slice_client.UpdateSlice(Slice(**slice_)) - slice_client.close() + metrics = METRICS_POOL.get_metrics('teardown', labels={'request_type': request_type}) + histogram_duration, counter_started, counter_completed, counter_failed = metrics if self._parameters.record_to_dlt: entities_to_record = explore_entities_to_record(slice_id=slice_id, service_id=service_id) slices_to_record, services_to_record, devices_to_record = entities_to_record - record_entities( - slices_to_record=slices_to_record, services_to_record=services_to_record, - devices_to_record=devices_to_record, delete=False) - def _delete(self, service_id : Optional[ServiceId] = None, slice_id : Optional[SliceId] = None) -> None: - if self._parameters.dry_mode: return + if service_id is not None: + service_client = ServiceClient() - if self._parameters.record_to_dlt: - entities_to_record = explore_entities_to_record(slice_id=slice_id, service_id=service_id) - slices_to_record, services_to_record, devices_to_record = entities_to_record + with histogram_duration.time(): + try: + counter_started.inc() + service_client.DeleteService(service_id) + counter_completed.inc() + except: # pylint: disable=bare-except + counter_failed.inc() + MSG = 'Exception Tearing Down Service {:s}' + LOGGER.exception(MSG.format(grpc_message_to_json_string(service_id))) + + service_client.close() if slice_id is not None: slice_client = SliceClient() - slice_client.DeleteSlice(slice_id) - slice_client.close() - if service_id is not None: - service_client = ServiceClient() - service_client.DeleteService(service_id) - service_client.close() + with histogram_duration.time(): + try: + counter_started.inc() + slice_client.DeleteSlice(slice_id) + counter_completed.inc() + except: # pylint: disable=bare-except + counter_failed.inc() + MSG = 'Exception Tearing Down Slice {:s}' + LOGGER.exception(MSG.format(grpc_message_to_json_string(slice_id))) + + slice_client.close() if self._parameters.record_to_dlt: record_entities( diff --git a/src/load_generator/service/__main__.py b/src/load_generator/service/__main__.py index 227099c59aa57f420c842a6210f3b8b146b23cda..054a6245d6ddc321a061cb02be655d5feb894eef 100644 --- a/src/load_generator/service/__main__.py +++ b/src/load_generator/service/__main__.py @@ -13,14 +13,15 @@ # limitations under the License. import logging, signal, sys, threading +from prometheus_client import start_http_server from common.Constants import ServiceNameEnum from common.Settings import ( - ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables) from .LoadGeneratorService import LoadGeneratorService -log_level = get_log_level() -logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") +LOG_LEVEL = get_log_level() +logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) terminate = threading.Event() @@ -39,10 +40,14 @@ def main(): get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) + LOGGER.info('Starting...') signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - LOGGER.info('Starting...') + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + # Starting load generator service grpc_service = LoadGeneratorService()