From 3f9c2b70da8642ab6d18dc40c9644ba281639a70 Mon Sep 17 00:00:00 2001
From: gifrerenom <lluis.gifre@cttc.es>
Date: Sun, 16 Mar 2025 21:07:41 +0000
Subject: [PATCH] E2E Orchestrator component:

- Updated Subscriptions Framework
- Corrected Controller Discovery mechanism
- Extended framework to support multiple Dispatchers
- Implemented Recommendations Dispatcher (being tested)
---
 src/e2e_orchestrator/service/__main__.py      |  62 +++++----
 ...vererThread.py => ControllerDiscoverer.py} |  16 +--
 .../RecommendationsClientNamespace.py         |  40 ------
 .../service/subscriptions/Subscription.py     |  30 ++---
 .../service/subscriptions/Subscriptions.py    |  17 +--
 .../subscriptions/dispatchers/Dispatchers.py  |  33 +++++
 .../subscriptions/dispatchers/_Dispatcher.py  |  46 +++++++
 .../subscriptions/dispatchers/__init__.py     |  13 ++
 .../recommendation/ClientNamespace.py         |  63 +++++++++
 .../dispatchers/recommendation/Constants.py   |  15 +++
 .../dispatchers/recommendation/Dispatcher.py  |  66 +++++++++
 .../recommendation/Recommendation.py          |  27 ++++
 .../dispatchers/recommendation/Tools.py       | 127 ++++++++++++++++++
 .../dispatchers/recommendation/__init__.py    |  13 ++
 14 files changed, 456 insertions(+), 112 deletions(-)
 rename src/e2e_orchestrator/service/subscriptions/{ControllerDiscovererThread.py => ControllerDiscoverer.py} (89%)
 delete mode 100644 src/e2e_orchestrator/service/subscriptions/RecommendationsClientNamespace.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/Dispatchers.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/_Dispatcher.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/__init__.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/ClientNamespace.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Constants.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Dispatcher.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Recommendation.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Tools.py
 create mode 100644 src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/__init__.py

diff --git a/src/e2e_orchestrator/service/__main__.py b/src/e2e_orchestrator/service/__main__.py
index d984add76..aa4def383 100644
--- a/src/e2e_orchestrator/service/__main__.py
+++ b/src/e2e_orchestrator/service/__main__.py
@@ -12,64 +12,68 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-import signal
-import sys
-import threading
-
+import logging, signal, sys, threading
 from prometheus_client import start_http_server
-
 from common.Constants import ServiceNameEnum
-from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST,
-                             ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
-                             get_log_level, get_metrics_port,
-                             wait_for_environment_variables)
-from e2e_orchestrator.service.subscriptions.ControllerDiscovererThread import ControllerDiscoverer
-
+from common.Settings import (
+    ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
+    get_log_level, get_metrics_port, wait_for_environment_variables
+)
+from .subscriptions.ControllerDiscoverer import ControllerDiscoverer
+from .subscriptions.Subscriptions import Subscriptions
+from .subscriptions.dispatchers.Dispatchers import Dispatchers
+from .subscriptions.dispatchers.recommendation.Dispatcher import RecommendationDispatcher
 from .E2EOrchestratorService import E2EOrchestratorService
 
-terminate = threading.Event()
+TERMINATE = threading.Event()
 
 LOG_LEVEL = get_log_level()
 logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
 LOGGER = logging.getLogger(__name__)
 
 
-def signal_handler(signal, frame):  # pylint: disable=redefined-outer-name
-    LOGGER.warning("Terminate signal received")
-    terminate.set()
+def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
+    LOGGER.warning('Terminate signal received')
+    TERMINATE.set()
 
 
 def main():
-    signal.signal(signal.SIGINT, signal_handler)
+    wait_for_environment_variables([
+        get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST     ),
+        get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
+    ])
+
+    signal.signal(signal.SIGINT,  signal_handler)
     signal.signal(signal.SIGTERM, signal_handler)
 
-    LOGGER.info("Starting...")
+    LOGGER.info('Starting...')
 
     # Start metrics server
     metrics_port = get_metrics_port()
     start_http_server(metrics_port)
 
+    # Starting service
     grpc_service = E2EOrchestratorService()
     grpc_service.start()
 
-    controller_discoverer = ControllerDiscoverer(
-        terminate=terminate
-    )
-    controller_discoverer.start()
 
