Commit 65a8011b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Tools - load-gen:

- added support to record load-gen operations in DLT to take performance evaluation metrics
- improved mixed request support
parent c56e0def
Loading
Loading
Loading
Loading
+13 −5
Original line number Diff line number Diff line
@@ -12,8 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

REQUEST_TYPE_SERVICE_L2NM = 'svc-l2nm'
REQUEST_TYPE_SERVICE_L3NM = 'svc-l3nm'
REQUEST_TYPE_SERVICE_TAPI = 'svc-tapi'
REQUEST_TYPE_SLICE_L2NM   = 'slc-l2nm'
REQUEST_TYPE_SLICE_L3NM   = 'slc-l3nm'
from enum import Enum

class RequestType(Enum):
    SERVICE_L2NM = 'svc-l2nm'
    SERVICE_L3NM = 'svc-l3nm'
    SERVICE_TAPI = 'svc-tapi'
    SLICE_L2NM   = 'slc-l2nm'
    SLICE_L3NM   = 'slc-l3nm'

ENDPOINT_COMPATIBILITY = {
    'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:INPUT': 'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:OUTPUT',
    'PHOTONIC_MEDIA:DWDM:G_50GHZ:INPUT'  : 'PHOTONIC_MEDIA:DWDM:G_50GHZ:OUTPUT',
}
+123 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json, queue
from typing import Optional, Set, Tuple
from common.proto.context_pb2 import DeviceId, LinkId, ServiceId, SliceId, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltConnectorClient import DltConnectorClient

def explore_entities_to_record(
    slice_id : Optional[SliceId] = None, service_id : Optional[ServiceId] = None
) -> Tuple[Set[str], Set[str], Set[str]]:

    context_client = ContextClient()

    slices_to_record   : Set[str] = set()
    services_to_record : Set[str] = set()
    devices_to_record  : Set[str] = set()

    slices_to_explore = queue.Queue()
    services_to_explore = queue.Queue()
    if slice_id is not None: slices_to_explore.put(slice_id)
    if service_id is not None: services_to_explore.put(service_id)

    while not slices_to_explore.empty():
        slice_id = slices_to_explore.get()
        slices_to_record.add(grpc_message_to_json_string(slice_id))

        slice_ = context_client.GetSlice(slice_id)

        for endpoint_id in slice_.slice_endpoint_ids:
            devices_to_record.add(grpc_message_to_json_string(endpoint_id.device_id))
        for subslice_id in slice_.slice_subslice_ids:
            slices_to_explore.put(subslice_id)
        for service_id in slice_.slice_service_ids:
            services_to_explore.put(service_id)

    while not services_to_explore.empty():
        service_id = services_to_explore.get()
        services_to_record.add(grpc_message_to_json_string(service_id))

        service = context_client.GetService(service_id)

        for endpoint_id in service.service_endpoint_ids:
            devices_to_record.add(grpc_message_to_json_string(endpoint_id.device_id))

        connections = context_client.ListConnections(service_id)
        for connection in connections.connections:
            for endpoint_id in connection.path_hops_endpoint_ids:
                devices_to_record.add(grpc_message_to_json_string(endpoint_id.device_id))
            for service_id in connection.sub_service_ids:
                services_to_explore.put(service_id)

    return slices_to_record, services_to_record, devices_to_record

def record_device_to_dlt(
    dlt_connector_client : DltConnectorClient, domain_id : TopologyId, device_id : DeviceId, delete : bool = False
) -> None:
    dlt_device_id = DltDeviceId()
    dlt_device_id.topology_id.CopyFrom(domain_id)       # pylint: disable=no-member
    dlt_device_id.device_id.CopyFrom(device_id)         # pylint: disable=no-member
    dlt_device_id.delete = delete
    dlt_connector_client.RecordDevice(dlt_device_id)

def record_link_to_dlt(
    dlt_connector_client : DltConnectorClient, domain_id : TopologyId, link_id : LinkId, delete : bool = False
) -> None:
    dlt_link_id = DltLinkId()
    dlt_link_id.topology_id.CopyFrom(domain_id)         # pylint: disable=no-member
    dlt_link_id.link_id.CopyFrom(link_id)               # pylint: disable=no-member
    dlt_link_id.delete = delete
    dlt_connector_client.RecordLink(dlt_link_id)

