diff --git a/src/dlt/connector/client/DltEventsCollector.py b/src/dlt/connector/client/DltEventsCollector.py index 9f929d7ccf4a8f54c6f87304ec05bb9086f8522c..f35d5c7cca91c89cc8fdb0f254a1927e7d76486c 100644 --- a/src/dlt/connector/client/DltEventsCollector.py +++ b/src/dlt/connector/client/DltEventsCollector.py @@ -12,27 +12,40 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Callable, Optional import grpc, logging, queue, threading, time -from common.proto.dlt_gateway_pb2 import DltRecordSubscription +from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription from common.tools.grpc.Tools import grpc_message_to_json_string from dlt.connector.client.DltGatewayClient import DltGatewayClient LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) +# This class accepts an event_handler method as attribute that can be used to pre-process and +# filter events before they reach the events_queue. Depending on the handler, the supported +# behaviors are: +# - If the handler is not set, the events are transparently added to the events_queue. +# - If returns None for an event, the event is not stored in the events_queue. +# - If returns a DltRecordEvent object for an event, the returned event is stored in the events_queue. +# - Other combinations are not supported. + class DltEventsCollector(threading.Thread): def __init__( self, dltgateway_client : DltGatewayClient, log_events_received : bool = False, + event_handler : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None, ) -> None: super().__init__(name='DltEventsCollector', daemon=True) self._dltgateway_client = dltgateway_client self._log_events_received = log_events_received + self._event_handler = event_handler self._events_queue = queue.Queue() self._terminate = threading.Event() self._dltgateway_stream = None def run(self) -> None: + event_handler = self._event_handler + if event_handler is None: event_handler = lambda e: e self._terminate.clear() while not self._terminate.is_set(): try: @@ -41,6 +54,11 @@ class DltEventsCollector(threading.Thread): for event in self._dltgateway_stream: if self._log_events_received: LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event))) + event = event_handler(event) + if event is None: continue + if not isinstance(event, DltRecordEvent): + # pylint: disable=broad-exception-raised + raise Exception('Unsupported return type: {:s}'.format(str(event))) self._events_queue.put_nowait(event) except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member diff --git a/src/dlt/connector/main_test.py b/src/dlt/connector/tests/basic.py similarity index 94% rename from src/dlt/connector/main_test.py rename to src/dlt/connector/tests/basic.py index 679be72870cf84b790553e2d43ed9961a1d66379..cc832cdc88475b862c841a6037d229b7675f283a 100644 --- a/src/dlt/connector/main_test.py +++ b/src/dlt/connector/tests/basic.py @@ -14,7 +14,7 @@ # pip install grpcio==1.47.0 grpcio-tools==1.47.0 protobuf==3.20.1 # PYTHONPATH=./src python -# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.main_test +# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.tests.basic import logging, sys, time from common.proto.dlt_gateway_pb2 import ( @@ -23,8 +23,8 @@ from common.proto.dlt_gateway_pb2 import ( from common.tools.object_factory.Device import json_device from common.tools.grpc.Tools import grpc_message_to_json_string from src.common.proto.context_pb2 import DEVICEOPERATIONALSTATUS_ENABLED, Device -from .client.DltGatewayClient import DltGatewayClient -from .client.DltEventsCollector import DltEventsCollector +from ..client.DltGatewayClient import DltGatewayClient +from ..client.DltEventsCollector import DltEventsCollector logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) diff --git a/src/dlt/connector/tests/performance/__init__.py b/src/dlt/connector/tests/performance/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..38d04994fb0fa1951fb465bc127eb72659dc2eaf --- /dev/null +++ b/src/dlt/connector/tests/performance/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/dlt/connector/tests/performance/__main__.py b/src/dlt/connector/tests/performance/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..91625ea2bb4e01de91c1e86334486bc63a1983d7 --- /dev/null +++ b/src/dlt/connector/tests/performance/__main__.py @@ -0,0 +1,96 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vary operation: create, update, delete +# vary record type (device, link, service, slice) +# for devices, services, slices, vary size: num endpoints, num constraints, num config rules +# measure load/store time +# measure event notification time + +# docker build -t dlt-gateway:test -f ./src/dlt/gateway/Dockerfile . +# docker run --name dlt-gateway -d -p 50051:50051 --network=tfs-br dlt-gateway:test +# pip install grpcio==1.47.0 grpcio-tools==1.47.0 protobuf==3.20.1 +# PYTHONPATH=./src python +# PYTHONPATH=./src python -m dlt.connector.tests.performance + +import functools, logging, sys, time +from common.proto.dlt_gateway_pb2 import DltRecordEvent +from dlt.connector.client.DltGatewayClient import DltGatewayClient +from dlt.connector.client.DltEventsCollector import DltEventsCollector +from .play_ground.Enums import CONTEXT_EVENT_TYPE_TO_ACTION, RECORD_TYPE_TO_ENUM +from .play_ground import PlayGround + +DLT_GATEWAY_HOST = '172.254.254.2' +DLT_GATEWAY_PORT = 50051 + +NUM_ACTIONS = 100 +DOMAIN_UUID = 'perf-test-fake-domain' + +CSV_FILEPATH = 'data/perf/scenario_2/dlt/2023-05May-30/response_time' + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +def _event_handler(play_ground : PlayGround, event : DltRecordEvent) -> None: + # Filter undesired/unsupported/wrong domain_uuids, actions, and record types + # Update notification time in PlayGround.PerfPoints + # Return None to prevent storing the events in the DLT Events' Collector internal queue + + domain_uuid = event.record_id.domain_uuid.uuid + if domain_uuid != DOMAIN_UUID: return None + + action = CONTEXT_EVENT_TYPE_TO_ACTION.get(event.event.event_type) + if action is None: return None + + record_type = RECORD_TYPE_TO_ENUM.get(event.record_id.type) + if record_type is None: return None + + #event_time = event.event.timestamp.timestamp + record_uuid = event.record_id.record_uuid.uuid + play_ground.perf_data.get(action, record_type, record_uuid).set_time_notified(time.time()) + return None + +def main() -> None: + dltgateway_client = DltGatewayClient(host=DLT_GATEWAY_HOST, port=DLT_GATEWAY_PORT) + + play_ground = PlayGround(dltgateway_client, DOMAIN_UUID) + event_handler = functools.partial(_event_handler, play_ground) + + dltgateway_collector = DltEventsCollector( + dltgateway_client, log_events_received=False, event_handler=event_handler) + dltgateway_collector.start() + + time.sleep(3) + + LOGGER.info('Starting {:d} actions...'.format(NUM_ACTIONS)) + + num_action = 0 + while num_action < NUM_ACTIONS: + if num_action > 0 and num_action % 10 == 0: + str_stats = play_ground.perf_data.stats_to_str() + MSG = 'Running action {:d}/{:d}...\n{:s}' + LOGGER.info(MSG.format(num_action, NUM_ACTIONS, str_stats)) + completed = play_ground.run_random_operation() + if not completed: continue + num_action += 1 + + LOGGER.info('Completed {:d} actions!'.format(NUM_ACTIONS)) + + dltgateway_collector.stop() + + play_ground.perf_data.to_csv(CSV_FILEPATH) + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/dlt/connector/tests/performance/play_ground/Dlt.py b/src/dlt/connector/tests/performance/play_ground/Dlt.py new file mode 100644 index 0000000000000000000000000000000000000000..d554daae6eff3aa8c8a9de352f023a25ffdc7960 --- /dev/null +++ b/src/dlt/connector/tests/performance/play_ground/Dlt.py @@ -0,0 +1,92 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, time +from typing import Dict, Union +from common.proto.context_pb2 import Device, Link, Service, Slice +from common.proto.dlt_gateway_pb2 import ( + DltRecord, DltRecordId, DltRecordOperationEnum, DltRecordStatus, DltRecordTypeEnum) +from common.tools.grpc.Tools import grpc_message_to_json_string +from dlt.connector.client.DltGatewayClient import DltGatewayClient +from .PerfPoint import PerfPoint + +DLT_OPERATION_CREATE = DltRecordOperationEnum.DLTRECORDOPERATION_ADD +DLT_OPERATION_UPDATE = DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE +DLT_OPERATION_DELETE = DltRecordOperationEnum.DLTRECORDOPERATION_DELETE + +DLT_RECORD_TYPE_DEVICE = DltRecordTypeEnum.DLTRECORDTYPE_DEVICE +DLT_RECORD_TYPE_LINK = DltRecordTypeEnum.DLTRECORDTYPE_LINK +DLT_RECORD_TYPE_SERVICE = DltRecordTypeEnum.DLTRECORDTYPE_SERVICE +DLT_RECORD_TYPE_SLICE = DltRecordTypeEnum.DLTRECORDTYPE_SLICE + +def dlt_record_set( + dltgateway_client : DltGatewayClient, perf_point : PerfPoint, + operation : DltRecordOperationEnum, domain_uuid : str, objekt : Union[Device, Link, Service, Slice] +) -> DltRecordStatus: + if isinstance(objekt, Device): + record_type = DLT_RECORD_TYPE_DEVICE + record_uuid = objekt.device_id.device_uuid.uuid + elif isinstance(objekt, Link): + record_type = DLT_RECORD_TYPE_LINK + record_uuid = objekt.link_id.link_uuid.uuid + elif isinstance(objekt, Service): + record_type = DLT_RECORD_TYPE_SERVICE + record_uuid = objekt.service_id.service_uuid.uuid + elif isinstance(objekt, Slice): + record_type = DLT_RECORD_TYPE_SLICE + record_uuid = objekt.slice_id.slice_uuid.uuid + else: + raise NotImplementedError('Object({:s}) not supported'.format(str(type(objekt)))) + + dlt_req = DltRecord() + dlt_req.record_id.domain_uuid.uuid = domain_uuid # pylint: disable=no-member + dlt_req.record_id.type = record_type # pylint: disable=no-member + dlt_req.record_id.record_uuid.uuid = record_uuid # pylint: disable=no-member + dlt_req.operation = operation + dlt_req.data_json = grpc_message_to_json_string(objekt) + + perf_point.set_time_requested(time.time()) + reply = dltgateway_client.RecordToDlt(dlt_req) + perf_point.set_time_replied(time.time()) + + return reply + +def dlt_record_found(record : DltRecord) -> bool: + return all([ + len(record.record_id.domain_uuid.uuid) > 0, + record.record_id.type != DltRecordTypeEnum.DLTRECORDTYPE_UNDEFINED, + len(record.record_id.record_uuid.uuid) > 0, + #record.operation != DltRecordOperationEnum.DLTRECORDOPERATION_UNDEFINED, + len(record.data_json) > 0, + ]) + +def dlt_record_get( + dltgateway_client : DltGatewayClient, perf_point : PerfPoint, + domain_uuid : str, record_type : DltRecordTypeEnum, record_uuid : str +) -> Dict: + dlt_rec_id = DltRecordId() + dlt_rec_id.domain_uuid.uuid = domain_uuid # pylint: disable=no-member + dlt_rec_id.type = record_type + dlt_rec_id.record_uuid.uuid = record_uuid # pylint: disable=no-member + + perf_point.set_time_requested(time.time()) + reply = dltgateway_client.GetFromDlt(dlt_rec_id) + perf_point.set_time_replied(time.time()) + + if dlt_record_found(reply): return json.loads(reply.data_json) + + MSG = 'DltRecord({:s}/{:s}/{:s}) not found' + str_record_type = DltRecordTypeEnum.Name(record_type) + msg = MSG.format(str(domain_uuid), str_record_type, str(record_uuid)) + raise Exception(msg) # pylint: disable=broad-exception-raised diff --git a/src/dlt/connector/tests/performance/play_ground/Enums.py b/src/dlt/connector/tests/performance/play_ground/Enums.py new file mode 100644 index 0000000000000000000000000000000000000000..1af5bd38dd58c5ab2b260788bfb3c63dc538766d --- /dev/null +++ b/src/dlt/connector/tests/performance/play_ground/Enums.py @@ -0,0 +1,42 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum +from common.proto.context_pb2 import EventTypeEnum +from common.proto.dlt_gateway_pb2 import DltRecordTypeEnum + +class ActionEnum(enum.Enum): + CREATE = 'create' + GET = 'get' + UPDATE = 'update' + DELETE = 'delete' + +class RecordTypeEnum(enum.Enum): + DEVICE = 'device' + LINK = 'link' + SERVICE = 'service' + SLICE = 'slice' + +CONTEXT_EVENT_TYPE_TO_ACTION = { + EventTypeEnum.EVENTTYPE_CREATE : ActionEnum.CREATE, + EventTypeEnum.EVENTTYPE_UPDATE : ActionEnum.UPDATE, + EventTypeEnum.EVENTTYPE_REMOVE : ActionEnum.DELETE, +} + +RECORD_TYPE_TO_ENUM = { + DltRecordTypeEnum.DLTRECORDTYPE_DEVICE : RecordTypeEnum.DEVICE, + DltRecordTypeEnum.DLTRECORDTYPE_LINK : RecordTypeEnum.LINK, + DltRecordTypeEnum.DLTRECORDTYPE_SERVICE : RecordTypeEnum.SERVICE, + DltRecordTypeEnum.DLTRECORDTYPE_SLICE : RecordTypeEnum.SLICE, +} diff --git a/src/dlt/connector/tests/performance/play_ground/PerfData.py b/src/dlt/connector/tests/performance/play_ground/PerfData.py new file mode 100644 index 0000000000000000000000000000000000000000..357c9b33f95c6a03ba5896f88d6f0a03a45247e2 --- /dev/null +++ b/src/dlt/connector/tests/performance/play_ground/PerfData.py @@ -0,0 +1,50 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import csv, prettytable +from typing import Dict, Tuple +from .Enums import ActionEnum, RecordTypeEnum +from .PerfPoint import PerfPoint + +class PerfData: + def __init__(self) -> None: + self._type_count : Dict[Tuple[ActionEnum, RecordTypeEnum], int] = dict() + self._points : Dict[Tuple[ActionEnum, RecordTypeEnum, str], PerfPoint] = dict() + + def get(self, action : ActionEnum, record_type : RecordTypeEnum, record_uuid : str) -> PerfPoint: + point_key = (action, record_type, record_uuid) + point = self._points.get(point_key) + if point is None: + point = PerfPoint(action, record_type, record_uuid) + self._points[point_key] = point + type_key = (action, record_type) + self._type_count[type_key] = self._type_count.get(type_key, 0) + 1 + return point + + def to_csv(self, filepath : str) -> None: + point : PerfPoint = self._points.values()[0] + keys = point.to_dict().keys() + with open(filepath, 'w', newline='', encoding='UTF-8') as csv_file: + dict_writer = csv.DictWriter(csv_file, keys) + dict_writer.writeheader() + dict_writer.writerows(self._points) + + def stats_to_str(self) -> str: + field_names = ['action', 'record_type', 'count'] + pt_stats = prettytable.PrettyTable(field_names=field_names, sortby='count', reversesort=True) + for f in field_names[0:2]: pt_stats.align[f] = 'l' + for f in field_names[2:3]: pt_stats.align[f] = 'r' + for (action,record_type),count in self._type_count.items(): + pt_stats.add_row([action.value, record_type.value, count]) + return pt_stats.get_string() diff --git a/src/dlt/connector/tests/performance/play_ground/PerfPoint.py b/src/dlt/connector/tests/performance/play_ground/PerfPoint.py new file mode 100644 index 0000000000000000000000000000000000000000..720bbf8bd7725a0b622e16f2afe16c4c94a2601c --- /dev/null +++ b/src/dlt/connector/tests/performance/play_ground/PerfPoint.py @@ -0,0 +1,58 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Optional +from .Enums import ActionEnum, RecordTypeEnum + +class PerfPoint: + def __init__(self, action : ActionEnum, record_type : RecordTypeEnum, record_uuid : str) -> None: + self.action : ActionEnum = action + self.record_type : RecordTypeEnum = record_type + self.record_uuid : str = record_uuid + self.size_bytes : Optional[int ] = None + self.num_endpoints : Optional[int ] = None + self.num_config_rules : Optional[int ] = None + self.num_constraints : Optional[int ] = None + self.num_sub_services : Optional[int ] = None + self.num_sub_slices : Optional[int ] = None + self.time_requested : Optional[float] = None + self.time_replied : Optional[float] = None + self.time_notified : Optional[float] = None + + def set_size_bytes (self, size_bytes : int) -> None: self.size_bytes = size_bytes + def set_num_endpoints (self, num_endpoints : int) -> None: self.num_endpoints = num_endpoints + def set_num_config_rules(self, num_config_rules : int) -> None: self.num_config_rules = num_config_rules + def set_num_constraints (self, num_constraints : int) -> None: self.num_constraints = num_constraints + def set_num_sub_services(self, num_sub_services : int) -> None: self.num_sub_services = num_sub_services + def set_num_sub_slices (self, num_sub_slices : int) -> None: self.num_sub_slices = num_sub_slices + + def set_time_requested(self, timestamp : float) -> None: self.time_requested = timestamp + def set_time_replied (self, timestamp : float) -> None: self.time_replied = timestamp + def set_time_notified (self, timestamp : float) -> None: self.time_notified = timestamp + + def to_dict(self) -> Dict: + return { + 'action' : self.action.value, + 'record_type' : self.record_type.value, + 'record_uuid' : self.record_uuid, + 'size_bytes' : self.size_bytes, + 'num_endpoints' : self.num_endpoints, + 'num_config_rules': self.num_config_rules, + 'num_constraints' : self.num_constraints, + 'num_sub_services': self.num_sub_services, + 'num_sub_slices' : self.num_sub_slices, + 'time_requested' : self.time_requested, + 'time_replied' : self.time_replied, + 'time_notified' : self.time_notified, + } diff --git a/src/dlt/connector/tests/performance/play_ground/Random.py b/src/dlt/connector/tests/performance/play_ground/Random.py new file mode 100644 index 0000000000000000000000000000000000000000..2c4c1e739f0ced1ad1988fae73bc90b5dd2b1ec6 --- /dev/null +++ b/src/dlt/connector/tests/performance/play_ground/Random.py @@ -0,0 +1,182 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random, uuid +from typing import Dict, List, Optional, Set +from common.proto.context_pb2 import ( + Device, EndPoint, Link, Service, ServiceStatusEnum, ServiceTypeEnum, Slice, SliceStatusEnum +) +from common.tools.grpc.Tools import grpc_message_to_json +from common.tools.object_factory.ConfigRule import json_config_rule_set +from common.tools.object_factory.Constraint import json_constraint_custom +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Device import ( + DEVICE_DISABLED, DEVICE_EMU_DRIVERS, DEVICE_EMUPR_TYPE, json_device, json_device_id +) +from common.tools.object_factory.EndPoint import json_endpoint, json_endpoint_id +from common.tools.object_factory.Link import json_link +from common.tools.object_factory.Service import json_service +from common.tools.object_factory.Slice import json_slice +from common.tools.object_factory.Topology import json_topology_id +from .Settings import ( + CONFIG_RULES_MIN_ENTRIES, CONFIG_RULES_MAX_ENTRIES, CONSTRAINTS_MIN_ENTRIES, CONSTRAINTS_MAX_ENTRIES, + DEVICE_MAX_ENDPOINTS, DEVICE_MIN_ENDPOINTS, DICT_MAX_ENTRIES, DICT_MIN_ENTRIES, PATH_MAX_PARTS, PATH_MIN_PARTS, + SLICE_MAX_SUBSERVICES, SLICE_MAX_SUBSLICES, SLICE_MIN_SUBSERVICES, SLICE_MIN_SUBSLICES, + WORD_ALPHABET, WORD_MAX_LENGTH, WORD_MIN_LENGTH, +) + +def random_word(length : Optional[int] = None, alphabet : Optional[str] = None) -> str: + if alphabet is None: alphabet = WORD_ALPHABET + if length is None: length = int(random.uniform(WORD_MIN_LENGTH, WORD_MAX_LENGTH)) + word = [] + remaining_length = length + while remaining_length > 0: + part_length = min(len(alphabet), remaining_length) + word.extend(random.sample(alphabet, part_length)) + remaining_length = length - len(word) + return ''.join(word) + +def random_path( + num_parts : Optional[int] = None, part_length : Optional[int] = None, alphabet : Optional[str] = None, + separator : str = '/' +) -> str: + if num_parts is None: num_parts = int(random.uniform(PATH_MIN_PARTS, PATH_MAX_PARTS)) + return separator.join([random_word(length=part_length, alphabet=alphabet) for _ in range(num_parts)]) + +def random_dict( + num_entries : Optional[int] = None, key_length : Optional[str] = None, value_length : Optional[str] = None +) -> Dict: + if num_entries is None: num_entries = int(random.uniform(DICT_MIN_ENTRIES, DICT_MAX_ENTRIES)) + return {random_word(length=key_length) : random_word(length=value_length) for _ in range(num_entries)} + +def random_config_rule() -> Dict: + resource_key = random_path() + resource_value = random_dict() + return json_config_rule_set(resource_key, resource_value) + +def random_config_rules(count : Optional[int] = None) -> List: + if count is None: count = int(random.uniform(CONFIG_RULES_MIN_ENTRIES, CONFIG_RULES_MAX_ENTRIES)) + return [random_config_rule() for _ in range(count)] + +def random_constraint() -> Dict: + constraint_type = random_path() + constraint_value = random_dict() + return json_constraint_custom(constraint_type, constraint_value) + +def random_constraints(count : Optional[int] = None) -> List: + if count is None: count = int(random.uniform(CONSTRAINTS_MIN_ENTRIES, CONSTRAINTS_MAX_ENTRIES)) + return [random_constraint() for _ in range(count)] + +def random_endpoint(device_uuid : str) -> Dict: + topology_id = json_topology_id(str(uuid.uuid4()), context_id=json_context_id(str(uuid.uuid4()))) + return json_endpoint(json_device_id(device_uuid), str(uuid.uuid4()), 'internal/copper', topology_id=topology_id) + +def random_endpoints(device_uuid : str, count : Optional[int] = None) -> List: + if count is None: count = int(random.uniform(DEVICE_MIN_ENDPOINTS, DEVICE_MAX_ENDPOINTS)) + return [random_endpoint(device_uuid) for _ in range(count)] + +def random_endpoint_id(device_uuid : str) -> Dict: + topology_id = json_topology_id(str(uuid.uuid4()), context_id=json_context_id(str(uuid.uuid4()))) + return json_endpoint_id(json_device_id(device_uuid), str(uuid.uuid4()), topology_id=topology_id) + +def random_endpoint_ids(device_uuid : str, count : Optional[int] = None) -> List: + if count is None: count = int(random.uniform(DEVICE_MIN_ENDPOINTS, DEVICE_MAX_ENDPOINTS)) + return [random_endpoint(device_uuid) for _ in range(count)] + +def get_random_endpoint( + devices : Dict[str, Device], excluded_device_uuids : Set[str] = set() +) -> EndPoint: + device_uuids = set(devices.keys()) + device_uuids = list(device_uuids.difference(excluded_device_uuids)) + device_uuid = random.choice(device_uuids) + device = devices[device_uuid] + return random.choice(device.device_endpoints) + +def get_random_endpoints( + devices : Dict[str, Device], count : Optional[int] = None +) -> Optional[List[EndPoint]]: + if len(devices) < count: return None # Too few devices + endpoints = list() + used_device_uuids = set() + for _ in range(count): + endpoint = get_random_endpoint(devices, used_device_uuids) + used_device_uuids.add(endpoint.endpoint_id.device_id.device_uuid.uuid) + endpoints.append(endpoint) + return endpoints + +def random_device(device_uuid : Optional[str] = None) -> Device: + if device_uuid is None: device_uuid = str(uuid.uuid4()) + device_type = DEVICE_EMUPR_TYPE + device_op_stat = DEVICE_DISABLED + drivers = DEVICE_EMU_DRIVERS + endpoints = random_endpoints(device_uuid) + config_rules = random_config_rules() + return Device(**json_device( + device_uuid, device_type, device_op_stat, endpoints=endpoints, config_rules=config_rules, drivers=drivers + )) + +def random_link( + devices : Dict[str, Device], link_uuid : Optional[str] = None +) -> Optional[Link]: + if link_uuid is None: link_uuid = str(uuid.uuid4()) + endpoints = get_random_endpoints(devices, count=2) + if endpoints is None: return None + endpoint_ids = [grpc_message_to_json(endpoint.endpoint_id) for endpoint in endpoints] + return Link(**json_link(link_uuid, endpoint_ids)) + +def random_service( + devices : Dict[str, Device], service_uuid : Optional[str] = None +) -> Optional[Service]: + if service_uuid is None: service_uuid = str(uuid.uuid4()) + context_id = json_context_id(str(uuid.uuid4())) + service_type = ServiceTypeEnum.SERVICETYPE_L2NM + service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + endpoints = get_random_endpoints(devices, count=2) + if endpoints is None: return None + endpoint_ids = [grpc_message_to_json(endpoint.endpoint_id) for endpoint in endpoints] + constraints = random_constraints(count=3) + config_rules = random_config_rules(count=5) + return Service(**json_service( + service_uuid, service_type, context_id=context_id, status=service_status, + endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)) + +def random_slice( + devices : Dict[str, Device], services : Dict[str, Service], slices : Dict[str, Service], + slice_uuid : Optional[str] = None +) -> Optional[Slice]: + if slice_uuid is None: slice_uuid = str(uuid.uuid4()) + context_id = json_context_id(str(uuid.uuid4())) + slice_status = SliceStatusEnum.SLICESTATUS_PLANNED + endpoints = get_random_endpoints(devices, count=2) + if endpoints is None: return None + endpoint_ids = [grpc_message_to_json(endpoint.endpoint_id) for endpoint in endpoints] + constraints = random_constraints(count=3) + config_rules = random_config_rules(count=5) + + num_sub_services = int(random.uniform(SLICE_MIN_SUBSERVICES, SLICE_MAX_SUBSERVICES)) + num_sub_services = min(num_sub_services, len(services)) + sub_services = random.sample(list(services.values()), num_sub_services) if num_sub_services > 0 else [] + sub_service_ids = [grpc_message_to_json(sub_service.service_id) for sub_service in sub_services] + + num_sub_slices = int(random.uniform(SLICE_MIN_SUBSLICES, SLICE_MAX_SUBSLICES)) + num_sub_slices = min(num_sub_slices, len(slices)) + sub_slices = random.sample(list(slices.values()), num_sub_slices) if num_sub_slices > 0 else [] + sub_slice_ids = [grpc_message_to_json(sub_slice.slice_id) for sub_slice in sub_slices] + + owner = {'owner_uuid': {'uuid': str(uuid.uuid4())}, 'owner_string': random_word()} + + return Slice(**json_slice( + slice_uuid, context_id=context_id, status=slice_status, + endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules, + service_ids=sub_service_ids, subslice_ids=sub_slice_ids, owner=owner)) diff --git a/src/dlt/connector/tests/performance/play_ground/Settings.py b/src/dlt/connector/tests/performance/play_ground/Settings.py new file mode 100644 index 0000000000000000000000000000000000000000..c04f6139d84fc280d33d580fd03e2598788a945f --- /dev/null +++ b/src/dlt/connector/tests/performance/play_ground/Settings.py @@ -0,0 +1,38 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +WORD_ALPHABET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' +WORD_MIN_LENGTH = 5 +WORD_MAX_LENGTH = 100 + +PATH_MIN_PARTS = 2 +PATH_MAX_PARTS = 15 + +DICT_MIN_ENTRIES = 1 +DICT_MAX_ENTRIES = 10 + +DEVICE_MIN_ENDPOINTS = 2 +DEVICE_MAX_ENDPOINTS = 48 + +CONFIG_RULES_MIN_ENTRIES = 3 +CONFIG_RULES_MAX_ENTRIES = 100 + +CONSTRAINTS_MIN_ENTRIES = 3 +CONSTRAINTS_MAX_ENTRIES = 10 + +SLICE_MIN_SUBSERVICES = 0 +SLICE_MAX_SUBSERVICES = 3 + +SLICE_MIN_SUBSLICES = 0 +SLICE_MAX_SUBSLICES = 3 diff --git a/src/dlt/connector/tests/performance/play_ground/__init__.py b/src/dlt/connector/tests/performance/play_ground/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1ebdf232128ebe75dcdf3dd037dc52d07274b3f5 --- /dev/null +++ b/src/dlt/connector/tests/performance/play_ground/__init__.py @@ -0,0 +1,198 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, random +from typing import Dict +from common.proto.context_pb2 import Device, Link, Service, Slice +from dlt.connector.client.DltGatewayClient import DltGatewayClient +from .Enums import ActionEnum, RecordTypeEnum +from .Dlt import ( + DLT_OPERATION_CREATE, DLT_OPERATION_DELETE, DLT_OPERATION_UPDATE, + DLT_RECORD_TYPE_DEVICE, DLT_RECORD_TYPE_LINK, DLT_RECORD_TYPE_SERVICE, DLT_RECORD_TYPE_SLICE, + dlt_record_get, dlt_record_set) +from .PerfData import PerfData +from .Random import random_device, random_link, random_service, random_slice + +LOGGER = logging.getLogger(__name__) + +class PlayGround: + def __init__(self, dltgateway_client : DltGatewayClient, domain_uuid : str) -> None: + self._dltgateway_client = dltgateway_client + self._domain_uuid = domain_uuid + self._perf_data = PerfData() + self._devices : Dict[str, Device ] = dict() + self._links : Dict[str, Link ] = dict() + self._services : Dict[str, Service] = dict() + self._slices : Dict[str, Slice ] = dict() + + @property + def perf_data(self): return self._perf_data + + def run_random_operation(self) -> bool: + method = None + while method is None: + action = random.choice(list(ActionEnum.__members__.values())) + record_type = random.choice(list(RecordTypeEnum.__members__.values())) + method_name = '{:s}_{:s}'.format(action.value, record_type.value) + method = getattr(self, method_name, None) + LOGGER.info('method_name: {:s}'.format(str(method_name))) + return method() + + def create_device(self) -> bool: + device = random_device() + if device is None: return False + device_uuid = device.device_id.device_uuid.uuid # pylint: disable=no-member + self._devices[device_uuid] = device + perf_point = self._perf_data.get(ActionEnum.CREATE, RecordTypeEnum.DEVICE, device_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_CREATE, self._domain_uuid, device) + return True + + def get_device(self) -> bool: + if len(self._devices) == 0: return False + device_uuid = random.choice(list(self._devices.keys())) + perf_point = self._perf_data.get(ActionEnum.GET, RecordTypeEnum.DEVICE, device_uuid) + data = dlt_record_get( + self._dltgateway_client, perf_point, self._domain_uuid, DLT_RECORD_TYPE_DEVICE, device_uuid) + self._devices[device_uuid] = Device(**data) + return True + + def update_device(self) -> bool: + if len(self._devices) == 0: return False + device_uuid = random.choice(list(self._devices.keys())) + device = random_device(device_uuid=device_uuid) + if device is None: return False + perf_point = self._perf_data.get(ActionEnum.UPDATE, RecordTypeEnum.DEVICE, device_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_UPDATE, self._domain_uuid, device) + self._devices[device_uuid] = device + return True + + def delete_device(self) -> bool: + if len(self._devices) == 0: return False + device_uuid = random.choice(list(self._devices.keys())) + device = self._devices.pop(device_uuid, None) + if device is None: return False + perf_point = self._perf_data.get(ActionEnum.DELETE, RecordTypeEnum.DEVICE, device_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_DELETE, self._domain_uuid, device) + return True + + def create_link(self) -> bool: + link = random_link(self._devices) + if link is None: return False + link_uuid = link.link_id.link_uuid.uuid # pylint: disable=no-member + perf_point = self._perf_data.get(ActionEnum.CREATE, RecordTypeEnum.LINK, link_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_CREATE, self._domain_uuid, link) + self._links[link_uuid] = link + return True + + def get_link(self) -> bool: + if len(self._links) == 0: return False + link_uuid = random.choice(list(self._links.keys())) + perf_point = self._perf_data.get(ActionEnum.GET, RecordTypeEnum.LINK, link_uuid) + data = dlt_record_get( + self._dltgateway_client, perf_point, self._domain_uuid, DLT_RECORD_TYPE_LINK, link_uuid) + self._links[link_uuid] = Link(**data) + return True + + def update_link(self) -> bool: + if len(self._links) == 0: return False + link_uuid = random.choice(list(self._links.keys())) + link = random_link(self._devices, link_uuid=link_uuid) + if link is None: return False + perf_point = self._perf_data.get(ActionEnum.UPDATE, RecordTypeEnum.LINK, link_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_UPDATE, self._domain_uuid, link) + self._links[link_uuid] = link + return True + + def delete_link(self) -> bool: + if len(self._links) == 0: return False + link_uuid = random.choice(list(self._links.keys())) + link = self._links.pop(link_uuid, None) + if link is None: return False + perf_point = self._perf_data.get(ActionEnum.DELETE, RecordTypeEnum.LINK, link_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_DELETE, self._domain_uuid, link) + return True + + def create_service(self) -> bool: + service = random_service(self._devices) + if service is None: return False + service_uuid = service.service_id.service_uuid.uuid # pylint: disable=no-member + perf_point = self._perf_data.get(ActionEnum.CREATE, RecordTypeEnum.SERVICE, service_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_CREATE, self._domain_uuid, service) + self._services[service_uuid] = service + return True + + def get_service(self) -> bool: + if len(self._services) == 0: return False + service_uuid = random.choice(list(self._services.keys())) + perf_point = self._perf_data.get(ActionEnum.GET, RecordTypeEnum.SERVICE, service_uuid) + data = dlt_record_get( + self._dltgateway_client, perf_point, self._domain_uuid, DLT_RECORD_TYPE_SERVICE, service_uuid) + self._services[service_uuid] = Service(**data) + return True + + def update_service(self) -> bool: + if len(self._services) == 0: return False + service_uuid = random.choice(list(self._services.keys())) + service = random_service(self._devices, service_uuid=service_uuid) + if service is None: return False + perf_point = self._perf_data.get(ActionEnum.UPDATE, RecordTypeEnum.SERVICE, service_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_UPDATE, self._domain_uuid, service) + self._services[service_uuid] = service + return True + + def delete_service(self) -> bool: + if len(self._services) == 0: return False + service_uuid = random.choice(list(self._services.keys())) + service = self._services.pop(service_uuid, None) + if service is None: return False + perf_point = self._perf_data.get(ActionEnum.DELETE, RecordTypeEnum.SERVICE, service_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_DELETE, self._domain_uuid, service) + return True + + def create_slice(self) -> bool: + slice_ = random_slice(self._devices, self._services, self._slices) + if slice_ is None: return False + slice_uuid = slice_.slice_id.slice_uuid.uuid # pylint: disable=no-member + perf_point = self._perf_data.get(ActionEnum.CREATE, RecordTypeEnum.SLICE, slice_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_CREATE, self._domain_uuid, slice_) + self._slices[slice_uuid] = slice_ + return True + + def get_slice(self) -> bool: + if len(self._slices) == 0: return False + slice_uuid = random.choice(list(self._slices.keys())) + perf_point = self._perf_data.get(ActionEnum.GET, RecordTypeEnum.SLICE, slice_uuid) + data = dlt_record_get( + self._dltgateway_client, perf_point, self._domain_uuid, DLT_RECORD_TYPE_SLICE, slice_uuid) + self._slices[slice_uuid] = Slice(**data) + return True + + def update_slice(self) -> bool: + if len(self._slices) == 0: return False + slice_uuid = random.choice(list(self._slices.keys())) + slice_ = random_slice(self._devices, self._services, self._slices, slice_uuid=slice_uuid) + if slice_ is None: return False + perf_point = self._perf_data.get(ActionEnum.UPDATE, RecordTypeEnum.SLICE, slice_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_UPDATE, self._domain_uuid, slice_) + self._slices[slice_uuid] = slice_ + return True + + def delete_slice(self) -> bool: + if len(self._slices) == 0: return False + slice_uuid = random.choice(list(self._slices.keys())) + slice_ = self._slices.pop(slice_uuid, None) + if slice_ is None: return False + perf_point = self._perf_data.get(ActionEnum.DELETE, RecordTypeEnum.SLICE, slice_uuid) + dlt_record_set(self._dltgateway_client, perf_point, DLT_OPERATION_DELETE, self._domain_uuid, slice_) + return True