Commit 2c03f374 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Service component:

- Updated retrieval of device controller in TaskExecutor
- Improved logging of RecomputeConnections RPC method
- Added random selection over best candidate paths
- Extended unitary test for RecomputeConnections RPC method to check subsecuent path changes
- Fixed TaskExecutor error message formatting
parent 84b53aad
Loading
Loading
Loading
Loading
+13 −7
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import grpc, json, logging, uuid
import grpc, json, logging, random, uuid
from typing import Optional
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import (
@@ -289,14 +289,13 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):

        LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection)))

        new_connection = None
        candidate_new_connections = list()
        for candidate_new_connection in pathcomp_reply.connections:
            str_candidate_new_connection = connection_to_string(candidate_new_connection)
            if str_candidate_new_connection != str_old_connection:
                new_connection = candidate_new_connection
                break
            if str_candidate_new_connection == str_old_connection: continue
            candidate_new_connections.append(candidate_new_connection)

        if new_connection is None:
        if len(candidate_new_connections) == 0:
            MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}'
            str_pathcomp_request = grpc_message_to_json_string(pathcomp_request)
            str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
@@ -304,6 +303,13 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
            extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection)
            raise OperationFailedException('no-new-path-found', extra_details=extra_details)
        
        str_candidate_new_connections = [
            grpc_message_to_json_string(candidate_new_connection)
            for candidate_new_connection in candidate_new_connections
        ]
        LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections)))

        new_connection = random.choice(candidate_new_connections)
        LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection)))

        # Change UUID of new connection to prevent collisions
+15 −13
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json, logging
import logging #, json
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from common.method_wrappers.ServiceExceptions import NotFoundException
@@ -20,7 +20,7 @@ from common.proto.context_pb2 import Connection, ConnectionId, Device, DeviceDri
from common.tools.context_queries.Connection import get_connection_by_id
from common.tools.context_queries.Device import get_device
from common.tools.context_queries.Service import get_service_by_id
from common.tools.grpc.Tools import grpc_message_list_to_json_string
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Device import json_device_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
@@ -113,16 +113,18 @@ class TaskExecutor:
        self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device)

    def get_device_controller(self, device : Device) -> Optional[Device]:
        json_controller = None
        for config_rule in device.device_config.config_rules:
            if config_rule.WhichOneof('config_rule') != 'custom': continue
            if config_rule.custom.resource_key != '_controller': continue
            json_controller = json.loads(config_rule.custom.resource_value)
            break

        if json_controller is None: return None

        controller_uuid = json_controller['uuid']
        #json_controller = None
        #for config_rule in device.device_config.config_rules:
        #    if config_rule.WhichOneof('config_rule') != 'custom': continue
        #    if config_rule.custom.resource_key != '_controller': continue
        #    json_controller = json.loads(config_rule.custom.resource_value)
        #    break

        #if json_controller is None: return None

        #controller_uuid = json_controller['uuid']
        controller_uuid = device.controller_id.device_uuid.uuid
        if len(controller_uuid) == 0: return None
        controller = self.get_device(DeviceId(**json_device_id(controller_uuid)))
        controller_uuid = controller.device_id.device_uuid.uuid
        if controller is None: raise Exception('Device({:s}) not found'.format(str(controller_uuid)))
@@ -188,7 +190,7 @@ class TaskExecutor:
            }
            LOGGER.exception(
                'Unable to select service handler. service={:s} connection={:s} connection_devices={:s}'.format(
                    grpc_message_list_to_json_string(service), grpc_message_list_to_json_string(connection),
                    grpc_message_to_json_string(service), grpc_message_to_json_string(connection),
                    str(dict_connection_devices)
                )
            )
+19 −0
Original line number Diff line number Diff line
@@ -78,6 +78,25 @@ def test_service_recompute_connection(
    assert len(response.connections) == 1 # 1 connection per service
    str_old_connections = grpc_message_to_json_string(response)

    # Change path first time
    request = Service()
    request.CopyFrom(service)
    del request.service_endpoint_ids[:]         # pylint: disable=no-member
    del request.service_constraints[:]          # pylint: disable=no-member
    del request.service_config.config_rules[:]  # pylint: disable=no-member
    service_client.RecomputeConnections(request)

    response = context_client.ListConnections(service_id)
    LOGGER.info('  ServiceId[{:s}] => Connections[{:d}] = {:s}'.format(
        grpc_message_to_json_string(service_id), len(response.connections), grpc_message_to_json_string(response)))
    assert len(response.connections) == 1 # 1 connection per service
    str_new_connections = grpc_message_to_json_string(response)

    assert str_old_connections != str_new_connections

    str_old_connections = str_new_connections

    # Change path second time
    request = Service()
    request.CopyFrom(service)
    del request.service_endpoint_ids[:]         # pylint: disable=no-member