diff --git a/src/device/service/drivers/gnmi/gNMI_driver.py b/src/device/service/drivers/openconfig/GnmiTelemetry.py similarity index 96% rename from src/device/service/drivers/gnmi/gNMI_driver.py rename to src/device/service/drivers/openconfig/GnmiTelemetry.py index 0a39bf222232068c11ccaadf321d566a4eb75d32..44b15d519d934fc5b464cf048145eebdcf8437d7 100644 --- a/src/device/service/drivers/gnmi/gNMI_driver.py +++ b/src/device/service/drivers/openconfig/GnmiTelemetry.py @@ -1,4 +1,4 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Ref: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md +# Ref: https://github.com/openconfig/gnmi/blob/master/proto/gnmi/gnmi.proto + import anytree, logging, queue, threading from typing import Any, Iterator, List, Optional, Tuple, Union from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type diff --git a/src/device/service/drivers/openconfig/NetConfTelemetry.py b/src/device/service/drivers/openconfig/NetConfTelemetry.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/device/service/drivers/openconfig/NetConfTelemetry.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/device/service/drivers/openconfig/OpenConfigDriver.py b/src/device/service/drivers/openconfig/OpenConfigDriver.py index ef3d0728d5ed02ea4a15ba0c3ccd6f1428cab7df..a541a23bf61cc3b263bd7fc519d2d5aafa930e61 100644 --- a/src/device/service/drivers/openconfig/OpenConfigDriver.py +++ b/src/device/service/drivers/openconfig/OpenConfigDriver.py @@ -30,6 +30,7 @@ from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_su #from .Tools import xml_pretty_print, xml_to_dict, xml_to_file from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse from .RetryDecorator import retry +from .TelemetryProtocolEnum import DEFAULT_TELEMETRY_PROTOCOL, TelemetryProtocolEnum, parse_telemetry_protocol DEBUG_MODE = False logging.getLogger('ncclient.manager').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING) @@ -121,60 +122,6 @@ class NetconfSessionHandler: def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) -def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): - if previous_sample is None: return None - if previous_timestamp is None: return None - if current_sample is None: return None - if current_timestamp is None: return None - delay = current_timestamp - previous_timestamp - field_keys = set(previous_sample.keys()).union(current_sample.keys()) - field_keys.discard('name') - delta_sample = {'name': previous_sample['name']} - for field_key in field_keys: - previous_sample_value = previous_sample[field_key] - if not isinstance(previous_sample_value, (int, float)): continue - current_sample_value = current_sample[field_key] - if not isinstance(current_sample_value, (int, float)): continue - delta_value = current_sample_value - previous_sample_value - if delta_value < 0: continue - delta_sample[field_key] = delta_value / delay - return delta_sample - -class SamplesCache: - def __init__(self, netconf_handler : NetconfSessionHandler) -> None: - self.__netconf_handler = netconf_handler - self.__lock = threading.Lock() - self.__timestamp = None - self.__absolute_samples = {} - self.__delta_samples = {} - - def _refresh_samples(self) -> None: - with self.__lock: - try: - now = datetime.timestamp(datetime.utcnow()) - if self.__timestamp is not None and (now - self.__timestamp) < SAMPLE_EVICTION_SECONDS: return - str_filter = get_filter(SAMPLE_RESOURCE_KEY) - xml_data = self.__netconf_handler.get(filter=str_filter).data_ele - interface_samples = parse(SAMPLE_RESOURCE_KEY, xml_data) - for interface,samples in interface_samples: - match = RE_GET_ENDPOINT_FROM_INTERFACE_KEY.match(interface) - if match is None: continue - interface = match.group(1) - delta_sample = compute_delta_sample( - self.__absolute_samples.get(interface), self.__timestamp, samples, now) - if delta_sample is not None: self.__delta_samples[interface] = delta_sample - self.__absolute_samples[interface] = samples - self.__timestamp = now - except: # pylint: disable=bare-except - LOGGER.exception('Error collecting samples') - - def get(self, resource_key : str) -> Tuple[float, Dict]: - self._refresh_samples() - match = RE_GET_ENDPOINT_FROM_INTERFACE_XPATH.match(resource_key) - with self.__lock: - if match is None: return self.__timestamp, {} - interface = match.group(1) - return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {})) def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : queue.Queue) -> None: try: @@ -249,6 +196,13 @@ class OpenConfigDriver(_Driver): self.__subscriptions = TreeNode('.') self.__started = threading.Event() self.__terminate = threading.Event() + + self.__telemetry_protocol = parse_telemetry_protocol(settings.get('telemetry_protocol')) + if self.__telemetry_protocol == TelemetryProtocolEnum.GNMI: + self.__telemetry = GnmiTelemetry() + else: + self.__telemetry = NetConfTelemetry() + self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events self.__scheduler.configure( jobstores = {'default': MemoryJobStore()}, diff --git a/src/device/service/drivers/openconfig/SamplesCache.py b/src/device/service/drivers/openconfig/SamplesCache.py new file mode 100644 index 0000000000000000000000000000000000000000..c2fa6bcecdff39ad74cff26676e718cec66a22f7 --- /dev/null +++ b/src/device/service/drivers/openconfig/SamplesCache.py @@ -0,0 +1,68 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): + if previous_sample is None: return None + if previous_timestamp is None: return None + if current_sample is None: return None + if current_timestamp is None: return None + delay = current_timestamp - previous_timestamp + field_keys = set(previous_sample.keys()).union(current_sample.keys()) + field_keys.discard('name') + delta_sample = {'name': previous_sample['name']} + for field_key in field_keys: + previous_sample_value = previous_sample[field_key] + if not isinstance(previous_sample_value, (int, float)): continue + current_sample_value = current_sample[field_key] + if not isinstance(current_sample_value, (int, float)): continue + delta_value = current_sample_value - previous_sample_value + if delta_value < 0: continue + delta_sample[field_key] = delta_value / delay + return delta_sample + +class SamplesCache: + def __init__(self, netconf_handler : NetconfSessionHandler) -> None: + self.__netconf_handler = netconf_handler + self.__lock = threading.Lock() + self.__timestamp = None + self.__absolute_samples = {} + self.__delta_samples = {} + + def _refresh_samples(self) -> None: + with self.__lock: + try: + now = datetime.timestamp(datetime.utcnow()) + if self.__timestamp is not None and (now - self.__timestamp) < SAMPLE_EVICTION_SECONDS: return + str_filter = get_filter(SAMPLE_RESOURCE_KEY) + xml_data = self.__netconf_handler.get(filter=str_filter).data_ele + interface_samples = parse(SAMPLE_RESOURCE_KEY, xml_data) + for interface,samples in interface_samples: + match = RE_GET_ENDPOINT_FROM_INTERFACE_KEY.match(interface) + if match is None: continue + interface = match.group(1) + delta_sample = compute_delta_sample( + self.__absolute_samples.get(interface), self.__timestamp, samples, now) + if delta_sample is not None: self.__delta_samples[interface] = delta_sample + self.__absolute_samples[interface] = samples + self.__timestamp = now + except: # pylint: disable=bare-except + LOGGER.exception('Error collecting samples') + + def get(self, resource_key : str) -> Tuple[float, Dict]: + self._refresh_samples() + match = RE_GET_ENDPOINT_FROM_INTERFACE_XPATH.match(resource_key) + with self.__lock: + if match is None: return self.__timestamp, {} + interface = match.group(1) + return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {})) diff --git a/src/device/service/drivers/openconfig/TelemetryProtocolEnum.py b/src/device/service/drivers/openconfig/TelemetryProtocolEnum.py new file mode 100644 index 0000000000000000000000000000000000000000..e5927848fee942cc37ab883a8c9b29b0efbe33ad --- /dev/null +++ b/src/device/service/drivers/openconfig/TelemetryProtocolEnum.py @@ -0,0 +1,27 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum +from typing import Optional + +class TelemetryProtocolEnum(enum.Enum): + GNMI = 'gnmi' + NETCONF = 'netconf' + +DEFAULT_TELEMETRY_PROTOCOL = TelemetryProtocolEnum.NETCONF + +def parse_telemetry_protocol(telemetry_protocol : Optional[str] = None) -> TelemetryProtocolEnum: + if telemetry_protocol is None: return DEFAULT_TELEMETRY_PROTOCOL + # pylint: disable=no-member + return TelemetryProtocolEnum._member_map_.get(telemetry_protocol, DEFAULT_TELEMETRY_PROTOCOL) diff --git a/src/device/service/drivers/gnmi/__init__.py b/src/device/service/drivers/openconfig/_Telemetry.py similarity index 54% rename from src/device/service/drivers/gnmi/__init__.py rename to src/device/service/drivers/openconfig/_Telemetry.py index 925746998061f4e05c468133dfacaaa0414551c8..efd05993b2bbcac8441f3022b659e40a8c54323e 100644 --- a/src/device/service/drivers/gnmi/__init__.py +++ b/src/device/service/drivers/openconfig/_Telemetry.py @@ -1,4 +1,4 @@ -# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,16 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES -ALL_RESOURCE_KEYS = [ - RESOURCE_ENDPOINTS, - RESOURCE_INTERFACES, - RESOURCE_NETWORK_INSTANCES, -] +class _Telemetry: + def __init__(self) -> None: + pass -RESOURCE_KEY_MAPPINGS = { - RESOURCE_ENDPOINTS : 'component', - RESOURCE_INTERFACES : 'interface', - RESOURCE_NETWORK_INSTANCES: 'network_instance', -} + def subscribe(self) -> None: + pass + + def unsubscribe(self) -> None: + pass + + def get_samples_queue(self) -> None: + pass