From 2c03f374768f3fdd8b6f9e2f6f2edcc68dc76153 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Fri, 19 May 2023 10:02:12 +0000 Subject: [PATCH] Service component: - Updated retrieval of device controller in TaskExecutor - Improved logging of RecomputeConnections RPC method - Added random selection over best candidate paths - Extended unitary test for RecomputeConnections RPC method to check subsecuent path changes - Fixed TaskExecutor error message formatting --- .../service/ServiceServiceServicerImpl.py | 20 ++++++++----- .../service/task_scheduler/TaskExecutor.py | 28 ++++++++++--------- .../tests/test_unitary_recompute_conns.py | 19 +++++++++++++ 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index 84559e4a4..6d23fd4ce 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, json, logging, uuid +import grpc, json, logging, random, uuid from typing import Optional from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.ServiceExceptions import ( @@ -289,21 +289,27 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection))) - new_connection = None + candidate_new_connections = list() for candidate_new_connection in pathcomp_reply.connections: str_candidate_new_connection = connection_to_string(candidate_new_connection) - if str_candidate_new_connection != str_old_connection: - new_connection = candidate_new_connection - break - - if new_connection is None: + if str_candidate_new_connection == str_old_connection: continue + candidate_new_connections.append(candidate_new_connection) + + if len(candidate_new_connections) == 0: MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}' str_pathcomp_request = grpc_message_to_json_string(pathcomp_request) str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply) str_old_connection = grpc_message_to_json_string(old_connection) extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection) raise OperationFailedException('no-new-path-found', extra_details=extra_details) + + str_candidate_new_connections = [ + grpc_message_to_json_string(candidate_new_connection) + for candidate_new_connection in candidate_new_connections + ] + LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections))) + new_connection = random.choice(candidate_new_connections) LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection))) # Change UUID of new connection to prevent collisions diff --git a/src/service/service/task_scheduler/TaskExecutor.py b/src/service/service/task_scheduler/TaskExecutor.py index acda45ce8..ae0f1be7d 100644 --- a/src/service/service/task_scheduler/TaskExecutor.py +++ b/src/service/service/task_scheduler/TaskExecutor.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json, logging +import logging #, json from enum import Enum from typing import TYPE_CHECKING, Any, Dict, Optional, Union from common.method_wrappers.ServiceExceptions import NotFoundException @@ -20,7 +20,7 @@ from common.proto.context_pb2 import Connection, ConnectionId, Device, DeviceDri from common.tools.context_queries.Connection import get_connection_by_id from common.tools.context_queries.Device import get_device from common.tools.context_queries.Service import get_service_by_id -from common.tools.grpc.Tools import grpc_message_list_to_json_string +from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Device import json_device_id from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient @@ -113,16 +113,18 @@ class TaskExecutor: self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) def get_device_controller(self, device : Device) -> Optional[Device]: - json_controller = None - for config_rule in device.device_config.config_rules: - if config_rule.WhichOneof('config_rule') != 'custom': continue - if config_rule.custom.resource_key != '_controller': continue - json_controller = json.loads(config_rule.custom.resource_value) - break - - if json_controller is None: return None - - controller_uuid = json_controller['uuid'] + #json_controller = None + #for config_rule in device.device_config.config_rules: + # if config_rule.WhichOneof('config_rule') != 'custom': continue + # if config_rule.custom.resource_key != '_controller': continue + # json_controller = json.loads(config_rule.custom.resource_value) + # break + + #if json_controller is None: return None + + #controller_uuid = json_controller['uuid'] + controller_uuid = device.controller_id.device_uuid.uuid + if len(controller_uuid) == 0: return None controller = self.get_device(DeviceId(**json_device_id(controller_uuid))) controller_uuid = controller.device_id.device_uuid.uuid if controller is None: raise Exception('Device({:s}) not found'.format(str(controller_uuid))) @@ -188,7 +190,7 @@ class TaskExecutor: } LOGGER.exception( 'Unable to select service handler. service={:s} connection={:s} connection_devices={:s}'.format( - grpc_message_list_to_json_string(service), grpc_message_list_to_json_string(connection), + grpc_message_to_json_string(service), grpc_message_to_json_string(connection), str(dict_connection_devices) ) ) diff --git a/src/service/tests/test_unitary_recompute_conns.py b/src/service/tests/test_unitary_recompute_conns.py index 3c87eae9e..717e3af73 100644 --- a/src/service/tests/test_unitary_recompute_conns.py +++ b/src/service/tests/test_unitary_recompute_conns.py @@ -78,6 +78,25 @@ def test_service_recompute_connection( assert len(response.connections) == 1 # 1 connection per service str_old_connections = grpc_message_to_json_string(response) + # Change path first time + request = Service() + request.CopyFrom(service) + del request.service_endpoint_ids[:] # pylint: disable=no-member + del request.service_constraints[:] # pylint: disable=no-member + del request.service_config.config_rules[:] # pylint: disable=no-member + service_client.RecomputeConnections(request) + + response = context_client.ListConnections(service_id) + LOGGER.info(' ServiceId[{:s}] => Connections[{:d}] = {:s}'.format( + grpc_message_to_json_string(service_id), len(response.connections), grpc_message_to_json_string(response))) + assert len(response.connections) == 1 # 1 connection per service + str_new_connections = grpc_message_to_json_string(response) + + assert str_old_connections != str_new_connections + + str_old_connections = str_new_connections + + # Change path second time request = Service() request.CopyFrom(service) del request.service_endpoint_ids[:] # pylint: disable=no-member -- GitLab