-    LOGGER.info("Running...")
+    dispatchers   = Dispatchers(TERMINATE)
+    dispatchers.add_dispatcher(RecommendationDispatcher)
+    subscriptions = Subscriptions(dispatchers, TERMINATE)
+    discoverer    = ControllerDiscoverer(subscriptions, TERMINATE)
+    discoverer.start()
+
+    LOGGER.info('Running...')
     # Wait for Ctrl+C or termination signal
-    while not terminate.wait(timeout=1):
-        pass
+    while not TERMINATE.wait(timeout=1.0): pass
 
-    LOGGER.info("Terminating...")
-    controller_discoverer.stop()
+    LOGGER.info('Terminating...')
+    discoverer.stop()
     grpc_service.stop()
 
-    LOGGER.info("Bye")
+    LOGGER.info('Bye')
     return 0
 
 
-if __name__ == "__main__":
+if __name__ == '__main__':
     sys.exit(main())
diff --git a/src/e2e_orchestrator/service/subscriptions/ControllerDiscovererThread.py b/src/e2e_orchestrator/service/subscriptions/ControllerDiscoverer.py
similarity index 89%
rename from src/e2e_orchestrator/service/subscriptions/ControllerDiscovererThread.py
rename to src/e2e_orchestrator/service/subscriptions/ControllerDiscoverer.py
index e12917f7d..5d9efd531 100644
--- a/src/e2e_orchestrator/service/subscriptions/ControllerDiscovererThread.py
+++ b/src/e2e_orchestrator/service/subscriptions/ControllerDiscoverer.py
@@ -68,24 +68,16 @@ class EventDispatcher(BaseEventDispatcher):
 
 class ControllerDiscoverer:
     def __init__(
-        self, terminate : Optional[threading.Event] = None
+        self, subscriptions : Subscriptions, terminate : threading.Event
     ) -> None:
         self._context_client = ContextClient()
 
-        self._event_collector = BaseEventCollector(
-            terminate=terminate
-        )
+        self._event_collector = BaseEventCollector(terminate=terminate)
         self._event_collector.install_collector(
-            self._context_client.GetDeviceEvents,
-            Empty(), log_events_received=True
+            self._context_client.GetDeviceEvents, Empty(), log_events_received=True
         )
-
-        self._subscriptions = Subscriptions()
-
         self._event_dispatcher = EventDispatcher(
-            self._event_collector.get_events_queue(),
-            self._context_client,
-            self._subscriptions,
+            self._event_collector.get_events_queue(), self._context_client, subscriptions,
             terminate=terminate
         )
 
diff --git a/src/e2e_orchestrator/service/subscriptions/RecommendationsClientNamespace.py b/src/e2e_orchestrator/service/subscriptions/RecommendationsClientNamespace.py
deleted file mode 100644
index 590524eea..000000000
--- a/src/e2e_orchestrator/service/subscriptions/RecommendationsClientNamespace.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging, queue, socketio
-
-LOGGER = logging.getLogger(__name__)
-
-class RecommendationsClientNamespace(socketio.ClientNamespace):
-    def __init__(self, request_queue : queue.Queue, reply_queue : queue.Queue):
-        self._request_queue = request_queue
-        self._reply_queue   = reply_queue
-        super().__init__(namespace='/recommendations')
-
-    def on_connect(self):
-        LOGGER.info('[on_connect] Connected')
-
-    def on_disconnect(self, reason):
-        MSG = '[on_disconnect] Disconnected!, reason: {:s}'
-        LOGGER.info(MSG.format(str(reason)))
-
-    def on_recommendation(self, data):
-        MSG = '[on_recommendation] data={:s}'
-        LOGGER.info(MSG.format(str(data)))
-
-        #MSG = '[on_recommendation] Recommendation: {:s}'
-        #LOGGER.info(MSG.format(str(recommendation)))
-
-        #request = (self._device_uuid, *sample)
-        #self._request_queue.put_nowait(request)
diff --git a/src/e2e_orchestrator/service/subscriptions/Subscription.py b/src/e2e_orchestrator/service/subscriptions/Subscription.py
index 8e1866bac..a8b986858 100644
--- a/src/e2e_orchestrator/service/subscriptions/Subscription.py
+++ b/src/e2e_orchestrator/service/subscriptions/Subscription.py
@@ -13,53 +13,41 @@
 # limitations under the License.
 
 
-import queue, socketio, threading
+import socketio, threading
 from common.Constants import ServiceNameEnum
 from common.Settings import get_service_baseurl_http
