Skip to content
Snippets Groups Projects
Commit aec5fbd2 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device - gNMI OpenConfig Driver:

- Corrected parsing of prefixes in Monitoring Thread
- Corrected subscription timeout to pack all subscriptions in a single request
- Enhanced driver to skip non-deletable elements
parent 4881a30f
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!172Resolve "(CTTC) Extend gNMI-OpenConfig SBI driver"
...@@ -168,6 +168,7 @@ class GnmiSessionHandler: ...@@ -168,6 +168,7 @@ class GnmiSessionHandler:
set_request = SetRequest() set_request = SetRequest()
#for resource_key in resource_keys: #for resource_key in resource_keys:
resources_requested = list()
for resource_key, resource_value in resources: for resource_key, resource_value in resources:
#self._logger.info('---1') #self._logger.info('---1')
#self._logger.info(str(resource_key)) #self._logger.info(str(resource_key))
...@@ -177,6 +178,7 @@ class GnmiSessionHandler: ...@@ -177,6 +178,7 @@ class GnmiSessionHandler:
#_, value, exists, operation_done = resource_tuple #_, value, exists, operation_done = resource_tuple
if isinstance(resource_value, str): resource_value = json.loads(resource_value) if isinstance(resource_value, str): resource_value = json.loads(resource_value)
str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=False) str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=False)
if str_path is None: continue # nothing to set
#self._logger.info('---3') #self._logger.info('---3')
#self._logger.info(str(str_path)) #self._logger.info(str(str_path))
#self._logger.info(str(str_data)) #self._logger.info(str(str_data))
...@@ -184,6 +186,7 @@ class GnmiSessionHandler: ...@@ -184,6 +186,7 @@ class GnmiSessionHandler:
set_request_entry = set_request_list.add() set_request_entry = set_request_list.add()
set_request_entry.path.CopyFrom(path_from_string(str_path)) set_request_entry.path.CopyFrom(path_from_string(str_path))
set_request_entry.val.json_val = str_data.encode('UTF-8') set_request_entry.val.json_val = str_data.encode('UTF-8')
resources_requested.append((resource_key, resource_value))
self._logger.info('set_request={:s}'.format(grpc_message_to_json_string(set_request))) self._logger.info('set_request={:s}'.format(grpc_message_to_json_string(set_request)))
metadata = [('username', self._username), ('password', self._password)] metadata = [('username', self._username), ('password', self._password)]
...@@ -192,7 +195,7 @@ class GnmiSessionHandler: ...@@ -192,7 +195,7 @@ class GnmiSessionHandler:
self._logger.info('set_reply={:s}'.format(grpc_message_to_json_string(set_reply))) self._logger.info('set_reply={:s}'.format(grpc_message_to_json_string(set_reply)))
results = [] results = []
for (resource_key, resource_value), update_result in zip(resources, set_reply.response): for (resource_key, resource_value), update_result in zip(resources_requested, set_reply.response):
operation = update_result.op operation = update_result.op
if operation == UpdateResult.UPDATE: if operation == UpdateResult.UPDATE:
results.append((resource_key, True)) results.append((resource_key, True))
...@@ -237,6 +240,7 @@ class GnmiSessionHandler: ...@@ -237,6 +240,7 @@ class GnmiSessionHandler:
set_request = SetRequest() set_request = SetRequest()
#for resource_key in resource_keys: #for resource_key in resource_keys:
resources_requested = list()
for resource_key, resource_value in resources: for resource_key, resource_value in resources:
#self._logger.info('---1') #self._logger.info('---1')
#self._logger.info(str(resource_key)) #self._logger.info(str(resource_key))
...@@ -248,11 +252,13 @@ class GnmiSessionHandler: ...@@ -248,11 +252,13 @@ class GnmiSessionHandler:
if isinstance(resource_value, str): resource_value = json.loads(resource_value) if isinstance(resource_value, str): resource_value = json.loads(resource_value)
# pylint: disable=unused-variable # pylint: disable=unused-variable
str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=True) str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=True)
if str_path is None: continue # nothing to do with this resource_key
#self._logger.info('---3') #self._logger.info('---3')
#self._logger.info(str(str_path)) #self._logger.info(str(str_path))
#self._logger.info(str(str_data)) #self._logger.info(str(str_data))
set_request_entry = set_request.delete.add() set_request_entry = set_request.delete.add()
set_request_entry.CopyFrom(path_from_string(str_path)) set_request_entry.CopyFrom(path_from_string(str_path))
resources_requested.append((resource_key, resource_value))
self._logger.info('set_request={:s}'.format(grpc_message_to_json_string(set_request))) self._logger.info('set_request={:s}'.format(grpc_message_to_json_string(set_request)))
metadata = [('username', self._username), ('password', self._password)] metadata = [('username', self._username), ('password', self._password)]
...@@ -261,7 +267,7 @@ class GnmiSessionHandler: ...@@ -261,7 +267,7 @@ class GnmiSessionHandler:
self._logger.info('set_reply={:s}'.format(grpc_message_to_json_string(set_reply))) self._logger.info('set_reply={:s}'.format(grpc_message_to_json_string(set_reply)))
results = [] results = []
for (resource_key, resource_value), update_result in zip(resources, set_reply.response): for (resource_key, resource_value), update_result in zip(resources_requested, set_reply.response):
operation = update_result.op operation = update_result.op
if operation == UpdateResult.DELETE: if operation == UpdateResult.DELETE:
results.append((resource_key, True)) results.append((resource_key, True))
......
...@@ -94,9 +94,14 @@ class MonitoringThread(threading.Thread): ...@@ -94,9 +94,14 @@ class MonitoringThread(threading.Thread):
subscriptions = [] subscriptions = []
while not self._terminate.is_set(): while not self._terminate.is_set():
try: try:
subscription = self._in_subscriptions.get(block=True, timeout=0.1) # Some devices do not support to process multiple
# SubscriptionList requests in a bidirectional channel.
# Increased timeout to 5 seconds assuming it should
# bring enough time to receive all the subscriptions in
# the queue and process them in bulk.
subscription = self._in_subscriptions.get(block=True, timeout=5.0)
operation, resource_key, sampling_duration, sampling_interval = subscription # pylint: disable=unused-variable operation, resource_key, sampling_duration, sampling_interval = subscription # pylint: disable=unused-variable
if operation != 'subscribe': continue # Unsubscribe not supported by gNM, needs to cancel entire connection if operation != 'subscribe': continue # Unsubscribe not supported by gNMI, needs to cancel entire connection
# options.timeout = int(sampling_duration) # options.timeout = int(sampling_duration)
#_path = parse_xpath(resource_key) #_path = parse_xpath(resource_key)
path = path_from_string(resource_key) path = path_from_string(resource_key)
...@@ -145,8 +150,12 @@ class MonitoringThread(threading.Thread): ...@@ -145,8 +150,12 @@ class MonitoringThread(threading.Thread):
else: else:
# might be clocks are not synchronized, use local timestamp # might be clocks are not synchronized, use local timestamp
timestamp = timestamp_local timestamp = timestamp_local
str_prefix = path_to_string(update.prefix) if len(update.prefix.elem) > 0 else ''
for update_entry in update.update: for update_entry in update.update:
str_path = path_to_string(update_entry.path) str_path = path_to_string(update_entry.path)
if len(str_prefix) > 0:
str_path = '{:s}/{:s}'.format(str_prefix, str_path)
str_path = str_path.replace('//', '/')
if str_path.startswith('/interfaces/'): if str_path.startswith('/interfaces/'):
# Add namespace, if missing # Add namespace, if missing
str_path_parts = str_path.split('/') str_path_parts = str_path.split('/')
......
...@@ -37,8 +37,9 @@ class NetworkInstanceInterfaceHandler(_Handler): ...@@ -37,8 +37,9 @@ class NetworkInstanceInterfaceHandler(_Handler):
if IS_CEOS: ni_if_id = if_name if IS_CEOS: ni_if_id = if_name
if delete: if delete:
PATH_TMPL = '/network-instances/network-instance[name={:s}]/interfaces/interface[id={:s}]' #PATH_TMPL = '/network-instances/network-instance[name={:s}]/interfaces/interface[id={:s}]'
str_path = PATH_TMPL.format(ni_name, ni_if_id) #str_path = PATH_TMPL.format(ni_name, ni_if_id)
str_path = None # nothing to delete
str_data = json.dumps({}) str_data = json.dumps({})
return str_path, str_data return str_path, str_data
......
...@@ -38,6 +38,7 @@ class NetworkInstanceProtocolHandler(_Handler): ...@@ -38,6 +38,7 @@ class NetworkInstanceProtocolHandler(_Handler):
str_path = PATH_TMPL.format(ni_name, identifier, proto_name) str_path = PATH_TMPL.format(ni_name, identifier, proto_name)
if delete: if delete:
str_path = None # nothing to delete
str_data = json.dumps({}) str_data = json.dumps({})
return str_path, str_data return str_path, str_data
......
...@@ -37,9 +37,10 @@ class NetworkInstanceStaticRouteHandler(_Handler): ...@@ -37,9 +37,10 @@ class NetworkInstanceStaticRouteHandler(_Handler):
identifier = 'openconfig-policy-types:{:s}'.format(identifier) identifier = 'openconfig-policy-types:{:s}'.format(identifier)
if delete: if delete:
PATH_TMPL = '/network-instances/network-instance[name={:s}]/protocols' #PATH_TMPL = '/network-instances/network-instance[name={:s}]/protocols'
PATH_TMPL += '/protocol[identifier={:s}][name={:s}]/static-routes/static[prefix={:s}]' #PATH_TMPL += '/protocol[identifier={:s}][name={:s}]/static-routes/static[prefix={:s}]'
str_path = PATH_TMPL.format(ni_name, identifier, proto_name, prefix) #str_path = PATH_TMPL.format(ni_name, identifier, proto_name, prefix)
str_path = None # nothing to delete
str_data = json.dumps({}) str_data = json.dumps({})
return str_path, str_data return str_path, str_data
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment