Commit 73efb256 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Load Generator component:

- Updated client class methods according to proto
- RequestScheduler: added methods to report status
- Extended servicer class to support parametrization
parent d78a15f1
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@ import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty
from common.proto.load_generator_pb2 import Parameters, Status
from common.proto.load_generator_pb2_grpc import LoadGeneratorServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
@@ -46,12 +47,19 @@ class LoadGeneratorClient:
        self.stub = None

    @RETRY_DECORATOR
    def Start(self, request : Empty) -> Empty:
    def Start(self, request : Parameters) -> Empty:
        LOGGER.debug('Start request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.Start(request)
        LOGGER.debug('Start result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetStatus(self, request : Empty) -> Status:
        LOGGER.debug('GetStatus request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetStatus(request)
        LOGGER.debug('GetStatus result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def Stop(self, request : Empty) -> Empty:
        LOGGER.debug('Stop request: {:s}'.format(grpc_message_to_json_string(request)))
+15 −4
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, logging, pytz, random
import copy, logging, pytz, random, threading
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
@@ -46,13 +46,22 @@ class RequestScheduler:
            timezone=pytz.utc)
        self._parameters = parameters
        self._generator = generator
        self._running = threading.Event()

    @property
    def num_generated(self): return max(0, self._generator.num_requests_generated - 1) # first increases, then checks

    @property
    def infinite_loop(self): return self._parameters.num_requests == 0

    @property
    def running(self): return self._running.is_set()

    def _schedule_request_setup(self) -> None:
        infinite_loop = self._parameters.num_requests == 0
        num_requests_generated = self._generator.num_requests_generated - 1 # because it first increases, then checks
        if not infinite_loop and (num_requests_generated >= self._parameters.num_requests):
        if not self.infinite_loop and (self.num_generated >= self._parameters.num_requests):
            LOGGER.info('Generation Done!')
            #self._scheduler.shutdown()
            self._running.clear()
            return
        iat = random.expovariate(1.0 / self._parameters.inter_arrival_time)
        run_date = datetime.utcnow() + timedelta(seconds=iat)
@@ -66,11 +75,13 @@ class RequestScheduler:
            self._request_teardown, args=(request,), trigger='date', run_date=run_date, timezone=pytz.utc)

    def start(self):
        self._running.set()
        self._schedule_request_setup()
        self._scheduler.start()

    def stop(self):
        self._scheduler.shutdown()
        self._running.clear()

    def _request_setup(self) -> None:
        self._schedule_request_setup()
+27 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from common.proto.load_generator_pb2 import RequestTypeEnum
from load_generator.load_gen.Constants import RequestType

REQUEST_TYPE_MAP = {
    RequestTypeEnum.REQUESTTYPE_SERVICE_L2NM : RequestType.SERVICE_L2NM,
    RequestTypeEnum.REQUESTTYPE_SERVICE_L3NM : RequestType.SERVICE_L3NM,
    RequestTypeEnum.REQUESTTYPE_SERVICE_MW   : RequestType.SERVICE_MW,
    RequestTypeEnum.REQUESTTYPE_SERVICE_TAPI : RequestType.SERVICE_TAPI,
    RequestTypeEnum.REQUESTTYPE_SLICE_L2NM   : RequestType.SLICE_L2NM,
    RequestTypeEnum.REQUESTTYPE_SLICE_L3NM   : RequestType.SLICE_L3NM,
}

REQUEST_TYPE_REVERSE_MAP = {v:k for k,v in REQUEST_TYPE_MAP.items()}
+44 −21
Original line number Diff line number Diff line
@@ -12,43 +12,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional
import grpc, logging
from typing import Optional
from apscheduler.schedulers.background import BackgroundScheduler
from common.proto.context_pb2 import Empty
from common.proto.load_generator_pb2 import Parameters, Status
from common.proto.load_generator_pb2_grpc import LoadGeneratorServiceServicer
from load_generator.load_gen.Constants import RequestType
from load_generator.load_gen.Parameters import Parameters
from load_generator.load_gen.Parameters import Parameters as LoadGen_Parameters
from load_generator.load_gen.RequestGenerator import RequestGenerator
from load_generator.load_gen.RequestScheduler import RequestScheduler
from .Constants import REQUEST_TYPE_MAP, REQUEST_TYPE_REVERSE_MAP

LOGGER = logging.getLogger(__name__)

class LoadGeneratorServiceServicerImpl(LoadGeneratorServiceServicer):
    def __init__(self):
        LOGGER.debug('Creating Servicer...')
        self._parameters = Parameters(
            num_requests = 100,
            request_types = [
                RequestType.SERVICE_L2NM,
                RequestType.SERVICE_L3NM,
                #RequestType.SERVICE_MW,
                #RequestType.SERVICE_TAPI,
                RequestType.SLICE_L2NM,
                RequestType.SLICE_L3NM,
            ],
            offered_load  = 50,
            holding_time  = 10,
            do_teardown   = True,
            dry_mode      = False,           # in dry mode, no request is sent to TeraFlowSDN
            record_to_dlt = False,           # if record_to_dlt, changes in device/link/service/slice are uploaded to DLT
            dlt_domain_id = 'dlt-perf-eval', # domain used to uploaded entities, ignored when record_to_dlt = False
        )
        self._generator : Optional[RequestGenerator] = None
        self._scheduler : Optional[RequestScheduler] = None
        LOGGER.debug('Servicer Created')

    def Start(self, request : Empty, context : grpc.ServicerContext) -> Empty:
    def Start(self, request : Parameters, context : grpc.ServicerContext) -> Empty:
        self._parameters = LoadGen_Parameters(
            num_requests       = request.num_requests,
            request_types      = [REQUEST_TYPE_MAP[rt] for rt in request.request_types],
            offered_load       = request.offered_load if request.offered_load > 1.e-12 else None,
            holding_time       = request.holding_time if request.holding_time > 1.e-12 else None,
            inter_arrival_time = request.inter_arrival_time if request.inter_arrival_time > 1.e-12 else None,
            do_teardown        = request.do_teardown,   # if set, schedule tear down of requests
            dry_mode           = request.dry_mode,      # in dry mode, no request is sent to TeraFlowSDN
            record_to_dlt      = request.record_to_dlt, # if set, upload changes to DLT
            dlt_domain_id      = request.dlt_domain_id, # domain used to uploaded entities (when record_to_dlt = True)
        )

        LOGGER.info('Initializing Generator...')
        self._generator = RequestGenerator(self._parameters)
        self._generator.initialize()
@@ -58,6 +54,33 @@ class LoadGeneratorServiceServicerImpl(LoadGeneratorServiceServicer):
        self._scheduler.start()
        return Empty()

    def GetStatus(self, request : Empty, context : grpc.ServicerContext) -> Status:
        if self._scheduler is None:
            # not started
            status = Status()
            status.num_generated = 0
            status.infinite_loop = False
            status.running       = False
            return status

        params = self._scheduler._parameters
        request_types = [REQUEST_TYPE_REVERSE_MAP[rt] for rt in params.request_types]

        status = Status()
        status.num_generated = self._scheduler.num_generated
        status.infinite_loop = self._scheduler.infinite_loop
        status.running       = self._scheduler.running
        status.parameters.num_requests       = params.num_requests          # pylint: disable=no-member
        status.parameters.offered_load       = params.offered_load          # pylint: disable=no-member
        status.parameters.holding_time       = params.holding_time          # pylint: disable=no-member
        status.parameters.inter_arrival_time = params.inter_arrival_time    # pylint: disable=no-member
        status.parameters.do_teardown        = params.do_teardown           # pylint: disable=no-member
        status.parameters.dry_mode           = params.dry_mode              # pylint: disable=no-member
        status.parameters.record_to_dlt      = params.record_to_dlt         # pylint: disable=no-member
        status.parameters.dlt_domain_id      = params.dlt_domain_id         # pylint: disable=no-member
        status.parameters.request_types.extend(request_types)               # pylint: disable=no-member
        return status

    def Stop(self, request : Empty, context : grpc.ServicerContext) -> Empty:
        if self._scheduler is not None:
            self._scheduler.stop()