Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from apscheduler.schedulers.background import BackgroundScheduler
from common.proto.context_pb2 import Empty
from common.proto.load_generator_pb2 import Parameters, Status
from common.proto.load_generator_pb2_grpc import LoadGeneratorServiceServicer
from load_generator.load_gen.Parameters import Parameters as LoadGen_Parameters
from load_generator.load_gen.RequestGenerator import RequestGenerator
from load_generator.load_gen.RequestScheduler import RequestScheduler
from load_generator.tools.ListScalarRange import grpc__to__list_scalar_range, list_scalar_range__to__grpc
from .Constants import REQUEST_TYPE_MAP, REQUEST_TYPE_REVERSE_MAP
LOGGER = logging.getLogger(__name__)
class LoadGeneratorServiceServicerImpl(LoadGeneratorServiceServicer):
def __init__(self):
LOGGER.debug('Creating Servicer...')
self._generator : Optional[RequestGenerator] = None
self._scheduler : Optional[RequestScheduler] = None
LOGGER.debug('Servicer Created')
def Start(self, request : Parameters, context : grpc.ServicerContext) -> Empty:
self._parameters = LoadGen_Parameters(
num_requests = request.num_requests,
request_types = [REQUEST_TYPE_MAP[rt] for rt in request.request_types],
offered_load = request.offered_load if request.offered_load > 1.e-12 else None,
holding_time = request.holding_time if request.holding_time > 1.e-12 else None,
inter_arrival_time = request.inter_arrival_time if request.inter_arrival_time > 1.e-12 else None,
availability_ranges = grpc__to__list_scalar_range(request.availability ),
capacity_gbps_ranges = grpc__to__list_scalar_range(request.capacity_gbps ),
e2e_latency_ms_ranges = grpc__to__list_scalar_range(request.e2e_latency_ms),
do_teardown = request.do_teardown, # if set, schedule tear down of requests
dry_mode = request.dry_mode, # in dry mode, no request is sent to TeraFlowSDN
record_to_dlt = request.record_to_dlt, # if set, upload changes to DLT
dlt_domain_id = request.dlt_domain_id, # domain used to uploaded entities (when record_to_dlt = True)
LOGGER.info('Initializing Generator...')
self._generator = RequestGenerator(self._parameters)
self._generator.initialize()
LOGGER.info('Running Schedule...')
self._scheduler = RequestScheduler(self._parameters, self._generator, scheduler_class=BackgroundScheduler)
self._scheduler.start()
return Empty()
def GetStatus(self, request : Empty, context : grpc.ServicerContext) -> Status:
if self._scheduler is None:
# not started
status = Status()
status.num_generated = 0
status.infinite_loop = False
status.running = False
return status
params = self._scheduler._parameters
request_types = [REQUEST_TYPE_REVERSE_MAP[rt] for rt in params.request_types]
status = Status()
status.num_generated = self._scheduler.num_generated
status.infinite_loop = self._scheduler.infinite_loop
status.running = self._scheduler.running
status.parameters.num_requests = params.num_requests # pylint: disable=no-member
status.parameters.offered_load = params.offered_load # pylint: disable=no-member
status.parameters.holding_time = params.holding_time # pylint: disable=no-member
status.parameters.inter_arrival_time = params.inter_arrival_time # pylint: disable=no-member
status.parameters.do_teardown = params.do_teardown # pylint: disable=no-member
status.parameters.dry_mode = params.dry_mode # pylint: disable=no-member
status.parameters.record_to_dlt = params.record_to_dlt # pylint: disable=no-member
status.parameters.dlt_domain_id = params.dlt_domain_id # pylint: disable=no-member
status.parameters.request_types.extend(request_types) # pylint: disable=no-member
list_scalar_range__to__grpc(params.availability_ranges, status.availability ) # pylint: disable=no-member
list_scalar_range__to__grpc(params.capacity_gbps_ranges, status.capacity_gbps ) # pylint: disable=no-member
list_scalar_range__to__grpc(params.e2e_latency_ms_ranges, status.e2e_latency_ms) # pylint: disable=no-member
def Stop(self, request : Empty, context : grpc.ServicerContext) -> Empty:
if self._scheduler is not None:
self._scheduler.stop()
return Empty()