Commit 2c74053b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Service component:

- added timestamp to log lines
- resolved infinite loop in TaskScheduler
- improved logging in TaskScheduler
- minor bug-fixing in dumy L2NMEmulatedServiceHandler
parent c6979af4
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ def main():
    global LOGGER # pylint: disable=global-statement

    log_level = get_log_level()
    logging.basicConfig(level=log_level)
    logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
    LOGGER = logging.getLogger(__name__)

    wait_for_environment_variables([
+1 −1
Original line number Diff line number Diff line
@@ -85,7 +85,7 @@ class L2NMEmulatedServiceHandler(_ServiceHandler):
        chk_type('endpoints', endpoints, list)
        if len(endpoints) == 0: return []

        service_uuid = self.__service.service_uuid.uuid
        service_uuid = self.__service.service_id.service_uuid.uuid
        settings : TreeNode = get_subnode(self.__resolver, self.__config, '/settings', None)

        results = []
+34 −5
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import graphlib, logging, queue
import graphlib, logging, queue, time
from typing import Dict, Tuple
from common.proto.context_pb2 import Connection, ConnectionId, Service, ServiceId, ServiceStatusEnum
from common.proto.pathcomp_pb2 import PathCompReply
@@ -110,6 +110,7 @@ class TasksScheduler:
        return connection_deconfigure_key

    def compose_from_pathcompreply(self, pathcomp_reply : PathCompReply, is_delete : bool = False) -> None:
        t0 = time.time()
        include_service = self._service_remove if is_delete else self._service_create
        include_connection = self._connection_deconfigure if is_delete else self._connection_configure

@@ -126,34 +127,55 @@ class TasksScheduler:
                self._executor.get_service(sub_service_id)
                self._dag.add(connection_key, service_key_done)

        t1 = time.time()
        LOGGER.info('[compose_from_pathcompreply] elapsed_time: {:f} sec'.format(t1-t0))

    def compose_from_service(self, service : Service, is_delete : bool = False) -> None:
        t0 = time.time()
        include_service = self._service_remove if is_delete else self._service_create
        include_connection = self._connection_deconfigure if is_delete else self._connection_configure

        explored_items = set()
        pending_items_to_explore = queue.Queue()
        pending_items_to_explore.put(service)

        while not pending_items_to_explore.empty():
            item = pending_items_to_explore.get()
            try:
                item = pending_items_to_explore.get(block=False)
            except queue.Empty:
                break

            if isinstance(item, Service):
                str_item_key = grpc_message_to_json_string(item.service_id)
                if str_item_key in explored_items: continue

                include_service(item.service_id)
                self._add_service_to_executor_cache(item)
                connections = self._context_client.ListConnections(item.service_id)
                for connection in connections:
                for connection in connections.connections:
                    self._add_connection_to_executor_cache(connection)
                    pending_items_to_explore.put(connection)

                explored_items.add(str_item_key)

            elif isinstance(item, ServiceId):
                str_item_key = grpc_message_to_json_string(item)
                if str_item_key in explored_items: continue

                include_service(item)
                self._executor.get_service(item)
                connections = self._context_client.ListConnections(item)
                for connection in connections:
                for connection in connections.connections:
                    self._add_connection_to_executor_cache(connection)
                    pending_items_to_explore.put(connection)

                explored_items.add(str_item_key)

            elif isinstance(item, Connection):
                connection_key = include_connection(item)
                str_item_key = grpc_message_to_json_string(item.connection_id)
                if str_item_key in explored_items: continue

                connection_key = include_connection(item.connection_id, item.service_id)
                self._add_connection_to_executor_cache(connection)
                self._executor.get_service(item.service_id)
                pending_items_to_explore.put(item.service_id)
@@ -162,10 +184,16 @@ class TasksScheduler:
                    self._executor.get_service(sub_service_id)
                    self._dag.add(connection_key, service_key_done)

                explored_items.add(str_item_key)

            else:
                MSG = 'Unsupported item {:s}({:s})'
                raise Exception(MSG.format(type(item).__name__, grpc_message_to_json_string(item)))

        t1 = time.time()
        LOGGER.info('explored_items={:s}'.format(str(explored_items)))
        LOGGER.info('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0))

    def execute_all(self, dry_run : bool = False) -> None:
        ordered_task_keys = list(self._dag.static_order())
        LOGGER.info('ordered_task_keys={:s}'.format(str(ordered_task_keys)))
@@ -176,4 +204,5 @@ class TasksScheduler:
            succeeded = True if dry_run else task.execute()
            results.append(succeeded)

        LOGGER.info('execute_all results={:s}'.format(str(results)))
        return zip(ordered_task_keys, results)