Commit c8315657 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/dlt-perf-eval' into 'develop'

DLT performance assessment tool

See merge request !112
parents 80d89934 eb15386f
Loading
Loading
Loading
Loading
+1001 −0

File added.

Preview size limit exceeded, changes collapsed.

+19 −1
Original line number Diff line number Diff line
@@ -12,27 +12,40 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Optional
import grpc, logging, queue, threading, time
from common.proto.dlt_gateway_pb2 import DltRecordSubscription
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription
from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.client.DltGatewayClient import DltGatewayClient

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

# This class accepts an event_handler method as attribute that can be used to pre-process and
# filter events before they reach the events_queue. Depending on the handler, the supported
# behaviors are:
# - If the handler is not set, the events are transparently added to the events_queue.
# - If returns None for an event, the event is not stored in the events_queue.
# - If returns a DltRecordEvent object for an event, the returned event is stored in the events_queue.
# - Other combinations are not supported.

class DltEventsCollector(threading.Thread):
    def __init__(
        self, dltgateway_client : DltGatewayClient,
        log_events_received     : bool = False,
        event_handler           : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None,
    ) -> None:
        super().__init__(name='DltEventsCollector', daemon=True)
        self._dltgateway_client = dltgateway_client
        self._log_events_received = log_events_received
        self._event_handler = event_handler
        self._events_queue = queue.Queue()
        self._terminate = threading.Event()
        self._dltgateway_stream = None

    def run(self) -> None:
        event_handler = self._event_handler
        if event_handler is None: event_handler = lambda e: e
        self._terminate.clear()
        while not self._terminate.is_set():
            try:
@@ -41,6 +54,11 @@ class DltEventsCollector(threading.Thread):
                for event in self._dltgateway_stream:
                    if self._log_events_received:
                        LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event)))
                    event = event_handler(event)
                    if event is None: continue
                    if not isinstance(event, DltRecordEvent):
                        # pylint: disable=broad-exception-raised
                        raise Exception('Unsupported return type: {:s}'.format(str(event)))
                    self._events_queue.put_nowait(event)
            except grpc.RpcError as e:
                if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member
+3 −3
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@

# pip install grpcio==1.47.0 grpcio-tools==1.47.0 protobuf==3.20.1
# PYTHONPATH=./src python
# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.main_test
# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.tests.basic

import logging, sys, time
from common.proto.dlt_gateway_pb2 import (
@@ -23,8 +23,8 @@ from common.proto.dlt_gateway_pb2 import (
from common.tools.object_factory.Device import json_device
from common.tools.grpc.Tools import grpc_message_to_json_string
from src.common.proto.context_pb2 import DEVICEOPERATIONALSTATUS_ENABLED, Device
from .client.DltGatewayClient import DltGatewayClient
from .client.DltEventsCollector import DltEventsCollector
from ..client.DltGatewayClient import DltGatewayClient
from ..client.DltEventsCollector import DltEventsCollector

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
+42 −0
Original line number Diff line number Diff line
# DLT Gateway+Blockchain Performance Assessment Test

This test assesses the performance of the DLT component's Gateway + HyperLedger Fabric Blockchain.

To carry on that, it first creates a number of random devices, links, services, and slices.
Then, it performs some random creations, retrievals, updates, and deletes of random devices, links,
services, and slices.

For each operation and record type, the size of the entities in bytes, the number of endpoints,
constraints, config rules, subservices, and subslices is recorded.
Besides, it is recorded also the time to store/retrieve the records in the blockchain, and the delay
between the change and the reception of the asynchronous notification event.

## Scenario prepararion:
Create a docker virtual network:

```(bash)
docker network rm tfs-br
docker network create -d bridge --subnet=172.254.254.0/24 --gateway=172.254.254.1 --ip-range=172.254.254.0/24 tfs-br
```

Build the DLT Gateway component's Docker image:
```(bash)
docker build -t dlt-gateway:test -f ./src/dlt/gateway/Dockerfile .
```

Start the DLT Gateway component:
```(bash)
docker run --name dlt-gateway -d -p 50051:50051 --network=tfs-br dlt-gateway:test
```

Install possibly missing requirements:
```(bash)
pip install grpcio==1.47.0 grpcio-tools==1.47.0 protobuf==3.20.1
```

Start the performance assessment:
```(bash)
PYTHONPATH=./src python -m dlt.performance
```

The test produces a CSV file with the results per operation.
+13 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading