Commit 3e9b528c authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

DLT component - Connector:

- Added event handler to DltEventsCollector
- Implemented DLT performance assessment tool
- Reorganized tests
parent 80d89934
Loading
Loading
Loading
Loading
+19 −1
Original line number Diff line number Diff line
@@ -12,27 +12,40 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Optional
import grpc, logging, queue, threading, time
from common.proto.dlt_gateway_pb2 import DltRecordSubscription
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordSubscription
from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.client.DltGatewayClient import DltGatewayClient

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

# This class accepts an event_handler method as attribute that can be used to pre-process and
# filter events before they reach the events_queue. Depending on the handler, the supported
# behaviors are:
# - If the handler is not set, the events are transparently added to the events_queue.
# - If returns None for an event, the event is not stored in the events_queue.
# - If returns a DltRecordEvent object for an event, the returned event is stored in the events_queue.
# - Other combinations are not supported.

class DltEventsCollector(threading.Thread):
    def __init__(
        self, dltgateway_client : DltGatewayClient,
        log_events_received     : bool = False,
        event_handler           : Optional[Callable[[DltRecordEvent], Optional[DltRecordEvent]]] = None,
    ) -> None:
        super().__init__(name='DltEventsCollector', daemon=True)
        self._dltgateway_client = dltgateway_client
        self._log_events_received = log_events_received
        self._event_handler = event_handler
        self._events_queue = queue.Queue()
        self._terminate = threading.Event()
        self._dltgateway_stream = None

    def run(self) -> None:
        event_handler = self._event_handler
        if event_handler is None: event_handler = lambda e: e
        self._terminate.clear()
        while not self._terminate.is_set():
            try:
@@ -41,6 +54,11 @@ class DltEventsCollector(threading.Thread):
                for event in self._dltgateway_stream:
                    if self._log_events_received:
                        LOGGER.info('[_collect] event: {:s}'.format(grpc_message_to_json_string(event)))
                    event = event_handler(event)
                    if event is None: continue
                    if not isinstance(event, DltRecordEvent):
                        # pylint: disable=broad-exception-raised
                        raise Exception('Unsupported return type: {:s}'.format(str(event)))
                    self._events_queue.put_nowait(event)
            except grpc.RpcError as e:
                if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member
+3 −3
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@

# pip install grpcio==1.47.0 grpcio-tools==1.47.0 protobuf==3.20.1
# PYTHONPATH=./src python
# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.main_test
# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.tests.basic

