Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, json, random, threading
from typing import Dict, Optional, Set, Tuple
from common.proto.context_pb2 import Empty
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.ConfigRule import json_config_rule_set
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import (
json_service_l2nm_planned, json_service_l3nm_planned, json_service_tapi_planned)
from context.client.ContextClient import ContextClient
from .Constants import SERVICE_TYPE_L2NM, SERVICE_TYPE_L3NM, SERVICE_TYPE_TAPI
from .Parameters import Parameters
ENDPOINT_COMPATIBILITY = {
'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:INPUT': 'PHOTONIC_MEDIA:FLEX:G_6_25GHZ:OUTPUT',
'PHOTONIC_MEDIA:DWDM:G_50GHZ:INPUT' : 'PHOTONIC_MEDIA:DWDM:G_50GHZ:OUTPUT',
}
def __init__(self, parameters : Parameters) -> None:
self._parameters = parameters
self._lock = threading.Lock()
self._num_services = 0
self._available_device_endpoints : Dict[str, Set[str]] = dict()
self._used_device_endpoints : Dict[str, Dict[str, str]] = dict()
self._endpoint_ids_to_types : Dict[Tuple[str, str], str] = dict()
self._endpoint_types_to_ids : Dict[str, Set[Tuple[str, str]]] = dict()
def initialize(self) -> None:
with self._lock:
self._available_device_endpoints.clear()
self._used_device_endpoints.clear()
context_client = ContextClient()
devices = context_client.ListDevices(Empty())
for device in devices.devices:
device_uuid = device.device_id.device_uuid.uuid
_endpoints = self._available_device_endpoints.setdefault(device_uuid, set())
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
self._endpoint_ids_to_types.setdefault((device_uuid, endpoint_uuid), endpoint_type)
self._endpoint_types_to_ids.setdefault(endpoint_type, set()).add((device_uuid, endpoint_uuid))
links = context_client.ListLinks(Empty())
for link in links.links:
for endpoint_id in link.link_endpoint_ids:
device_uuid = endpoint_id.device_id.device_uuid.uuid
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
_endpoints = self._available_device_endpoints.get(device_uuid, set())
_endpoints.discard(endpoint_uuid)
if len(_endpoints) == 0: self._available_device_endpoints.pop(device_uuid, None)
endpoint_type = self._endpoint_ids_to_types.pop((device_uuid, endpoint_uuid), None)
if endpoint_type is None: continue
if endpoint_type not in self._endpoint_types_to_ids: continue
endpoints_for_type = self._endpoint_types_to_ids[endpoint_type]
endpoint_key = (device_uuid, endpoint_uuid)
if endpoint_key not in endpoints_for_type: continue
endpoints_for_type.discard(endpoint_key)
@property
def num_services_generated(self): return self._num_services
def dump_state(self) -> None:
with self._lock:
_endpoints = {
device_uuid:[endpoint_uuid for endpoint_uuid in endpoint_uuids]
for device_uuid,endpoint_uuids in self._available_device_endpoints.items()
}
LOGGER.info('[dump_state] available_device_endpoints = {:s}'.format(json.dumps(_endpoints)))
LOGGER.info('[dump_state] used_device_endpoints = {:s}'.format(json.dumps(self._used_device_endpoints)))
def _use_device_endpoint(
self, service_uuid : str, endpoint_types : Optional[Set[str]] = None, exclude_device_uuids : Set[str] = set()
) -> Optional[Tuple[str, str]]:
with self._lock:
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
compatible_endpoints : Set[Tuple[str, str]] = set()
elegible_device_endpoints : Dict[str, Set[str]] = {}
if endpoint_types is None:
# allow all
elegible_device_endpoints : Dict[str, Set[str]] = {
device_uuid:device_endpoint_uuids
for device_uuid,device_endpoint_uuids in self._available_device_endpoints.items()
if device_uuid not in exclude_device_uuids and len(device_endpoint_uuids) > 0
}
else:
# allow only compatible endpoints
for endpoint_type in endpoint_types:
if endpoint_type not in self._endpoint_types_to_ids: continue
compatible_endpoints.update(self._endpoint_types_to_ids[endpoint_type])
for device_uuid,device_endpoint_uuids in self._available_device_endpoints.items():
if device_uuid in exclude_device_uuids or len(device_endpoint_uuids) == 0: continue
for endpoint_uuid in device_endpoint_uuids:
endpoint_key = (device_uuid,endpoint_uuid)
if endpoint_key not in compatible_endpoints: continue
elegible_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
if len(elegible_device_endpoints) == 0:
LOGGER.warning(' '.join([
'>> No endpoint is available:',
'endpoint_types={:s}'.format(str(endpoint_types)),
'exclude_device_uuids={:s}'.format(str(exclude_device_uuids)),
'self._endpoint_types_to_ids={:s}'.format(str(self._endpoint_types_to_ids)),
'self._available_device_endpoints={:s}'.format(str(self._available_device_endpoints)),
'compatible_endpoints={:s}'.format(str(compatible_endpoints)),
]))
return None
device_uuid = random.choice(list(elegible_device_endpoints.keys()))
device_endpoint_uuids = elegible_device_endpoints.get(device_uuid)
endpoint_uuid = random.choice(list(device_endpoint_uuids))
self._available_device_endpoints.setdefault(device_uuid, set()).discard(endpoint_uuid)
self._used_device_endpoints.setdefault(device_uuid, dict())[endpoint_uuid] = service_uuid
return device_uuid, endpoint_uuid
def _release_device_endpoint(self, device_uuid : str, endpoint_uuid : str) -> None:
with self._lock:
self._used_device_endpoints.setdefault(device_uuid, set()).pop(endpoint_uuid, None)
self._available_device_endpoints.setdefault(device_uuid, set()).add(endpoint_uuid)
def compose_service(self) -> Optional[Dict]:
with self._lock:
self._num_services += 1
num_service = self._num_services
#service_uuid = str(uuid.uuid4())
service_uuid = 'svc_{:d}'.format(num_service)
# choose service type
service_type = random.choice(self._parameters.service_types)
# choose source endpoint
src_endpoint_types = set(ENDPOINT_COMPATIBILITY.keys()) if service_type in {SERVICE_TYPE_TAPI} else None
src = self._use_device_endpoint(service_uuid, endpoint_types=src_endpoint_types)
if src is None:
LOGGER.warning('>> No source endpoint is available')
return None
# identify compatible destination endpoint types
src_endpoint_type = self._endpoint_ids_to_types.get((src_device_uuid,src_endpoint_uuid))
dst_endpoint_type = ENDPOINT_COMPATIBILITY.get(src_endpoint_type)
# identify expluded destination devices
exclude_device_uuids = {} if service_type in {SERVICE_TYPE_TAPI} else {src_device_uuid}
# choose feasible destination endpoint
dst = self._use_device_endpoint(
service_uuid, endpoint_types={dst_endpoint_type}, exclude_device_uuids=exclude_device_uuids)
# if destination endpoint not found, release source, and terminate current service generation
LOGGER.warning('>> No destination endpoint is available')
self._release_device_endpoint(src_device_uuid, src_endpoint_uuid)
return None
dst_device_uuid,dst_endpoint_uuid = dst
endpoint_ids = [
json_endpoint_id(json_device_id(src_device_uuid), src_endpoint_uuid),
json_endpoint_id(json_device_id(dst_device_uuid), dst_endpoint_uuid),
]
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
if service_type == SERVICE_TYPE_L2NM:
constraints = [
json_constraint_custom('bandwidth[gbps]', '10.0'),
json_constraint_custom('latency[ms]', '20.0'),
]
vlan_id = num_service % 1000
circuit_id = '{:03d}'.format(vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
config_rules = [
json_config_rule_set('/settings', {
'mtu': 1512
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id': src_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': dst_router_id,
'circuit_id': circuit_id,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id': dst_router_id,
'sub_interface_index': vlan_id,
'vlan_id': vlan_id,
'remote_router': src_router_id,
'circuit_id': circuit_id,
}),
]
return json_service_l2nm_planned(
service_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
elif service_type == SERVICE_TYPE_L3NM:
constraints = [
json_constraint_custom('bandwidth[gbps]', '10.0'),
json_constraint_custom('latency[ms]', '20.0'),
]
vlan_id = num_service % 1000
bgp_as = 60000 + (num_service % 10000)
bgp_route_target = '{:5d}:{:03d}'.format(bgp_as, 333)
route_distinguisher = '{:5d}:{:03d}'.format(bgp_as, vlan_id)
src_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
dst_router_id = '10.0.0.{:d}'.format(int(src_device_uuid.replace('R', '')))
src_address_ip = '.'.join([src_device_uuid.replace('R', ''), '0'] + src_endpoint_uuid.split('/'))
dst_address_ip = '.'.join([dst_device_uuid.replace('R', ''), '0'] + dst_endpoint_uuid.split('/'))
config_rules = [
json_config_rule_set('/settings', {
'mtu' : 1512,
'bgp_as' : bgp_as,
'bgp_route_target': bgp_route_target,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(src_device_uuid, src_endpoint_uuid), {
'router_id' : src_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : src_address_ip,
'address_prefix' : 16,
}),
json_config_rule_set('/device[{:s}]/endpoint[{:s}]/settings'.format(dst_device_uuid, dst_endpoint_uuid), {
'router_id' : dst_router_id,
'route_distinguisher': route_distinguisher,
'sub_interface_index': vlan_id,
'vlan_id' : vlan_id,
'address_ip' : dst_address_ip,
'address_prefix' : 16,
}),
]
return json_service_l3nm_planned(
service_uuid, endpoint_ids=endpoint_ids, constraints=constraints, config_rules=config_rules)
elif service_type == SERVICE_TYPE_TAPI:
config_rules = [
json_config_rule_set('/settings', {
'capacity_value' : 50.0,
'capacity_unit' : 'GHz',
'layer_proto_name': 'PHOTONIC_MEDIA',
'layer_proto_qual': 'tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC',
'direction' : 'UNIDIRECTIONAL',
}),
]
return json_service_tapi_planned(
service_uuid, endpoint_ids=endpoint_ids, constraints=[], config_rules=config_rules)
def release_service(self, json_service : Dict) -> None:
for endpoint_id in json_service['service_endpoint_ids']:
device_uuid = endpoint_id['device_id']['device_uuid']['uuid']
endpoint_uuid = endpoint_id['endpoint_uuid']['uuid']
self._release_device_endpoint(device_uuid, endpoint_uuid)