diff --git a/src/service/service/__main__.py b/src/service/service/__main__.py index 2c042fe0e9f4496f1dc4c34f901e099b3434969b..04cf00b06bff809f837833964a9e093f18888ac2 100644 --- a/src/service/service/__main__.py +++ b/src/service/service/__main__.py @@ -33,7 +33,7 @@ def main(): global LOGGER # pylint: disable=global-statement log_level = get_log_level() - logging.basicConfig(level=log_level) + logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ diff --git a/src/service/service/service_handlers/l2nm_emulated/L2NMEmulatedServiceHandler.py b/src/service/service/service_handlers/l2nm_emulated/L2NMEmulatedServiceHandler.py index 5b0bd030443a96f9a7277bf3deeaa566488e54be..5d1e0126e3b36b7b5c687fc25c96af46721da69b 100644 --- a/src/service/service/service_handlers/l2nm_emulated/L2NMEmulatedServiceHandler.py +++ b/src/service/service/service_handlers/l2nm_emulated/L2NMEmulatedServiceHandler.py @@ -85,7 +85,7 @@ class L2NMEmulatedServiceHandler(_ServiceHandler): chk_type('endpoints', endpoints, list) if len(endpoints) == 0: return [] - service_uuid = self.__service.service_uuid.uuid + service_uuid = self.__service.service_id.service_uuid.uuid settings : TreeNode = get_subnode(self.__resolver, self.__config, '/settings', None) results = [] diff --git a/src/service/service/task_scheduler/TaskScheduler.py b/src/service/service/task_scheduler/TaskScheduler.py index e5656bd0d8e9d426b29b2341774d7ffd52d2cf21..9f102f558121e4eb8f7fa9c89dd1b637c92ac8a4 100644 --- a/src/service/service/task_scheduler/TaskScheduler.py +++ b/src/service/service/task_scheduler/TaskScheduler.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import graphlib, logging, queue +import graphlib, logging, queue, time from typing import Dict, Tuple from common.proto.context_pb2 import Connection, ConnectionId, Service, ServiceId, ServiceStatusEnum from common.proto.pathcomp_pb2 import PathCompReply @@ -110,6 +110,7 @@ class TasksScheduler: return connection_deconfigure_key def compose_from_pathcompreply(self, pathcomp_reply : PathCompReply, is_delete : bool = False) -> None: + t0 = time.time() include_service = self._service_remove if is_delete else self._service_create include_connection = self._connection_deconfigure if is_delete else self._connection_configure @@ -126,34 +127,55 @@ class TasksScheduler: self._executor.get_service(sub_service_id) self._dag.add(connection_key, service_key_done) + t1 = time.time() + LOGGER.info('[compose_from_pathcompreply] elapsed_time: {:f} sec'.format(t1-t0)) + def compose_from_service(self, service : Service, is_delete : bool = False) -> None: + t0 = time.time() include_service = self._service_remove if is_delete else self._service_create include_connection = self._connection_deconfigure if is_delete else self._connection_configure + explored_items = set() pending_items_to_explore = queue.Queue() pending_items_to_explore.put(service) while not pending_items_to_explore.empty(): - item = pending_items_to_explore.get() + try: + item = pending_items_to_explore.get(block=False) + except queue.Empty: + break if isinstance(item, Service): + str_item_key = grpc_message_to_json_string(item.service_id) + if str_item_key in explored_items: continue + include_service(item.service_id) self._add_service_to_executor_cache(item) connections = self._context_client.ListConnections(item.service_id) - for connection in connections: + for connection in connections.connections: self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) + explored_items.add(str_item_key) + elif isinstance(item, ServiceId): + str_item_key = grpc_message_to_json_string(item) + if str_item_key in explored_items: continue + include_service(item) self._executor.get_service(item) connections = self._context_client.ListConnections(item) - for connection in connections: + for connection in connections.connections: self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) + explored_items.add(str_item_key) + elif isinstance(item, Connection): - connection_key = include_connection(item) + str_item_key = grpc_message_to_json_string(item.connection_id) + if str_item_key in explored_items: continue + + connection_key = include_connection(item.connection_id, item.service_id) self._add_connection_to_executor_cache(connection) self._executor.get_service(item.service_id) pending_items_to_explore.put(item.service_id) @@ -162,10 +184,16 @@ class TasksScheduler: self._executor.get_service(sub_service_id) self._dag.add(connection_key, service_key_done) + explored_items.add(str_item_key) + else: MSG = 'Unsupported item {:s}({:s})' raise Exception(MSG.format(type(item).__name__, grpc_message_to_json_string(item))) + t1 = time.time() + LOGGER.info('explored_items={:s}'.format(str(explored_items))) + LOGGER.info('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0)) + def execute_all(self, dry_run : bool = False) -> None: ordered_task_keys = list(self._dag.static_order()) LOGGER.info('ordered_task_keys={:s}'.format(str(ordered_task_keys))) @@ -176,4 +204,5 @@ class TasksScheduler: succeeded = True if dry_run else task.execute() results.append(succeeded) + LOGGER.info('execute_all results={:s}'.format(str(results))) return zip(ordered_task_keys, results)