import logging, sys, time
from common.proto.dlt_gateway_pb2 import (
@@ -23,8 +23,8 @@ from common.proto.dlt_gateway_pb2 import (
from common.tools.object_factory.Device import json_device
from common.tools.grpc.Tools import grpc_message_to_json_string
from src.common.proto.context_pb2 import DEVICEOPERATIONALSTATUS_ENABLED, Device
from .client.DltGatewayClient import DltGatewayClient
from .client.DltEventsCollector import DltEventsCollector
from ..client.DltGatewayClient import DltGatewayClient
from ..client.DltEventsCollector import DltEventsCollector

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
+13 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+96 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# vary operation: create, update, delete
# vary record type (device, link, service, slice)
# for devices, services, slices, vary size: num endpoints, num constraints, num config rules
# measure load/store time
# measure event notification time

# docker build -t dlt-gateway:test -f ./src/dlt/gateway/Dockerfile .
# docker run --name dlt-gateway -d -p 50051:50051 --network=tfs-br dlt-gateway:test
# pip install grpcio==1.47.0 grpcio-tools==1.47.0 protobuf==3.20.1
# PYTHONPATH=./src python
# PYTHONPATH=./src python -m dlt.connector.tests.performance

import functools, logging, sys, time
from common.proto.dlt_gateway_pb2 import DltRecordEvent
from dlt.connector.client.DltGatewayClient import DltGatewayClient
from dlt.connector.client.DltEventsCollector import DltEventsCollector
from .play_ground.Enums import CONTEXT_EVENT_TYPE_TO_ACTION, RECORD_TYPE_TO_ENUM
from .play_ground import PlayGround

DLT_GATEWAY_HOST = '172.254.254.2'
DLT_GATEWAY_PORT = 50051

NUM_ACTIONS = 100
DOMAIN_UUID = 'perf-test-fake-domain'

CSV_FILEPATH = 'data/perf/scenario_2/dlt/2023-05May-30/response_time'

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

def _event_handler(play_ground : PlayGround, event : DltRecordEvent) -> None:
    # Filter undesired/unsupported/wrong domain_uuids, actions, and record types
    # Update notification time in PlayGround.PerfPoints
    # Return None to prevent storing the events in the DLT Events' Collector internal queue

    domain_uuid = event.record_id.domain_uuid.uuid
    if domain_uuid != DOMAIN_UUID: return None

    action = CONTEXT_EVENT_TYPE_TO_ACTION.get(event.event.event_type)
    if action is None: return None

    record_type = RECORD_TYPE_TO_ENUM.get(event.record_id.type)
    if record_type is None: return None

    #event_time = event.event.timestamp.timestamp
    record_uuid = event.record_id.record_uuid.uuid
    play_ground.perf_data.get(action, record_type, record_uuid).set_time_notified(time.time())
    return None

def main() -> None:
    dltgateway_client = DltGatewayClient(host=DLT_GATEWAY_HOST, port=DLT_GATEWAY_PORT)

    play_ground = PlayGround(dltgateway_client, DOMAIN_UUID)
    event_handler = functools.partial(_event_handler, play_ground)

    dltgateway_collector = DltEventsCollector(
        dltgateway_client, log_events_received=False, event_handler=event_handler)
    dltgateway_collector.start()

    time.sleep(3)

    LOGGER.info('Starting {:d} actions...'.format(NUM_ACTIONS))

    num_action = 0
    while num_action < NUM_ACTIONS:
        if num_action > 0 and num_action % 10 == 0:
            str_stats = play_ground.perf_data.stats_to_str()
            MSG = 'Running action {:d}/{:d}...\n{:s}'
            LOGGER.info(MSG.format(num_action, NUM_ACTIONS, str_stats))
        completed = play_ground.run_random_operation()
        if not completed: continue
        num_action += 1

    LOGGER.info('Completed {:d} actions!'.format(NUM_ACTIONS))

    dltgateway_collector.stop()

    play_ground.perf_data.to_csv(CSV_FILEPATH)
    return 0

if __name__ == '__main__':
    sys.exit(main())
+92 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json, time
from typing import Dict, Union
from common.proto.context_pb2 import Device, Link, Service, Slice
from common.proto.dlt_gateway_pb2 import (
    DltRecord, DltRecordId, DltRecordOperationEnum, DltRecordStatus, DltRecordTypeEnum)
from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.client.DltGatewayClient import DltGatewayClient
from .PerfPoint import PerfPoint

DLT_OPERATION_CREATE = DltRecordOperationEnum.DLTRECORDOPERATION_ADD
DLT_OPERATION_UPDATE = DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE
DLT_OPERATION_DELETE = DltRecordOperationEnum.DLTRECORDOPERATION_DELETE

DLT_RECORD_TYPE_DEVICE  = DltRecordTypeEnum.DLTRECORDTYPE_DEVICE
DLT_RECORD_TYPE_LINK    = DltRecordTypeEnum.DLTRECORDTYPE_LINK
DLT_RECORD_TYPE_SERVICE = DltRecordTypeEnum.DLTRECORDTYPE_SERVICE
DLT_RECORD_TYPE_SLICE   = DltRecordTypeEnum.DLTRECORDTYPE_SLICE

def dlt_record_set(
    dltgateway_client : DltGatewayClient, perf_point : PerfPoint,
    operation : DltRecordOperationEnum, domain_uuid : str, objekt : Union[Device, Link, Service, Slice]
) -> DltRecordStatus:
    if isinstance(objekt, Device):
        record_type = DLT_RECORD_TYPE_DEVICE
        record_uuid = objekt.device_id.device_uuid.uuid
    elif isinstance(objekt, Link):
        record_type = DLT_RECORD_TYPE_LINK
        record_uuid = objekt.link_id.link_uuid.uuid
    elif isinstance(objekt, Service):
        record_type = DLT_RECORD_TYPE_SERVICE
        record_uuid = objekt.service_id.service_uuid.uuid
    elif isinstance(objekt, Slice):
        record_type = DLT_RECORD_TYPE_SLICE
        record_uuid = objekt.slice_id.slice_uuid.uuid
    else:
        raise NotImplementedError('Object({:s}) not supported'.format(str(type(objekt))))

    dlt_req = DltRecord()
    dlt_req.record_id.domain_uuid.uuid = domain_uuid  # pylint: disable=no-member
    dlt_req.record_id.type             = record_type  # pylint: disable=no-member
    dlt_req.record_id.record_uuid.uuid = record_uuid  # pylint: disable=no-member
    dlt_req.operation                  = operation
    dlt_req.data_json                  = grpc_message_to_json_string(objekt)

    perf_point.set_time_requested(time.time())
    reply = dltgateway_client.RecordToDlt(dlt_req)
    perf_point.set_time_replied(time.time())

    return reply

def dlt_record_found(record : DltRecord) -> bool:
    return all([
        len(record.record_id.domain_uuid.uuid) > 0,
        record.record_id.type != DltRecordTypeEnum.DLTRECORDTYPE_UNDEFINED,
        len(record.record_id.record_uuid.uuid) > 0,
        #record.operation != DltRecordOperationEnum.DLTRECORDOPERATION_UNDEFINED,
        len(record.data_json) > 0,
    ])

def dlt_record_get(
    dltgateway_client : DltGatewayClient, perf_point : PerfPoint,
    domain_uuid : str, record_type : DltRecordTypeEnum, record_uuid : str
) -> Dict:
    dlt_rec_id = DltRecordId()
    dlt_rec_id.domain_uuid.uuid = domain_uuid   # pylint: disable=no-member
    dlt_rec_id.type             = record_type
    dlt_rec_id.record_uuid.uuid = record_uuid   # pylint: disable=no-member

    perf_point.set_time_requested(time.time())
    reply = dltgateway_client.GetFromDlt(dlt_rec_id)
    perf_point.set_time_replied(time.time())

    if dlt_record_found(reply): return json.loads(reply.data_json)

    MSG = 'DltRecord({:s}/{:s}/{:s}) not found'
    str_record_type = DltRecordTypeEnum.Name(record_type)
    msg = MSG.format(str(domain_uuid), str_record_type, str(record_uuid))
    raise Exception(msg) # pylint: disable=broad-exception-raised
Loading