Skip to content
Snippets Groups Projects
Commit 48c2413c authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device component - gNMI/OpenConfig Driver:

WORK IN PROGRESS
- Added unitary tests and scripts
- Enhanced reporting of capabilities
- Migrated Component and Interface code to libyang
- Migrating NetworkInstance code to libyang
- Disabled unneeded log messages
- Temporarily disabled telemetry
- Added LibYang-based YANG handler
- Added helper methods
parent 22082f10
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!172Resolve "(CTTC) Extend gNMI-OpenConfig SBI driver"
Showing
with 1149 additions and 314 deletions
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary_gnmi_openconfig.py
...@@ -19,12 +19,13 @@ from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk ...@@ -19,12 +19,13 @@ from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk
from .gnmi.gnmi_pb2_grpc import gNMIStub from .gnmi.gnmi_pb2_grpc import gNMIStub
from .gnmi.gnmi_pb2 import Encoding, GetRequest, SetRequest, UpdateResult # pylint: disable=no-name-in-module from .gnmi.gnmi_pb2 import Encoding, GetRequest, SetRequest, UpdateResult # pylint: disable=no-name-in-module
from .handlers import ALL_RESOURCE_KEYS, compose, get_path, parse from .handlers import ALL_RESOURCE_KEYS, compose, get_path, parse
from .tools.Capabilities import get_supported_encodings from .handlers.YangHandler import YangHandler
from .tools.Capabilities import check_capabilities
from .tools.Channel import get_grpc_channel from .tools.Channel import get_grpc_channel
from .tools.Path import path_from_string, path_to_string #, compose_path from .tools.Path import path_from_string, path_to_string #, compose_path
from .tools.Subscriptions import Subscriptions from .tools.Subscriptions import Subscriptions
from .tools.Value import decode_value #, value_exists from .tools.Value import decode_value #, value_exists
from .MonitoringThread import MonitoringThread #from .MonitoringThread import MonitoringThread
class GnmiSessionHandler: class GnmiSessionHandler:
def __init__(self, address : str, port : int, settings : Dict, logger : logging.Logger) -> None: def __init__(self, address : str, port : int, settings : Dict, logger : logging.Logger) -> None:
...@@ -39,12 +40,20 @@ class GnmiSessionHandler: ...@@ -39,12 +40,20 @@ class GnmiSessionHandler:
self._use_tls = settings.get('use_tls', False) self._use_tls = settings.get('use_tls', False)
self._channel : Optional[grpc.Channel] = None self._channel : Optional[grpc.Channel] = None
self._stub : Optional[gNMIStub] = None self._stub : Optional[gNMIStub] = None
self._monit_thread = None self._yang_handler = YangHandler()
self._supported_encodings = None #self._monit_thread = None
self._subscriptions = Subscriptions() self._subscriptions = Subscriptions()
self._in_subscriptions = queue.Queue() self._in_subscriptions = queue.Queue()
self._out_samples = queue.Queue() self._out_samples = queue.Queue()
def __del__(self) -> None:
self._logger.warning('Destroying YangValidator...')
self._logger.warning('yang_validator.data:')
for path, dnode in self._yang_handler.get_data_paths().items():
self._logger.warning(' {:s}: {:s}'.format(str(path), json.dumps(dnode.print_dict())))
self._yang_handler.destroy()
self._logger.warning('DONE')
@property @property
def subscriptions(self): return self._subscriptions def subscriptions(self): return self._subscriptions
...@@ -58,18 +67,17 @@ class GnmiSessionHandler: ...@@ -58,18 +67,17 @@ class GnmiSessionHandler:
with self._lock: with self._lock:
self._channel = get_grpc_channel(self._address, self._port, self._use_tls, self._logger) self._channel = get_grpc_channel(self._address, self._port, self._use_tls, self._logger)
self._stub = gNMIStub(self._channel) self._stub = gNMIStub(self._channel)
self._supported_encodings = get_supported_encodings( check_capabilities(self._stub, self._username, self._password, timeout=120)
self._stub, self._username, self._password, timeout=120) #self._monit_thread = MonitoringThread(
self._monit_thread = MonitoringThread( # self._stub, self._logger, self._settings, self._in_subscriptions, self._out_samples)
self._stub, self._logger, self._settings, self._in_subscriptions, self._out_samples) #self._monit_thread.start()
self._monit_thread.start()
self._connected.set() self._connected.set()
def disconnect(self): def disconnect(self):
if not self._connected.is_set(): return if not self._connected.is_set(): return
with self._lock: with self._lock:
self._monit_thread.stop() #self._monit_thread.stop()
self._monit_thread.join() #self._monit_thread.join()
self._channel.close() self._channel.close()
self._connected.clear() self._connected.clear()
...@@ -87,9 +95,9 @@ class GnmiSessionHandler: ...@@ -87,9 +95,9 @@ class GnmiSessionHandler:
str_resource_name = 'resource_key[#{:d}]'.format(i) str_resource_name = 'resource_key[#{:d}]'.format(i)
try: try:
chk_string(str_resource_name, resource_key, allow_empty=False) chk_string(str_resource_name, resource_key, allow_empty=False)
self._logger.debug('[GnmiSessionHandler:get] resource_key = {:s}'.format(str(resource_key))) #self._logger.debug('[GnmiSessionHandler:get] resource_key = {:s}'.format(str(resource_key)))
str_path = get_path(resource_key) str_path = get_path(resource_key)
self._logger.debug('[GnmiSessionHandler:get] str_path = {:s}'.format(str(str_path))) #self._logger.debug('[GnmiSessionHandler:get] str_path = {:s}'.format(str(str_path)))
get_request.path.append(path_from_string(str_path)) get_request.path.append(path_from_string(str_path))
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
MSG = 'Exception parsing {:s}: {:s}' MSG = 'Exception parsing {:s}: {:s}'
...@@ -130,7 +138,7 @@ class GnmiSessionHandler: ...@@ -130,7 +138,7 @@ class GnmiSessionHandler:
value = decode_value(update.val) value = decode_value(update.val)
#resource_key_tuple[1] = value #resource_key_tuple[1] = value
#resource_key_tuple[2] = True #resource_key_tuple[2] = True
results.extend(parse(str_path, value)) results.extend(parse(str_path, value, self._yang_handler))
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
MSG = 'Exception processing update {:s}' MSG = 'Exception processing update {:s}'
self._logger.exception(MSG.format(grpc_message_to_json_string(update))) self._logger.exception(MSG.format(grpc_message_to_json_string(update)))
...@@ -159,17 +167,17 @@ class GnmiSessionHandler: ...@@ -159,17 +167,17 @@ class GnmiSessionHandler:
set_request = SetRequest() set_request = SetRequest()
#for resource_key in resource_keys: #for resource_key in resource_keys:
for resource_key, resource_value in resources: for resource_key, resource_value in resources:
self._logger.info('---1') #self._logger.info('---1')
self._logger.info(str(resource_key)) #self._logger.info(str(resource_key))
self._logger.info(str(resource_value)) #self._logger.info(str(resource_value))
#resource_tuple = resource_tuples.get(resource_key) #resource_tuple = resource_tuples.get(resource_key)
#if resource_tuple is None: continue #if resource_tuple is None: continue
#_, value, exists, operation_done = resource_tuple #_, value, exists, operation_done = resource_tuple
if isinstance(resource_value, str): resource_value = json.loads(resource_value) if isinstance(resource_value, str): resource_value = json.loads(resource_value)
str_path, str_data = compose(resource_key, resource_value, delete=False) str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=False)
self._logger.info('---3') #self._logger.info('---3')
self._logger.info(str(str_path)) #self._logger.info(str(str_path))
self._logger.info(str(str_data)) #self._logger.info(str(str_data))
set_request_list = set_request.update #if exists else set_request.replace set_request_list = set_request.update #if exists else set_request.replace
set_request_entry = set_request_list.add() set_request_entry = set_request_list.add()
set_request_entry.path.CopyFrom(path_from_string(str_path)) set_request_entry.path.CopyFrom(path_from_string(str_path))
...@@ -228,18 +236,19 @@ class GnmiSessionHandler: ...@@ -228,18 +236,19 @@ class GnmiSessionHandler:
set_request = SetRequest() set_request = SetRequest()
#for resource_key in resource_keys: #for resource_key in resource_keys:
for resource_key, resource_value in resources: for resource_key, resource_value in resources:
self._logger.info('---1') #self._logger.info('---1')
self._logger.info(str(resource_key)) #self._logger.info(str(resource_key))
self._logger.info(str(resource_value)) #self._logger.info(str(resource_value))
#resource_tuple = resource_tuples.get(resource_key) #resource_tuple = resource_tuples.get(resource_key)
#if resource_tuple is None: continue #if resource_tuple is None: continue
#_, value, exists, operation_done = resource_tuple #_, value, exists, operation_done = resource_tuple
#if not exists: continue #if not exists: continue
if isinstance(resource_value, str): resource_value = json.loads(resource_value) if isinstance(resource_value, str): resource_value = json.loads(resource_value)
str_path, str_data = compose(resource_key, resource_value, delete=True) # pylint: disable=unused-variable
self._logger.info('---3') str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=True)
self._logger.info(str(str_path)) #self._logger.info('---3')
self._logger.info(str(str_data)) #self._logger.info(str(str_path))
#self._logger.info(str(str_data))
set_request_entry = set_request.delete.add() set_request_entry = set_request.delete.add()
set_request_entry.CopyFrom(path_from_string(str_path)) set_request_entry.CopyFrom(path_from_string(str_path))
......
...@@ -12,37 +12,44 @@ ...@@ -12,37 +12,44 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging #, json import json, logging # libyang
import pyangbind.lib.pybindJSON as pybindJSON
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from . import openconfig
from ._Handler import _Handler from ._Handler import _Handler
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
PATH_IF_CTR = "/openconfig-interfaces:interfaces/interface[name={:s}]/state/counters/{:s}" PATH_IF_CTR = '/openconfig-interfaces:interfaces/interface[name={:s}]/state/counters/{:s}'
#pylint: disable=abstract-method #pylint: disable=abstract-method
class ComponentHandler(_Handler): class ComponentHandler(_Handler):
def get_resource_key(self) -> str: return '/endpoints/endpoint' def get_resource_key(self) -> str: return '/endpoints/endpoint'
def get_path(self) -> str: return '/openconfig-platform:components' def get_path(self) -> str: return '/openconfig-platform:components'
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: def parse(
#LOGGER.info('json_data = {:s}'.format(json.dumps(json_data))) self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
LOGGER.debug('json_data = {:s}'.format(json.dumps(json_data)))
oc_components = pybindJSON.loads_ietf(json_data, openconfig.components, 'components') yang_components_path = self.get_path()
#LOGGER.info('oc_components = {:s}'.format(pybindJSON.dumps(oc_components, mode='ietf'))) json_data_valid = yang_handler.parse_to_dict(yang_components_path, json_data, fmt='json')
entries = [] entries = []
for component_key, oc_component in oc_components.component.items(): for component in json_data_valid['components']['component']:
#LOGGER.info('component_key={:s} oc_component={:s}'.format( LOGGER.debug('component={:s}'.format(str(component)))
# component_key, pybindJSON.dumps(oc_component, mode='ietf')
#))
component_name = oc_component.config.name component_name = component['name']
#component_config = component.get('config', {})
component_type = oc_component.state.type #yang_components : libyang.DContainer = yang_handler.get_data_path(yang_components_path)
#yang_component_path = 'component[name="{:s}"]'.format(component_name)
#yang_component : libyang.DContainer = yang_components.create_path(yang_component_path)
#yang_component.merge_data_dict(component, strict=True, validate=False)
component_state = component.get('state', {})
component_type = component_state.get('type')
if component_type is None: continue
component_type = component_type.split(':')[-1] component_type = component_type.split(':')[-1]
if component_type not in {'PORT'}: continue if component_type not in {'PORT'}: continue
...@@ -58,8 +65,6 @@ class ComponentHandler(_Handler): ...@@ -58,8 +65,6 @@ class ComponentHandler(_Handler):
KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED: PATH_IF_CTR.format(interface_name, 'out-pkts' ), KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED: PATH_IF_CTR.format(interface_name, 'out-pkts' ),
} }
if len(endpoint) == 0: continue
entries.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint)) entries.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint))
return entries return entries
...@@ -12,11 +12,11 @@ ...@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json, logging import json, libyang, logging
import pyangbind.lib.pybindJSON as pybindJSON
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
from . import openconfig
from ._Handler import _Handler from ._Handler import _Handler
from .Tools import get_bool, get_int, get_str
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -24,9 +24,11 @@ class InterfaceHandler(_Handler): ...@@ -24,9 +24,11 @@ class InterfaceHandler(_Handler):
def get_resource_key(self) -> str: return '/interface' def get_resource_key(self) -> str: return '/interface'
def get_path(self) -> str: return '/openconfig-interfaces:interfaces' def get_path(self) -> str: return '/openconfig-interfaces:interfaces'
def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: def compose(
if_name = str (resource_value['name' ]) # ethernet-1/1 self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
sif_index = int (resource_value.get('sub_if_index' , 0 )) # 0 ) -> Tuple[str, str]:
if_name = get_str(resource_value, 'name' ) # ethernet-1/1
sif_index = get_int(resource_value, 'sub_if_index', 0) # 0
if delete: if delete:
PATH_TMPL = '/interfaces/interface[name={:s}]/subinterfaces/subinterface[index={:d}]' PATH_TMPL = '/interfaces/interface[name={:s}]/subinterfaces/subinterface[index={:d}]'
...@@ -34,118 +36,166 @@ class InterfaceHandler(_Handler): ...@@ -34,118 +36,166 @@ class InterfaceHandler(_Handler):
str_data = json.dumps({}) str_data = json.dumps({})
return str_path, str_data return str_path, str_data
if_enabled = bool(resource_value.get('enabled' , True)) # True/False if_enabled = get_bool(resource_value, 'enabled', True) # True/False
sif_enabled = bool(resource_value.get('sub_if_enabled' , True)) # True/False sif_enabled = get_bool(resource_value, 'sub_if_enabled', True) # True/False
sif_ipv4_enabled = bool(resource_value.get('sub_if_ipv4_enabled', True)) # True/False sif_vlan_id = get_int (resource_value, 'sif_vlan_id', ) # 127
sif_ipv4_address = str (resource_value['sub_if_ipv4_address' ]) # 172.16.0.1 sif_ipv4_enabled = get_bool(resource_value, 'sub_if_ipv4_enabled', True) # True/False
sif_ipv4_prefix = int (resource_value['sub_if_ipv4_prefix' ]) # 24 sif_ipv4_address = get_str (resource_value, 'sub_if_ipv4_address' ) # 172.16.0.1
sif_ipv4_prefix = get_int (resource_value, 'sub_if_ipv4_prefix' ) # 24
yang_ifs : libyang.DContainer = yang_handler.get_data_path('/openconfig-interfaces:interfaces')
yang_if_path = 'interface[name="{:s}"]'.format(if_name)
yang_if : libyang.DContainer = yang_ifs.create_path(yang_if_path)
yang_if.create_path('config/name', if_name )
if if_enabled is not None: yang_if.create_path('config/enabled', if_enabled)
yang_sifs : libyang.DContainer = yang_if.create_path('subinterfaces')
yang_sif_path = 'subinterface[index="{:d}"]'.format(sif_index)
yang_sif : libyang.DContainer = yang_sifs.create_path(yang_sif_path)
yang_sif.create_path('config/index', sif_index)
if sif_enabled is not None: yang_sif.create_path('config/enabled', sif_enabled)
if sif_vlan_id is not None:
yang_subif_vlan : libyang.DContainer = yang_sif.create_path('openconfig-vlan:vlan')
yang_subif_vlan.create_path('match/single-tagged/config/vlan-id', sif_vlan_id)
yang_ipv4 : libyang.DContainer = yang_sif.create_path('openconfig-if-ip:ipv4')
if sif_ipv4_enabled is not None: yang_ipv4.create_path('config/enabled', sif_ipv4_enabled)
if sif_ipv4_address is not None:
yang_ipv4_addrs : libyang.DContainer = yang_ipv4.create_path('addresses')
yang_ipv4_addr_path = 'address[ip="{:s}"]'.format(sif_ipv4_address)
yang_ipv4_addr : libyang.DContainer = yang_ipv4_addrs.create_path(yang_ipv4_addr_path)
yang_ipv4_addr.create_path('config/ip', sif_ipv4_address)
yang_ipv4_addr.create_path('config/prefix-length', sif_ipv4_prefix )
str_path = '/interfaces/interface[name={:s}]'.format(if_name) str_path = '/interfaces/interface[name={:s}]'.format(if_name)
str_data = json.dumps({ str_data = yang_if.print_mem('json')
'name': if_name, json_data = json.loads(str_data)
'config': {'name': if_name, 'enabled': if_enabled}, json_data = json_data['openconfig-interfaces:interface'][0]
'subinterfaces': { str_data = json.dumps(json_data)
'subinterface': {
'index': sif_index,
'config': {'index': sif_index, 'enabled': sif_enabled},
'ipv4': {
'config': {'enabled': sif_ipv4_enabled},
'addresses': {
'address': {
'ip': sif_ipv4_address,
'config': {'ip': sif_ipv4_address, 'prefix_length': sif_ipv4_prefix},
}
}
}
}
}
})
return str_path, str_data return str_path, str_data
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: def parse(
#LOGGER.info('json_data = {:s}'.format(json.dumps(json_data))) self, json_data : Dict, yang_handler : YangHandler
oc_interfaces = pybindJSON.loads_ietf(json_data, openconfig.interfaces, 'interfaces') ) -> List[Tuple[str, Dict[str, Any]]]:
#LOGGER.info('oc_interfaces = {:s}'.format(pybindJSON.dumps(oc_interfaces, mode='ietf'))) LOGGER.debug('json_data = {:s}'.format(json.dumps(json_data)))
yang_interfaces_path = self.get_path()
json_data_valid = yang_handler.parse_to_dict(yang_interfaces_path, json_data, fmt='json')
entries = [] entries = []
for interface_key, oc_interface in oc_interfaces.interface.items(): for interface in json_data_valid['interfaces']['interface']:
#LOGGER.info('interface_key={:s} oc_interfaces={:s}'.format( LOGGER.debug('interface={:s}'.format(str(interface)))
# interface_key, pybindJSON.dumps(oc_interface, mode='ietf')
#)) interface_name = interface['name']
interface_config = interface.get('config', {})
interface = {}
interface['name'] = oc_interface.config.name #yang_interfaces : libyang.DContainer = yang_handler.get_data_path(yang_interfaces_path)
#yang_interface_path = 'interface[name="{:s}"]'.format(interface_name)
interface_type = oc_interface.config.type #yang_interface : libyang.DContainer = yang_interfaces.create_path(yang_interface_path)
interface_type = interface_type.replace('ianaift:', '') #yang_interface.merge_data_dict(interface, strict=True, validate=False)
interface_type = interface_type.replace('iana-if-type:', '')
interface['type'] = interface_type interface_state = interface.get('state', {})
interface_type = interface_state.get('type')
interface['mtu' ] = oc_interface.config.mtu if interface_type is None: continue
interface['enabled' ] = oc_interface.config.enabled interface_type = interface_type.split(':')[-1]
interface['description' ] = oc_interface.config.description if interface_type not in {'ethernetCsmacd'}: continue
interface['admin-status'] = oc_interface.state.admin_status
interface['oper-status' ] = oc_interface.state.oper_status _interface = {
interface['management' ] = oc_interface.state.management 'name' : interface_name,
'type' : interface_type,
entry_interface_key = '/interface[{:s}]'.format(interface['name']) 'mtu' : interface_state['mtu'],
entries.append((entry_interface_key, interface)) 'ifindex' : interface_state['ifindex'],
'admin-status' : interface_state['admin-status'],
for subinterface_key, oc_subinterface in oc_interface.subinterfaces.subinterface.items(): 'oper-status' : interface_state['oper-status'],
#LOGGER.info('subinterface_key={:d} oc_subinterfaces={:s}'.format( 'management' : interface_state['management'],
# subinterface_key, pybindJSON.dumps(oc_subinterface, mode='ietf') }
#)) if 'description' in interface_config:
_interface['description'] = interface_config['description']
subinterface = {} if 'enabled' in interface_config:
subinterface['index' ] = oc_subinterface.state.index _interface['enabled'] = interface_config['enabled']
subinterface['name' ] = oc_subinterface.state.name if 'hardware-port' in interface_state:
subinterface['enabled'] = oc_subinterface.state.enabled _interface['hardware-port'] = interface_state['hardware-port']
if 'transceiver' in interface_state:
entry_subinterface_key = '{:s}/subinterface[{:d}]'.format(entry_interface_key, subinterface['index']) _interface['transceiver'] = interface_state['transceiver']
entries.append((entry_subinterface_key, subinterface))
entry_interface_key = '/interface[{:s}]'.format(interface_name)
#VLAN_FIELDS = ('vlan', 'openconfig-vlan:vlan', 'ocv:vlan') entries.append((entry_interface_key, _interface))
#json_vlan = dict_get_first(json_subinterface, VLAN_FIELDS, default={})
if interface_type == 'ethernetCsmacd':
#MATCH_FIELDS = ('match', 'openconfig-vlan:match', 'ocv:match') ethernet_state = interface['ethernet']['state']
#json_vlan = dict_get_first(json_vlan, MATCH_FIELDS, default={})
_ethernet = {
#SIN_TAG_FIELDS = ('single-tagged', 'openconfig-vlan:single-tagged', 'ocv:single-tagged') 'mac-address' : ethernet_state['mac-address'],
#json_vlan = dict_get_first(json_vlan, SIN_TAG_FIELDS, default={}) 'hw-mac-address' : ethernet_state['hw-mac-address'],
'port-speed' : ethernet_state['port-speed'].split(':')[-1],
#CONFIG_FIELDS = ('config', 'openconfig-vlan:config', 'ocv:config') 'negotiated-port-speed' : ethernet_state['negotiated-port-speed'].split(':')[-1],
#json_vlan = dict_get_first(json_vlan, CONFIG_FIELDS, default={}) }
entry_ethernet_key = '{:s}/ethernet'.format(entry_interface_key)
#VLAN_ID_FIELDS = ('vlan-id', 'openconfig-vlan:vlan-id', 'ocv:vlan-id') entries.append((entry_ethernet_key, _ethernet))
#subinterface_vlan_id = dict_get_first(json_vlan, VLAN_ID_FIELDS)
#if subinterface_vlan_id is not None: subinterface['vlan_id'] = subinterface_vlan_id subinterfaces = interface.get('subinterfaces', {}).get('subinterface', [])
for subinterface in subinterfaces:
for address_key, oc_address in oc_subinterface.ipv4.addresses.address.items(): LOGGER.debug('subinterface={:s}'.format(str(subinterface)))
#LOGGER.info('ipv4: address_key={:s} oc_address={:s}'.format(
# address_key, pybindJSON.dumps(oc_address, mode='ietf') subinterface_index = subinterface['index']
#)) subinterface_state = subinterface.get('state', {})
address_ipv4 = { _subinterface = {'index': subinterface_index}
'ip' : oc_address.state.ip, if 'name' in subinterface_state:
'origin': oc_address.state.origin, _subinterface['name'] = subinterface_state['name']
'prefix': oc_address.state.prefix_length, if 'enabled' in subinterface_state:
} _subinterface['enabled'] = subinterface_state['enabled']
entry_subinterface_key = '{:s}/subinterface[{:d}]'.format(entry_interface_key, subinterface_index)
entry_address_ipv4_key = '{:s}/ipv4[{:s}]'.format(entry_subinterface_key, address_ipv4['ip']) entries.append((entry_subinterface_key, _subinterface))
entries.append((entry_address_ipv4_key, address_ipv4))
if 'vlan' in subinterface:
for address_key, oc_address in oc_subinterface.ipv6.addresses.address.items(): vlan = subinterface['vlan']
#LOGGER.info('ipv6: address_key={:s} oc_address={:s}'.format( vlan_match = vlan['match']
# address_key, pybindJSON.dumps(oc_address, mode='ietf')
#)) single_tagged = vlan_match.pop('single-tagged', None)
if single_tagged is not None:
address_ipv6 = { single_tagged_config = single_tagged['config']
'ip' : oc_address.state.ip, vlan_id = single_tagged_config['vlan-id']
'origin': oc_address.state.origin,
'prefix': oc_address.state.prefix_length, _vlan = {'vlan_id': vlan_id}
} entry_vlan_key = '{:s}/vlan[single:{:s}]'.format(entry_subinterface_key, vlan_id)
entries.append((entry_vlan_key, _vlan))
entry_address_ipv6_key = '{:s}/ipv6[{:s}]'.format(entry_subinterface_key, address_ipv6['ip'])
entries.append((entry_address_ipv6_key, address_ipv6)) if len(vlan_match) > 0:
raise Exception('Unsupported VLAN schema: {:s}'.format(str(vlan)))
ipv4_addresses = subinterface.get('ipv4', {}).get('addresses', {}).get('address', [])
for ipv4_address in ipv4_addresses:
LOGGER.debug('ipv4_address={:s}'.format(str(ipv4_address)))
ipv4_address_ip = ipv4_address['ip']
ipv4_address_state = ipv4_address.get('state', {})
_ipv4_address = {'ip': ipv4_address_ip}
if 'origin' in ipv4_address_state:
_ipv4_address['origin'] = ipv4_address_state['origin']
if 'prefix-length' in ipv4_address_state:
_ipv4_address['prefix'] = ipv4_address_state['prefix-length']
entry_ipv4_address_key = '{:s}/ipv4[{:s}]'.format(entry_subinterface_key, ipv4_address_ip)
entries.append((entry_ipv4_address_key, _ipv4_address))
ipv6_addresses = subinterface.get('ipv6', {}).get('addresses', {}).get('address', [])
for ipv6_address in ipv6_addresses:
LOGGER.debug('ipv6_address={:s}'.format(str(ipv6_address)))
ipv6_address_ip = ipv6_address['ip']
ipv6_address_state = ipv6_address.get('state', {})
_ipv6_address = {'ip': ipv6_address_ip}
if 'origin' in ipv6_address_state:
_ipv6_address['origin'] = ipv6_address_state['origin']
if 'prefix-length' in ipv6_address_state:
_ipv6_address['prefix'] = ipv6_address_state['prefix-length']
entry_ipv6_address_key = '{:s}/ipv6[{:s}]'.format(entry_subinterface_key, ipv6_address_ip)
entries.append((entry_ipv6_address_key, _ipv6_address))
return entries return entries
...@@ -12,20 +12,40 @@ ...@@ -12,20 +12,40 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json, logging import json, libyang, logging
import pyangbind.lib.pybindJSON as pybindJSON import operator
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
from . import openconfig
from ._Handler import _Handler from ._Handler import _Handler
from .Tools import get_bool, get_int, get_str
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
MAP_NETWORK_INSTANCE_TYPE = {
# special routing instance; acts as default/global routing instance for a network device
'DEFAULT': 'openconfig-network-instance-types:DEFAULT_INSTANCE',
# private L3-only routing instance; formed of one or more RIBs
'L3VRF': 'openconfig-network-instance-types:L3VRF',
# private L2-only switch instance; formed of one or more L2 forwarding tables
'L2VSI': 'openconfig-network-instance-types:L2VSI',
# private L2-only forwarding instance; point to point connection between two endpoints
'L2P2P': 'openconfig-network-instance-types:L2P2P',
# private Layer 2 and Layer 3 forwarding instance
'L2L3': 'openconfig-network-instance-types:L2L3',
}
class NetworkInstanceHandler(_Handler): class NetworkInstanceHandler(_Handler):
def get_resource_key(self) -> str: return '/network_instance' def get_resource_key(self) -> str: return '/network_instance'
def get_path(self) -> str: return '/openconfig-network-instance:network-instances' def get_path(self) -> str: return '/openconfig-network-instance:network-instances'
def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: def compose(
ni_name = str(resource_value['name']) # test-svc self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
ni_name = get_str(resource_value, 'name') # test-svc
if delete: if delete:
PATH_TMPL = '/network-instances/network-instance[name={:s}]' PATH_TMPL = '/network-instances/network-instance[name={:s}]'
...@@ -33,15 +53,11 @@ class NetworkInstanceHandler(_Handler): ...@@ -33,15 +53,11 @@ class NetworkInstanceHandler(_Handler):
str_data = json.dumps({}) str_data = json.dumps({})
return str_path, str_data return str_path, str_data
ni_type = str(resource_value['type']) # L3VRF / L2VSI / ... ni_type = get_str(resource_value, 'type') # L3VRF / L2VSI / ...
ni_type = MAP_NETWORK_INSTANCE_TYPE.get(ni_type, ni_type)
# not works: [FailedPrecondition] unsupported identifier 'DIRECTLY_CONNECTED' # 'DIRECTLY_CONNECTED' is implicitly added
#protocols = [self._compose_directly_connected()]
MAP_OC_NI_TYPE = {
'L3VRF': 'openconfig-network-instance-types:L3VRF',
}
ni_type = MAP_OC_NI_TYPE.get(ni_type, ni_type)
str_path = '/network-instances/network-instance[name={:s}]'.format(ni_name) str_path = '/network-instances/network-instance[name={:s}]'.format(ni_name)
str_data = json.dumps({ str_data = json.dumps({
...@@ -51,19 +67,92 @@ class NetworkInstanceHandler(_Handler): ...@@ -51,19 +67,92 @@ class NetworkInstanceHandler(_Handler):
}) })
return str_path, str_data return str_path, str_data
def _compose_directly_connected(self, name=None, enabled=True) -> Dict: def parse(
identifier = 'DIRECTLY_CONNECTED' self, json_data : Dict, yang_handler : YangHandler
if name is None: name = 'DIRECTLY_CONNECTED' ) -> List[Tuple[str, Dict[str, Any]]]:
return { LOGGER.debug('json_data = {:s}'.format(json.dumps(json_data)))
'identifier': identifier, 'name': name,
'config': {'identifier': identifier, 'name': name, 'enabled': enabled}, # Arista Parsing Fixes:
} # - Default instance comes with mpls/signaling-protocols/rsvp-te/global/hellos/state/hello-interval set to 0
# overwrite with .../hellos/config/hello-interval
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: network_instances = json_data.get('openconfig-network-instance:network-instance', [])
LOGGER.info('json_data = {:s}'.format(json.dumps(json_data))) for network_instance in network_instances:
oc_network_instances = pybindJSON.loads_ietf(json_data, openconfig., 'interfaces') if network_instance['name'] != 'default': continue
#LOGGER.info('oc_interfaces = {:s}'.format(pybindJSON.dumps(oc_interfaces, mode='ietf'))) mpls_rsvp_te = network_instance.get('mpls', {}).get('signaling-protocols', {}).get('rsvp-te', {})
response = [] mpls_rsvp_te_hellos = mpls_rsvp_te.get('global', {}).get('hellos', {})
return response hello_interval = mpls_rsvp_te_hellos.get('config', {}).get('hello-interval', 9000)
mpls_rsvp_te_hellos.get('state', {})['hello-interval'] = hello_interval
openconfig-network-instance:network-instance
\ No newline at end of file yang_network_instances_path = self.get_path()
json_data_valid = yang_handler.parse_to_dict(yang_network_instances_path, json_data, fmt='json', strict=False)
entries = []
for network_instance in json_data_valid['network-instances']['network-instance']:
LOGGER.debug('network_instance={:s}'.format(str(network_instance)))
ni_name = network_instance['name']
ni_config = network_instance['config']
ni_type = ni_config['type'].split(':')[-1]
_net_inst = {'name': ni_name, 'type': ni_type}
entry_net_inst_key = '/network_instance[{:s}]'.format(ni_name)
entries.append((entry_net_inst_key, _net_inst))
ni_protocols = network_instance.get('protocols', {}).get('protocol', [])
for ni_protocol in ni_protocols:
ni_protocol_id = ni_protocol['identifier'].split(':')[-1]
ni_protocol_name = ni_protocol['name']
_protocol = {'id': ni_protocol_id, 'name': ni_protocol_name}
entry_protocol_key = '{:s}/protocol[{:s}]'.format(entry_net_inst_key, ni_protocol_id)
entries.append((entry_protocol_key, _protocol))
if ni_protocol_id == 'STATIC':
static_routes = ni_protocol.get('static-routes', {}).get('static', [])
for static_route in static_routes:
static_route_prefix = static_route['prefix']
next_hops = static_route.get('next-hops', {}).get('next-hop', [])
_next_hops = [
{
'index' : next_hop['index'],
'gateway': next_hop['config']['next-hop'],
'metric' : next_hop['config']['metric'],
}
for next_hop in next_hops
]
_next_hops = sorted(_next_hops, key=operator.itemgetter('index'))
_static_route = {'prefix': static_route_prefix, 'next_hops': _next_hops}
entry_static_route_key = '{:s}/static_routes[{:s}]'.format(
entry_protocol_key, static_route_prefix
)
entries.append((entry_static_route_key, _static_route))
ni_tables = network_instance.get('tables', {}).get('table', [])
for ni_table in ni_tables:
ni_table_protocol = ni_table['protocol'].split(':')[-1]
ni_table_address_family = ni_table['address-family'].split(':')[-1]
_table = {'protocol': ni_table_protocol, 'address_family': ni_table_address_family}
entry_table_key = '{:s}/table[{:s},{:s}]'.format(
entry_net_inst_key, ni_table_protocol, ni_table_address_family
)
entries.append((entry_table_key, _table))
ni_vlans = network_instance.get('vlans', {}).get('vlan', [])
for ni_vlan in ni_vlans:
ni_vlan_id = ni_vlan['vlan-id']
#ni_vlan_config = ni_vlan['config']
ni_vlan_state = ni_vlan['state']
ni_vlan_name = ni_vlan_state['name']
_members = [
member['state']['interface']
for member in ni_vlan.get('members', {}).get('member', [])
]
_vlan = {'vlan_id': ni_vlan_id, 'name': ni_vlan_name, 'members': _members}
entry_vlan_key = '{:s}/vlan[{:d}]'.format(entry_net_inst_key, ni_vlan_id)
entries.append((entry_vlan_key, _vlan))
return entries
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import re import re
from typing import Any, Dict, Iterable, Optional from typing import Any, Callable, Dict, Iterable, Optional
RE_REMOVE_FILTERS = re.compile(r'\[[^\]]+\]') RE_REMOVE_FILTERS = re.compile(r'\[[^\]]+\]')
RE_REMOVE_NAMESPACES = re.compile(r'\/[a-zA-Z0-9\_\-]+:') RE_REMOVE_NAMESPACES = re.compile(r'\/[a-zA-Z0-9\_\-]+:')
...@@ -40,3 +40,22 @@ def container_get_first( ...@@ -40,3 +40,22 @@ def container_get_first(
if namespace_key_name in container: return container[namespace_key_name] if namespace_key_name in container: return container[namespace_key_name]
return default return default
def get_value(
resource_value : Dict, field_name : str, cast_func : Callable = lambda x:x, default : Optional[Any] = None
) -> Optional[Any]:
field_value = resource_value.get(field_name, default)
if field_value is not None: field_value = cast_func(field_value)
return field_value
def get_bool(resource_value : Dict, field_name : bool, default : Optional[Any] = None) -> bool:
return get_value(resource_value, field_name, cast_func=bool, default=default)
def get_float(resource_value : Dict, field_name : float, default : Optional[Any] = None) -> float:
return get_value(resource_value, field_name, cast_func=float, default=default)
def get_int(resource_value : Dict, field_name : int, default : Optional[Any] = None) -> int:
return get_value(resource_value, field_name, cast_func=int, default=default)
def get_str(resource_value : Dict, field_name : str, default : Optional[Any] = None) -> str:
return get_value(resource_value, field_name, cast_func=str, default=default)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, libyang, logging, os
from typing import Dict, Optional
YANG_BASE_PATH = os.path.join(os.path.dirname(__file__), '..', 'git', 'openconfig', 'public')
YANG_SEARCH_PATHS = ':'.join([
os.path.join(YANG_BASE_PATH, 'release'),
os.path.join(YANG_BASE_PATH, 'third_party'),
])
YANG_MODULES = [
'iana-if-type',
'openconfig-vlan-types',
'openconfig-interfaces',
'openconfig-if-8021x',
'openconfig-if-aggregate',
'openconfig-if-ethernet-ext',
'openconfig-if-ethernet',
'openconfig-if-ip-ext',
'openconfig-if-ip',
'openconfig-if-poe',
'openconfig-if-sdn-ext',
'openconfig-if-tunnel',
'openconfig-vlan',
'openconfig-types',
'openconfig-policy-types',
'openconfig-mpls-types',
'openconfig-network-instance-types',
'openconfig-network-instance',
'openconfig-platform',
'openconfig-platform-controller-card',
'openconfig-platform-cpu',
'openconfig-platform-ext',
'openconfig-platform-fabric',
'openconfig-platform-fan',
'openconfig-platform-integrated-circuit',
'openconfig-platform-linecard',
'openconfig-platform-pipeline-counters',
'openconfig-platform-port',
'openconfig-platform-psu',
'openconfig-platform-software',
'openconfig-platform-transceiver',
'openconfig-platform-types',
]
LOGGER = logging.getLogger(__name__)
class YangHandler:
def __init__(self) -> None:
self._yang_context = libyang.Context(YANG_SEARCH_PATHS)
self._loaded_modules = set()
for yang_module_name in YANG_MODULES:
LOGGER.info('Loading module: {:s}'.format(str(yang_module_name)))
self._yang_context.load_module(yang_module_name).feature_enable_all()
self._loaded_modules.add(yang_module_name)
self._data_path_instances = dict()
def get_data_paths(self) -> Dict[str, libyang.DNode]:
return self._data_path_instances
def get_data_path(self, path : str) -> libyang.DNode:
data_path_instance = self._data_path_instances.get(path)
if data_path_instance is None:
data_path_instance = self._yang_context.create_data_path(path)
self._data_path_instances[path] = data_path_instance
return data_path_instance
def parse_to_dict(
self, request_path : str, json_data : Dict, fmt : str = 'json', strict : bool = True
) -> Dict:
if fmt != 'json': raise Exception('Unsupported format: {:s}'.format(str(fmt)))
LOGGER.debug('request_path = {:s}'.format(str(request_path)))
LOGGER.debug('json_data = {:s}'.format(str(json_data)))
LOGGER.debug('format = {:s}'.format(str(fmt)))
parent_path_parts = list(filter(lambda s: len(s) > 0, request_path.split('/')))
for parent_path_part in reversed(parent_path_parts):
json_data = {parent_path_part: json_data}
str_data = json.dumps(json_data)
dnode : Optional[libyang.DNode] = self._yang_context.parse_data_mem(
str_data, fmt, strict=strict, parse_only=True, #validate_present=True, #validate=True,
)
if dnode is None: raise Exception('Unable to parse Data({:s})'.format(str(json_data)))
parsed = dnode.print_dict()
LOGGER.debug('parsed = {:s}'.format(json.dumps(parsed)))
dnode.free()
return parsed
def destroy(self) -> None:
self._yang_context.destroy()
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
from .YangHandler import YangHandler
class _Handler: class _Handler:
def get_resource_key(self) -> str: def get_resource_key(self) -> str:
...@@ -23,10 +24,14 @@ class _Handler: ...@@ -23,10 +24,14 @@ class _Handler:
# Retrieve the OpenConfig path schema used to interrogate the device # Retrieve the OpenConfig path schema used to interrogate the device
raise NotImplementedError() raise NotImplementedError()
def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: def compose(
self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
# Compose a Set/Delete message based on the resource_key/resource_value fields, and the delete flag # Compose a Set/Delete message based on the resource_key/resource_value fields, and the delete flag
raise NotImplementedError() raise NotImplementedError()
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: def parse(
self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
# Parse a Reply from the device and return a list of resource_key/resource_value pairs # Parse a Reply from the device and return a list of resource_key/resource_value pairs
raise NotImplementedError() raise NotImplementedError()
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from ._Handler import _Handler from ._Handler import _Handler
from .Component import ComponentHandler from .Component import ComponentHandler
...@@ -23,6 +23,7 @@ from .NetworkInstance import NetworkInstanceHandler ...@@ -23,6 +23,7 @@ from .NetworkInstance import NetworkInstanceHandler
from .NetworkInstanceInterface import NetworkInstanceInterfaceHandler from .NetworkInstanceInterface import NetworkInstanceInterfaceHandler
from .NetworkInstanceStaticRoute import NetworkInstanceStaticRouteHandler from .NetworkInstanceStaticRoute import NetworkInstanceStaticRouteHandler
from .Tools import get_schema from .Tools import get_schema
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -71,7 +72,8 @@ PATH_TO_HANDLER = { ...@@ -71,7 +72,8 @@ PATH_TO_HANDLER = {
} }
def get_handler( def get_handler(
resource_key : Optional[str] = None, path : Optional[str] = None, raise_if_not_found=True resource_key : Optional[str] = None, path : Optional[str] = None,
raise_if_not_found=True
) -> Optional[_Handler]: ) -> Optional[_Handler]:
if (resource_key is None) == (path is None): if (resource_key is None) == (path is None):
MSG = 'Exactly one of resource_key({:s}) or path({:s}) must be specified' MSG = 'Exactly one of resource_key({:s}) or path({:s}) must be specified'
...@@ -95,10 +97,18 @@ def get_handler( ...@@ -95,10 +97,18 @@ def get_handler(
return handler return handler
def get_path(resource_key : str) -> str: def get_path(resource_key : str) -> str:
return get_handler(resource_key=resource_key).get_path() handler = get_handler(resource_key=resource_key)
return handler.get_path()
def parse(str_path : str, value : Union[Dict, List]): def parse(
return get_handler(path=str_path).parse(value) str_path : str, value : Union[Dict, List], yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
handler = get_handler(path=str_path)
return handler.parse(value, yang_handler)
def compose(resource_key : str, resource_value : Union[Dict, List], delete : bool = False) -> Tuple[str, str]: def compose(
return get_handler(resource_key=resource_key).compose(resource_key, resource_value, delete=delete) resource_key : str, resource_value : Union[Dict, List],
yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
handler = get_handler(resource_key=resource_key)
return handler.compose(resource_key, resource_value, yang_handler, delete=delete)
...@@ -17,7 +17,7 @@ from common.tools.grpc.Tools import grpc_message_to_json ...@@ -17,7 +17,7 @@ from common.tools.grpc.Tools import grpc_message_to_json
from ..gnmi.gnmi_pb2 import CapabilityRequest # pylint: disable=no-name-in-module from ..gnmi.gnmi_pb2 import CapabilityRequest # pylint: disable=no-name-in-module
from ..gnmi.gnmi_pb2_grpc import gNMIStub from ..gnmi.gnmi_pb2_grpc import gNMIStub
def get_supported_encodings( def check_capabilities(
stub : gNMIStub, username : str, password : str, timeout : Optional[int] = None stub : gNMIStub, username : str, password : str, timeout : Optional[int] = None
) -> Set[Union[str, int]]: ) -> Set[Union[str, int]]:
metadata = [('username', username), ('password', password)] metadata = [('username', username), ('password', password)]
...@@ -25,6 +25,17 @@ def get_supported_encodings( ...@@ -25,6 +25,17 @@ def get_supported_encodings(
reply = stub.Capabilities(req, metadata=metadata, timeout=timeout) reply = stub.Capabilities(req, metadata=metadata, timeout=timeout)
data = grpc_message_to_json(reply) data = grpc_message_to_json(reply)
gnmi_version = data.get('gNMI_version')
if gnmi_version is None or gnmi_version != '0.7.0':
raise Exception('Unsupported gNMI version: {:s}'.format(str(gnmi_version)))
#supported_models = {
# supported_model['name']: supported_model['version']
# for supported_model in data.get('supported_models', [])
#}
# TODO: check supported models and versions
supported_encodings = { supported_encodings = {
supported_encoding supported_encoding
for supported_encoding in data.get('supported_encodings', []) for supported_encoding in data.get('supported_encodings', [])
...@@ -33,4 +44,6 @@ def get_supported_encodings( ...@@ -33,4 +44,6 @@ def get_supported_encodings(
if len(supported_encodings) == 0: if len(supported_encodings) == 0:
# pylint: disable=broad-exception-raised # pylint: disable=broad-exception-raised
raise Exception('No supported encodings found') raise Exception('No supported encodings found')
return supported_encodings if 'JSON_IETF' not in supported_encodings:
# pylint: disable=broad-exception-raised
raise Exception('JSON_IETF encoding not supported')
...@@ -61,7 +61,7 @@ def decode_value(value : TypedValue) -> Any: ...@@ -61,7 +61,7 @@ def decode_value(value : TypedValue) -> Any:
str_value : str = value.json_ietf_val.decode('UTF-8') str_value : str = value.json_ietf_val.decode('UTF-8')
try: try:
# Cleanup and normalize the records according to OpenConfig # Cleanup and normalize the records according to OpenConfig
str_value = str_value.replace('openconfig-platform-types:', 'oc-platform-types:') #str_value = str_value.replace('openconfig-platform-types:', 'oc-platform-types:')
json_value = json.loads(str_value) json_value = json.loads(str_value)
recursive_remove_keys(json_value) recursive_remove_keys(json_value)
return json_value return json_value
......
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, os, sys, time
from typing import Dict, Tuple
os.environ['DEVICE_EMULATED_ONLY'] = 'YES'
from device.service.drivers.gnmi_openconfig.GnmiOpenConfigDriver import GnmiOpenConfigDriver # pylint: disable=wrong-import-position
from device.service.driver_api._Driver import (
RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, RESOURCE_ROUTING_POLICIES, RESOURCE_SERVICES
)
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
# +---+---------------------------+--------------+---------------------------------+-------+---------+--------------------+--------------+
# | # | Name | Container ID | Image | Kind | State | IPv4 Address | IPv6 Address |
# +---+---------------------------+--------------+---------------------------------+-------+---------+--------------------+--------------+
# | 1 | clab-tfs-scenario-client1 | a8d48ec3265a | ghcr.io/hellt/network-multitool | linux | running | 172.100.100.201/24 | N/A |
# | 2 | clab-tfs-scenario-client2 | fc88436d2b32 | ghcr.io/hellt/network-multitool | linux | running | 172.100.100.202/24 | N/A |
# | 3 | clab-tfs-scenario-srl1 | b995b9bdadda | ghcr.io/nokia/srlinux | srl | running | 172.100.100.101/24 | N/A |
# | 4 | clab-tfs-scenario-srl2 | aacfc38cc376 | ghcr.io/nokia/srlinux | srl | running | 172.100.100.102/24 | N/A |
# +---+---------------------------+--------------+---------------------------------+-------+---------+--------------------+--------------+
def interface(if_name, sif_index, ipv4_address, ipv4_prefix, enabled) -> Tuple[str, Dict]:
str_path = '/interface[{:s}]'.format(if_name)
str_data = {'name': if_name, 'enabled': enabled, 'sub_if_index': sif_index, 'sub_if_enabled': enabled,
'sub_if_ipv4_enabled': enabled, 'sub_if_ipv4_address': ipv4_address, 'sub_if_ipv4_prefix': ipv4_prefix}
return str_path, str_data
def network_instance(ni_name, ni_type) -> Tuple[str, Dict]:
str_path = '/network_instance[{:s}]'.format(ni_name)
str_data = {'name': ni_name, 'type': ni_type}
return str_path, str_data
def network_instance_static_route(ni_name, prefix, next_hop, next_hop_index=0) -> Tuple[str, Dict]:
str_path = '/network_instance[{:s}]/static_route[{:s}]'.format(ni_name, prefix)
str_data = {'name': ni_name, 'prefix': prefix, 'next_hop': next_hop, 'next_hop_index': next_hop_index}
return str_path, str_data
def network_instance_interface(ni_name, if_name, sif_index) -> Tuple[str, Dict]:
str_path = '/network_instance[{:s}]/interface[{:s}.{:d}]'.format(ni_name, if_name, sif_index)
str_data = {'name': ni_name, 'if_name': if_name, 'sif_index': sif_index}
return str_path, str_data
def main():
driver_settings = {
'protocol': 'gnmi',
'username': 'admin',
'password': 'admin',
'use_tls' : False,
}
driver = GnmiOpenConfigDriver('172.20.20.101', 6030, **driver_settings)
driver.Connect()
#resources_to_get = []
#resources_to_get = [RESOURCE_ENDPOINTS]
#resources_to_get = [RESOURCE_INTERFACES]
resources_to_get = [RESOURCE_NETWORK_INSTANCES]
#resources_to_get = [RESOURCE_ROUTING_POLICIES]
#resources_to_get = [RESOURCE_SERVICES]
LOGGER.info('resources_to_get = {:s}'.format(str(resources_to_get)))
results_getconfig = driver.GetConfig(resources_to_get)
LOGGER.info('results_getconfig = {:s}'.format(str(results_getconfig)))
#resources_to_set = [
# network_instance('test-svc', 'L3VRF'),
#
# interface('ethernet-1/1', 0, '172.16.0.1', 24, True),
# network_instance_interface('test-svc', 'ethernet-1/1', 0),
#
# interface('ethernet-1/2', 0, '172.0.0.1', 24, True),
# network_instance_interface('test-svc', 'ethernet-1/2', 0),
#
# network_instance_static_route('test-svc', '172.0.0.0/24', '172.16.0.2'),
# network_instance_static_route('test-svc', '172.2.0.0/24', '172.16.0.3'),
#]
#LOGGER.info('resources_to_set = {:s}'.format(str(resources_to_set)))
#results_setconfig = driver.SetConfig(resources_to_set)
#LOGGER.info('results_setconfig = {:s}'.format(str(results_setconfig)))
#resources_to_delete = [
# #network_instance_static_route('d35fc1d9', '172.0.0.0/24', '172.16.0.2'),
# #network_instance_static_route('d35fc1d9', '172.2.0.0/24', '172.16.0.3'),
#
# #network_instance_interface('d35fc1d9', 'ethernet-1/1', 0),
# #network_instance_interface('d35fc1d9', 'ethernet-1/2', 0),
#
# #interface('ethernet-1/1', 0, '172.16.1.1', 24, True),
# #interface('ethernet-1/2', 0, '172.0.0.2', 24, True),
#
# #network_instance('20f66fb5', 'L3VRF'),
#]
#LOGGER.info('resources_to_delete = {:s}'.format(str(resources_to_delete)))
#results_deleteconfig = driver.DeleteConfig(resources_to_delete)
#LOGGER.info('results_deleteconfig = {:s}'.format(str(results_deleteconfig)))
time.sleep(1)
driver.Disconnect()
return 0
if __name__ == '__main__':
sys.exit(main())
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment