diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index 84559e4a4eea33ed3748cacaf640d24f1c08ef6f..6d23fd4cee53d1639c9eefbd943d45dab497b253 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, json, logging, uuid +import grpc, json, logging, random, uuid from typing import Optional from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.ServiceExceptions import ( @@ -289,21 +289,27 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection))) - new_connection = None + candidate_new_connections = list() for candidate_new_connection in pathcomp_reply.connections: str_candidate_new_connection = connection_to_string(candidate_new_connection) - if str_candidate_new_connection != str_old_connection: - new_connection = candidate_new_connection - break - - if new_connection is None: + if str_candidate_new_connection == str_old_connection: continue + candidate_new_connections.append(candidate_new_connection) + + if len(candidate_new_connections) == 0: MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}' str_pathcomp_request = grpc_message_to_json_string(pathcomp_request) str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply) str_old_connection = grpc_message_to_json_string(old_connection) extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection) raise OperationFailedException('no-new-path-found', extra_details=extra_details) + + str_candidate_new_connections = [ + grpc_message_to_json_string(candidate_new_connection) + for candidate_new_connection in candidate_new_connections + ] + LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections))) + new_connection = random.choice(candidate_new_connections) LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection))) # Change UUID of new connection to prevent collisions diff --git a/src/service/service/task_scheduler/TaskExecutor.py b/src/service/service/task_scheduler/TaskExecutor.py index acda45ce80a62a4a3723744546968e3195799b59..ae0f1be7da291a5dc025641cb606f7a7706059ca 100644 --- a/src/service/service/task_scheduler/TaskExecutor.py +++ b/src/service/service/task_scheduler/TaskExecutor.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json, logging +import logging #, json from enum import Enum from typing import TYPE_CHECKING, Any, Dict, Optional, Union from common.method_wrappers.ServiceExceptions import NotFoundException @@ -20,7 +20,7 @@ from common.proto.context_pb2 import Connection, ConnectionId, Device, DeviceDri from common.tools.context_queries.Connection import get_connection_by_id from common.tools.context_queries.Device import get_device from common.tools.context_queries.Service import get_service_by_id -from common.tools.grpc.Tools import grpc_message_list_to_json_string +from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Device import json_device_id from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient @@ -113,16 +113,18 @@ class TaskExecutor: self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) def get_device_controller(self, device : Device) -> Optional[Device]: - json_controller = None - for config_rule in device.device_config.config_rules: - if config_rule.WhichOneof('config_rule') != 'custom': continue - if config_rule.custom.resource_key != '_controller': continue - json_controller = json.loads(config_rule.custom.resource_value) - break - - if json_controller is None: return None - - controller_uuid = json_controller['uuid'] + #json_controller = None + #for config_rule in device.device_config.config_rules: + # if config_rule.WhichOneof('config_rule') != 'custom': continue + # if config_rule.custom.resource_key != '_controller': continue + # json_controller = json.loads(config_rule.custom.resource_value) + # break + + #if json_controller is None: return None + + #controller_uuid = json_controller['uuid'] + controller_uuid = device.controller_id.device_uuid.uuid + if len(controller_uuid) == 0: return None controller = self.get_device(DeviceId(**json_device_id(controller_uuid))) controller_uuid = controller.device_id.device_uuid.uuid if controller is None: raise Exception('Device({:s}) not found'.format(str(controller_uuid))) @@ -188,7 +190,7 @@ class TaskExecutor: } LOGGER.exception( 'Unable to select service handler. service={:s} connection={:s} connection_devices={:s}'.format( - grpc_message_list_to_json_string(service), grpc_message_list_to_json_string(connection), + grpc_message_to_json_string(service), grpc_message_to_json_string(connection), str(dict_connection_devices) ) ) diff --git a/src/service/tests/test_unitary_recompute_conns.py b/src/service/tests/test_unitary_recompute_conns.py index 3c87eae9ef0e83372064544aeaa0e1721d7fda7d..717e3af73b0d21d1dfeeab1e388c5df663417337 100644 --- a/src/service/tests/test_unitary_recompute_conns.py +++ b/src/service/tests/test_unitary_recompute_conns.py @@ -78,6 +78,25 @@ def test_service_recompute_connection( assert len(response.connections) == 1 # 1 connection per service str_old_connections = grpc_message_to_json_string(response) + # Change path first time + request = Service() + request.CopyFrom(service) + del request.service_endpoint_ids[:] # pylint: disable=no-member + del request.service_constraints[:] # pylint: disable=no-member + del request.service_config.config_rules[:] # pylint: disable=no-member + service_client.RecomputeConnections(request) + + response = context_client.ListConnections(service_id) + LOGGER.info(' ServiceId[{:s}] => Connections[{:d}] = {:s}'.format( + grpc_message_to_json_string(service_id), len(response.connections), grpc_message_to_json_string(response))) + assert len(response.connections) == 1 # 1 connection per service + str_new_connections = grpc_message_to_json_string(response) + + assert str_old_connections != str_new_connections + + str_old_connections = str_new_connections + + # Change path second time request = Service() request.CopyFrom(service) del request.service_endpoint_ids[:] # pylint: disable=no-member