Skip to content
Snippets Groups Projects
Commit b996d411 authored by cajadiazj's avatar cajadiazj
Browse files

gNMI support added

parent dcde117f
No related branches found
No related tags found
4 merge requests!142Release TeraFlowSDN 2.1,!132NetSoft Hackfest extensions, gNMI Driver, gNMI L3NM Service Handler, multiple fixes,!113Draft: NetSoft Hackfest extensions,!75Device component - OpenConfig gNMI Driver
......@@ -36,7 +36,7 @@ spec:
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
value: "DEBUG"
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:2020"]
......
......@@ -36,7 +36,7 @@ spec:
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
value: "DEBUG"
envFrom:
- secretRef:
name: qdb-data
......
......@@ -57,7 +57,7 @@ export CRDB_DATABASE="tfs"
export CRDB_DEPLOY_MODE="single"
# Disable flag for dropping database, if exists.
export CRDB_DROP_DATABASE_IF_EXISTS=""
export CRDB_DROP_DATABASE_IF_EXISTS="YES"
# Disable flag for re-deploying CockroachDB from scratch.
export CRDB_REDEPLOY=""
......@@ -87,7 +87,7 @@ export QDB_PASSWORD="quest"
export QDB_TABLE="tfs_monitoring"
## If not already set, disable flag for dropping table if exists.
#export QDB_DROP_TABLE_IF_EXISTS=""
export QDB_DROP_TABLE_IF_EXISTS=""
# If not already set, disable flag for re-deploying QuestDB from scratch.
export QDB_REDEPLOY=""
export QDB_REDEPLOY="YES"
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import anytree, copy, logging, pytz, queue, re, threading
import anytree, copy, logging, pytz, queue, re, threading, json, os, sys
#import lxml.etree as ET
from datetime import datetime, timedelta
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
......@@ -31,6 +31,15 @@ from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_su
from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse
from .RetryDecorator import retry
import grpc
from google.protobuf.json_format import MessageToJson
gnmi_path__ = os.path.dirname(os.path.abspath(__file__))
sys.path.append(gnmi_path__)
import gnmi_pb2_grpc
import gnmi_pb2
DEBUG_MODE = False
logging.getLogger('ncclient.manager').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING)
logging.getLogger('ncclient.transport.ssh').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING)
......@@ -56,6 +65,7 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION,
class NetconfSessionHandler:
def __init__(self, address : str, port : int, **settings) -> None:
mensaje = f"__init__: address={address}, port={port}, settings={settings}"
self.__lock = threading.RLock()
self.__connected = threading.Event()
self.__address = address
......@@ -121,6 +131,182 @@ class NetconfSessionHandler:
def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None):
return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id)
class gNMISessionHandler:
def __init__(self, address : str, **settings) -> None:
self.__lock = threading.RLock()
self.__connected = threading.Event()
self.__address = address
self.__port = settings.get('gnmi_port')
self.__username = settings.get('username')
self.__password = settings.get('password')
self.__vendor = settings.get('vendor')
self.__key_filename = settings.get('key_filename')
self.__hostkey_verify = settings.get('hostkey_verify', True)
self.__look_for_keys = settings.get('look_for_keys', True)
self.__allow_agent = settings.get('allow_agent', True)
self.__force_running = settings.get('force_running', False)
self.__commit_per_delete = settings.get('delete_rule', False)
self.__device_params = settings.get('device_params', {})
self.__manager_params = settings.get('manager_params', {})
self.__nc_params = settings.get('nc_params', {})
self.__stub = None
self.__candidate_supported = False
self.__channel = None
self.__supportedEncodings = None
self.__options = Options()
def connect(self):
with self.__lock:
self.__channel = grpc.insecure_channel(str(self.__address)+':'+self.__port)
self.__stub = gnmi_pb2_grpc.gNMIStub(self.__channel)
metadata = [('username',self.__username ), ('password', self.__password)]
req = gnmi_pb2.CapabilityRequest()
response = self.__stub.Capabilities(req, metadata=metadata)
data = json.loads(MessageToJson(response))
self.__supportedEncodings = data['supportedEncodings']
# TODO: self.__candidate_supported =
self.__connected.set()
def disconnect(self):
if not self.__connected.is_set(): return
with self.__lock:
self.__channel.close()
def subscribeStreaming(self, subscription : Tuple[str, float, float], out_samples : queue.Queue) -> None:
resource_key, sampling_duration, sampling_interval = subscription
options = copy.deepcopy(self.__options)
options.xpaths = [parse_xpath(resource_key)]
options.timeout = int(sampling_duration)
options.interval = int(sampling_interval)
req_iterator = gen_request(options)
metadata = [('username',self.__username), ('password', self.__password)]
responses = self.__stub.Subscribe(req_iterator, self.__options.timeout, metadata=metadata)
previous_sample = None
delta = 0.0
previous_timestamp = datetime.timestamp(datetime.utcnow())
for response in responses:
data = json.loads(MessageToJson(response))
if data.get("update") is not None and data.get("update").get("update") != None:
now = datetime.timestamp(datetime.utcnow())
for element in data['update']['update']:
counter_name = split_resource_key(dict_to_xpath(element['path']))
if counter_name == split_resource_key(resource_key):
value = int(element['val']['uintVal'])
delay = now - previous_timestamp
if previous_sample is not None: delta = (value - previous_sample)/delay
previous_sample = int(value)
previous_timestamp = now
sample = (now, resource_key, delta)
out_samples.put_nowait(sample)
@property
def use_candidate(self): return self.__candidate_supported and not self.__force_running
@property
def commit_per_rule(self): return self.__commit_per_delete
@property
def vendor(self): return self.__vendor
@RETRY_DECORATOR
def get(self): # pylint: disable=redefined-builtin
return False
@RETRY_DECORATOR
def edit_config(
self, config, target='running', default_operation=None, test_option=None,
error_option=None, format='xml' # pylint: disable=redefined-builtin
):
if config == EMPTY_CONFIG: return
with self.__lock:
self.__manager.edit_config(
config, target=target, default_operation=default_operation, test_option=test_option,
error_option=error_option, format=format)
def locked(self, target):
return self.__manager.locked(target=target)
def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None):
return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id)
def path_from_string(path='/'):
if path:
if path[0]=='/':
if path[-1]=='/':
path_list = re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[1:-1]
else:
path_list = re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[1:]
else:
if path[-1]=='/':
path_list = re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[:-1]
else:
path_list = re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)
else:
return gnmi_pb2.Path(elem=[])
mypath = []
for e in path_list:
eName = e.split("[", 1)[0]
eKeys = re.findall('\[(.*?)\]', e)
dKeys = dict(x.split('=', 1) for x in eKeys)
mypath.append(gnmi_pb2.PathElem(name=eName, key=dKeys))
return gnmi_pb2.Path(elem=mypath)
def gen_request(options):
mysubs = []
path = options.xpaths[0]
mypath = path_from_string(path)
mysub = gnmi_pb2.Subscription(path=mypath, mode=options.submode, suppress_redundant=options.suppress, sample_interval=options.interval*1000000000, heartbeat_interval=options.heartbeat)
mysubs.append(mysub)
if options.prefix:
myprefix = path_from_string(options.prefix)
else:
myprefix = None
if options.qos:
myqos = gnmi_pb2.QOSMarking(marking=options.qos)
else:
myqos = None
mysblist = gnmi_pb2.SubscriptionList(prefix=myprefix, mode=options.mode, allow_aggregation=options.aggregate, encoding=options.encoding, subscription=mysubs, qos=myqos)
mysubreq = gnmi_pb2.SubscribeRequest( subscribe=mysblist )
yield mysubreq
def parse_xpath(xpath):
xpath = xpath.replace("//", "/")
xpath = xpath.replace("oci:interface[", "interface[")
xpath = xpath.replace("/oci", "/openconfig-interfaces")
xpath = re.sub(r"\[oci:name='(.*?)'\]", r"[name=\1]", xpath)
# Eliminar el contador del final
xpath = "/".join(xpath.split("/")[:-1]) + "/"
return xpath
def split_resource_key(path):
pattern = r"/state/counters/(.*)"
match = re.search(pattern, path)
if match:
return match.group(1)
else:
return None
def dict_to_xpath(d: dict) -> str:
xpath = '/'
for item in d['elem']:
name = item.get('name')
if name == 'interface':
key = item.get('key')
interface_name = key.get('name')
xpath += f"/oci:interface[oci:name='{interface_name}']"
else:
xpath += f"/{name}"
xpath = xpath.replace('openconfig-interfaces', 'oci')
return xpath
def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp):
if previous_sample is None: return None
if previous_timestamp is None: return None
......@@ -141,12 +327,13 @@ def compute_delta_sample(previous_sample, previous_timestamp, current_sample, cu
return delta_sample
class SamplesCache:
def __init__(self, netconf_handler : NetconfSessionHandler) -> None:
def __init__(self, netconf_handler : NetconfSessionHandler, gNMI_handler : gNMISessionHandler) -> None:
self.__netconf_handler = netconf_handler
self.__lock = threading.Lock()
self.__timestamp = None
self.__absolute_samples = {}
self.__delta_samples = {}
self.__gNMI_handler = gNMI_handler
def _refresh_samples(self) -> None:
with self.__lock:
......@@ -189,6 +376,24 @@ def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples :
except: # pylint: disable=bare-except
LOGGER.exception('Error retrieving samples')
class Options:
def __init__(self, xpaths=None, prefix=None, mode=0, submode=0, suppress=False, interval=0,
encoding='JSON', heartbeat=0, qos=None, aggregate=False, server=None, username='admin', password='admin', timeout=None):
self.xpaths = xpaths
self.prefix = prefix
self.mode = mode
self.submode = submode
self.suppress = suppress
self.interval = interval
self.encoding = encoding
self.heartbeat = heartbeat
self.qos = qos
self.aggregate = aggregate
self.server = server
self.username = username
self.password = password
self.timeout = timeout
def edit_config(
netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False,
target='running', default_operation='merge', test_option=None, error_option=None,
......@@ -249,6 +454,7 @@ class OpenConfigDriver(_Driver):
self.__subscriptions = TreeNode('.')
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__gnmi_monitoring = settings.get('monitoring_protocol') == 'gnmi'
self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events
self.__scheduler.configure(
jobstores = {'default': MemoryJobStore()},
......@@ -257,12 +463,16 @@ class OpenConfigDriver(_Driver):
timezone=pytz.utc)
self.__out_samples = queue.Queue()
self.__netconf_handler : NetconfSessionHandler = NetconfSessionHandler(address, port, **settings)
self.__samples_cache = SamplesCache(self.__netconf_handler)
self.__gNMI_handler : gNMISessionHandler = gNMISessionHandler(address, **settings)
self.__samples_cache = SamplesCache(self.__netconf_handler, self.__gNMI_handler)
def Connect(self) -> bool:
with self.__lock:
if self.__started.is_set(): return True
self.__netconf_handler.connect()
if self.__gnmi_monitoring:
self.__gNMI_handler.connect()
LOGGER.debug('Using gNMI as monitoring protocol')
# Connect triggers activation of sampling events that will be scheduled based on subscriptions
self.__scheduler.start()
self.__started.set()
......@@ -276,7 +486,7 @@ class OpenConfigDriver(_Driver):
if not self.__started.is_set(): return True
# Disconnect triggers deactivation of sampling events
self.__scheduler.shutdown()
self.__netconf_handler.disconnect()
if self.__gnmi_monitoring: self.__netconf_handler.disconnect()
return True
@metered_subclass_method(METRICS_POOL)
......@@ -373,7 +583,13 @@ class OpenConfigDriver(_Driver):
end_date = start_date + timedelta(seconds=sampling_duration)
job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval)
job = self.__scheduler.add_job(
if self.__gnmi_monitoring:
LOGGER.debug('Processing gNMI subscription: '+ str(subscription))
job = threading.Thread(target=self.__gNMI_handler.subscribeStreaming, args=(subscription, self.__out_samples))
job.start()
else:
job = self.__scheduler.add_job(
do_sampling, args=(self.__samples_cache, resource_key, self.__out_samples),
kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval,
start_date=start_date, end_date=end_date, timezone=pytz.utc)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment