Commit 99c57f55 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Cleanup ECOC'22 demo experiment

parent 4c5f8d57
Loading
Loading
Loading
Loading
+2 −14
Original line number Diff line number Diff line
@@ -147,7 +147,6 @@ class TasksScheduler:

            if isinstance(item, Service):
                str_item_key = grpc_message_to_json_string(item.service_id)
                LOGGER.info('Exploring Service: {:s}'.format(str_item_key))
                if str_item_key in explored_items: continue

                include_service(item.service_id)
@@ -155,14 +154,12 @@ class TasksScheduler:
                connections = self._context_client.ListConnections(item.service_id)
                for connection in connections.connections:
                    self._add_connection_to_executor_cache(connection)
                    LOGGER.info('  PUT connection {:s}'.format(grpc_message_to_json_string(connection.connection_id)))
                    pending_items_to_explore.put(connection)

                explored_items.add(str_item_key)

            elif isinstance(item, ServiceId):
                str_item_key = grpc_message_to_json_string(item)
                LOGGER.info('Exploring ServiceId: {:s}'.format(str_item_key))
                if str_item_key in explored_items: continue

                include_service(item)
@@ -170,30 +167,24 @@ class TasksScheduler:
                connections = self._context_client.ListConnections(item)
                for connection in connections.connections:
                    self._add_connection_to_executor_cache(connection)
                    LOGGER.info('  PUT connection {:s}'.format(grpc_message_to_json_string(connection.connection_id)))
                    pending_items_to_explore.put(connection)

                explored_items.add(str_item_key)

            elif isinstance(item, Connection):
                str_item_key = grpc_message_to_json_string(item.connection_id)
                LOGGER.info('Exploring Connection: {:s}'.format(str_item_key))
                if str_item_key in explored_items: continue

                connection_key = include_connection(item.connection_id, item.service_id)
                self._add_connection_to_executor_cache(connection)

                #_,service_key_done = include_service(item.service_id)
                self._executor.get_service(item.service_id)
                #self._dag.add(service_key_done, connection_key)
                LOGGER.info('  PUT service_id {:s}'.format(grpc_message_to_json_string(item.service_id)))
                pending_items_to_explore.put(item.service_id)

                for sub_service_id in item.sub_service_ids:
                    _,service_key_done = include_service(sub_service_id)
                    self._executor.get_service(sub_service_id)
                    self._dag.add(service_key_done, connection_key)
                    LOGGER.info('  PUT sub_service_id {:s}'.format(grpc_message_to_json_string(sub_service_id)))
                    pending_items_to_explore.put(sub_service_id)

                explored_items.add(str_item_key)
@@ -203,20 +194,17 @@ class TasksScheduler:
                raise Exception(MSG.format(type(item).__name__, grpc_message_to_json_string(item)))

        t1 = time.time()
        LOGGER.info('explored_items={:s}'.format(str(explored_items)))
        LOGGER.info('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0))

    def execute_all(self, dry_run : bool = False) -> None:
        ordered_task_keys = list(self._dag.static_order())
        LOGGER.info('ordered_task_keys={:s}'.format(str(ordered_task_keys)))
        LOGGER.info('[execute_all] ordered_task_keys={:s}'.format(str(ordered_task_keys)))

        results = []
        for task_key in ordered_task_keys:
            LOGGER.info('Task {:s} - begin'.format(str(task_key)))
            task = self._tasks.get(task_key)
            succeeded = True if dry_run else task.execute()
            LOGGER.info('Task {:s} - succeeded={:s}'.format(str(task_key), str(succeeded)))
            results.append(succeeded)

        LOGGER.info('execute_all results={:s}'.format(str(results)))
        LOGGER.info('[execute_all] results={:s}'.format(str(results)))
        return zip(ordered_task_keys, results)
+0 −5
Original line number Diff line number Diff line
@@ -16,7 +16,6 @@ from common.proto.context_pb2 import ConnectionId
from common.rpc_method_wrapper.ServiceExceptions import OperationFailedException
from common.tools.grpc.Tools import grpc_message_to_json_string
from service.service.service_handler_api.Tools import check_errors_setendpoint
from service.service.task_scheduler.ConnectionExpander import ConnectionExpander
from service.service.task_scheduler.TaskExecutor import TaskExecutor
from service.service.tools.EndpointIdFormatters import endpointids_to_raw
from service.service.tools.ObjectKeys import get_connection_key
@@ -47,11 +46,7 @@ class Task_ConnectionConfigure(_Task):
        service_handler_settings = {}
        service_handler = self._task_executor.get_service_handler(connection, service, **service_handler_settings)

        #connection_expander = ConnectionExpander()
        #traversed_endpoint_ids = connection_expander.get_endpoints_traversed(connection)
        #endpointids_to_set = endpointids_to_raw(traversed_endpoint_ids)
        endpointids_to_set = endpointids_to_raw(connection.path_hops_endpoint_ids)

        connection_uuid = connection.connection_id.connection_uuid.uuid
        results_setendpoint = service_handler.SetEndpoint(endpointids_to_set, connection_uuid=connection_uuid)
        errors = check_errors_setendpoint(endpointids_to_set, results_setendpoint)
+0 −5
Original line number Diff line number Diff line
@@ -16,7 +16,6 @@ from common.proto.context_pb2 import ConnectionId
from common.rpc_method_wrapper.ServiceExceptions import OperationFailedException
from common.tools.grpc.Tools import grpc_message_to_json_string
from service.service.service_handler_api.Tools import check_errors_deleteendpoint
from service.service.task_scheduler.ConnectionExpander import ConnectionExpander
from service.service.task_scheduler.TaskExecutor import TaskExecutor
from service.service.tools.EndpointIdFormatters import endpointids_to_raw
from service.service.tools.ObjectKeys import get_connection_key
@@ -47,11 +46,7 @@ class Task_ConnectionDeconfigure(_Task):
        service_handler_settings = {}
        service_handler = self._task_executor.get_service_handler(connection, service, **service_handler_settings)

        #connection_expander = ConnectionExpander()
        #traversed_endpoint_ids = connection_expander.get_endpoints_traversed(connection)
        #endpointids_to_delete = endpointids_to_raw(traversed_endpoint_ids)
        endpointids_to_delete = endpointids_to_raw(connection.path_hops_endpoint_ids)

        connection_uuid = connection.connection_id.connection_uuid.uuid
        results_deleteendpoint = service_handler.DeleteEndpoint(endpointids_to_delete, connection_uuid=connection_uuid)
        errors = check_errors_deleteendpoint(endpointids_to_delete, results_deleteendpoint)