Commit 5a259475 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Service component:

- Corrected DAG dependencies in TaskScheduler compose_service_connection_update() method
- Corrected validation and update of new connection in RecomputeConnections RPC method
parent 9a4d9785
Loading
Loading
Loading
Loading
+22 −3
Original line number Diff line number Diff line
@@ -12,12 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, json, logging
import grpc, json, logging, uuid
from typing import Optional
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import (
    AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException)
from common.proto.context_pb2 import Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum
    AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException,
    OperationFailedException)
from common.proto.context_pb2 import Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.tools.context_queries.Service import get_service_by_id
@@ -286,6 +287,8 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
        # compute a string representing the old connection
        str_old_connection = connection_to_string(old_connection)

        LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection)))

        new_connection = None
        for candidate_new_connection in pathcomp_reply.connections:
            str_candidate_new_connection = connection_to_string(candidate_new_connection)
@@ -293,6 +296,22 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
                new_connection = candidate_new_connection
                break
        
        if new_connection is None:
            MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}'
            str_pathcomp_request = grpc_message_to_json_string(pathcomp_request)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
            str_old_connection = grpc_message_to_json_string(old_connection)
            extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection)
            raise OperationFailedException('no-new-path-found', extra_details=extra_details)

        LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection)))

        # Change UUID of new connection to prevent collisions
        tmp_connection = Connection()
        tmp_connection.CopyFrom(new_connection)
        tmp_connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())
        new_connection = tmp_connection

        # Feed TaskScheduler with the service to update, the old connection to
        # deconfigure and the new connection to configure. It will produce a
        # schedule of tasks (an ordered list of tasks to be executed) to
+4 −1
Original line number Diff line number Diff line
@@ -232,7 +232,7 @@ class TasksScheduler:
        self._dag.add(service_active_key, service_updating_key)

        # re-activating the service depends on the new connection having been configured
        self._dag.add(service_active_key, service_updating_key)
        self._dag.add(service_active_key, new_connection_configure_key)

        t1 = time.time()
        LOGGER.debug('[compose_service_connection_update] elapsed_time: {:f} sec'.format(t1-t0))
@@ -243,9 +243,12 @@ class TasksScheduler:

        results = []
        for task_key in ordered_task_keys:
            str_task_name = ('DRY ' if dry_run else '') + str(task_key)
            LOGGER.debug('[execute_all] starting task {:s}'.format(str_task_name))
            task = self._tasks.get(task_key)
            succeeded = True if dry_run else task.execute()
            results.append(succeeded)
            LOGGER.debug('[execute_all] finished task {:s} ; succeeded={:s}'.format(str_task_name, str(succeeded)))

        LOGGER.debug('[execute_all] results={:s}'.format(str(results)))
        return zip(ordered_task_keys, results)