def record_service_to_dlt(
    dlt_connector_client : DltConnectorClient, domain_id : TopologyId, service_id : ServiceId, delete : bool = False
) -> None:
    dlt_service_id = DltServiceId()
    dlt_service_id.topology_id.CopyFrom(domain_id)      # pylint: disable=no-member
    dlt_service_id.service_id.CopyFrom(service_id)      # pylint: disable=no-member
    dlt_service_id.delete = delete
    dlt_connector_client.RecordService(dlt_service_id)

def record_slice_to_dlt(
    dlt_connector_client : DltConnectorClient, domain_id : TopologyId, slice_id : SliceId, delete : bool = False
) -> None:
    dlt_slice_id = DltSliceId()
    dlt_slice_id.topology_id.CopyFrom(domain_id)        # pylint: disable=no-member
    dlt_slice_id.slice_id.CopyFrom(slice_id)            # pylint: disable=no-member
    dlt_slice_id.delete = delete
    dlt_connector_client.RecordSlice(dlt_slice_id)

def record_entities(
    slices_to_record : Set[str] = set(), services_to_record : Set[str] = set(), devices_to_record : Set[str] = set(),
    delete : bool = False
) -> None:
    dlt_connector_client = DltConnectorClient()
    dlt_domain_id = TopologyId(**json_topology_id('dlt-perf-eval'))

    for str_device_id in devices_to_record:
        device_id = DeviceId(**(json.loads(str_device_id)))
        record_device_to_dlt(dlt_connector_client, dlt_domain_id, device_id, delete=delete)

    for str_service_id in services_to_record:
        service_id = ServiceId(**(json.loads(str_service_id)))
        record_service_to_dlt(dlt_connector_client, dlt_domain_id, service_id, delete=delete)

    for str_slice_id in slices_to_record:
        slice_id = SliceId(**(json.loads(str_slice_id)))
        record_slice_to_dlt(dlt_connector_client, dlt_domain_id, slice_id, delete=delete)
+16 −1
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@ class Parameters:
    def __init__(
        self, num_requests : int, request_types : List[str], offered_load : Optional[float] = None,
        inter_arrival_time : Optional[float] = None, holding_time : Optional[float] = None,
        dry_mode : bool = False
        dry_mode : bool = False, record_to_dlt : bool = False, dlt_domain_id : Optional[str] = None
    ) -> None:
        self._num_requests = num_requests
        self._request_types = request_types
@@ -26,6 +26,8 @@ class Parameters:
        self._inter_arrival_time = inter_arrival_time
        self._holding_time = holding_time
        self._dry_mode = dry_mode
        self._record_to_dlt = record_to_dlt
        self._dlt_domain_id = dlt_domain_id

        if self._offered_load is None and self._holding_time is not None and self._inter_arrival_time is not None:
            self._offered_load = self._holding_time / self._inter_arrival_time
@@ -33,6 +35,13 @@ class Parameters:
            self._inter_arrival_time = self._holding_time / self._offered_load
        elif self._offered_load is not None and self._holding_time is None and self._inter_arrival_time is not None:
            self._holding_time = self._offered_load * self._inter_arrival_time
        else:
            MSG = 'Exactly two of offered_load({:s}), inter_arrival_time({:s}), holding_time({:s}) must be specified.'
            raise Exception(MSG.format(str(self._offered_load), str(self._inter_arrival_time), str(self._holding_time)))

        if self._record_to_dlt and self._dlt_domain_id is None:
            MSG = 'Parameter dlt_domain_id({:s}) must be specified with record_to_dlt({:s}).'
            raise Exception(MSG.format(str(self._dlt_domain_id), str(self._record_to_dlt)))

    @property
    def num_requests(self): return self._num_requests
@@ -51,3 +60,9 @@ class Parameters:

    @property
    def dry_mode(self): return self._dry_mode

    @property
    def record_to_dlt(self): return self._record_to_dlt

    @property
    def dlt_domain_id(self): return self._dlt_domain_id
+26 −20
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@

import logging, json, random, threading
from typing import Dict, Optional, Set, Tuple
from common.proto.context_pb2 import Empty
from common.proto.context_pb2 import Empty, TopologyId
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.ConfigRule import json_config_rule_set
from common.tools.object_factory.Device import json_device_id
@@ -22,19 +22,15 @@ from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import (
    json_service_l2nm_planned, json_service_l3nm_planned, json_service_tapi_planned)
