Loading src/device/service/DeviceServiceServicerImpl.py +12 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver from .monitoring.MonitoringLoops import MonitoringLoops from .Tools import ( check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules, deconfigure_rules, populate_config_rules, populate_endpoints, populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi) populate_config_rules, populate_endpoint_monitoring_resources, populate_endpoints, populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi) LOGGER = logging.getLogger(__name__) Loading Loading @@ -86,6 +86,17 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): raise OperationFailedException('AddDevice', extra_details=errors) device_id = context_client.SetDevice(device) # Update endpoint monitoring resources with UUIDs device_with_uuids = context_client.GetDevice(device_id) populate_endpoint_monitoring_resources(device_with_uuids, self.monitoring_loops) # TODO: remove next temporary workaround when issue with Automation component is resolved device_with_uuids_2 = Device() device_with_uuids_2.CopyFrom(device_with_uuids) device_with_uuids_2.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED device_id = context_client.SetDevice(device_with_uuids_2) return device_id finally: self.mutex_queues.signal_done(device_uuid) Loading src/device/service/Tools.py +25 −6 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json import json, logging from typing import Any, Dict, List, Tuple, Union from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.method_wrappers.ServiceExceptions import InvalidArgumentException Loading @@ -21,12 +21,15 @@ from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.grpc.ConfigRules import update_config_rule_custom from common.tools.grpc.Tools import grpc_message_to_json from context.client.ContextClient import ContextClient from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS from .monitoring.MonitoringLoops import MonitoringLoops from .Errors import ( ERROR_BAD_ENDPOINT, ERROR_DELETE, ERROR_GET, ERROR_GET_INIT, ERROR_MISSING_KPI, ERROR_SAMPLETYPE, ERROR_SET, ERROR_SUBSCRIBE, ERROR_UNSUBSCRIBE) LOGGER = logging.getLogger(__name__) def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]: connection_config_rules = dict() unexpected_config_rules = list() Loading Loading @@ -107,6 +110,21 @@ def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : Mon return errors def populate_endpoint_monitoring_resources(device_with_uuids : Device, monitoring_loops : MonitoringLoops) -> None: device_uuid = device_with_uuids.device_id.device_uuid.uuid for endpoint in device_with_uuids.device_endpoints: endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_name = endpoint.name kpi_sample_types = endpoint.kpi_sample_types for kpi_sample_type in kpi_sample_types: monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) if monitor_resource_key is not None: continue monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_name, kpi_sample_type) if monitor_resource_key is None: continue monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key) def _raw_config_rules_to_grpc( device_uuid : str, device_config : DeviceConfig, error_template : str, config_action : ConfigActionEnum, raw_config_rules : List[Tuple[str, Union[Any, Exception, None]]] Loading Loading @@ -202,11 +220,12 @@ def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loo resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) if resource_key is None: kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') return [ ERROR_SAMPLETYPE.format( MSG = ERROR_SAMPLETYPE.format( str(device_uuid), str(endpoint_uuid), str(kpi_sample_type), str(kpi_sample_type_name) ) ] LOGGER.warning('{:s} Supported Device-Endpoint-KpiSampleType items: {:s}'.format( MSG, str(monitoring_loops.get_all_resource_keys()))) return [MSG] sampling_duration = request.sampling_duration_s # seconds sampling_interval = request.sampling_interval_s # seconds Loading src/device/service/monitoring/MonitoringLoops.py +5 −1 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging, queue, threading import copy, logging, queue, threading from typing import Dict, Optional, Tuple, Union from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import Kpi Loading Loading @@ -93,6 +93,10 @@ class MonitoringLoops: key = (device_uuid, endpoint_uuid, kpi_sample_type) return self._device_endpoint_sampletype__to__resource_key.get(key) def get_all_resource_keys(self) -> Dict[Tuple[str, str, int], str]: with self._lock_device_endpoint: return copy.deepcopy(self._device_endpoint_sampletype__to__resource_key) def remove_resource_key( self, device_uuid : str, endpoint_uuid : str, kpi_sample_type : KpiSampleType ) -> None: Loading Loading
src/device/service/DeviceServiceServicerImpl.py +12 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver from .monitoring.MonitoringLoops import MonitoringLoops from .Tools import ( check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules, deconfigure_rules, populate_config_rules, populate_endpoints, populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi) populate_config_rules, populate_endpoint_monitoring_resources, populate_endpoints, populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi) LOGGER = logging.getLogger(__name__) Loading Loading @@ -86,6 +86,17 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): raise OperationFailedException('AddDevice', extra_details=errors) device_id = context_client.SetDevice(device) # Update endpoint monitoring resources with UUIDs device_with_uuids = context_client.GetDevice(device_id) populate_endpoint_monitoring_resources(device_with_uuids, self.monitoring_loops) # TODO: remove next temporary workaround when issue with Automation component is resolved device_with_uuids_2 = Device() device_with_uuids_2.CopyFrom(device_with_uuids) device_with_uuids_2.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED device_id = context_client.SetDevice(device_with_uuids_2) return device_id finally: self.mutex_queues.signal_done(device_uuid) Loading
src/device/service/Tools.py +25 −6 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json import json, logging from typing import Any, Dict, List, Tuple, Union from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.method_wrappers.ServiceExceptions import InvalidArgumentException Loading @@ -21,12 +21,15 @@ from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.tools.grpc.ConfigRules import update_config_rule_custom from common.tools.grpc.Tools import grpc_message_to_json from context.client.ContextClient import ContextClient from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS from .monitoring.MonitoringLoops import MonitoringLoops from .Errors import ( ERROR_BAD_ENDPOINT, ERROR_DELETE, ERROR_GET, ERROR_GET_INIT, ERROR_MISSING_KPI, ERROR_SAMPLETYPE, ERROR_SET, ERROR_SUBSCRIBE, ERROR_UNSUBSCRIBE) LOGGER = logging.getLogger(__name__) def check_connect_rules(device_config : DeviceConfig) -> Dict[str, Any]: connection_config_rules = dict() unexpected_config_rules = list() Loading Loading @@ -107,6 +110,21 @@ def populate_endpoints(device : Device, driver : _Driver, monitoring_loops : Mon return errors def populate_endpoint_monitoring_resources(device_with_uuids : Device, monitoring_loops : MonitoringLoops) -> None: device_uuid = device_with_uuids.device_id.device_uuid.uuid for endpoint in device_with_uuids.device_endpoints: endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid endpoint_name = endpoint.name kpi_sample_types = endpoint.kpi_sample_types for kpi_sample_type in kpi_sample_types: monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) if monitor_resource_key is not None: continue monitor_resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_name, kpi_sample_type) if monitor_resource_key is None: continue monitoring_loops.add_resource_key(device_uuid, endpoint_uuid, kpi_sample_type, monitor_resource_key) def _raw_config_rules_to_grpc( device_uuid : str, device_config : DeviceConfig, error_template : str, config_action : ConfigActionEnum, raw_config_rules : List[Tuple[str, Union[Any, Exception, None]]] Loading Loading @@ -202,11 +220,12 @@ def subscribe_kpi(request : MonitoringSettings, driver : _Driver, monitoring_loo resource_key = monitoring_loops.get_resource_key(device_uuid, endpoint_uuid, kpi_sample_type) if resource_key is None: kpi_sample_type_name = KpiSampleType.Name(kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') return [ ERROR_SAMPLETYPE.format( MSG = ERROR_SAMPLETYPE.format( str(device_uuid), str(endpoint_uuid), str(kpi_sample_type), str(kpi_sample_type_name) ) ] LOGGER.warning('{:s} Supported Device-Endpoint-KpiSampleType items: {:s}'.format( MSG, str(monitoring_loops.get_all_resource_keys()))) return [MSG] sampling_duration = request.sampling_duration_s # seconds sampling_interval = request.sampling_interval_s # seconds Loading
src/device/service/monitoring/MonitoringLoops.py +5 −1 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging, queue, threading import copy, logging, queue, threading from typing import Dict, Optional, Tuple, Union from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.monitoring_pb2 import Kpi Loading Loading @@ -93,6 +93,10 @@ class MonitoringLoops: key = (device_uuid, endpoint_uuid, kpi_sample_type) return self._device_endpoint_sampletype__to__resource_key.get(key) def get_all_resource_keys(self) -> Dict[Tuple[str, str, int], str]: with self._lock_device_endpoint: return copy.deepcopy(self._device_endpoint_sampletype__to__resource_key) def remove_resource_key( self, device_uuid : str, endpoint_uuid : str, kpi_sample_type : KpiSampleType ) -> None: Loading