diff --git a/src/service/experimental/test_concurrent_task_executor.py b/src/service/experimental/test_concurrent_task_executor.py new file mode 100644 index 0000000000000000000000000000000000000000..97b49b0d98696f1ca04640b6864d7984ad6eee62 --- /dev/null +++ b/src/service/experimental/test_concurrent_task_executor.py @@ -0,0 +1,90 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import concurrent.futures, graphlib, random, threading, time +from typing import Any, Dict, Optional + +class _Task: + def __init__(self, name : str) -> None: + self._name = name + + @property + def name(self): return self._name + + def execute(self) -> None: + delay = 1 # random.uniform(1, 5) + print(time.time(), 'task', self._name, 'waiting', delay) + time.sleep(delay) + +class ConcurrentTaskExecutor: + def __init__(self, max_workers: Optional[int] = None, thread_name_prefix: str = '') -> None: + self._max_workers = max_workers + self._thread_name_prefix = thread_name_prefix + self._tasks : Dict[str, _Task] = dict() + self._dag = graphlib.TopologicalSorter() + self._lock = threading.Lock() + self._changed = threading.Event() + + def add_task(self, task: _Task, *predecessors : str) -> None: + self._tasks[task.name] = task + if len(predecessors) > 0: self.add_predecessors(task.name, *predecessors) + + def add_predecessors(self, task_name : str, *predecessors : str) -> None: + self._dag.add(task_name, *predecessors) + + def run_task(self, task : _Task) -> None: + print(time.time(), 'task', task.name, 'started') + task.execute() + print(time.time(), 'task', task.name, 'completed') + with self._lock: + self._dag.done(task.name) + self._changed.set() + + def execute(self) -> None: + self._dag.prepare() + self._changed.set() + + tpe_settings = dict(max_workers=self._max_workers, thread_name_prefix=self._thread_name_prefix) + with concurrent.futures.ThreadPoolExecutor(**tpe_settings) as executor: + while self._changed.wait(): + with self._lock: + if not self._dag.is_active(): break + self._changed.clear() + with self._lock: + tasks = self._dag.get_ready() + print(time.time(), 'triggering tasks', tasks) + for task in tasks: + executor.submit(self.run_task, task) + + +cte = ConcurrentTaskExecutor() + +cte.add_task(_Task('svc:pkt1:planned'), 'svc:root:planned') +cte.add_task(_Task('svc:pkt2:planned'), 'svc:root:planned') +cte.add_task(_Task('svc:opt1:planned'), 'svc:pkt1:planned') +cte.add_task(_Task('svc:opt2:planned'), 'svc:pkt2:planned') +cte.add_task(_Task('con:opt1:config'), 'svc:opt1:planned') +cte.add_task(_Task('con:opt2:config'), 'svc:opt2:planned') +cte.add_task(_Task('svc:opt1:active'), 'svc:opt1:planned', 'con:opt1:config') +cte.add_task(_Task('svc:opt2:active'), 'svc:opt2:planned', 'con:opt2:config') +cte.add_task(_Task('con:pkt1:config'), 'svc:pkt1:planned', 'svc:opt1:active') +cte.add_task(_Task('con:pkt2:config'), 'svc:pkt2:planned', 'svc:opt2:active') +cte.add_task(_Task('svc:pkt1:active'), 'svc:pkt1:planned', 'con:pkt1:config') +cte.add_task(_Task('svc:pkt2:active'), 'svc:pkt2:planned', 'con:pkt2:config') +cte.add_task(_Task('svc:root:active'), 'svc:pkt1:active', 'svc:pkt2:active') + +print(time.time(), 'started') +cte.execute() +print(time.time(), 'completed')