diff --git a/src/automation/src/main/java/eu/teraflow/automation/AutomationServiceImpl.java b/src/automation/src/main/java/eu/teraflow/automation/AutomationServiceImpl.java index 773c99de6d94b5f8806a8a354b2371c0a6748f9f..54255cf78481bbaa1ecdd81a097a335292d81ed5 100644 --- a/src/automation/src/main/java/eu/teraflow/automation/AutomationServiceImpl.java +++ b/src/automation/src/main/java/eu/teraflow/automation/AutomationServiceImpl.java @@ -50,33 +50,34 @@ public class AutomationServiceImpl implements AutomationService { device -> { final var id = deviceId; - if (!device.isEnabled()) { - LOGGER.infof(MESSAGE, device); - - final var initialConfiguration = - deviceService.getInitialConfiguration(device.getDeviceId()); - - device.enableDevice(); - LOGGER.infof("Enabled device [%s]", id); - - initialConfiguration - .subscribe() - .with( - deviceConfig -> { - device.setDeviceConfiguration(deviceConfig); - final var configuredDeviceIdUni = deviceService.configureDevice(device); - - configuredDeviceIdUni - .subscribe() - .with( - configuredDeviceId -> - LOGGER.infof( - "Device [%s] has been enabled and configured successfully with %s.\n", - id, deviceConfig)); - }); - } else { - LOGGER.infof("%s has been already enabled. Ignoring...", device); + if (device.isEnabled()) { + LOGGER.warnf("%s has already been enabled. Ignoring...", device); + return; } + + LOGGER.infof(MESSAGE, device); + + final var initialConfiguration = + deviceService.getInitialConfiguration(device.getDeviceId()); + + device.enableDevice(); + LOGGER.infof("Enabled device [%s]", id); + + initialConfiguration + .subscribe() + .with( + deviceConfig -> { + device.setDeviceConfiguration(deviceConfig); + final var configuredDeviceIdUni = deviceService.configureDevice(device); + + configuredDeviceIdUni + .subscribe() + .with( + configuredDeviceId -> + LOGGER.infof( + "Device [%s] has been successfully enabled and configured with %s.\n", + id, deviceConfig)); + }); }); return deserializedDeviceUni; @@ -92,13 +93,23 @@ public class AutomationServiceImpl implements AutomationService { device -> { final var id = deviceId; + if (device.isDisabled()) { + LOGGER.warnf("%s has already been disabled. Ignoring...", device); + return; + } + + device.disableDevice(); + LOGGER.infof("Disabled device [%s]", id); + LOGGER.infof(MESSAGE, device); final var empty = deviceService.deleteDevice(device.getDeviceId()); empty .subscribe() - .with(emptyMessage -> LOGGER.infof("Device [%s] has been deleted.\n", id)); + .with( + emptyMessage -> + LOGGER.infof("Device [%s] has been successfully deleted.\n", id)); }); return deserializedDeviceUni; @@ -114,6 +125,11 @@ public class AutomationServiceImpl implements AutomationService { device -> { final var id = deviceId; + if (!device.isEnabled()) { + LOGGER.warnf("Cannot update disabled device %s. Ignoring...", device); + return; + } + LOGGER.infof(MESSAGE, device); device.setDeviceConfiguration(deviceConfig); final var updatedDeviceIdUni = deviceService.configureDevice(device); @@ -123,7 +139,7 @@ public class AutomationServiceImpl implements AutomationService { .with( configuredDeviceId -> LOGGER.infof( - "Device [%s] has been updated successfully with %s.\n", + "Device [%s] has been successfully updated with %s.\n", id, deviceConfig)); }); diff --git a/src/automation/src/main/java/eu/teraflow/automation/ContextSubscriber.java b/src/automation/src/main/java/eu/teraflow/automation/ContextSubscriber.java index c4d636b6b4dca7241808ade421f32a77861e4d3f..2fc3a3356456b3c1bc55137f686a7e82570a3171 100644 --- a/src/automation/src/main/java/eu/teraflow/automation/ContextSubscriber.java +++ b/src/automation/src/main/java/eu/teraflow/automation/ContextSubscriber.java @@ -78,9 +78,11 @@ public class ContextSubscriber { automationService.deleteDevice(deviceEvent.getDeviceId()); break; case UPDATE: - LOGGER.infof("Received %s for device [%s]", event, deviceId); - automationService.updateDevice( - deviceEvent.getDeviceId(), deviceEvent.getDeviceConfig().orElse(null)); + LOGGER.warnf( + "Received %s for device [%s]. " + + "No automation action on an already updated device", + event, deviceId); + break; case UNDEFINED: logWarningMessage(event, deviceId, eventType); break; diff --git a/src/automation/src/main/java/eu/teraflow/automation/context/model/Device.java b/src/automation/src/main/java/eu/teraflow/automation/context/model/Device.java index 77bd3ca5c861713b43faf178c6450e35e6032b3c..1e5563917625a9679feb9e9491990885cc4a3c22 100644 --- a/src/automation/src/main/java/eu/teraflow/automation/context/model/Device.java +++ b/src/automation/src/main/java/eu/teraflow/automation/context/model/Device.java @@ -61,10 +61,18 @@ public class Device { return deviceOperationalStatus == DeviceOperationalStatus.ENABLED; } + public boolean isDisabled() { + return deviceOperationalStatus == DeviceOperationalStatus.DISABLED; + } + public void enableDevice() { this.deviceOperationalStatus = DeviceOperationalStatus.ENABLED; } + public void disableDevice() { + this.deviceOperationalStatus = DeviceOperationalStatus.DISABLED; + } + public String getDeviceId() { return deviceId; } diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index bc71168f621afc9f0a9ed93d51844542beed813c..71fe14f53395e2ac57884911fe846c9c1b2c2834 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -22,7 +22,7 @@ from common.rpc_method_wrapper.ServiceExceptions import AlreadyExistsException, from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from context.client.ContextClient import ContextClient from pathcomp.frontend.client.PathCompClient import PathCompClient -from service.service.tools.ContextGetters import get_service +from .tools.ContextGetters import get_service from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .task_scheduler.TaskScheduler import TasksScheduler diff --git a/src/service/service/service_handler_api/ServiceHandlerFactory.py b/src/service/service/service_handler_api/ServiceHandlerFactory.py index 09a56775d4f391d71fe5ac30f9be74430120e306..00f9535bfaa9f152101ab14a87e413991619ba76 100644 --- a/src/service/service/service_handler_api/ServiceHandlerFactory.py +++ b/src/service/service/service_handler_api/ServiceHandlerFactory.py @@ -14,21 +14,23 @@ import logging, operator from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple from common.proto.context_pb2 import Device, Service from common.tools.grpc.Tools import grpc_message_to_json_string -from service.service.service_handler_api._ServiceHandler import _ServiceHandler from .Exceptions import ( UnsatisfiedFilterException, UnsupportedServiceHandlerClassException, UnsupportedFilterFieldException, UnsupportedFilterFieldValueException) from .FilterFields import FILTER_FIELD_ALLOWED_VALUES, FilterFieldEnum +if TYPE_CHECKING: + from service.service.service_handler_api._ServiceHandler import _ServiceHandler + LOGGER = logging.getLogger(__name__) class ServiceHandlerFactory: def __init__(self, service_handlers : List[Tuple[type, List[Dict[FilterFieldEnum, Any]]]]) -> None: # Dict{field_name => Dict{field_value => Set{ServiceHandler}}} - self.__indices : Dict[str, Dict[str, Set[_ServiceHandler]]] = {} + self.__indices : Dict[str, Dict[str, Set['_ServiceHandler']]] = {} for service_handler_class,filter_field_sets in service_handlers: for filter_fields in filter_field_sets: @@ -36,6 +38,7 @@ class ServiceHandlerFactory: self.register_service_handler_class(service_handler_class, **filter_fields) def register_service_handler_class(self, service_handler_class, **filter_fields): + from service.service.service_handler_api._ServiceHandler import _ServiceHandler if not issubclass(service_handler_class, _ServiceHandler): raise UnsupportedServiceHandlerClassException(str(service_handler_class)) @@ -59,12 +62,12 @@ class ServiceHandlerFactory: field_indice_service_handlers = field_indice.setdefault(field_value, set()) field_indice_service_handlers.add(service_handler_class) - def get_service_handler_class(self, **filter_fields) -> _ServiceHandler: + def get_service_handler_class(self, **filter_fields) -> '_ServiceHandler': supported_filter_fields = set(FILTER_FIELD_ALLOWED_VALUES.keys()) unsupported_filter_fields = set(filter_fields.keys()).difference(supported_filter_fields) if len(unsupported_filter_fields) > 0: raise UnsupportedFilterFieldException(unsupported_filter_fields) - candidate_service_handler_classes : Dict[_ServiceHandler, int] = None # num. filter hits per service_handler + candidate_service_handler_classes : Dict['_ServiceHandler', int] = None # num. filter hits per service_handler for field_name, field_values in filter_fields.items(): field_indice = self.__indices.get(field_name) if field_indice is None: continue @@ -109,7 +112,7 @@ def get_common_device_drivers(drivers_per_device : List[Set[int]]) -> Set[int]: def get_service_handler_class( service_handler_factory : ServiceHandlerFactory, service : Service, connection_devices : Dict[str, Device] -) -> Optional[_ServiceHandler]: +) -> Optional['_ServiceHandler']: str_service_key = grpc_message_to_json_string(service.service_id) diff --git a/src/service/service/service_handler_api/_ServiceHandler.py b/src/service/service/service_handler_api/_ServiceHandler.py index c642afe75342309f607ed722cf78544bcfdb1ebd..a5042a504d1ade0a357ecb298a340707fe8b167e 100644 --- a/src/service/service/service_handler_api/_ServiceHandler.py +++ b/src/service/service/service_handler_api/_ServiceHandler.py @@ -14,7 +14,7 @@ from typing import Any, List, Optional, Tuple, Union from common.proto.context_pb2 import Service -from service.task_scheduler.TaskExecutor import TaskExecutor +from service.service.task_scheduler.TaskExecutor import TaskExecutor class _ServiceHandler: def __init__(self, diff --git a/src/service/service/task_scheduler/TaskExecutor.py b/src/service/service/task_scheduler/TaskExecutor.py index 416e1698f2432e22ae5cfe8e437570fc7d3c8880..757a660590dde1b3fb2eee7090b2329cd45ec8cb 100644 --- a/src/service/service/task_scheduler/TaskExecutor.py +++ b/src/service/service/task_scheduler/TaskExecutor.py @@ -13,16 +13,18 @@ # limitations under the License. from enum import Enum -from typing import Any, Dict, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, Optional, Union from common.proto.context_pb2 import Connection, ConnectionId, Device, DeviceId, Service, ServiceId from common.rpc_method_wrapper.ServiceExceptions import NotFoundException from context.client.ContextClient import ContextClient from device.client.DeviceClient import DeviceClient -from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory, get_service_handler_class from service.service.tools.ContextGetters import get_connection, get_device, get_service from service.service.tools.ObjectKeys import get_connection_key, get_device_key, get_service_key +if TYPE_CHECKING: + from service.service.service_handler_api._ServiceHandler import _ServiceHandler + CacheableObject = Union[Connection, Device, Service] class CacheableObjectType(Enum): @@ -136,7 +138,7 @@ class TaskExecutor: def get_service_handler( self, connection : Connection, service : Service, **service_handler_settings - ) -> _ServiceHandler: + ) -> '_ServiceHandler': connection_devices = self.get_devices_from_connection(connection) service_handler_class = get_service_handler_class(self._service_handler_factory, service, connection_devices) return service_handler_class(service, self, **service_handler_settings) diff --git a/src/service/service/task_scheduler/TaskScheduler.py b/src/service/service/task_scheduler/TaskScheduler.py index de7e9eb7a70e683051e9d2fd906252713dcdba54..6f2bdba3e14a799e361b2543b4160b69b230d66a 100644 --- a/src/service/service/task_scheduler/TaskScheduler.py +++ b/src/service/service/task_scheduler/TaskScheduler.py @@ -13,12 +13,11 @@ # limitations under the License. import graphlib, logging, queue, time -from typing import Dict, Tuple +from typing import TYPE_CHECKING, Dict, Tuple from common.proto.context_pb2 import Connection, ConnectionId, Service, ServiceId, ServiceStatusEnum from common.proto.pathcomp_pb2 import PathCompReply from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient -from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from service.service.tools.ObjectKeys import get_connection_key, get_service_key from .tasks._Task import _Task from .tasks.Task_ConnectionConfigure import Task_ConnectionConfigure @@ -27,10 +26,13 @@ from .tasks.Task_ServiceDelete import Task_ServiceDelete from .tasks.Task_ServiceSetStatus import Task_ServiceSetStatus from .TaskExecutor import CacheableObjectType, TaskExecutor +if TYPE_CHECKING: + from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory + LOGGER = logging.getLogger(__name__) class TasksScheduler: - def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None: + def __init__(self, service_handler_factory : 'ServiceHandlerFactory') -> None: self._dag = graphlib.TopologicalSorter() self._executor = TaskExecutor(service_handler_factory) self._tasks : Dict[str, _Task] = dict()