Skip to content
Snippets Groups Projects
Commit d43ad45f authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Load Generator component:

- Added metrics port to manifest
- Added service monitor manifest
- Added metrics server
- Extended RequestGenerator to retrieve request_type
- Extended RequestScheduler to monitor setup and teardown requests and type of requests
parent 8362b0d2
No related branches found
No related tags found
2 merge requests!142Release TeraFlowSDN 2.1,!92New features in Load Generator
......@@ -33,6 +33,7 @@ spec:
imagePullPolicy: Always
ports:
- containerPort: 50052
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
......@@ -65,3 +66,7 @@ spec:
protocol: TCP
port: 50052
targetPort: 50052
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
......@@ -330,3 +330,32 @@ spec:
any: false
matchNames:
- tfs # namespace where the app is running
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
namespace: monitoring # namespace where prometheus is running
name: tfs-load-generatorservice-metric
labels:
app: load-generatorservice
#release: prometheus
#release: prom # name of the release
# ( VERY IMPORTANT: You need to know the correct release name by viewing
# the servicemonitor of Prometheus itself: Without the correct name,
# Prometheus cannot identify the metrics of the Flask app as the target.)
spec:
selector:
matchLabels:
# Target app service
#namespace: tfs
app: load-generatorservice # same as above
#release: prometheus # same as above
endpoints:
- port: metrics # named port in target app
scheme: http
path: /metrics # path to scrape
interval: 5s # scrape interval
namespaceSelector:
any: false
matchNames:
- tfs # namespace where the app is running
......@@ -186,11 +186,11 @@ class RequestGenerator:
self._used_device_endpoints.setdefault(device_uuid, dict()).pop(endpoint_uuid, None)
self._available_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
def compose_request(self) -> Tuple[bool, Optional[Dict]]: # completed, request
def compose_request(self) -> Tuple[bool, Optional[Dict], str]: # completed, request
with self._lock:
if not self.infinite_loop and (self._num_generated >= self._parameters.num_requests):
LOGGER.info('Generation Done!')
return True, None # completed
return True, None, None # completed
self._num_generated += 1
num_request = self._num_generated
......@@ -203,9 +203,9 @@ class RequestGenerator:
if request_type in {
RequestType.SERVICE_L2NM, RequestType.SERVICE_L3NM, RequestType.SERVICE_TAPI, RequestType.SERVICE_MW
}:
return False, self._compose_service(num_request, request_uuid, request_type)
return False, self._compose_service(num_request, request_uuid, request_type), request_type
elif request_type in {RequestType.SLICE_L2NM, RequestType.SLICE_L3NM}:
return False, self._compose_slice(num_request, request_uuid, request_type)
return False, self._compose_slice(num_request, request_uuid, request_type), request_type
def _compose_service(self, num_request : int, request_uuid : str, request_type : str) -> Optional[Dict]:
# choose source endpoint
......
......@@ -18,7 +18,9 @@ from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta
from typing import Dict, Optional
from common.method_wrappers.Decorator import MetricsPool
from common.proto.context_pb2 import Service, ServiceId, Slice, SliceId
from common.tools.grpc.Tools import grpc_message_to_json_string
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from .Constants import MAX_WORKER_THREADS
......@@ -31,6 +33,10 @@ logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('LoadGen', 'Requests', labels={
'request_type': ''
})
class RequestScheduler:
def __init__(
self, parameters : Parameters, generator : RequestGenerator, scheduler_class=BlockingScheduler
......@@ -64,11 +70,12 @@ class RequestScheduler:
self._scheduler.add_job(
self._request_setup, trigger='date', run_date=run_date, timezone=pytz.utc)
def _schedule_request_teardown(self, request : Dict) -> None:
def _schedule_request_teardown(self, request : Dict, request_type : str) -> None:
ht = random.expovariate(1.0 / self._parameters.holding_time)
run_date = datetime.utcnow() + timedelta(seconds=ht)
args = (request, request_type)
self._scheduler.add_job(
self._request_teardown, args=(request,), trigger='date', run_date=run_date, timezone=pytz.utc)
self._request_teardown, args=args, trigger='date', run_date=run_date, timezone=pytz.utc)
def start(self):
self._running.set()
......@@ -80,7 +87,7 @@ class RequestScheduler:
self._running.clear()
def _request_setup(self) -> None:
completed,request = self._generator.compose_request()
completed, request, request_type = self._generator.compose_request()
if completed:
LOGGER.info('Generation Done!')
#self._scheduler.shutdown()
......@@ -101,7 +108,7 @@ class RequestScheduler:
dst_endpoint_uuid = request['service_endpoint_ids'][1]['endpoint_uuid']['uuid']
LOGGER.info('Setup Service: uuid=%s src=%s:%s dst=%s:%s',
service_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid)
self._create_update(service=request)
self._create_update(request_type, service=request)
elif 'slice_id' in request:
slice_uuid = request['slice_id']['slice_uuid']['uuid']
......@@ -111,12 +118,12 @@ class RequestScheduler:
dst_endpoint_uuid = request['slice_endpoint_ids'][1]['endpoint_uuid']['uuid']
LOGGER.info('Setup Slice: uuid=%s src=%s:%s dst=%s:%s',
slice_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid)
self._create_update(slice_=request)
self._create_update(request_type, slice_=request)
if self._parameters.do_teardown:
self._schedule_request_teardown(request)
self._schedule_request_teardown(request, request_type)
def _request_teardown(self, request : Dict) -> None:
def _request_teardown(self, request : Dict, request_type : str) -> None:
if 'service_id' in request:
service_uuid = request['service_id']['service_uuid']['uuid']
src_device_uuid = request['service_endpoint_ids'][0]['device_id']['device_uuid']['uuid']
......@@ -125,7 +132,7 @@ class RequestScheduler:
dst_endpoint_uuid = request['service_endpoint_ids'][1]['endpoint_uuid']['uuid']
LOGGER.info('Teardown Service: uuid=%s src=%s:%s dst=%s:%s',
service_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid)
self._delete(service_id=ServiceId(**(request['service_id'])))
self._delete(request_type, service_id=ServiceId(**(request['service_id'])))
elif 'slice_id' in request:
slice_uuid = request['slice_id']['slice_uuid']['uuid']
......@@ -135,33 +142,64 @@ class RequestScheduler:
dst_endpoint_uuid = request['slice_endpoint_ids'][1]['endpoint_uuid']['uuid']
LOGGER.info('Teardown Slice: uuid=%s src=%s:%s dst=%s:%s',
slice_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid)
self._delete(slice_id=SliceId(**(request['slice_id'])))
self._delete(request_type, slice_id=SliceId(**(request['slice_id'])))
self._generator.release_request(request)
def _create_update(self, service : Optional[Dict] = None, slice_ : Optional[Dict] = None) -> None:
def _create_update(
self, request_type : str, service : Optional[Dict] = None, slice_ : Optional[Dict] = None
) -> None:
if self._parameters.dry_mode: return
metrics = METRICS_POOL.get_metrics('setup', labels={'request_type': request_type})
histogram_duration, counter_started, counter_completed, counter_failed = metrics
service_id = None
if service is not None:
service_client = ServiceClient()
service_add = copy.deepcopy(service)
service_add['service_endpoint_ids'] = []
service_add['service_constraints'] = []
service_add['service_config'] = {'config_rules': []}
service_add = Service(**service_add)
service = Service(**service)
with histogram_duration.time():
try:
counter_started.inc()
service_id = service_client.CreateService(service_add)
service_id = service_client.UpdateService(service)
counter_completed.inc()
except: # pylint: disable=bare-except
counter_failed.inc()
MSG = 'Exception Setting Up Service {:s}'
LOGGER.exception(MSG.format(grpc_message_to_json_string(service)))
service_client = ServiceClient()
service_id = service_client.CreateService(Service(**service_add))
service_client.close()
slice_id = None
if slice_ is not None:
slice_client = SliceClient()
slice_add = copy.deepcopy(slice_)
slice_add['slice_endpoint_ids'] = []
slice_add['slice_constraints'] = []
slice_add['slice_config'] = {'config_rules': []}
slice_add = Slice(**slice_add)
slice_ = Slice(**slice_)
with histogram_duration.time():
try:
counter_started.inc()
slice_id = slice_client.CreateSlice(slice_add)
slice_id = slice_client.UpdateSlice(slice_)
counter_completed.inc()
except: # pylint: disable=bare-except
counter_failed.inc()
MSG = 'Exception Setting Up Slice {:s}'
LOGGER.exception(MSG.format(grpc_message_to_json_string(slice_)))
slice_client = SliceClient()
slice_id = slice_client.CreateSlice(Slice(**slice_add))
slice_client.close()
if self._parameters.record_to_dlt:
......@@ -171,41 +209,47 @@ class RequestScheduler:
slices_to_record=slices_to_record, services_to_record=services_to_record,
devices_to_record=devices_to_record, delete=False)
service_id = None
if service is not None:
service_client = ServiceClient()
service_id = service_client.UpdateService(Service(**service))
service_client.close()
def _delete(
self, request_type : str, service_id : Optional[ServiceId] = None, slice_id : Optional[SliceId] = None
) -> None:
if self._parameters.dry_mode: return
slice_id = None
if slice_ is not None:
slice_client = SliceClient()
slice_id = slice_client.UpdateSlice(Slice(**slice_))
slice_client.close()
metrics = METRICS_POOL.get_metrics('teardown', labels={'request_type': request_type})
histogram_duration, counter_started, counter_completed, counter_failed = metrics
if self._parameters.record_to_dlt:
entities_to_record = explore_entities_to_record(slice_id=slice_id, service_id=service_id)
slices_to_record, services_to_record, devices_to_record = entities_to_record
record_entities(
slices_to_record=slices_to_record, services_to_record=services_to_record,
devices_to_record=devices_to_record, delete=False)
def _delete(self, service_id : Optional[ServiceId] = None, slice_id : Optional[SliceId] = None) -> None:
if self._parameters.dry_mode: return
if service_id is not None:
service_client = ServiceClient()
if self._parameters.record_to_dlt:
entities_to_record = explore_entities_to_record(slice_id=slice_id, service_id=service_id)
slices_to_record, services_to_record, devices_to_record = entities_to_record
with histogram_duration.time():
try:
counter_started.inc()
service_client.DeleteService(service_id)
counter_completed.inc()
except: # pylint: disable=bare-except
counter_failed.inc()
MSG = 'Exception Tearing Down Service {:s}'
LOGGER.exception(MSG.format(grpc_message_to_json_string(service_id)))
service_client.close()
if slice_id is not None:
slice_client = SliceClient()
slice_client.DeleteSlice(slice_id)
slice_client.close()
if service_id is not None:
service_client = ServiceClient()
service_client.DeleteService(service_id)
service_client.close()
with histogram_duration.time():
try:
counter_started.inc()
slice_client.DeleteSlice(slice_id)
counter_completed.inc()
except: # pylint: disable=bare-except
counter_failed.inc()
MSG = 'Exception Tearing Down Slice {:s}'
LOGGER.exception(MSG.format(grpc_message_to_json_string(slice_id)))
slice_client.close()
if self._parameters.record_to_dlt:
record_entities(
......
......@@ -13,14 +13,15 @@
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level,
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables)
from .LoadGeneratorService import LoadGeneratorService
log_level = get_log_level()
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOG_LEVEL = get_log_level()
logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
terminate = threading.Event()
......@@ -39,10 +40,14 @@ def main():
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
LOGGER.info('Starting...')
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Starting load generator service
grpc_service = LoadGeneratorService()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment