diff --git a/src/service/service/task_scheduler/TaskExecutor.py b/src/service/service/task_scheduler/TaskExecutor.py index cd20faad23a06678be39dbacc476a0ea25d4d540..08373230c55af5d92b64b6f4947d22f240181d7e 100644 --- a/src/service/service/task_scheduler/TaskExecutor.py +++ b/src/service/service/task_scheduler/TaskExecutor.py @@ -14,7 +14,8 @@ import json, logging from enum import Enum -from typing import TYPE_CHECKING, Any, Dict, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union +from common.DeviceTypes import DeviceTypeEnum from common.method_wrappers.ServiceExceptions import NotFoundException from common.proto.context_pb2 import ( Connection, ConnectionId, Device, DeviceDriverEnum, DeviceId, Service, ServiceId, @@ -158,8 +159,8 @@ class TaskExecutor: def get_devices_from_connection( self, connection : Connection, exclude_managed_by_controller : bool = False - ) -> Dict[str, Device]: - devices = dict() + ) -> Dict[DeviceTypeEnum, Dict[str, Device]]: + devices : Dict[DeviceTypeEnum, Dict[str, Device]] = dict() for endpoint_id in connection.path_hops_endpoint_ids: device = self.get_device(endpoint_id.device_id) device_uuid = endpoint_id.device_id.device_uuid.uuid @@ -167,11 +168,14 @@ class TaskExecutor: controller = self.get_device_controller(device) if controller is None: - devices[device_uuid] = device + device_type = DeviceTypeEnum._value2member_map_[device.device_type] + devices.setdefault(device_type, dict())[device_uuid] = device else: if not exclude_managed_by_controller: - devices[device_uuid] = device - devices[controller.device_id.device_uuid.uuid] = controller + device_type = DeviceTypeEnum._value2member_map_[device.device_type] + devices.setdefault(device_type, dict())[device_uuid] = device + device_type = DeviceTypeEnum._value2member_map_[controller.device_type] + devices.setdefault(device_type, dict())[controller.device_id.device_uuid.uuid] = controller return devices # ----- Service-related methods ------------------------------------------------------------------------------------ @@ -198,25 +202,33 @@ class TaskExecutor: # ----- Service Handler Factory ------------------------------------------------------------------------------------ - def get_service_handler( + def get_service_handlers( self, connection : Connection, service : Service, **service_handler_settings - ) -> '_ServiceHandler': - connection_devices = self.get_devices_from_connection(connection, exclude_managed_by_controller=True) - try: - service_handler_class = get_service_handler_class( - self._service_handler_factory, service, connection_devices) - return service_handler_class(service, self, **service_handler_settings) - except (UnsatisfiedFilterException, UnsupportedFilterFieldException, UnsupportedFilterFieldValueException): - dict_connection_devices = { - cd_data.name : (cd_uuid, cd_data.name, { - (device_driver, DeviceDriverEnum.Name(device_driver)) - for device_driver in cd_data.device_drivers - }) - for cd_uuid,cd_data in connection_devices.items() - } - LOGGER.exception( - 'Unable to select service handler. service={:s} connection={:s} connection_devices={:s}'.format( + ) -> Dict[DeviceTypeEnum, Tuple['_ServiceHandler', Dict[str, Device]]]: + connection_device_types : Dict[DeviceTypeEnum, Dict[str, Device]] = self.get_devices_from_connection( + connection, exclude_managed_by_controller=True + ) + service_handlers : Dict[DeviceTypeEnum, Tuple['_ServiceHandler', Dict[str, Device]]] = dict() + for device_type, connection_devices in connection_device_types.items(): + try: + service_handler_class = get_service_handler_class( + self._service_handler_factory, service, connection_devices) + service_handler = service_handler_class(service, self, **service_handler_settings) + service_handlers[device_type] = (service_handler, connection_devices) + except ( + UnsatisfiedFilterException, UnsupportedFilterFieldException, + UnsupportedFilterFieldValueException + ): + dict_connection_devices = { + cd_data.name : (cd_uuid, cd_data.name, { + (device_driver, DeviceDriverEnum.Name(device_driver)) + for device_driver in cd_data.device_drivers + }) + for cd_uuid,cd_data in connection_devices.items() + } + MSG = 'Unable to select service handler. service={:s} connection={:s} connection_devices={:s}' + LOGGER.exception(MSG.format( grpc_message_to_json_string(service), grpc_message_to_json_string(connection), str(dict_connection_devices) - ) - ) + )) + return service_handlers diff --git a/src/service/service/task_scheduler/tasks/Task_ConnectionConfigure.py b/src/service/service/task_scheduler/tasks/Task_ConnectionConfigure.py index f6c543c1ccb947eb01c3d5f5fb93c0504a77ca95..3f52f337ae163a6e8c78d873a58a291ecff4bd1a 100644 --- a/src/service/service/task_scheduler/tasks/Task_ConnectionConfigure.py +++ b/src/service/service/task_scheduler/tasks/Task_ConnectionConfigure.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING, Dict, Tuple +from common.DeviceTypes import DeviceTypeEnum from common.method_wrappers.ServiceExceptions import OperationFailedException -from common.proto.context_pb2 import ConnectionId +from common.proto.context_pb2 import ConnectionId, Device from common.tools.grpc.Tools import grpc_message_to_json_string from service.service.service_handler_api.Tools import check_errors_setendpoint from service.service.task_scheduler.TaskExecutor import TaskExecutor @@ -21,6 +23,9 @@ from service.service.tools.EndpointIdFormatters import endpointids_to_raw from service.service.tools.ObjectKeys import get_connection_key from ._Task import _Task +if TYPE_CHECKING: + from service.service.service_handler_api._ServiceHandler import _ServiceHandler + KEY_TEMPLATE = 'connection({connection_id:s}):configure' class Task_ConnectionConfigure(_Task): @@ -44,12 +49,24 @@ class Task_ConnectionConfigure(_Task): service = self._task_executor.get_service(connection.service_id) service_handler_settings = {} - service_handler = self._task_executor.get_service_handler(connection, service, **service_handler_settings) + service_handlers : Dict[DeviceTypeEnum, Tuple['_ServiceHandler', Dict[str, Device]]] = \ + self._task_executor.get_service_handlers(connection, service, **service_handler_settings) - endpointids_to_set = endpointids_to_raw(connection.path_hops_endpoint_ids) connection_uuid = connection.connection_id.connection_uuid.uuid - results_setendpoint = service_handler.SetEndpoint(endpointids_to_set, connection_uuid=connection_uuid) - errors = check_errors_setendpoint(endpointids_to_set, results_setendpoint) + endpointids_to_set = endpointids_to_raw(connection.path_hops_endpoint_ids) + + errors = list() + for _, (service_handler, connection_devices) in service_handlers.items(): + _endpointids_to_set = [ + (device_uuid, endpoint_uuid, topology_uuid) + for device_uuid, endpoint_uuid, topology_uuid in endpointids_to_set + if device_uuid in connection_devices + ] + results_setendpoint = service_handler.SetEndpoint( + _endpointids_to_set, connection_uuid=connection_uuid + ) + errors.extend(check_errors_setendpoint(endpointids_to_set, results_setendpoint)) + if len(errors) > 0: MSG = 'SetEndpoint for Connection({:s}) from Service({:s})' str_connection = grpc_message_to_json_string(connection) diff --git a/src/service/service/task_scheduler/tasks/Task_ConnectionDeconfigure.py b/src/service/service/task_scheduler/tasks/Task_ConnectionDeconfigure.py index 7b6b7951befbd6abd4d052ce5eec39d3398aa6e7..4ce774d208d3ea71e55482ea0653521cb7f1083a 100644 --- a/src/service/service/task_scheduler/tasks/Task_ConnectionDeconfigure.py +++ b/src/service/service/task_scheduler/tasks/Task_ConnectionDeconfigure.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING, Dict, Tuple +from common.DeviceTypes import DeviceTypeEnum from common.method_wrappers.ServiceExceptions import OperationFailedException -from common.proto.context_pb2 import ConnectionId +from common.proto.context_pb2 import ConnectionId, Device from common.tools.grpc.Tools import grpc_message_to_json_string from service.service.service_handler_api.Tools import check_errors_deleteendpoint from service.service.task_scheduler.TaskExecutor import TaskExecutor @@ -21,6 +23,9 @@ from service.service.tools.EndpointIdFormatters import endpointids_to_raw from service.service.tools.ObjectKeys import get_connection_key from ._Task import _Task +if TYPE_CHECKING: + from service.service.service_handler_api._ServiceHandler import _ServiceHandler + KEY_TEMPLATE = 'connection({connection_id:s}):deconfigure' class Task_ConnectionDeconfigure(_Task): @@ -44,12 +49,24 @@ class Task_ConnectionDeconfigure(_Task): service = self._task_executor.get_service(connection.service_id) service_handler_settings = {} - service_handler = self._task_executor.get_service_handler(connection, service, **service_handler_settings) + service_handlers : Dict[DeviceTypeEnum, Tuple['_ServiceHandler', Dict[str, Device]]] = \ + self._task_executor.get_service_handlers(connection, service, **service_handler_settings) - endpointids_to_delete = endpointids_to_raw(connection.path_hops_endpoint_ids) connection_uuid = connection.connection_id.connection_uuid.uuid - results_deleteendpoint = service_handler.DeleteEndpoint(endpointids_to_delete, connection_uuid=connection_uuid) - errors = check_errors_deleteendpoint(endpointids_to_delete, results_deleteendpoint) + endpointids_to_delete = endpointids_to_raw(connection.path_hops_endpoint_ids) + + errors = list() + for _, (service_handler, connection_devices) in service_handlers.items(): + _endpointids_to_delete = [ + (device_uuid, endpoint_uuid, topology_uuid) + for device_uuid, endpoint_uuid, topology_uuid in endpointids_to_delete + if device_uuid in connection_devices + ] + results_deleteendpoint = service_handler.DeleteEndpoint( + _endpointids_to_delete, connection_uuid=connection_uuid + ) + errors.extend(check_errors_deleteendpoint(endpointids_to_delete, results_deleteendpoint)) + if len(errors) > 0: MSG = 'DeleteEndpoint for Connection({:s}) from Service({:s})' str_connection = grpc_message_to_json_string(connection)