diff --git a/src/device/service/drivers/gnmi_openconfig/DeltaSampleCache.py b/src/device/service/drivers/gnmi_openconfig/DeltaSampleCache.py new file mode 100644 index 0000000000000000000000000000000000000000..5083082fe5694a95e95d95cd8ed72563d77dc098 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/DeltaSampleCache.py @@ -0,0 +1,35 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +from typing import Any, Dict, Tuple, Union + +class DeltaSampleCache: + def __init__(self) -> None: + self._previous_samples : Dict[str, Tuple[float, Union[int, float]]] = dict() + + def get_delta(self, path : str, current_timestamp : float, current_value : Any) -> None: + previous_sample = copy.deepcopy(self._previous_samples.get(path)) + self._previous_samples[path] = current_timestamp, current_value + + if not isinstance(current_value, (int, float)): return None + if previous_sample is None: return current_timestamp, 0 + previous_timestamp, previous_value = previous_sample + if not isinstance(previous_value, (int, float)): return None + + delta_value = max(0, current_value - previous_value) + delay = current_timestamp - previous_timestamp + delta_sample = current_timestamp, delta_value / delay + + return delta_sample diff --git a/src/device/service/drivers/gnmi_openconfig/MonitoringThread.py b/src/device/service/drivers/gnmi_openconfig/MonitoringThread.py index 5c40b13b995c35aec6f6b618678a6dcba65b8846..7cbd0da87d15b6fac0ea7f4a5de3c02259a07dc8 100644 --- a/src/device/service/drivers/gnmi_openconfig/MonitoringThread.py +++ b/src/device/service/drivers/gnmi_openconfig/MonitoringThread.py @@ -16,7 +16,7 @@ # Ref: https://github.com/openconfig/gnmi/blob/master/proto/gnmi/gnmi.proto from __future__ import annotations -import grpc, logging, queue, threading +import grpc, logging, queue, re, threading from collections.abc import Iterator from datetime import datetime from typing import Dict @@ -26,6 +26,7 @@ from .gnmi.gnmi_pb2 import ( # pylint: disable=no-name-in-module ) from .gnmi.gnmi_pb2_grpc import gNMIStub from .tools.Path import path_from_string, path_to_string +from .DeltaSampleCache import DeltaSampleCache LOGGER = logging.getLogger(__name__) @@ -82,6 +83,7 @@ class MonitoringThread(threading.Thread): self._in_subscriptions = in_subscriptions self._out_samples = out_samples self._response_iterator = None + self._delta_sample_cache = DeltaSampleCache() def stop(self) -> None: self._terminate.set() @@ -131,16 +133,35 @@ class MonitoringThread(threading.Thread): timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration) self._response_iterator = self._stub.Subscribe(request_iterator, metadata=metadata, timeout=timeout) for subscribe_response in self._response_iterator: - timestamp = datetime.timestamp(datetime.utcnow()) str_subscribe_response = grpc_message_to_json_string(subscribe_response) self._logger.warning('[run] subscribe_response={:s}'.format(str_subscribe_response)) - for update in subscribe_response.update.update: - str_path = path_to_string(update.path) - if str_path != '/system/name/host-name': continue - #counter_name = update.path[-1].name - value_type = update.val.WhichOneof('value') - value = getattr(update.val, value_type) - sample = (timestamp, str_path, value) + update = subscribe_response.update + timestamp_device = float(update.timestamp) / 1.e9 + timestamp_local = datetime.timestamp(datetime.utcnow()) + # if difference between timestamp from device and local is lower than 1 second + if abs(timestamp_device - timestamp_local) <= 1: + # assume clocks are synchronized, use timestamp from device + timestamp = timestamp_device + else: + # might be clocks are not synchronized, use local timestamp + timestamp = timestamp_local + for update_entry in update.update: + str_path = path_to_string(update_entry.path) + #if str_path != '/system/name/host-name': continue + #counter_name = update_entry.path[-1].name + value_type = update_entry.val.WhichOneof('value') + value = getattr(update_entry.val, value_type) + if re.match(r'^[0-9]+$', value) is not None: + value = int(value) + elif re.match(r'^[0-9]*\.[0-9]*$', value) is not None: + value = float(value) + else: + value = str(value) + delta_sample = self._delta_sample_cache.get_delta(str_path, timestamp, value) + if delta_sample is None: + sample = (timestamp, str_path, value) + else: + sample = (delta_sample[0], str_path, delta_sample[1]) self._logger.warning('[run] sample={:s}'.format(str(sample))) self._out_samples.put_nowait(sample) except grpc.RpcError as e: diff --git a/src/device/service/drivers/gnmi_openconfig/SamplesCache.py b/src/device/service/drivers/gnmi_openconfig/SamplesCache.py deleted file mode 100644 index 28be2d661600cffc170cfa04c235207f41151ac0..0000000000000000000000000000000000000000 --- a/src/device/service/drivers/gnmi_openconfig/SamplesCache.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Collection of samples through NetConf is very slow and each request collects all the data. -# Populate a cache periodically (when first interface is interrogated). -# Evict data after some seconds, when data is considered as outdated - -import copy, queue, logging, re, threading -from datetime import datetime -from typing import Dict, Tuple -from .templates_old import get_filter, parse -from .GnmiSessionHandler import GnmiSessionHandler - -RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*') -RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*") - -SAMPLE_EVICTION_SECONDS = 30.0 # seconds -SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters' - -def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): - if previous_sample is None: return None - if previous_timestamp is None: return None - if current_sample is None: return None - if current_timestamp is None: return None - delay = current_timestamp - previous_timestamp - field_keys = set(previous_sample.keys()).union(current_sample.keys()) - field_keys.discard('name') - delta_sample = {'name': previous_sample['name']} - for field_key in field_keys: - previous_sample_value = previous_sample[field_key] - if not isinstance(previous_sample_value, (int, float)): continue - current_sample_value = current_sample[field_key] - if not isinstance(current_sample_value, (int, float)): continue - delta_value = current_sample_value - previous_sample_value - if delta_value < 0: continue - delta_sample[field_key] = delta_value / delay - return delta_sample - -class SamplesCache: - def __init__(self, handler : GnmiSessionHandler, logger : logging.Logger) -> None: - self.__handler = handler - self.__logger = logger - self.__lock = threading.Lock() - self.__timestamp = None - self.__absolute_samples = {} - self.__delta_samples = {} - - def _refresh_samples(self) -> None: - with self.__lock: - try: - now = datetime.timestamp(datetime.utcnow()) - if self.__timestamp is not None and (now - self.__timestamp) < SAMPLE_EVICTION_SECONDS: return - str_filter = get_filter(SAMPLE_RESOURCE_KEY) - xml_data = self.__handler.get(filter=str_filter).data_ele - interface_samples = parse(SAMPLE_RESOURCE_KEY, xml_data) - for interface,samples in interface_samples: - match = RE_GET_ENDPOINT_FROM_INTERFACE_KEY.match(interface) - if match is None: continue - interface = match.group(1) - delta_sample = compute_delta_sample( - self.__absolute_samples.get(interface), self.__timestamp, samples, now) - if delta_sample is not None: self.__delta_samples[interface] = delta_sample - self.__absolute_samples[interface] = samples - self.__timestamp = now - except: # pylint: disable=bare-except - self.__logger.exception('Error collecting samples') - - def get(self, resource_key : str) -> Tuple[float, Dict]: - self._refresh_samples() - match = RE_GET_ENDPOINT_FROM_INTERFACE_XPATH.match(resource_key) - with self.__lock: - if match is None: return self.__timestamp, {} - interface = match.group(1) - return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {})) - -def do_sampling( - samples_cache : SamplesCache, logger : logging.Logger, resource_key : str, out_samples : queue.Queue -) -> None: - try: - timestamp, samples = samples_cache.get(resource_key) - counter_name = resource_key.split('/')[-1].split(':')[-1] - value = samples.get(counter_name) - if value is None: - logger.warning('[do_sampling] value not found for {:s}'.format(resource_key)) - return - # resource_key template: //oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s} - sample = (timestamp, resource_key, value) - out_samples.put_nowait(sample) - except: # pylint: disable=bare-except - logger.exception('Error retrieving samples')