from common.tools.object_factory.Slice import json_slice
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from .Constants import (
    REQUEST_TYPE_SERVICE_L2NM, REQUEST_TYPE_SERVICE_L3NM, REQUEST_TYPE_SERVICE_TAPI,
    REQUEST_TYPE_SLICE_L2NM, REQUEST_TYPE_SLICE_L3NM)
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from tests.tools.load_gen.DltTools import record_device_to_dlt, record_link_to_dlt
from .Constants import ENDPOINT_COMPATIBILITY, RequestType
from .Parameters import Parameters

LOGGER = logging.getLogger(__name__)

ENDPOINT_COMPATIBILITY = {
    'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:INPUT': 'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:OUTPUT',
    'PHOTONIC_MEDIA:DWDM:G_50GHZ:INPUT'  : 'PHOTONIC_MEDIA:DWDM:G_50GHZ:OUTPUT',
}

class RequestGenerator:
    def __init__(self, parameters : Parameters) -> None:
        self._parameters = parameters
@@ -51,6 +47,10 @@ class RequestGenerator:
            self._used_device_endpoints.clear()

            context_client = ContextClient()
            dlt_connector_client = DltConnectorClient()

            if self._parameters.record_to_dlt:
                dlt_domain_id = TopologyId(**json_topology_id('dlt-perf-eval'))

            devices = context_client.ListDevices(Empty())
            for device in devices.devices:
@@ -63,6 +63,9 @@ class RequestGenerator:
                    self._endpoint_ids_to_types.setdefault((device_uuid, endpoint_uuid), endpoint_type)
                    self._endpoint_types_to_ids.setdefault(endpoint_type, set()).add((device_uuid, endpoint_uuid))
                
                if self._parameters.record_to_dlt:
                    record_device_to_dlt(dlt_connector_client, dlt_domain_id, device.device_id)

            links = context_client.ListLinks(Empty())
            for link in links.links:
                for endpoint_id in link.link_endpoint_ids:
@@ -81,6 +84,9 @@ class RequestGenerator:
                    if endpoint_key not in endpoints_for_type: continue
                    endpoints_for_type.discard(endpoint_key)
            
                    if self._parameters.record_to_dlt:
                        record_link_to_dlt(dlt_connector_client, dlt_domain_id, link.link_id)

    @property
    def num_requests_generated(self): return self._num_requests

@@ -153,14 +159,14 @@ class RequestGenerator:
        # choose request type
        request_type = random.choice(self._parameters.request_types)

        if request_type in {REQUEST_TYPE_SERVICE_L2NM, REQUEST_TYPE_SERVICE_L3NM, REQUEST_TYPE_SERVICE_TAPI}:
        if request_type in {RequestType.SERVICE_L2NM, RequestType.SERVICE_L3NM, RequestType.SERVICE_TAPI}:
            return self._compose_service(num_request, request_uuid, request_type)
        elif request_type in {REQUEST_TYPE_SLICE_L2NM, REQUEST_TYPE_SLICE_L3NM}:
        elif request_type in {RequestType.SLICE_L2NM, RequestType.SLICE_L3NM}:
            return self._compose_slice(num_request, request_uuid, request_type)

    def _compose_service(self, num_request : int, request_uuid : str, request_type : str) -> Optional[Dict]:
        # choose source endpoint
        src_endpoint_types = set(ENDPOINT_COMPATIBILITY.keys()) if request_type in {REQUEST_TYPE_SERVICE_TAPI} else None
        src_endpoint_types = set(ENDPOINT_COMPATIBILITY.keys()) if request_type in {RequestType.SERVICE_TAPI} else None
        src = self._use_device_endpoint(request_uuid, endpoint_types=src_endpoint_types)
        if src is None:
            LOGGER.warning('>> No source endpoint is available')
