Skip to content
Snippets Groups Projects
Commit 3f9c2b70 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

E2E Orchestrator component:

- Updated Subscriptions Framework
- Corrected Controller Discovery mechanism
- Extended framework to support multiple Dispatchers
- Implemented Recommendations Dispatcher (being tested)
parent 66284d48
No related branches found
No related tags found
3 merge requests!359Release TeraFlowSDN 5.0,!328Resolve "(CTTC) Update recommendations to use SocketIO on NBI and E2E Orch components",!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
Showing
with 456 additions and 72 deletions
...@@ -12,64 +12,68 @@ ...@@ -12,64 +12,68 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging, signal, sys, threading
import signal
import sys
import threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST, from common.Settings import (
ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
get_log_level, get_metrics_port, get_log_level, get_metrics_port, wait_for_environment_variables
wait_for_environment_variables) )
from e2e_orchestrator.service.subscriptions.ControllerDiscovererThread import ControllerDiscoverer from .subscriptions.ControllerDiscoverer import ControllerDiscoverer
from .subscriptions.Subscriptions import Subscriptions
from .subscriptions.dispatchers.Dispatchers import Dispatchers
from .subscriptions.dispatchers.recommendation.Dispatcher import RecommendationDispatcher
from .E2EOrchestratorService import E2EOrchestratorService from .E2EOrchestratorService import E2EOrchestratorService
terminate = threading.Event() TERMINATE = threading.Event()
LOG_LEVEL = get_log_level() LOG_LEVEL = get_log_level()
logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning("Terminate signal received") LOGGER.warning('Terminate signal received')
terminate.set() TERMINATE.set()
def main(): def main():
signal.signal(signal.SIGINT, signal_handler) wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info("Starting...") LOGGER.info('Starting...')
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
# Starting service
grpc_service = E2EOrchestratorService() grpc_service = E2EOrchestratorService()
grpc_service.start() grpc_service.start()
controller_discoverer = ControllerDiscoverer(
terminate=terminate
)
controller_discoverer.start()
LOGGER.info("Running...") dispatchers = Dispatchers(TERMINATE)
dispatchers.add_dispatcher(RecommendationDispatcher)
subscriptions = Subscriptions(dispatchers, TERMINATE)
discoverer = ControllerDiscoverer(subscriptions, TERMINATE)
discoverer.start()
LOGGER.info('Running...')
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1): while not TERMINATE.wait(timeout=1.0): pass
pass
LOGGER.info("Terminating...") LOGGER.info('Terminating...')
controller_discoverer.stop() discoverer.stop()
grpc_service.stop() grpc_service.stop()
LOGGER.info("Bye") LOGGER.info('Bye')
return 0 return 0
if __name__ == "__main__": if __name__ == '__main__':
sys.exit(main()) sys.exit(main())
...@@ -68,24 +68,16 @@ class EventDispatcher(BaseEventDispatcher): ...@@ -68,24 +68,16 @@ class EventDispatcher(BaseEventDispatcher):
class ControllerDiscoverer: class ControllerDiscoverer:
def __init__( def __init__(
self, terminate : Optional[threading.Event] = None self, subscriptions : Subscriptions, terminate : threading.Event
) -> None: ) -> None:
self._context_client = ContextClient() self._context_client = ContextClient()
self._event_collector = BaseEventCollector( self._event_collector = BaseEventCollector(terminate=terminate)
terminate=terminate
)
self._event_collector.install_collector( self._event_collector.install_collector(
self._context_client.GetDeviceEvents, self._context_client.GetDeviceEvents, Empty(), log_events_received=True
Empty(), log_events_received=True
) )
self._subscriptions = Subscriptions()
self._event_dispatcher = EventDispatcher( self._event_dispatcher = EventDispatcher(
self._event_collector.get_events_queue(), self._event_collector.get_events_queue(), self._context_client, subscriptions,
self._context_client,
self._subscriptions,
terminate=terminate terminate=terminate
) )
......
...@@ -13,53 +13,41 @@ ...@@ -13,53 +13,41 @@
# limitations under the License. # limitations under the License.
import queue, socketio, threading import socketio, threading
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import get_service_baseurl_http from common.Settings import get_service_baseurl_http
from .RecommendationsClientNamespace import RecommendationsClientNamespace from .dispatchers.Dispatchers import Dispatchers
from .TFSControllerSettings import TFSControllerSettings from .TFSControllerSettings import TFSControllerSettings
NBI_SERVICE_PREFIX_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or '' NBI_SERVICE_PREFIX_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or ''
CHILD_SOCKETIO_URL = 'http://{:s}:{:s}@{:s}:{:d}{:s}' CHILD_SOCKETIO_URL = 'http://{:s}:{:s}@{:s}:{:d}' + NBI_SERVICE_PREFIX_URL
class Subscription(threading.Thread): class Subscription(threading.Thread):
def __init__( def __init__(
self, tfs_ctrl_settings : TFSControllerSettings, self, tfs_ctrl_settings : TFSControllerSettings, dispatchers : Dispatchers,
terminate : threading.Event terminate : threading.Event
) -> None: ) -> None:
super().__init__(daemon=True) super().__init__(daemon=True)
self._settings = tfs_ctrl_settings self._settings = tfs_ctrl_settings
self._terminate = terminate self._dispatchers = dispatchers
self._request_queue = queue.Queue() self._terminate = terminate
self._reply_queue = queue.Queue() self._is_running = threading.Event()
self._is_running = threading.Event()
@property @property
def is_running(self): return self._is_running.is_set() def is_running(self): return self._is_running.is_set()
@property
def request_queue(self): return self._request_queue
@property
def reply_queue(self): return self._reply_queue
def run(self) -> None: def run(self) -> None:
child_socketio_url = CHILD_SOCKETIO_URL.format( child_socketio_url = CHILD_SOCKETIO_URL.format(
self._settings.nbi_username, self._settings.nbi_username,
self._settings.nbi_password, self._settings.nbi_password,
self._settings.nbi_address, self._settings.nbi_address,
self._settings.nbi_port, self._settings.nbi_port,
NBI_SERVICE_PREFIX_URL
)
namespace = RecommendationsClientNamespace(
self._request_queue, self._reply_queue
) )
sio = socketio.Client(logger=True, engineio_logger=True) sio = socketio.Client(logger=True, engineio_logger=True)
sio.register_namespace(namespace) self._dispatchers.register(sio)
sio.connect(child_socketio_url) sio.connect(child_socketio_url)
while not self._terminate.is_set(): while not self._terminate.is_set():
......
...@@ -12,17 +12,19 @@ ...@@ -12,17 +12,19 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, queue, threading import logging, threading
from typing import Dict from typing import Dict
from .dispatchers.Dispatchers import Dispatchers
from .Subscription import Subscription from .Subscription import Subscription
from .TFSControllerSettings import TFSControllerSettings from .TFSControllerSettings import TFSControllerSettings
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class Subscriptions: class Subscriptions:
def __init__(self) -> None: def __init__(self, dispatchers : Dispatchers, terminate : threading.Event) -> None:
self._terminate = threading.Event() self._dispatchers = dispatchers
self._lock = threading.Lock() self._terminate = terminate
self._lock = threading.Lock()
self._subscriptions : Dict[str, Subscription] = dict() self._subscriptions : Dict[str, Subscription] = dict()
def add_subscription(self, tfs_ctrl_settings : TFSControllerSettings) -> None: def add_subscription(self, tfs_ctrl_settings : TFSControllerSettings) -> None:
...@@ -30,7 +32,7 @@ class Subscriptions: ...@@ -30,7 +32,7 @@ class Subscriptions:
with self._lock: with self._lock:
subscription = self._subscriptions.get(device_uuid) subscription = self._subscriptions.get(device_uuid)
if (subscription is not None) and subscription.is_running: return if (subscription is not None) and subscription.is_running: return
subscription = Subscription(tfs_ctrl_settings, self._terminate) subscription = Subscription(tfs_ctrl_settings, self._dispatchers, self._terminate)
self._subscriptions[device_uuid] = subscription self._subscriptions[device_uuid] = subscription
subscription.start() subscription.start()
...@@ -40,8 +42,3 @@ class Subscriptions: ...@@ -40,8 +42,3 @@ class Subscriptions:
if subscription is None: return if subscription is None: return
if subscription.is_running: subscription.stop() if subscription.is_running: subscription.stop()
self._subscriptions.pop(device_uuid, None) self._subscriptions.pop(device_uuid, None)
def stop(self):
self._terminate.set()
for device_uuid in self._subscriptions:
self.remove_subscription(device_uuid)
...@@ -12,29 +12,22 @@ ...@@ -12,29 +12,22 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, queue, socketio import logging, socketio, threading
from typing import List, Type
from ._Dispatcher import _Dispatcher
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class RecommendationsClientNamespace(socketio.ClientNamespace): class Dispatchers:
def __init__(self, request_queue : queue.Queue, reply_queue : queue.Queue): def __init__(self, terminate : threading.Event) -> None:
self._request_queue = request_queue self._terminate = terminate
self._reply_queue = reply_queue self._dispatchers : List[_Dispatcher] = list()
super().__init__(namespace='/recommendations')
def on_connect(self): def add_dispatcher(self, dispatcher_class : Type[_Dispatcher]) -> None:
LOGGER.info('[on_connect] Connected') dispatcher = dispatcher_class(self._terminate)
self._dispatchers.append(dispatcher)
dispatcher.start()
def on_disconnect(self, reason): def register(self, sio_client : socketio.Client) -> None:
MSG = '[on_disconnect] Disconnected!, reason: {:s}' for dispatcher in self._dispatchers:
LOGGER.info(MSG.format(str(reason))) dispatcher.register(sio_client)
def on_recommendation(self, data):
MSG = '[on_recommendation] data={:s}'
LOGGER.info(MSG.format(str(data)))
#MSG = '[on_recommendation] Recommendation: {:s}'
#LOGGER.info(MSG.format(str(recommendation)))
#request = (self._device_uuid, *sample)
#self._request_queue.put_nowait(request)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import queue, socketio, threading
from concurrent.futures import Future
from typing import Any, Tuple
class _Dispatcher(threading.Thread):
def __init__(self, terminate : threading.Event):
super().__init__(daemon=True)
self._dispatcher_queue = queue.Queue[Tuple[Any, Future]]()
self._terminate = terminate
@property
def dispatcher_queue(self): return self._dispatcher_queue
def register(self, sio_client : socketio.Client) -> None:
raise NotImplementedError('To be implemented in subclass')
def run(self):
while not self._terminate.is_set():
try:
request,future = self._dispatcher_queue.get(block=True, timeout=1.0)
except queue.Empty:
continue
try:
result = self.process_request(request)
except Exception as e:
future.set_exception(e)
else:
future.set_result(result)
def process_request(self, request : Any) -> Any:
raise NotImplementedError('To be implemented in subclass')
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, queue, socketio
from concurrent.futures import Future
from .Constants import SIO_NAMESPACE
from .Recommendation import Recommendation, RecommendationAction
LOGGER = logging.getLogger(__name__)
class ClientNamespace(socketio.ClientNamespace):
def __init__(self, dispatcher_queue : queue.Queue[Recommendation]):
self._dispatcher_queue = dispatcher_queue
super().__init__(namespace=SIO_NAMESPACE)
def on_connect(self):
LOGGER.info('[on_connect] Connected')
def on_disconnect(self, reason):
MSG = '[on_disconnect] Disconnected!, reason: {:s}'
LOGGER.info(MSG.format(str(reason)))
def on_recommendation(self, data):
MSG = '[on_recommendation] begin data={:s}'
LOGGER.info(MSG.format(str(data)))
json_data = json.loads(data)
recommendation = Recommendation(
action = RecommendationAction._value2member_map_[json_data['action']],
data = json.loads(json_data['data']),
)
result = Future()
MSG = '[on_recommendation] Recommendation: {:s}'
LOGGER.info(MSG.format(str(recommendation)))
LOGGER.debug('[on_recommendation] Queuing recommendation...')
self._dispatcher_queue.put_nowait((recommendation, result))
LOGGER.debug('[on_recommendation] Recommendation processed...')
reply = dict()
try:
reply['result'] = result.result()
event = reply['result']['event']
except Exception as e:
reply['error'] = str(e)
#reply['stacktrace'] = str(e)
event = 'error'
LOGGER.debug('[on_recommendation] Replying...')
self.emit(event, json.dumps(reply))
LOGGER.debug('[on_recommendation] end')
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SIO_NAMESPACE = '/recommendations'
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, logging, socketio
from typing import Dict
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import Service, ServiceId
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Service import json_service_id
from service.client.ServiceClient import ServiceClient
from .._Dispatcher import _Dispatcher
from .ClientNamespace import ClientNamespace
from .Recommendation import Recommendation, RecommendationAction
from .Tools import compose_optical_service
LOGGER = logging.getLogger(__name__)
class RecommendationDispatcher(_Dispatcher):
def register(self, sio_client : socketio.Client) -> None:
sio_client.register_namespace(ClientNamespace(self.dispatcher_queue))
def process_request(self, request : Recommendation) -> Dict:
LOGGER.info('[process_request] request={:s}'.format(str(request)))
if request.action == RecommendationAction.VLINK_CREATE:
vlink_optical_service = compose_optical_service(request.data)
vlink_optical_service_add = copy.deepcopy(vlink_optical_service)
vlink_optical_service_add.pop('service_endpoint_ids', None)
vlink_optical_service_add.pop('service_constraints', None)
vlink_optical_service_add.pop('service_config', None)
service_client = ServiceClient()
service_id = service_client.CreateService(Service(**vlink_optical_service_add))
vlink_optical_service['service_id']['service_uuid']['uuid'] = service_id.service_uuid.uuid
service_id = service_client.UpdateService(Service(**vlink_optical_service))
result = {'event': 'vlink-created'}
elif request.action == RecommendationAction.VLINK_REMOVE:
vlink_service_uuid = request.data['link_id']['link_uuid']['uuid']
context_id = json_context_id(DEFAULT_CONTEXT_NAME)
vlink_optical_service_id = json_service_id(vlink_service_uuid, context_id=context_id)
service_client = ServiceClient()
service_id = service_client.DeleteService(ServiceId(**vlink_optical_service_id))
if vlink_service_uuid == 'IP1/PORT-xe1==IP2/PORT-xe1':
service_id = service_client.DeleteService(ServiceId(**vlink_optical_service_id))
result = {'event': 'vlink-removed'}
else:
MSG = 'RecommendationAction not supported in Recommendation({:s})'
raise NotImplementedError(MSG.format(str(request)))
return result
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict
class RecommendationAction(Enum):
VLINK_CREATE = 'vlink-create'
VLINK_REMOVE = 'vlink-remove'
@dataclass
class Recommendation:
action : RecommendationAction
data : Dict = field(default_factory=dict)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, networkx
from dataclasses import dataclass, field
from typing import Dict, List
from common.proto.context_pb2 import ServiceTypeEnum
from common.tools.context_queries.Topology import get_topology_details
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.Context import json_context
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import json_service
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.DeviceTypes import DeviceTypeEnum
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
@dataclass
class GraphAndMapping:
graph : networkx.Graph = field(default_factory=networkx.Graph)
device_to_type : Dict[str, str] = field(default_factory=dict)
device_name_to_uuid : Dict[str, str] = field(default_factory=dict)
endpoint_name_to_uuid : Dict[Dict[str, str], str] = field(default_factory=dict)
endpoint_to_device_uuid : Dict[str, str] = field(default_factory=dict)
def compose_graph_from_topology() -> GraphAndMapping:
context_client = ContextClient()
topology_details = get_topology_details(
context_client, DEFAULT_TOPOLOGY_NAME,
context_uuid=DEFAULT_CONTEXT_NAME, rw_copy=False
)
graph_and_mapping = GraphAndMapping()
for device in topology_details.devices:
device_uuid = device.device_id.device_uuid.uuid
graph_and_mapping.device_name_to_uuid.setdefault(device.name, device_uuid)
graph_and_mapping.device_name_to_uuid.setdefault(device_uuid, device_uuid)
graph_and_mapping.device_to_type.setdefault(device_uuid, device.device_type)
endpoint_uuids = list()
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_uuids.append(endpoint_uuid)
graph_and_mapping.graph.add_node(endpoint_uuid)
graph_and_mapping.endpoint_name_to_uuid.setdefault((device_uuid, endpoint.name), endpoint_uuid)
graph_and_mapping.endpoint_name_to_uuid.setdefault((device_uuid, endpoint_uuid), endpoint_uuid)
graph_and_mapping.endpoint_to_device_uuid.setdefault(endpoint_uuid, device_uuid)
for endpoint_uuid_i in endpoint_uuids:
for endpoint_uuid_j in endpoint_uuids:
if endpoint_uuid_i == endpoint_uuid_j: continue
graph_and_mapping.graph.add_edge(endpoint_uuid_i, endpoint_uuid_j)
for link in topology_details.links:
graph_and_mapping.graph.add_edge(
link.link_endpoint_ids[ 0].endpoint_uuid.uuid,
link.link_endpoint_ids[-1].endpoint_uuid.uuid,
)
return graph_and_mapping
def compose_optical_service(vlink_request : Dict) -> Dict:
graph_and_mapping = compose_graph_from_topology()
vlink_endpoint_id_a = vlink_request['link_endpoint_ids'][ 0]
vlink_endpoint_id_b = vlink_request['link_endpoint_ids'][-1]
device_uuid_or_name_a = vlink_endpoint_id_a['device_id']['device_uuid']['uuid']
device_uuid_or_name_b = vlink_endpoint_id_b['device_id']['device_uuid']['uuid']
endpoint_uuid_or_name_a = vlink_endpoint_id_a['endpoint_uuid']['uuid']
endpoint_uuid_or_name_b = vlink_endpoint_id_b['endpoint_uuid']['uuid']
device_uuid_a = graph_and_mapping.device_name_to_uuid[device_uuid_or_name_a]
device_uuid_b = graph_and_mapping.device_name_to_uuid[device_uuid_or_name_b]
endpoint_uuid_a = graph_and_mapping.endpoint_name_to_uuid[(device_uuid_a, endpoint_uuid_or_name_a)]
endpoint_uuid_b = graph_and_mapping.endpoint_name_to_uuid[(device_uuid_b, endpoint_uuid_or_name_b)]
path_hops = networkx.shortest_path(
graph_and_mapping.graph, endpoint_uuid_a, endpoint_uuid_b
)
optical_border_endpoint_ids : List[str] = list()
for endpoint_uuid in path_hops:
device_uuid = graph_and_mapping.endpoint_to_device_uuid[endpoint_uuid]
device_type = graph_and_mapping.device_to_type[device_uuid]
if device_type != DeviceTypeEnum.EMULATED_OPTICAL_TRANSPONDER.value: continue
device_id = json_device_id(device_uuid)
endpoint_id = json_endpoint_id(device_id, endpoint_uuid)
optical_border_endpoint_ids.append(endpoint_id)
constraints = [
json_constraint_custom('bandwidth[gbps]', str(vlink_request['attributes']['total_capacity_gbps'])),
json_constraint_custom('bidirectionality', '1'),
]
vlink_service_uuid = vlink_request['link_id']['link_uuid']['uuid']
if vlink_service_uuid == 'IP1/PORT-xe1==IP2/PORT-xe1':
constraints.append(json_constraint_custom('optical-band-width[GHz]', '300'))
vlink_optical_service = json_service(
vlink_service_uuid,
ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY,
context_id=json_context(DEFAULT_CONTEXT_NAME),
endpoint_ids=optical_border_endpoint_ids,
constraints=constraints,
)
return vlink_optical_service
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment