diff --git a/src/load_generator/client/LoadGeneratorClient.py b/src/load_generator/client/LoadGeneratorClient.py index 99626bbbb59671af41c11054d34338194f42a6af..2bed40dfdfe13d2920166bcb56237fe84bff8789 100644 --- a/src/load_generator/client/LoadGeneratorClient.py +++ b/src/load_generator/client/LoadGeneratorClient.py @@ -16,6 +16,7 @@ import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty +from common.proto.load_generator_pb2 import Parameters, Status from common.proto.load_generator_pb2_grpc import LoadGeneratorServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string @@ -46,12 +47,19 @@ class LoadGeneratorClient: self.stub = None @RETRY_DECORATOR - def Start(self, request : Empty) -> Empty: + def Start(self, request : Parameters) -> Empty: LOGGER.debug('Start request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.Start(request) LOGGER.debug('Start result: {:s}'.format(grpc_message_to_json_string(response))) return response + @RETRY_DECORATOR + def GetStatus(self, request : Empty) -> Status: + LOGGER.debug('GetStatus request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetStatus(request) + LOGGER.debug('GetStatus result: {:s}'.format(grpc_message_to_json_string(response))) + return response + @RETRY_DECORATOR def Stop(self, request : Empty) -> Empty: LOGGER.debug('Stop request: {:s}'.format(grpc_message_to_json_string(request))) diff --git a/src/load_generator/load_gen/RequestScheduler.py b/src/load_generator/load_gen/RequestScheduler.py index 775da1580a2a6521dbdc8fe32236c1f2adb4b3a7..e1003376a4cffa85b52cf997a470db06617a1020 100644 --- a/src/load_generator/load_gen/RequestScheduler.py +++ b/src/load_generator/load_gen/RequestScheduler.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy, logging, pytz, random +import copy, logging, pytz, random, threading from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.blocking import BlockingScheduler @@ -46,13 +46,22 @@ class RequestScheduler: timezone=pytz.utc) self._parameters = parameters self._generator = generator + self._running = threading.Event() + + @property + def num_generated(self): return max(0, self._generator.num_requests_generated - 1) # first increases, then checks + + @property + def infinite_loop(self): return self._parameters.num_requests == 0 + + @property + def running(self): return self._running.is_set() def _schedule_request_setup(self) -> None: - infinite_loop = self._parameters.num_requests == 0 - num_requests_generated = self._generator.num_requests_generated - 1 # because it first increases, then checks - if not infinite_loop and (num_requests_generated >= self._parameters.num_requests): + if not self.infinite_loop and (self.num_generated >= self._parameters.num_requests): LOGGER.info('Generation Done!') #self._scheduler.shutdown() + self._running.clear() return iat = random.expovariate(1.0 / self._parameters.inter_arrival_time) run_date = datetime.utcnow() + timedelta(seconds=iat) @@ -66,11 +75,13 @@ class RequestScheduler: self._request_teardown, args=(request,), trigger='date', run_date=run_date, timezone=pytz.utc) def start(self): + self._running.set() self._schedule_request_setup() self._scheduler.start() def stop(self): self._scheduler.shutdown() + self._running.clear() def _request_setup(self) -> None: self._schedule_request_setup() diff --git a/src/load_generator/service/Constants.py b/src/load_generator/service/Constants.py new file mode 100644 index 0000000000000000000000000000000000000000..6c339877c70363e874df278d6b5d29cc47a3be0f --- /dev/null +++ b/src/load_generator/service/Constants.py @@ -0,0 +1,27 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.proto.load_generator_pb2 import RequestTypeEnum +from load_generator.load_gen.Constants import RequestType + +REQUEST_TYPE_MAP = { + RequestTypeEnum.REQUESTTYPE_SERVICE_L2NM : RequestType.SERVICE_L2NM, + RequestTypeEnum.REQUESTTYPE_SERVICE_L3NM : RequestType.SERVICE_L3NM, + RequestTypeEnum.REQUESTTYPE_SERVICE_MW : RequestType.SERVICE_MW, + RequestTypeEnum.REQUESTTYPE_SERVICE_TAPI : RequestType.SERVICE_TAPI, + RequestTypeEnum.REQUESTTYPE_SLICE_L2NM : RequestType.SLICE_L2NM, + RequestTypeEnum.REQUESTTYPE_SLICE_L3NM : RequestType.SLICE_L3NM, +} + +REQUEST_TYPE_REVERSE_MAP = {v:k for k,v in REQUEST_TYPE_MAP.items()} diff --git a/src/load_generator/service/LoadGeneratorServiceServicerImpl.py b/src/load_generator/service/LoadGeneratorServiceServicerImpl.py index c280581ddfab488249ff249e60118ec3030e0447..d66b0b2c10c5228e0c3d15759fc46b2c0770154d 100644 --- a/src/load_generator/service/LoadGeneratorServiceServicerImpl.py +++ b/src/load_generator/service/LoadGeneratorServiceServicerImpl.py @@ -12,43 +12,39 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional import grpc, logging +from typing import Optional from apscheduler.schedulers.background import BackgroundScheduler from common.proto.context_pb2 import Empty +from common.proto.load_generator_pb2 import Parameters, Status from common.proto.load_generator_pb2_grpc import LoadGeneratorServiceServicer -from load_generator.load_gen.Constants import RequestType -from load_generator.load_gen.Parameters import Parameters +from load_generator.load_gen.Parameters import Parameters as LoadGen_Parameters from load_generator.load_gen.RequestGenerator import RequestGenerator from load_generator.load_gen.RequestScheduler import RequestScheduler +from .Constants import REQUEST_TYPE_MAP, REQUEST_TYPE_REVERSE_MAP LOGGER = logging.getLogger(__name__) class LoadGeneratorServiceServicerImpl(LoadGeneratorServiceServicer): def __init__(self): LOGGER.debug('Creating Servicer...') - self._parameters = Parameters( - num_requests = 100, - request_types = [ - RequestType.SERVICE_L2NM, - RequestType.SERVICE_L3NM, - #RequestType.SERVICE_MW, - #RequestType.SERVICE_TAPI, - RequestType.SLICE_L2NM, - RequestType.SLICE_L3NM, - ], - offered_load = 50, - holding_time = 10, - do_teardown = True, - dry_mode = False, # in dry mode, no request is sent to TeraFlowSDN - record_to_dlt = False, # if record_to_dlt, changes in device/link/service/slice are uploaded to DLT - dlt_domain_id = 'dlt-perf-eval', # domain used to uploaded entities, ignored when record_to_dlt = False - ) self._generator : Optional[RequestGenerator] = None self._scheduler : Optional[RequestScheduler] = None LOGGER.debug('Servicer Created') - def Start(self, request : Empty, context : grpc.ServicerContext) -> Empty: + def Start(self, request : Parameters, context : grpc.ServicerContext) -> Empty: + self._parameters = LoadGen_Parameters( + num_requests = request.num_requests, + request_types = [REQUEST_TYPE_MAP[rt] for rt in request.request_types], + offered_load = request.offered_load if request.offered_load > 1.e-12 else None, + holding_time = request.holding_time if request.holding_time > 1.e-12 else None, + inter_arrival_time = request.inter_arrival_time if request.inter_arrival_time > 1.e-12 else None, + do_teardown = request.do_teardown, # if set, schedule tear down of requests + dry_mode = request.dry_mode, # in dry mode, no request is sent to TeraFlowSDN + record_to_dlt = request.record_to_dlt, # if set, upload changes to DLT + dlt_domain_id = request.dlt_domain_id, # domain used to uploaded entities (when record_to_dlt = True) + ) + LOGGER.info('Initializing Generator...') self._generator = RequestGenerator(self._parameters) self._generator.initialize() @@ -58,6 +54,33 @@ class LoadGeneratorServiceServicerImpl(LoadGeneratorServiceServicer): self._scheduler.start() return Empty() + def GetStatus(self, request : Empty, context : grpc.ServicerContext) -> Status: + if self._scheduler is None: + # not started + status = Status() + status.num_generated = 0 + status.infinite_loop = False + status.running = False + return status + + params = self._scheduler._parameters + request_types = [REQUEST_TYPE_REVERSE_MAP[rt] for rt in params.request_types] + + status = Status() + status.num_generated = self._scheduler.num_generated + status.infinite_loop = self._scheduler.infinite_loop + status.running = self._scheduler.running + status.parameters.num_requests = params.num_requests # pylint: disable=no-member + status.parameters.offered_load = params.offered_load # pylint: disable=no-member + status.parameters.holding_time = params.holding_time # pylint: disable=no-member + status.parameters.inter_arrival_time = params.inter_arrival_time # pylint: disable=no-member + status.parameters.do_teardown = params.do_teardown # pylint: disable=no-member + status.parameters.dry_mode = params.dry_mode # pylint: disable=no-member + status.parameters.record_to_dlt = params.record_to_dlt # pylint: disable=no-member + status.parameters.dlt_domain_id = params.dlt_domain_id # pylint: disable=no-member + status.parameters.request_types.extend(request_types) # pylint: disable=no-member + return status + def Stop(self, request : Empty, context : grpc.ServicerContext) -> Empty: if self._scheduler is not None: self._scheduler.stop()