@@ -170,10 +176,10 @@ class RequestGenerator:
        # identify compatible destination endpoint types
        src_endpoint_type = self._endpoint_ids_to_types.get((src_device_uuid,src_endpoint_uuid))
        dst_endpoint_type = ENDPOINT_COMPATIBILITY.get(src_endpoint_type)
        dst_endpoint_types = {dst_endpoint_type} if request_type in {REQUEST_TYPE_SERVICE_TAPI} else None
        dst_endpoint_types = {dst_endpoint_type} if request_type in {RequestType.SERVICE_TAPI} else None

        # identify excluded destination devices
        exclude_device_uuids = {} if request_type in {REQUEST_TYPE_SERVICE_TAPI} else {src_device_uuid}
        exclude_device_uuids = {} if request_type in {RequestType.SERVICE_TAPI} else {src_device_uuid}

        # choose feasible destination endpoint
        dst = self._use_device_endpoint(
@@ -192,7 +198,7 @@ class RequestGenerator:
            json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
        ]

        if request_type == REQUEST_TYPE_SERVICE_L2NM:
        if request_type == RequestType.SERVICE_L2NM:
            constraints = [
                json_constraint_custom('bandwidth[gbps]', '10.0'),
                json_constraint_custom('latency[ms]',     '20.0'),
@@ -223,7 +229,7 @@ class RequestGenerator:
            return json_service_l2nm_planned(
                request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)

        elif request_type == REQUEST_TYPE_SERVICE_L3NM:
        elif request_type == RequestType.SERVICE_L3NM:
            constraints = [
                json_constraint_custom('bandwidth[gbps]', '10.0'),
                json_constraint_custom('latency[ms]',     '20.0'),
@@ -262,7 +268,7 @@ class RequestGenerator:
            return json_service_l3nm_planned(
                request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)

        elif request_type == REQUEST_TYPE_SERVICE_TAPI:
        elif request_type == RequestType.SERVICE_TAPI:
            config_rules = [
                json_config_rule_set('/settings', {
                    'capacity_value'  : 50.0,
@@ -284,7 +290,7 @@ class RequestGenerator:
        src_device_uuid,src_endpoint_uuid = src

        # identify excluded destination devices
        exclude_device_uuids = {} if request_type in {REQUEST_TYPE_SERVICE_TAPI} else {src_device_uuid}
        exclude_device_uuids = {} if request_type in {RequestType.SERVICE_TAPI} else {src_device_uuid}

        # choose feasible destination endpoint
        dst = self._use_device_endpoint(request_uuid, exclude_device_uuids=exclude_device_uuids)
@@ -306,7 +312,7 @@ class RequestGenerator:
            json_constraint_custom('latency[ms]',     '20.0'),
        ]

        if request_type == REQUEST_TYPE_SLICE_L2NM:
        if request_type == RequestType.SLICE_L2NM:
            vlan_id = num_request % 1000
            circuit_id = '{:03d}'.format(vlan_id)
            src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
@@ -331,7 +337,7 @@ class RequestGenerator:
                }),
            ]

        elif request_type == REQUEST_TYPE_SLICE_L3NM:
        elif request_type == RequestType.SLICE_L3NM:
            vlan_id = num_request % 1000
            bgp_as = 60000 + (num_request % 10000)
            bgp_route_target = '{:5d}:{:03d}'.format(bgp_as, 333)
+80 −32
Original line number Diff line number Diff line
@@ -13,14 +13,15 @@
# limitations under the License.

import copy, logging, pytz, random
from datetime import datetime, timedelta
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import Dict
from datetime import datetime, timedelta
from typing import Dict, Optional
from common.proto.context_pb2 import Service, ServiceId, Slice, SliceId
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from .DltTools import explore_entities_to_record, record_entities
from .Parameters import Parameters
from .RequestGenerator import RequestGenerator

@@ -80,16 +81,7 @@ class RequestScheduler:
            dst_endpoint_uuid = request['service_endpoint_ids'][1]['endpoint_uuid']['uuid']
            LOGGER.info('Setup Service: uuid=%s src=%s:%s dst=%s:%s',
                service_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid)

            if not self._parameters.dry_mode:
                request_add = copy.deepcopy(request)
                request_add['service_endpoint_ids'] = []
                request_add['service_constraints'] = []
                request_add['service_config'] = {'config_rules': []}
                service_client = ServiceClient()    # create instances per request to load balance between pods
                service_client.CreateService(Service(**request_add))
                service_client.UpdateService(Service(**request))
                service_client.close()
            self._create_update(service=request)

        elif 'slice_id' in request:
            slice_uuid = request['slice_id']['slice_uuid']['uuid']
@@ -99,16 +91,7 @@ class RequestScheduler:
            dst_endpoint_uuid = request['slice_endpoint_ids'][1]['endpoint_uuid']['uuid']
            LOGGER.info('Setup Slice: uuid=%s src=%s:%s dst=%s:%s',
                slice_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid)

            if not self._parameters.dry_mode:
                request_add = copy.deepcopy(request)
                request_add['slice_endpoint_ids'] = []
                request_add['slice_constraints'] = []
                request_add['slice_config'] = {'config_rules': []}
                slice_client = SliceClient()    # create instances per request to load balance between pods
                slice_client.CreateSlice(Slice(**request_add))
                slice_client.UpdateSlice(Slice(**request))
                slice_client.close()
            self._create_update(slice_=request)

        self._schedule_request_teardown(request)

@@ -121,11 +104,7 @@ class RequestScheduler:
            dst_endpoint_uuid = request['service_endpoint_ids'][1]['endpoint_uuid']['uuid']
            LOGGER.info('Teardown Service: uuid=%s src=%s:%s dst=%s:%s',
                service_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid)

            if not self._parameters.dry_mode:
                service_client = ServiceClient()    # create instances per request to load balance between pods
                service_client.DeleteService(ServiceId(**(request['service_id'])))
                service_client.close()
            self._delete(service_id=ServiceId(**(request['service_id'])))

        elif 'slice_id' in request:
            slice_uuid = request['slice_id']['slice_uuid']['uuid']
@@ -135,10 +114,79 @@ class RequestScheduler:
            dst_endpoint_uuid = request['slice_endpoint_ids'][1]['endpoint_uuid']['uuid']
            LOGGER.info('Teardown Slice: uuid=%s src=%s:%s dst=%s:%s',
                slice_uuid, src_device_uuid, src_endpoint_uuid, dst_device_uuid, dst_endpoint_uuid)
            self._delete(slice_id=SliceId(**(request['slice_id'])))

        self._generator.release_request(request)

    def _create_update(self, service : Optional[Dict] = None, slice_ : Optional[Dict] = None) -> None:
        if self._parameters.dry_mode: return

        service_id = None
        if service is not None:
            service_add = copy.deepcopy(service)
            service_add['service_endpoint_ids'] = []
            service_add['service_constraints'] = []
            service_add['service_config'] = {'config_rules': []}

            service_client = ServiceClient()
            service_id = service_client.CreateService(Service(**service_add))
            service_client.close()

            if not self._parameters.dry_mode:
                slice_client = SliceClient()    # create instances per request to load balance between pods
                slice_client.DeleteSlice(SliceId(**(request['slice_id'])))
        slice_id = None
        if slice_ is not None:
            slice_add = copy.deepcopy(slice_)
            slice_add['slice_endpoint_ids'] = []
            slice_add['slice_constraints'] = []
            slice_add['slice_config'] = {'config_rules': []}

            slice_client = SliceClient()
            slice_id = slice_client.CreateSlice(Slice(**slice_add))
            slice_client.close()

        self._generator.release_request(request)
        if self._parameters.record_to_dlt:
            entities_to_record = explore_entities_to_record(slice_id=slice_id, service_id=service_id)
            slices_to_record, services_to_record, devices_to_record = entities_to_record
            record_entities(
                slices_to_record=slices_to_record, services_to_record=services_to_record,
                devices_to_record=devices_to_record, delete=False)

        service_id = None
        if service is not None:
            service_client = ServiceClient()
            service_id = service_client.UpdateService(Service(**service))
            service_client.close()

        slice_id = None
        if slice_ is not None:
            slice_client = SliceClient()
            slice_id = slice_client.UpdateSlice(Slice(**slice_))
            slice_client.close()

        if self._parameters.record_to_dlt:
            entities_to_record = explore_entities_to_record(slice_id=slice_id, service_id=service_id)
            slices_to_record, services_to_record, devices_to_record = entities_to_record
            record_entities(
                slices_to_record=slices_to_record, services_to_record=services_to_record,
                devices_to_record=devices_to_record, delete=False)

    def _delete(self, service_id : Optional[ServiceId] = None, slice_id : Optional[SliceId] = None) -> None:
        if self._parameters.dry_mode: return

        if self._parameters.record_to_dlt:
            entities_to_record = explore_entities_to_record(slice_id=slice_id, service_id=service_id)
            slices_to_record, services_to_record, devices_to_record = entities_to_record

        if slice_id is not None:
            slice_client = SliceClient()
            slice_client.DeleteSlice(slice_id)
            slice_client.close()

        if service_id is not None:
            service_client = ServiceClient()
            service_client.DeleteService(service_id)
            service_client.close()

        if self._parameters.record_to_dlt:
            record_entities(
                slices_to_record=slices_to_record, services_to_record=services_to_record,
                devices_to_record=devices_to_record, delete=True)
Loading