Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, json, random, threading
from typing import Dict, Optional, Set, Tuple
from common.proto.context_pb2 import Empty, TopologyId
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.ConfigRule import json_config_rule_set
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import (
json_service_l2nm_planned, json_service_l3nm_planned, json_service_tapi_planned)
from common.tools.object_factory.Slice import json_slice
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltConnectorClient import DltConnectorClient
from .Constants import ENDPOINT_COMPATIBILITY, RequestType
from .DltTools import record_device_to_dlt, record_link_to_dlt
def __init__(self, parameters : Parameters) -> None:
self._parameters = parameters
self._available_device_endpoints : Dict[str, Set[str]] = dict()
self._used_device_endpoints : Dict[str, Dict[str, str]] = dict()
self._endpoint_ids_to_types : Dict[Tuple[str, str], str] = dict()
self._endpoint_types_to_ids : Dict[str, Set[Tuple[str, str]]] = dict()
def initialize(self) -> None:
with self._lock:
self._available_device_endpoints.clear()
self._used_device_endpoints.clear()
context_client = ContextClient()
dlt_connector_client = DltConnectorClient()
if self._parameters.record_to_dlt:
dlt_domain_id = TopologyId(**json_topology_id('dlt-perf-eval'))
devices = context_client.ListDevices(Empty())
for device in devices.devices:
device_uuid = device.device_id.device_uuid.uuid
_endpoints = self._available_device_endpoints.setdefault(device_uuid, set())
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
self._endpoint_ids_to_types.setdefault((device_uuid, endpoint_uuid), endpoint_type)
self._endpoint_types_to_ids.setdefault(endpoint_type, set()).add((device_uuid, endpoint_uuid))
if self._parameters.record_to_dlt:
record_device_to_dlt(dlt_connector_client, dlt_domain_id, device.device_id)
links = context_client.ListLinks(Empty())
for link in links.links:
for endpoint_id in link.link_endpoint_ids:
device_uuid = endpoint_id.device_id.device_uuid.uuid
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
_endpoints = self._available_device_endpoints.get(device_uuid, set())
_endpoints.discard(endpoint_uuid)
if len(_endpoints) == 0: self._available_device_endpoints.pop(device_uuid, None)
endpoint_type = self._endpoint_ids_to_types.pop((device_uuid, endpoint_uuid), None)
if endpoint_type is None: continue
if endpoint_type not in self._endpoint_types_to_ids: continue
endpoints_for_type = self._endpoint_types_to_ids[endpoint_type]
endpoint_key = (device_uuid, endpoint_uuid)
if endpoint_key not in endpoints_for_type: continue
endpoints_for_type.discard(endpoint_key)
if self._parameters.record_to_dlt:
record_link_to_dlt(dlt_connector_client, dlt_domain_id, link.link_id)
def num_requests_generated(self): return self._num_requests
def dump_state(self) -> None:
with self._lock:
_endpoints = {
device_uuid:[endpoint_uuid for endpoint_uuid in endpoint_uuids]
for device_uuid,endpoint_uuids in self._available_device_endpoints.items()
}
LOGGER.info('[dump_state] available_device_endpoints = {:s}'.format(json.dumps(_endpoints)))
LOGGER.info('[dump_state] used_device_endpoints = {:s}'.format(json.dumps(self._used_device_endpoints)))
def _use_device_endpoint(
self, service_uuid : str, request_type : RequestType, endpoint_types : Optional[Set[str]] = None,
exclude_device_uuids : Set[str] = set(), exclude_endpoint_uuids : Set[Tuple[str, str]] = set(),
) -> Optional[Tuple[str, str]]:
with self._lock:
compatible_endpoints : Set[Tuple[str, str]] = set()
elegible_device_endpoints : Dict[str, Set[str]] = {}
if endpoint_types is None:
# allow all
elegible_device_endpoints : Dict[str, Set[str]] = {
device_uuid:[
endpoint_uuid for endpoint_uuid in device_endpoint_uuids
if (len(exclude_endpoint_uuids) == 0) or \
((device_uuid,endpoint_uuid) not in exclude_endpoint_uuids)
]
for device_uuid,device_endpoint_uuids in self._available_device_endpoints.items()
if (device_uuid not in exclude_device_uuids) and \
(len(device_endpoint_uuids) > 0)
}
else:
# allow only compatible endpoints
for endpoint_type in endpoint_types:
if endpoint_type not in self._endpoint_types_to_ids: continue
compatible_endpoints.update(self._endpoint_types_to_ids[endpoint_type])
for device_uuid,device_endpoint_uuids in self._available_device_endpoints.items():
if device_uuid in exclude_device_uuids or len(device_endpoint_uuids) == 0: continue
for endpoint_uuid in device_endpoint_uuids:
endpoint_key = (device_uuid,endpoint_uuid)
if endpoint_key in exclude_endpoint_uuids: continue
if endpoint_key not in compatible_endpoints: continue
elegible_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
if len(elegible_device_endpoints) == 0:
LOGGER.warning(' '.join([
'>> No endpoint is available:',
'endpoint_types={:s}'.format(str(endpoint_types)),
'exclude_device_uuids={:s}'.format(str(exclude_device_uuids)),
'self._endpoint_types_to_ids={:s}'.format(str(self._endpoint_types_to_ids)),
'self._available_device_endpoints={:s}'.format(str(self._available_device_endpoints)),
'compatible_endpoints={:s}'.format(str(compatible_endpoints)),
]))
return None
device_uuid = random.choice(list(elegible_device_endpoints.keys()))
device_endpoint_uuids = elegible_device_endpoints.get(device_uuid)
endpoint_uuid = random.choice(list(device_endpoint_uuids))
if request_type not in {RequestType.SERVICE_MW}:
# reserve the resources
self._available_device_endpoints.setdefault(device_uuid, set()).discard(endpoint_uuid)
self._used_device_endpoints.setdefault(device_uuid, dict())[endpoint_uuid] = service_uuid
return device_uuid, endpoint_uuid
def _release_device_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None:
with self._lock:
self._used_device_endpoints.setdefault(device_uuid, dict()).pop(endpoint_uuid, None)
self._available_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
self._num_requests += 1
num_request = self._num_requests
#request_uuid = str(uuid.uuid4())
request_uuid = 'svc_{:d}'.format(num_request)
# choose request type
request_type = random.choice(self._parameters.request_types)
if request_type in {
RequestType.SERVICE_L2NM, RequestType.SERVICE_L3NM, RequestType.SERVICE_TAPI, RequestType.SERVICE_MW
}:
return self._compose_service(num_request, request_uuid, request_type)
elif request_type in {RequestType.SLICE_L2NM, RequestType.SLICE_L3NM}:
return self._compose_slice(num_request, request_uuid, request_type)
def _compose_service(self, num_request : int, request_uuid : str, request_type : str) -> Optional[Dict]:
src_endpoint_types = set(ENDPOINT_COMPATIBILITY.keys()) if request_type in {RequestType.SERVICE_TAPI} else None
src = self._use_device_endpoint(request_uuid, request_type, endpoint_types=src_endpoint_types)
if src is None:
LOGGER.warning('>> No source endpoint is available')
return None
# identify compatible destination endpoint types
src_endpoint_type = self._endpoint_ids_to_types.get((src_device_uuid,src_endpoint_uuid))
dst_endpoint_type = ENDPOINT_COMPATIBILITY.get(src_endpoint_type)
dst_endpoint_types = {dst_endpoint_type} if request_type in {RequestType.SERVICE_TAPI} else None
exclude_device_uuids = {} if request_type in {RequestType.SERVICE_TAPI, RequestType.SERVICE_MW} else {src_device_uuid}
# choose feasible destination endpoint
dst = self._use_device_endpoint(
request_uuid, request_type, endpoint_types=dst_endpoint_types, exclude_device_uuids=exclude_device_uuids,
exclude_endpoint_uuids={src})
# if destination endpoint not found, release source, and terminate current service generation
LOGGER.warning('>> No destination endpoint is available')
self._release_device_endpoint(src_device_uuid, src_endpoint_uuid)
return None
dst_device_uuid,dst_endpoint_uuid = dst
endpoint_ids = [
json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid),
json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
]
constraints = [
json_constraint_custom('bandwidth[gbps]', '10.0'),
json_constraint_custom('latency[ms]', '20.0'),
]
circuit_id = '{:03d}'.format(vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
config_rules = [
json_config_rule_set('/settings', {
'mtu': 1512
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id': src_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': dst_router_id,
'circuit_id': circuit_id,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id': dst_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': src_router_id,
'circuit_id': circuit_id,
}),
]
return json_service_l2nm_planned(
request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
constraints = [
json_constraint_custom('bandwidth[gbps]', '10.0'),
json_constraint_custom('latency[ms]', '20.0'),
]
vlan_id = num_request % 1000
bgp_as = 60000 + (num_request % 10000)
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
bgp_route_target = '{:5d}:{:03d}'.format(bgp_as, 333)
route_distinguisher = '{:5d}:{:03d}'.format(bgp_as, vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
src_address_ip = '.'.join([src_device_uuid.replace('R', ''), '0'] + src_endpoint_uuid.split('/'))
dst_address_ip = '.'.join([dst_device_uuid.replace('R', ''), '0'] + dst_endpoint_uuid.split('/'))
config_rules = [
json_config_rule_set('/settings', {
'mtu' : 1512,
'bgp_as' : bgp_as,
'bgp_route_target': bgp_route_target,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id' : src_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : src_address_ip,
'address_prefix' : 16,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id' : dst_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : dst_address_ip,
'address_prefix' : 16,
}),
]
return json_service_l3nm_planned(
request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
config_rules = [
json_config_rule_set('/settings', {
'capacity_value' : 50.0,
'capacity_unit' : 'GHz',
'layer_proto_name': 'PHOTONIC_MEDIA',
'layer_proto_qual': 'tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC',
'direction' : 'UNIDIRECTIONAL',
}),
]
return json_service_tapi_planned(
request_uuid, endpoint_ids=endpoint_ids, constraints=[], config_rules=config_rules)
elif request_type == RequestType.SERVICE_MW:
vlan_id = 1000 + num_request % 1000
config_rules = [
json_config_rule_set('/settings', {
'vlan_id': vlan_id,
}),
]
return json_service_l2nm_planned(
request_uuid, endpoint_ids=endpoint_ids, constraints=[], config_rules=config_rules)
def _compose_slice(self, num_request : int, request_uuid : str, request_type : str) -> Optional[Dict]:
# choose source endpoint
src = self._use_device_endpoint(request_uuid, request_type)
if src is None:
LOGGER.warning('>> No source endpoint is available')
return None
src_device_uuid,src_endpoint_uuid = src
# identify excluded destination devices
exclude_device_uuids = {} if request_type in {RequestType.SERVICE_TAPI, RequestType.SERVICE_MW} else {src_device_uuid}
dst = self._use_device_endpoint(request_uuid, request_type, exclude_device_uuids=exclude_device_uuids)
# if destination endpoint not found, release source, and terminate current service generation
if dst is None:
LOGGER.warning('>> No destination endpoint is available')
self._release_device_endpoint(src_device_uuid, src_endpoint_uuid)
return None
# compose endpoints
dst_device_uuid,dst_endpoint_uuid = dst
endpoint_ids = [
json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid),
json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
]
constraints = [
json_constraint_custom('bandwidth[gbps]', '10.0'),
json_constraint_custom('latency[ms]', '20.0'),
]
vlan_id = num_request % 1000
circuit_id = '{:03d}'.format(vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
config_rules = [
json_config_rule_set('/settings', {
'mtu': 1512
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id': src_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': dst_router_id,
'circuit_id': circuit_id,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id': dst_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': src_router_id,
'circuit_id': circuit_id,
}),
]
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
vlan_id = num_request % 1000
bgp_as = 60000 + (num_request % 10000)
bgp_route_target = '{:5d}:{:03d}'.format(bgp_as, 333)
route_distinguisher = '{:5d}:{:03d}'.format(bgp_as, vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
src_address_ip = '.'.join([src_device_uuid.replace('R', ''), '0'] + src_endpoint_uuid.split('/'))
dst_address_ip = '.'.join([dst_device_uuid.replace('R', ''), '0'] + dst_endpoint_uuid.split('/'))
config_rules = [
json_config_rule_set('/settings', {
'mtu' : 1512,
'bgp_as' : bgp_as,
'bgp_route_target': bgp_route_target,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id' : src_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : src_address_ip,
'address_prefix' : 16,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id' : dst_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : dst_address_ip,
'address_prefix' : 16,
}),
]
return json_slice(
request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
def release_request(self, json_request : Dict) -> None:
if 'service_id' in json_request:
for endpoint_id in json_request['service_endpoint_ids']:
device_uuid = endpoint_id['device_id']['device_uuid']['uuid']
endpoint_uuid = endpoint_id['endpoint_uuid']['uuid']
self._release_device_endpoint(device_uuid, endpoint_uuid)
elif 'slice_id' in json_request:
for endpoint_id in json_request['slice_endpoint_ids']:
device_uuid = endpoint_id['device_id']['device_uuid']['uuid']
endpoint_uuid = endpoint_id['endpoint_uuid']['uuid']
self._release_device_endpoint(device_uuid, endpoint_uuid)