-from .RecommendationsClientNamespace import RecommendationsClientNamespace
+from .dispatchers.Dispatchers import Dispatchers
 from .TFSControllerSettings import TFSControllerSettings
 
 
 NBI_SERVICE_PREFIX_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or ''
-CHILD_SOCKETIO_URL = 'http://{:s}:{:s}@{:s}:{:d}{:s}'
+CHILD_SOCKETIO_URL = 'http://{:s}:{:s}@{:s}:{:d}' + NBI_SERVICE_PREFIX_URL
 
 
 class Subscription(threading.Thread):
     def __init__(
-        self, tfs_ctrl_settings : TFSControllerSettings,
+        self, tfs_ctrl_settings : TFSControllerSettings, dispatchers : Dispatchers,
         terminate : threading.Event
     ) -> None:
         super().__init__(daemon=True)
-        self._settings      = tfs_ctrl_settings
-        self._terminate     = terminate
-        self._request_queue = queue.Queue()
-        self._reply_queue   = queue.Queue()
-        self._is_running    = threading.Event()
+        self._settings    = tfs_ctrl_settings
+        self._dispatchers = dispatchers
+        self._terminate   = terminate
+        self._is_running  = threading.Event()
 
     @property
     def is_running(self): return self._is_running.is_set()
 
-    @property
-    def request_queue(self): return self._request_queue
-
-    @property
-    def reply_queue(self): return self._reply_queue
-
     def run(self) -> None:
         child_socketio_url = CHILD_SOCKETIO_URL.format(
             self._settings.nbi_username,
             self._settings.nbi_password,
             self._settings.nbi_address,
             self._settings.nbi_port,
-            NBI_SERVICE_PREFIX_URL
-        )
-
-        namespace = RecommendationsClientNamespace(
-            self._request_queue, self._reply_queue
         )
 
         sio = socketio.Client(logger=True, engineio_logger=True)
-        sio.register_namespace(namespace)
+        self._dispatchers.register(sio)
         sio.connect(child_socketio_url)
 
         while not self._terminate.is_set():
diff --git a/src/e2e_orchestrator/service/subscriptions/Subscriptions.py b/src/e2e_orchestrator/service/subscriptions/Subscriptions.py
index f4676ff8d..683aead3b 100644
--- a/src/e2e_orchestrator/service/subscriptions/Subscriptions.py
+++ b/src/e2e_orchestrator/service/subscriptions/Subscriptions.py
@@ -12,17 +12,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging, queue, threading
+import logging, threading
 from typing import Dict
+from .dispatchers.Dispatchers import Dispatchers
 from .Subscription import Subscription
 from .TFSControllerSettings import TFSControllerSettings
 
 LOGGER = logging.getLogger(__name__)
 
 class Subscriptions:
-    def __init__(self) -> None:
-        self._terminate = threading.Event()
-        self._lock      = threading.Lock()
+    def __init__(self, dispatchers : Dispatchers, terminate : threading.Event) -> None:
+        self._dispatchers = dispatchers
+        self._terminate   = terminate
+        self._lock        = threading.Lock()
         self._subscriptions : Dict[str, Subscription] = dict()
 
     def add_subscription(self, tfs_ctrl_settings : TFSControllerSettings) -> None:
@@ -30,7 +32,7 @@ class Subscriptions:
         with self._lock:
             subscription = self._subscriptions.get(device_uuid)
             if (subscription is not None) and subscription.is_running: return
-            subscription = Subscription(tfs_ctrl_settings, self._terminate)
+            subscription = Subscription(tfs_ctrl_settings, self._dispatchers, self._terminate)
             self._subscriptions[device_uuid] = subscription
             subscription.start()
 
@@ -40,8 +42,3 @@ class Subscriptions:
             if subscription is None: return
             if subscription.is_running: subscription.stop()
             self._subscriptions.pop(device_uuid, None)
