diff --git a/src/load_generator/load_gen/RequestGenerator.py b/src/load_generator/load_gen/RequestGenerator.py index 906c26e98a75fe3c8f15d628f863faac4ba2ea16..a6d14307eee9bbc531e09495d4b650e361aa3d26 100644 --- a/src/load_generator/load_gen/RequestGenerator.py +++ b/src/load_generator/load_gen/RequestGenerator.py @@ -14,9 +14,11 @@ import logging, json, random, threading from typing import Dict, Optional, Set, Tuple -from common.proto.context_pb2 import Empty, TopologyId +from common.proto.context_pb2 import Empty, IsolationLevelEnum, TopologyId from common.tools.grpc.Tools import grpc_message_to_json -from common.tools.object_factory.Constraint import json_constraint_custom +from common.tools.object_factory.Constraint import ( + json_constraint_sla_availability, json_constraint_sla_capacity, json_constraint_sla_isolation, + json_constraint_sla_latency) from common.tools.object_factory.ConfigRule import json_config_rule_set from common.tools.object_factory.Device import json_device_id from common.tools.object_factory.EndPoint import json_endpoint_id @@ -36,7 +38,7 @@ class RequestGenerator: def __init__(self, parameters : Parameters) -> None: self._parameters = parameters self._lock = threading.Lock() - self._num_requests = 0 + self._num_generated = 0 self._available_device_endpoints : Dict[str, Set[str]] = dict() self._used_device_endpoints : Dict[str, Dict[str, str]] = dict() self._endpoint_ids_to_types : Dict[Tuple[str, str], str] = dict() @@ -45,6 +47,12 @@ class RequestGenerator: self._device_data : Dict[str, Dict] = dict() self._device_endpoint_data : Dict[str, Dict[str, Dict]] = dict() + @property + def num_generated(self): return self._num_generated + + @property + def infinite_loop(self): return self._parameters.num_requests == 0 + def initialize(self) -> None: with self._lock: self._available_device_endpoints.clear() @@ -96,17 +104,14 @@ class RequestGenerator: if self._parameters.record_to_dlt: record_link_to_dlt(dlt_connector_client, dlt_domain_id, link.link_id) - @property - def num_requests_generated(self): return self._num_requests - def dump_state(self) -> None: with self._lock: _endpoints = { device_uuid:[endpoint_uuid for endpoint_uuid in endpoint_uuids] for device_uuid,endpoint_uuids in self._available_device_endpoints.items() } - LOGGER.info('[dump_state] available_device_endpoints = {:s}'.format(json.dumps(_endpoints))) - LOGGER.info('[dump_state] used_device_endpoints = {:s}'.format(json.dumps(self._used_device_endpoints))) + LOGGER.debug('[dump_state] available_device_endpoints = {:s}'.format(json.dumps(_endpoints))) + LOGGER.debug('[dump_state] used_device_endpoints = {:s}'.format(json.dumps(self._used_device_endpoints))) def _use_device_endpoint( self, service_uuid : str, request_type : RequestType, endpoint_types : Optional[Set[str]] = None, @@ -167,10 +172,13 @@ class RequestGenerator: self._used_device_endpoints.setdefault(device_uuid, dict()).pop(endpoint_uuid, None) self._available_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid) - def compose_request(self) -> Optional[Dict]: + def compose_request(self) -> Tuple[bool, Optional[Dict]]: # completed, request with self._lock: - self._num_requests += 1 - num_request = self._num_requests + if not self.infinite_loop and (self._num_generated >= self._parameters.num_requests): + LOGGER.info('Generation Done!') + return True, None # completed + self._num_generated += 1 + num_request = self._num_generated #request_uuid = str(uuid.uuid4()) request_uuid = 'svc_{:d}'.format(num_request) @@ -181,9 +189,9 @@ class RequestGenerator: if request_type in { RequestType.SERVICE_L2NM, RequestType.SERVICE_L3NM, RequestType.SERVICE_TAPI, RequestType.SERVICE_MW }: - return self._compose_service(num_request, request_uuid, request_type) + return False, self._compose_service(num_request, request_uuid, request_type) elif request_type in {RequestType.SLICE_L2NM, RequestType.SLICE_L3NM}: - return self._compose_slice(num_request, request_uuid, request_type) + return False, self._compose_slice(num_request, request_uuid, request_type) def _compose_service(self, num_request : int, request_uuid : str, request_type : str) -> Optional[Dict]: # choose source endpoint @@ -222,10 +230,17 @@ class RequestGenerator: ] if request_type == RequestType.SERVICE_L2NM: + availability = int(random.uniform(00.0, 99.99) * 100.0) / 100.0 + capacity_gbps = int(random.uniform(0.1, 100.0) * 100.0) / 100.0 + e2e_latency_ms = int(random.uniform(5.0, 100.0) * 100.0) / 100.0 + constraints = [ - json_constraint_custom('bandwidth[gbps]', '10.0'), - json_constraint_custom('latency[ms]', '20.0'), + json_constraint_sla_availability(1, True, availability), + json_constraint_sla_capacity(capacity_gbps), + json_constraint_sla_isolation([IsolationLevelEnum.NO_ISOLATION]), + json_constraint_sla_latency(e2e_latency_ms), ] + vlan_id = num_request % 1000 circuit_id = '{:03d}'.format(vlan_id) @@ -260,10 +275,17 @@ class RequestGenerator: request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules) elif request_type == RequestType.SERVICE_L3NM: + availability = int(random.uniform(00.0, 99.99) * 100.0) / 100.0 + capacity_gbps = int(random.uniform(0.1, 100.0) * 100.0) / 100.0 + e2e_latency_ms = int(random.uniform(5.0, 100.0) * 100.0) / 100.0 + constraints = [ - json_constraint_custom('bandwidth[gbps]', '10.0'), - json_constraint_custom('latency[ms]', '20.0'), + json_constraint_sla_availability(1, True, availability), + json_constraint_sla_capacity(capacity_gbps), + json_constraint_sla_isolation([IsolationLevelEnum.NO_ISOLATION]), + json_constraint_sla_latency(e2e_latency_ms), ] + vlan_id = num_request % 1000 bgp_as = 60000 + (num_request % 10000) bgp_route_target = '{:5d}:{:03d}'.format(bgp_as, 333) @@ -357,9 +379,15 @@ class RequestGenerator: json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid), json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid), ] + + availability = int(random.uniform(00.0, 99.99) * 100.0) / 100.0 + capacity_gbps = int(random.uniform(0.1, 100.0) * 100.0) / 100.0 + e2e_latency_ms = int(random.uniform(5.0, 100.0) * 100.0) / 100.0 constraints = [ - json_constraint_custom('bandwidth[gbps]', '10.0'), - json_constraint_custom('latency[ms]', '20.0'), + json_constraint_sla_availability(1, True, availability), + json_constraint_sla_capacity(capacity_gbps), + json_constraint_sla_isolation([IsolationLevelEnum.NO_ISOLATION]), + json_constraint_sla_latency(e2e_latency_ms), ] if request_type == RequestType.SLICE_L2NM: diff --git a/src/load_generator/load_gen/RequestScheduler.py b/src/load_generator/load_gen/RequestScheduler.py index 775da1580a2a6521dbdc8fe32236c1f2adb4b3a7..13ae70deb2dfeff74ef19c8eb0e7d2657a72c6fb 100644 --- a/src/load_generator/load_gen/RequestScheduler.py +++ b/src/load_generator/load_gen/RequestScheduler.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy, logging, pytz, random +import copy, logging, pytz, random, threading from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.schedulers.blocking import BlockingScheduler @@ -46,14 +46,18 @@ class RequestScheduler: timezone=pytz.utc) self._parameters = parameters self._generator = generator + self._running = threading.Event() + + @property + def num_generated(self): return max(self._generator.num_generated, self._parameters.num_requests) + + @property + def infinite_loop(self): return self._generator.infinite_loop + + @property + def running(self): return self._running.is_set() def _schedule_request_setup(self) -> None: - infinite_loop = self._parameters.num_requests == 0 - num_requests_generated = self._generator.num_requests_generated - 1 # because it first increases, then checks - if not infinite_loop and (num_requests_generated >= self._parameters.num_requests): - LOGGER.info('Generation Done!') - #self._scheduler.shutdown() - return iat = random.expovariate(1.0 / self._parameters.inter_arrival_time) run_date = datetime.utcnow() + timedelta(seconds=iat) self._scheduler.add_job( @@ -66,16 +70,24 @@ class RequestScheduler: self._request_teardown, args=(request,), trigger='date', run_date=run_date, timezone=pytz.utc) def start(self): + self._running.set() self._schedule_request_setup() self._scheduler.start() def stop(self): self._scheduler.shutdown() + self._running.clear() def _request_setup(self) -> None: - self._schedule_request_setup() + completed,request = self._generator.compose_request() + if completed: + LOGGER.info('Generation Done!') + #self._scheduler.shutdown() + self._running.clear() + return + else: + self._schedule_request_setup() - request = self._generator.compose_request() if request is None: LOGGER.warning('No resources available to compose new request') return