Newer
Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Ref: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md
# Ref: https://github.com/openconfig/gnmi/blob/master/proto/gnmi/gnmi.proto
from __future__ import annotations
import grpc, logging, queue, re, threading
from collections.abc import Iterator
from datetime import datetime
from typing import Dict
from common.tools.grpc.Tools import grpc_message_to_json_string
from .gnmi.gnmi_pb2 import ( # pylint: disable=no-name-in-module
QOSMarking, SubscribeRequest, Subscription, SubscriptionList, SubscriptionMode
)
from .gnmi.gnmi_pb2_grpc import gNMIStub
from .tools.Path import path_from_string, path_to_string
from .DeltaSampleCache import DeltaSampleCache
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
LOGGER = logging.getLogger(__name__)
# SubscriptionList Mode: Mode of the subscription.
# STREAM = 0: Values streamed by the target. gNMI Specification Section 3.5.1.5.2
# ONCE = 1: Values sent once-off by the target. gNMI Specification Section 3.5.1.5.1
# POLL = 2: Values sent in response to a poll request. gNMI Specification Section 3.5.1.5.3
GNMI_SUBSCRIPTION_LIST_MODE = SubscriptionList.Mode.STREAM
# Path Prefix: Prefix used for paths.
GNMI_PATH_PREFIX = None
# QOS MArking: DSCP marking to be used.
GNMI_QOS_MARKING = None
# Allow Aggregation: Whether elements of the schema that are marked as eligible for aggregation
# should be aggregated or not.
GNMI_ALLOW_AGGREGATION = False
# Encoding: The encoding that the target should use within the Notifications generated
# corresponding to the SubscriptionList.
GNMI_ENCODING = 'JSON'
#Subscription Mode: The mode of the subscription, specifying how the target must return values
# in a subscription. gNMI Specification Section 3.5.1.3
# TARGET_DEFINED = 0: The target selects the relevant mode for each element.
# ON_CHANGE = 1: The target sends an update on element value change.
# SAMPLE = 2: The target samples values according to the interval.
GNMI_SUBSCRIPTION_MODE = SubscriptionMode.SAMPLE
# Suppress Redundant: Indicates whether values that have not changed should be sent in a SAMPLE
# subscription. gNMI Specification Section 3.5.1.3
GNMI_SUPPRESS_REDUNDANT = False
# Heartbeat Interval: Specifies the maximum allowable silent period in nanoseconds when
# suppress_redundant is in use. The target should send a value at least once in the period
# specified. gNMI Specification Section 3.5.1.3
GNMI_HEARTBEAT_INTERVAL = 10 # seconds
GNMI_SUBSCRIPTION_TIMEOUT = None
class MonitoringThread(threading.Thread):
def __init__(
self, stub : gNMIStub, logger : logging.Logger, settings : Dict,
in_subscriptions : queue.Queue, out_samples : queue.Queue
) -> None:
super().__init__(daemon=True)
self._terminate = threading.Event()
self._stub = stub
self._logger = logger
self._username = settings.get('username')
self._password = settings.get('password')
self._in_subscriptions = in_subscriptions
self._out_samples = out_samples
self._response_iterator = None
self._delta_sample_cache = DeltaSampleCache()
def stop(self) -> None:
self._terminate.set()
if self._response_iterator is not None:
self._response_iterator.cancel()
def generate_requests(self) -> Iterator[SubscribeRequest]:
subscriptions = []
while not self._terminate.is_set():
try:
# Some devices do not support to process multiple
# SubscriptionList requests in a bidirectional channel.
# Increased timeout to 5 seconds assuming it should
# bring enough time to receive all the subscriptions in
# the queue and process them in bulk.
subscription = self._in_subscriptions.get(block=True, timeout=5.0)
operation, resource_key, sampling_duration, sampling_interval = subscription # pylint: disable=unused-variable
if operation != 'subscribe': continue # Unsubscribe not supported by gNMI, needs to cancel entire connection
# options.timeout = int(sampling_duration)
#_path = parse_xpath(resource_key)
path = path_from_string(resource_key)
subscription = Subscription(
path=path, mode=GNMI_SUBSCRIPTION_MODE, suppress_redundant=GNMI_SUPPRESS_REDUNDANT,
sample_interval=int(sampling_interval * 1000000000),
heartbeat_interval=int(GNMI_HEARTBEAT_INTERVAL * 1000000000))
subscriptions.append(subscription)
except queue.Empty:
if len(subscriptions) == 0: continue
self._logger.debug('[generate_requests] process')
prefix = path_from_string(GNMI_PATH_PREFIX) if GNMI_PATH_PREFIX is not None else None
qos = QOSMarking(marking=GNMI_QOS_MARKING) if GNMI_QOS_MARKING is not None else None
subscriptions_list = SubscriptionList(
prefix=prefix, mode=GNMI_SUBSCRIPTION_LIST_MODE, allow_aggregation=GNMI_ALLOW_AGGREGATION,
encoding=GNMI_ENCODING, subscription=subscriptions, qos=qos)
subscribe_request = SubscribeRequest(subscribe=subscriptions_list)
str_subscribe_request = grpc_message_to_json_string(subscribe_request)
self._logger.debug('[generate_requests] subscribe_request={:s}'.format(str_subscribe_request))
yield subscribe_request
subscriptions = []
except: # pylint: disable=bare-except
self._logger.exception('[generate_requests] Unhandled Exception')
def run(self) -> None:
# Add a dummy subscription to be used as keep-alive
# usable only with SRLinux native data models
#subscription = ('/system/name/host-name', None, 1)
#self._in_subscriptions.put_nowait(subscription)
try:
request_iterator = self.generate_requests()
metadata = [('username', self._username), ('password', self._password)]
timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration)
self._response_iterator = self._stub.Subscribe(request_iterator, metadata=metadata, timeout=timeout)
for subscribe_response in self._response_iterator:
str_subscribe_response = grpc_message_to_json_string(subscribe_response)
self._logger.debug('[run] subscribe_response={:s}'.format(str_subscribe_response))
update = subscribe_response.update
timestamp_device = float(update.timestamp) / 1.e9
timestamp_local = datetime.timestamp(datetime.utcnow())
# if difference between timestamp from device and local is lower than 1 second
if abs(timestamp_device - timestamp_local) <= 1:
# assume clocks are synchronized, use timestamp from device
timestamp = timestamp_device
else:
# might be clocks are not synchronized, use local timestamp
timestamp = timestamp_local
str_prefix = path_to_string(update.prefix) if len(update.prefix.elem) > 0 else ''
for update_entry in update.update:
str_path = path_to_string(update_entry.path)
if len(str_prefix) > 0:
str_path = '{:s}/{:s}'.format(str_prefix, str_path)
str_path = str_path.replace('//', '/')
if str_path.startswith('/interfaces/'):
# Add namespace, if missing
str_path_parts = str_path.split('/')
str_path_parts[1] = 'openconfig-interfaces:interfaces'
str_path = '/'.join(str_path_parts)
#if str_path != '/system/name/host-name': continue
#counter_name = update_entry.path[-1].name
value_type = update_entry.val.WhichOneof('value')
value = getattr(update_entry.val, value_type)
if isinstance(value, str):
if re.match(r'^[0-9]+$', value) is not None:
value = int(value)
elif re.match(r'^[0-9]*\.[0-9]*$', value) is not None:
value = float(value)
else:
value = str(value)
delta_sample = self._delta_sample_cache.get_delta(str_path, timestamp, value)
if delta_sample is None:
sample = (timestamp, str_path, value)
else:
sample = (delta_sample[0], str_path, delta_sample[1])
self._logger.debug('[run] sample={:s}'.format(str(sample)))
if sample[2] is not None:
# Skip not-a-number (e.g., division by zero) samples
self._out_samples.put_nowait(sample)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: raise # pylint: disable=no-member
if e.details() != 'Locally cancelled by application!': raise # pylint: disable=no-member
except: # pylint: disable=bare-except
self._logger.exception('Unhandled Exception')