-
-    def stop(self):
-        self._terminate.set()
-        for device_uuid in self._subscriptions:
-            self.remove_subscription(device_uuid)
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/Dispatchers.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/Dispatchers.py
new file mode 100644
index 000000000..88345e32a
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/Dispatchers.py
@@ -0,0 +1,33 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging, socketio, threading
+from typing import List, Type
+from ._Dispatcher import _Dispatcher
+
+LOGGER = logging.getLogger(__name__)
+
+class Dispatchers:
+    def __init__(self, terminate : threading.Event) -> None:
+        self._terminate = terminate
+        self._dispatchers : List[_Dispatcher] = list()
+
+    def add_dispatcher(self, dispatcher_class : Type[_Dispatcher]) -> None:
+        dispatcher = dispatcher_class(self._terminate)
+        self._dispatchers.append(dispatcher)
+        dispatcher.start()
+
+    def register(self, sio_client : socketio.Client) -> None:
+        for dispatcher in self._dispatchers:
+            dispatcher.register(sio_client)
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/_Dispatcher.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/_Dispatcher.py
new file mode 100644
index 000000000..d2cd40bbd
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/_Dispatcher.py
@@ -0,0 +1,46 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import queue, socketio, threading
+from concurrent.futures import Future
+from typing import Any, Tuple
+
+class _Dispatcher(threading.Thread):
+    def __init__(self, terminate : threading.Event):
+        super().__init__(daemon=True)
+        self._dispatcher_queue = queue.Queue[Tuple[Any, Future]]()
+        self._terminate = terminate
+
+    @property
+    def dispatcher_queue(self): return self._dispatcher_queue
+
+    def register(self, sio_client : socketio.Client) -> None:
+        raise NotImplementedError('To be implemented in subclass')
+
+    def run(self):
+        while not self._terminate.is_set():
+            try:
+                request,future = self._dispatcher_queue.get(block=True, timeout=1.0)
+            except queue.Empty:
+                continue
+
+            try:
+                result = self.process_request(request)
+            except Exception as e:
+                future.set_exception(e)
+            else:
+                future.set_result(result)
+
+    def process_request(self, request : Any) -> Any:
+        raise NotImplementedError('To be implemented in subclass')
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/__init__.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/__init__.py
new file mode 100644
index 000000000..023830645
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/ClientNamespace.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/ClientNamespace.py
new file mode 100644
index 000000000..ab702acf6
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/ClientNamespace.py
@@ -0,0 +1,63 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json, logging, queue, socketio
+from concurrent.futures import Future
+from .Constants import SIO_NAMESPACE
+from .Recommendation import Recommendation, RecommendationAction
+
+LOGGER = logging.getLogger(__name__)
+
+class ClientNamespace(socketio.ClientNamespace):
+    def __init__(self, dispatcher_queue : queue.Queue[Recommendation]):
+        self._dispatcher_queue = dispatcher_queue
+        super().__init__(namespace=SIO_NAMESPACE)
+
+    def on_connect(self):
+        LOGGER.info('[on_connect] Connected')
+
+    def on_disconnect(self, reason):
+        MSG = '[on_disconnect] Disconnected!, reason: {:s}'
+        LOGGER.info(MSG.format(str(reason)))
+
+    def on_recommendation(self, data):
+        MSG = '[on_recommendation] begin data={:s}'
+        LOGGER.info(MSG.format(str(data)))
+
+        json_data = json.loads(data)
+        recommendation = Recommendation(
+            action = RecommendationAction._value2member_map_[json_data['action']],
+            data   = json.loads(json_data['data']),
+        )
+        result = Future()
+
+        MSG = '[on_recommendation] Recommendation: {:s}'
+        LOGGER.info(MSG.format(str(recommendation)))
+
+        LOGGER.debug('[on_recommendation] Queuing recommendation...')
+        self._dispatcher_queue.put_nowait((recommendation, result))
+        LOGGER.debug('[on_recommendation] Recommendation processed...')
+        
+        reply = dict()
+        try:
+            reply['result'] = result.result()
+            event = reply['result']['event']
+        except Exception as e:
+            reply['error'] = str(e)
+            #reply['stacktrace'] = str(e)
+            event = 'error'
+
+        LOGGER.debug('[on_recommendation] Replying...')
+        self.emit(event, json.dumps(reply))
+        LOGGER.debug('[on_recommendation] end')
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Constants.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Constants.py
new file mode 100644
index 000000000..da3af24fd
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Constants.py
@@ -0,0 +1,15 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+SIO_NAMESPACE = '/recommendations'
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Dispatcher.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Dispatcher.py
new file mode 100644
index 000000000..72e79e6b5
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Dispatcher.py
@@ -0,0 +1,66 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy, logging, socketio
+from typing import Dict
+from common.Constants import DEFAULT_CONTEXT_NAME
+from common.proto.context_pb2 import Service, ServiceId
+from common.tools.object_factory.Context import json_context_id
+from common.tools.object_factory.Service import json_service_id
+from service.client.ServiceClient import ServiceClient
+from .._Dispatcher import _Dispatcher
+from .ClientNamespace import ClientNamespace
+from .Recommendation import Recommendation, RecommendationAction
+from .Tools import compose_optical_service
+
+LOGGER = logging.getLogger(__name__)
+
+class RecommendationDispatcher(_Dispatcher):
+
+    def register(self, sio_client : socketio.Client) -> None:
+        sio_client.register_namespace(ClientNamespace(self.dispatcher_queue))
+
+    def process_request(self, request : Recommendation) -> Dict:
+        LOGGER.info('[process_request] request={:s}'.format(str(request)))
+
+        if request.action == RecommendationAction.VLINK_CREATE:
+            vlink_optical_service = compose_optical_service(request.data)
+            vlink_optical_service_add = copy.deepcopy(vlink_optical_service)
+            vlink_optical_service_add.pop('service_endpoint_ids', None)
+            vlink_optical_service_add.pop('service_constraints',  None)
+            vlink_optical_service_add.pop('service_config',       None)
+
+            service_client = ServiceClient()
+            service_id = service_client.CreateService(Service(**vlink_optical_service_add))
+            vlink_optical_service['service_id']['service_uuid']['uuid'] = service_id.service_uuid.uuid
+            service_id = service_client.UpdateService(Service(**vlink_optical_service))
+
+            result = {'event': 'vlink-created'}
+        elif request.action == RecommendationAction.VLINK_REMOVE:
+            vlink_service_uuid = request.data['link_id']['link_uuid']['uuid']
+            context_id = json_context_id(DEFAULT_CONTEXT_NAME)
+            vlink_optical_service_id = json_service_id(vlink_service_uuid, context_id=context_id)
+
+            service_client = ServiceClient()
+            service_id = service_client.DeleteService(ServiceId(**vlink_optical_service_id))
+
+            if vlink_service_uuid == 'IP1/PORT-xe1==IP2/PORT-xe1':
+                service_id = service_client.DeleteService(ServiceId(**vlink_optical_service_id))
+
+            result = {'event': 'vlink-removed'}
+        else:
+            MSG = 'RecommendationAction not supported in Recommendation({:s})'
+            raise NotImplementedError(MSG.format(str(request)))
+
+        return result
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Recommendation.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Recommendation.py
new file mode 100644
index 000000000..ca03b193f
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Recommendation.py
@@ -0,0 +1,27 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Dict
+
+class RecommendationAction(Enum):
+    VLINK_CREATE = 'vlink-create'
+    VLINK_REMOVE = 'vlink-remove'
+
+@dataclass
+class Recommendation:
+    action : RecommendationAction
+    data   : Dict   = field(default_factory=dict)
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Tools.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Tools.py
new file mode 100644
index 000000000..48720767c
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/Tools.py
@@ -0,0 +1,127 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging, networkx
+from dataclasses import dataclass, field
+from typing import Dict, List
+from common.proto.context_pb2 import ServiceTypeEnum
+from common.tools.context_queries.Topology import get_topology_details
+from common.tools.object_factory.Constraint import json_constraint_custom
+from common.tools.object_factory.Context import json_context
+from common.tools.object_factory.Device import json_device_id
+from common.tools.object_factory.EndPoint import json_endpoint_id
+from common.tools.object_factory.Service import json_service
+from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
+from common.DeviceTypes import DeviceTypeEnum
+from context.client.ContextClient import ContextClient
+
+
+LOGGER = logging.getLogger(__name__)
+
+
+@dataclass
+class GraphAndMapping:
+    graph                   : networkx.Graph            = field(default_factory=networkx.Graph)
+    device_to_type          : Dict[str, str]            = field(default_factory=dict)
+    device_name_to_uuid     : Dict[str, str]            = field(default_factory=dict)
+    endpoint_name_to_uuid   : Dict[Dict[str, str], str] = field(default_factory=dict)
+    endpoint_to_device_uuid : Dict[str, str]            = field(default_factory=dict)
+
+
+def compose_graph_from_topology() -> GraphAndMapping:
+    context_client = ContextClient()
+    topology_details = get_topology_details(
+        context_client, DEFAULT_TOPOLOGY_NAME,
+        context_uuid=DEFAULT_CONTEXT_NAME, rw_copy=False
+    )
+
+    graph_and_mapping = GraphAndMapping()
+
+    for device in topology_details.devices:
+        device_uuid = device.device_id.device_uuid.uuid
+        graph_and_mapping.device_name_to_uuid.setdefault(device.name, device_uuid)
+        graph_and_mapping.device_name_to_uuid.setdefault(device_uuid, device_uuid)
+        graph_and_mapping.device_to_type.setdefault(device_uuid, device.device_type)
+
+        endpoint_uuids = list()
+        for endpoint in device.device_endpoints:
+            endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
+            endpoint_uuids.append(endpoint_uuid)
+            graph_and_mapping.graph.add_node(endpoint_uuid)
+
+            graph_and_mapping.endpoint_name_to_uuid.setdefault((device_uuid, endpoint.name), endpoint_uuid)
+            graph_and_mapping.endpoint_name_to_uuid.setdefault((device_uuid, endpoint_uuid), endpoint_uuid)
+            graph_and_mapping.endpoint_to_device_uuid.setdefault(endpoint_uuid, device_uuid)
+
+        for endpoint_uuid_i in endpoint_uuids:
+            for endpoint_uuid_j in endpoint_uuids:
+                if endpoint_uuid_i == endpoint_uuid_j: continue
+                graph_and_mapping.graph.add_edge(endpoint_uuid_i, endpoint_uuid_j)
+
+    for link in topology_details.links:
+        graph_and_mapping.graph.add_edge(
+            link.link_endpoint_ids[ 0].endpoint_uuid.uuid,
+            link.link_endpoint_ids[-1].endpoint_uuid.uuid,
+        )
+
+    return graph_and_mapping
+
+def compose_optical_service(vlink_request : Dict) -> Dict:
+    graph_and_mapping = compose_graph_from_topology()
+
+    vlink_endpoint_id_a = vlink_request['link_endpoint_ids'][ 0]
+    vlink_endpoint_id_b = vlink_request['link_endpoint_ids'][-1]
+
+    device_uuid_or_name_a   = vlink_endpoint_id_a['device_id']['device_uuid']['uuid']
+    device_uuid_or_name_b   = vlink_endpoint_id_b['device_id']['device_uuid']['uuid']
+    endpoint_uuid_or_name_a = vlink_endpoint_id_a['endpoint_uuid']['uuid']
+    endpoint_uuid_or_name_b = vlink_endpoint_id_b['endpoint_uuid']['uuid']
+
+    device_uuid_a = graph_and_mapping.device_name_to_uuid[device_uuid_or_name_a]
+    device_uuid_b = graph_and_mapping.device_name_to_uuid[device_uuid_or_name_b]
+
+    endpoint_uuid_a = graph_and_mapping.endpoint_name_to_uuid[(device_uuid_a, endpoint_uuid_or_name_a)]
+    endpoint_uuid_b = graph_and_mapping.endpoint_name_to_uuid[(device_uuid_b, endpoint_uuid_or_name_b)]
+
+    path_hops = networkx.shortest_path(
+        graph_and_mapping.graph, endpoint_uuid_a, endpoint_uuid_b
+    )
+
+    optical_border_endpoint_ids : List[str] = list()
+    for endpoint_uuid in path_hops:
+        device_uuid = graph_and_mapping.endpoint_to_device_uuid[endpoint_uuid]
+        device_type = graph_and_mapping.device_to_type[device_uuid]
+        if device_type != DeviceTypeEnum.EMULATED_OPTICAL_TRANSPONDER.value: continue
+        device_id = json_device_id(device_uuid)
+        endpoint_id = json_endpoint_id(device_id, endpoint_uuid)
+        optical_border_endpoint_ids.append(endpoint_id)
+
+    constraints = [
+        json_constraint_custom('bandwidth[gbps]',  str(vlink_request['attributes']['total_capacity_gbps'])),
+        json_constraint_custom('bidirectionality', '1'),
+    ]
+
+    vlink_service_uuid = vlink_request['link_id']['link_uuid']['uuid']
+
+    if vlink_service_uuid == 'IP1/PORT-xe1==IP2/PORT-xe1':
+        constraints.append(json_constraint_custom('optical-band-width[GHz]', '300'))
+
+    vlink_optical_service = json_service(
+        vlink_service_uuid,
+        ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY,
+        context_id=json_context(DEFAULT_CONTEXT_NAME),
+        endpoint_ids=optical_border_endpoint_ids,
+        constraints=constraints,
+    )
+    return vlink_optical_service
diff --git a/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/__init__.py b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/__init__.py
new file mode 100644
index 000000000..023830645
--- /dev/null
+++ b/src/e2e_orchestrator/service/subscriptions/dispatchers/recommendation/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
-- 
GitLab