Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, json, random, threading
from typing import Dict, Optional, Set, Tuple
from common.proto.context_pb2 import Empty
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.ConfigRule import json_config_rule_set
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import (
json_service_l2nm_planned, json_service_l3nm_planned, json_service_tapi_planned)
from common.tools.object_factory.Slice import json_slice
from context.client.ContextClient import ContextClient
from .Constants import (
REQUEST_TYPE_SERVICE_L2NM, REQUEST_TYPE_SERVICE_L3NM, REQUEST_TYPE_SERVICE_TAPI,
REQUEST_TYPE_SLICE_L2NM, REQUEST_TYPE_SLICE_L3NM)
ENDPOINT_COMPATIBILITY = {
'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:INPUT': 'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:OUTPUT',
'PHOTONIC_MEDIA:DWDM:G_50GHZ:INPUT' : 'PHOTONIC_MEDIA:DWDM:G_50GHZ:OUTPUT',
}
def __init__(self, parameters : Parameters) -> None:
self._parameters = parameters
self._available_device_endpoints : Dict[str, Set[str]] = dict()
self._used_device_endpoints : Dict[str, Dict[str, str]] = dict()
self._endpoint_ids_to_types : Dict[Tuple[str, str], str] = dict()
self._endpoint_types_to_ids : Dict[str, Set[Tuple[str, str]]] = dict()
def initialize(self) -> None:
with self._lock:
self._available_device_endpoints.clear()
self._used_device_endpoints.clear()
context_client = ContextClient()
devices = context_client.ListDevices(Empty())
for device in devices.devices:
device_uuid = device.device_id.device_uuid.uuid
_endpoints = self._available_device_endpoints.setdefault(device_uuid, set())
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
self._endpoint_ids_to_types.setdefault((device_uuid, endpoint_uuid), endpoint_type)
self._endpoint_types_to_ids.setdefault(endpoint_type, set()).add((device_uuid, endpoint_uuid))
links = context_client.ListLinks(Empty())
for link in links.links:
for endpoint_id in link.link_endpoint_ids:
device_uuid = endpoint_id.device_id.device_uuid.uuid
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
_endpoints = self._available_device_endpoints.get(device_uuid, set())
_endpoints.discard(endpoint_uuid)
if len(_endpoints) == 0: self._available_device_endpoints.pop(device_uuid, None)
endpoint_type = self._endpoint_ids_to_types.pop((device_uuid, endpoint_uuid), None)
if endpoint_type is None: continue
if endpoint_type not in self._endpoint_types_to_ids: continue
endpoints_for_type = self._endpoint_types_to_ids[endpoint_type]
endpoint_key = (device_uuid, endpoint_uuid)
if endpoint_key not in endpoints_for_type: continue
endpoints_for_type.discard(endpoint_key)
def num_requests_generated(self): return self._num_requests
def dump_state(self) -> None:
with self._lock:
_endpoints = {
device_uuid:[endpoint_uuid for endpoint_uuid in endpoint_uuids]
for device_uuid,endpoint_uuids in self._available_device_endpoints.items()
}
LOGGER.info('[dump_state] available_device_endpoints = {:s}'.format(json.dumps(_endpoints)))
LOGGER.info('[dump_state] used_device_endpoints = {:s}'.format(json.dumps(self._used_device_endpoints)))
def _use_device_endpoint(
self, service_uuid : str, endpoint_types : Optional[Set[str]] = None, exclude_device_uuids : Set[str] = set()
) -> Optional[Tuple[str, str]]:
with self._lock:
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
compatible_endpoints : Set[Tuple[str, str]] = set()
elegible_device_endpoints : Dict[str, Set[str]] = {}
if endpoint_types is None:
# allow all
elegible_device_endpoints : Dict[str, Set[str]] = {
device_uuid:device_endpoint_uuids
for device_uuid,device_endpoint_uuids in self._available_device_endpoints.items()
if device_uuid not in exclude_device_uuids and len(device_endpoint_uuids) > 0
}
else:
# allow only compatible endpoints
for endpoint_type in endpoint_types:
if endpoint_type not in self._endpoint_types_to_ids: continue
compatible_endpoints.update(self._endpoint_types_to_ids[endpoint_type])
for device_uuid,device_endpoint_uuids in self._available_device_endpoints.items():
if device_uuid in exclude_device_uuids or len(device_endpoint_uuids) == 0: continue
for endpoint_uuid in device_endpoint_uuids:
endpoint_key = (device_uuid,endpoint_uuid)
if endpoint_key not in compatible_endpoints: continue
elegible_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
if len(elegible_device_endpoints) == 0:
LOGGER.warning(' '.join([
'>> No endpoint is available:',
'endpoint_types={:s}'.format(str(endpoint_types)),
'exclude_device_uuids={:s}'.format(str(exclude_device_uuids)),
'self._endpoint_types_to_ids={:s}'.format(str(self._endpoint_types_to_ids)),
'self._available_device_endpoints={:s}'.format(str(self._available_device_endpoints)),
'compatible_endpoints={:s}'.format(str(compatible_endpoints)),
]))
return None
device_uuid = random.choice(list(elegible_device_endpoints.keys()))
device_endpoint_uuids = elegible_device_endpoints.get(device_uuid)
endpoint_uuid = random.choice(list(device_endpoint_uuids))
self._available_device_endpoints.setdefault(device_uuid, set()).discard(endpoint_uuid)
self._used_device_endpoints.setdefault(device_uuid, dict())[endpoint_uuid] = service_uuid
return device_uuid, endpoint_uuid
def _release_device_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None:
with self._lock:
self._used_device_endpoints.setdefault(device_uuid, set()).pop(endpoint_uuid, None)
self._available_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
self._num_requests += 1
num_request = self._num_requests
#request_uuid = str(uuid.uuid4())
request_uuid = 'svc_{:d}'.format(num_request)
# choose request type
request_type = random.choice(self._parameters.request_types)
if request_type in {REQUEST_TYPE_SERVICE_L2NM, REQUEST_TYPE_SERVICE_L3NM, REQUEST_TYPE_SERVICE_TAPI}:
return self._compose_service(num_request, request_uuid, request_type)
elif request_type in {REQUEST_TYPE_SLICE_L2NM, REQUEST_TYPE_SLICE_L3NM}:
return self._compose_slice(num_request, request_uuid, request_type)
def _compose_service(self, num_request : int, request_uuid : str, request_type : str) -> Optional[Dict]:
src_endpoint_types = set(ENDPOINT_COMPATIBILITY.keys()) if request_type in {REQUEST_TYPE_SERVICE_TAPI} else None
src = self._use_device_endpoint(request_uuid, endpoint_types=src_endpoint_types)
if src is None:
LOGGER.warning('>> No source endpoint is available')
return None
# identify compatible destination endpoint types
src_endpoint_type = self._endpoint_ids_to_types.get((src_device_uuid,src_endpoint_uuid))
dst_endpoint_type = ENDPOINT_COMPATIBILITY.get(src_endpoint_type)
dst_endpoint_types = {dst_endpoint_type} if request_type in {REQUEST_TYPE_SERVICE_TAPI} else None
# identify excluded destination devices
exclude_device_uuids = {} if request_type in {REQUEST_TYPE_SERVICE_TAPI} else {src_device_uuid}
# choose feasible destination endpoint
dst = self._use_device_endpoint(
request_uuid, endpoint_types=dst_endpoint_types, exclude_device_uuids=exclude_device_uuids)
# if destination endpoint not found, release source, and terminate current service generation
LOGGER.warning('>> No destination endpoint is available')
self._release_device_endpoint(src_device_uuid, src_endpoint_uuid)
return None
dst_device_uuid,dst_endpoint_uuid = dst
endpoint_ids = [
json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid),
json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
]
constraints = [
json_constraint_custom('bandwidth[gbps]', '10.0'),
json_constraint_custom('latency[ms]', '20.0'),
]
circuit_id = '{:03d}'.format(vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
config_rules = [
json_config_rule_set('/settings', {
'mtu': 1512
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id': src_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': dst_router_id,
'circuit_id': circuit_id,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id': dst_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': src_router_id,
'circuit_id': circuit_id,
}),
]
return json_service_l2nm_planned(
request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
constraints = [
json_constraint_custom('bandwidth[gbps]', '10.0'),
json_constraint_custom('latency[ms]', '20.0'),
]
vlan_id = num_request % 1000
bgp_as = 60000 + (num_request % 10000)
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
bgp_route_target = '{:5d}:{:03d}'.format(bgp_as, 333)
route_distinguisher = '{:5d}:{:03d}'.format(bgp_as, vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
src_address_ip = '.'.join([src_device_uuid.replace('R', ''), '0'] + src_endpoint_uuid.split('/'))
dst_address_ip = '.'.join([dst_device_uuid.replace('R', ''), '0'] + dst_endpoint_uuid.split('/'))
config_rules = [
json_config_rule_set('/settings', {
'mtu' : 1512,
'bgp_as' : bgp_as,
'bgp_route_target': bgp_route_target,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id' : src_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : src_address_ip,
'address_prefix' : 16,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id' : dst_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : dst_address_ip,
'address_prefix' : 16,
}),
]
return json_service_l3nm_planned(
request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
config_rules = [
json_config_rule_set('/settings', {
'capacity_value' : 50.0,
'capacity_unit' : 'GHz',
'layer_proto_name': 'PHOTONIC_MEDIA',
'layer_proto_qual': 'tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC',
'direction' : 'UNIDIRECTIONAL',
}),
]
return json_service_tapi_planned(
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
request_uuid, endpoint_ids=endpoint_ids, constraints=[], config_rules=config_rules)
def _compose_slice(self, num_request : int, request_uuid : str, request_type : str) -> Optional[Dict]:
# choose source endpoint
src = self._use_device_endpoint(request_uuid)
if src is None:
LOGGER.warning('>> No source endpoint is available')
return None
src_device_uuid,src_endpoint_uuid = src
# identify excluded destination devices
exclude_device_uuids = {} if request_type in {REQUEST_TYPE_SERVICE_TAPI} else {src_device_uuid}
# choose feasible destination endpoint
dst = self._use_device_endpoint(request_uuid, exclude_device_uuids=exclude_device_uuids)
# if destination endpoint not found, release source, and terminate current service generation
if dst is None:
LOGGER.warning('>> No destination endpoint is available')
self._release_device_endpoint(src_device_uuid, src_endpoint_uuid)
return None
# compose endpoints
dst_device_uuid,dst_endpoint_uuid = dst
endpoint_ids = [
json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid),
json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
]
constraints = [
json_constraint_custom('bandwidth[gbps]', '10.0'),
json_constraint_custom('latency[ms]', '20.0'),
]
if request_type == REQUEST_TYPE_SLICE_L2NM:
vlan_id = num_request % 1000
circuit_id = '{:03d}'.format(vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
config_rules = [
json_config_rule_set('/settings', {
'mtu': 1512
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id': src_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': dst_router_id,
'circuit_id': circuit_id,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id': dst_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': src_router_id,
'circuit_id': circuit_id,
}),
]
elif request_type == REQUEST_TYPE_SLICE_L3NM:
vlan_id = num_request % 1000
bgp_as = 60000 + (num_request % 10000)
bgp_route_target = '{:5d}:{:03d}'.format(bgp_as, 333)
route_distinguisher = '{:5d}:{:03d}'.format(bgp_as, vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
src_address_ip = '.'.join([src_device_uuid.replace('R', ''), '0'] + src_endpoint_uuid.split('/'))
dst_address_ip = '.'.join([dst_device_uuid.replace('R', ''), '0'] + dst_endpoint_uuid.split('/'))
config_rules = [
json_config_rule_set('/settings', {
'mtu' : 1512,
'bgp_as' : bgp_as,
'bgp_route_target': bgp_route_target,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id' : src_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : src_address_ip,
'address_prefix' : 16,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id' : dst_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : dst_address_ip,
'address_prefix' : 16,
}),
]
return json_slice(
request_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
def release_request(self, json_request : Dict) -> None:
if 'service_id' in json_request:
for endpoint_id in json_request['service_endpoint_ids']:
device_uuid = endpoint_id['device_id']['device_uuid']['uuid']
endpoint_uuid = endpoint_id['endpoint_uuid']['uuid']
self._release_device_endpoint(device_uuid, endpoint_uuid)
elif 'slice_id' in json_request:
for endpoint_id in json_request['slice_endpoint_ids']:
device_uuid = endpoint_id['device_id']['device_uuid']['uuid']
endpoint_uuid = endpoint_id['endpoint_uuid']['uuid']
self._release_device_endpoint(device_uuid, endpoint_uuid)