Commit 9cc0143d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Device component - gNMI OpenConfig driver:

- Driver now discovers device vendor, platform, model, etc. from initial discovery
- If cEOS, use REPLACE to configure interfaces with vlans, otherwise, classical update
parent f1556580
Loading
Loading
Loading
Loading
+4 −2
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_type
from device.service.driver_api._Driver import _Driver
from .GnmiSessionHandler import GnmiSessionHandler
from .GnmiSessionHandler import GnmiSessionHandler, INITIAL_TARGET_INFO_RESOURCE_KEY

DRIVER_NAME = 'gnmi_openconfig'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
@@ -51,7 +51,9 @@ class GnmiOpenConfigDriver(_Driver):
    @metered_subclass_method(METRICS_POOL)
    def GetInitialConfig(self) -> List[Tuple[str, Any]]:
        with self.__lock:
            return []
            target_facts = self.__handler.target_facts
            if len(target_facts) == 0: return []
            return [(INITIAL_TARGET_INFO_RESOURCE_KEY, target_facts)]

    @metered_subclass_method(METRICS_POOL)
    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
+171 −9
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk
from .gnmi.gnmi_pb2_grpc import gNMIStub
from .gnmi.gnmi_pb2 import Encoding, GetRequest, SetRequest, UpdateResult   # pylint: disable=no-name-in-module
from .handlers import ALL_RESOURCE_KEYS, compose, get_path, parse
from .handlers.Interface import EOS_TAGGED_L3_REPLACE_FIELD
from .handlers.YangHandler import YangHandler
from .tools.Capabilities import check_capabilities
from .tools.Channel import get_grpc_channel
@@ -27,6 +28,91 @@ from .tools.Subscriptions import Subscriptions
from .tools.Value import decode_value #, value_exists
from .MonitoringThread import MonitoringThread

INITIAL_TARGET_INFO_RESOURCE_KEY = '_info/gnmi/target'

DISCOVERY_PATH_SYSTEM_STATE = '/openconfig-system:system/state'
DISCOVERY_PATH_CHASSIS_STATE = '/openconfig-platform:components/component[name=Chassis]/state'


def _normalize_hint(value: Any) -> Optional[str]:
    if not isinstance(value, str): return None
    value = value.strip()
    if len(value) == 0: return None
    return value.lower()

def _extract_target_facts_from_system_state(system_state : Dict[str, Any]) -> Dict[str, str]:
    facts : Dict[str, str] = dict()
    hostname = system_state.get('openconfig-system:hostname')
    if isinstance(hostname, str) and len(hostname) > 0:
        facts['hostname'] = hostname

    software_version = system_state.get('openconfig-system:software-version')
    if isinstance(software_version, str) and len(software_version) > 0:
        facts['software_version'] = software_version

    return facts

def _extract_target_facts_from_chassis_state(chassis_state : Dict[str, Any]) -> Dict[str, str]:
    facts : Dict[str, str] = dict()

    vendor = chassis_state.get('openconfig-platform:mfg-name')
    if isinstance(vendor, str) and len(vendor) > 0:
        facts['vendor'] = vendor

    model = chassis_state.get('openconfig-platform:part-no')
    if isinstance(model, str) and len(model) > 0:
        facts['model'] = model

    description = chassis_state.get('openconfig-platform:description')
    if isinstance(description, str) and len(description) > 0:
        facts['description'] = description

    serial_no = chassis_state.get('openconfig-platform:serial-no')
    if isinstance(serial_no, str) and len(serial_no) > 0:
        facts['serial_no'] = serial_no

    return facts


def _is_arista_eos_target(settings: Dict[str, Any]) -> bool:
    hints = []
    target_facts = settings.get('_target_facts', {})
    if isinstance(target_facts, dict):
        for value in target_facts.values():
            normalized_value = _normalize_hint(value)
            if normalized_value is not None:
                hints.append(normalized_value)
    for key in ('vendor', 'platform', 'type', 'device_type', 'kind', 'model'):
        normalized_value = _normalize_hint(settings.get(key))
        if normalized_value is not None:
            hints.append(normalized_value)
    hint_blob = ' '.join(hints)
    return any(token in hint_blob for token in ('arista', 'ceos', 'eos'))


def _is_tagged_l3_access_subinterface(resource_key: str, resource_value: Dict[str, Any]) -> bool:
    if not resource_key.startswith('/interface['):
        return False
    if resource_value.get('vlan_id') is None:
        return False
    if resource_value.get('index') != 0:
        return False
    if resource_value.get('address_ip') is None or resource_value.get('address_prefix') is None:
        return False
    if resource_value.get('type') != 'l3ipvlan':
        return False
    return True


def _prepare_set_operation(
    settings: Dict[str, Any], resource_key: str, resource_value: Dict[str, Any]
) -> Tuple[str, Dict[str, Any]]:
    prepared_value = copy.deepcopy(resource_value)
    if _is_arista_eos_target(settings) and _is_tagged_l3_access_subinterface(resource_key, prepared_value):
        prepared_value[EOS_TAGGED_L3_REPLACE_FIELD] = True
        return 'replace', prepared_value
    return 'update', prepared_value

class GnmiSessionHandler:
    def __init__(self, address : str, port : int, settings : Dict, logger : logging.Logger) -> None:
        self._address   = address
@@ -45,6 +131,7 @@ class GnmiSessionHandler:
        self._subscriptions = Subscriptions()
        self._in_subscriptions = queue.Queue()
        self._out_samples = queue.Queue()
        self._target_facts : Dict[str, Any] = dict()

    def __del__(self) -> None:
        self._logger.info('Destroying YangValidator...')
@@ -64,11 +151,64 @@ class GnmiSessionHandler:
    @property
    def out_samples(self): return self._out_samples

    @property
    def target_facts(self): return copy.deepcopy(self._target_facts)

    def _merge_target_facts(self, target_facts : Optional[Dict[str, Any]]) -> None:
        if not isinstance(target_facts, dict): return
        for key, value in target_facts.items():
            if value is None: continue
            if isinstance(value, str) and len(value) == 0: continue
            self._target_facts[key] = value
        self._settings['_target_facts'] = copy.deepcopy(self._target_facts)

    def _get_discovery_value(self, str_path : str) -> Optional[Dict[str, Any]]:
        metadata = [('username', self._username), ('password', self._password)]

        get_request = GetRequest()
        get_request.type = GetRequest.DataType.STATE
        get_request.encoding = Encoding.JSON_IETF
        get_request.path.append(path_from_string(str_path))

        try:
            get_reply = self._stub.Get(get_request, metadata=metadata, timeout=30)
        except grpc.RpcError:
            self._logger.debug('Discovery probe failed for path=%s', str_path, exc_info=True)
            return None

        for notification in get_reply.notification:
            for update in notification.update:
                try:
                    value = decode_value(update.val)
                except Exception: # pylint: disable=broad-except
                    self._logger.debug('Discovery decode failed for path=%s', str_path, exc_info=True)
                    continue
                if isinstance(value, dict):
                    return value

        return None

    def _discover_target_facts(self, capability_info : Dict[str, Any]) -> None:
        self._merge_target_facts(capability_info.get('target_facts'))

        gnmi_version = capability_info.get('gnmi_version')
        if isinstance(gnmi_version, str) and len(gnmi_version) > 0:
            self._merge_target_facts({'gnmi_version': gnmi_version})

        system_state = self._get_discovery_value(DISCOVERY_PATH_SYSTEM_STATE)
        if isinstance(system_state, dict):
            self._merge_target_facts(_extract_target_facts_from_system_state(system_state))

        chassis_state = self._get_discovery_value(DISCOVERY_PATH_CHASSIS_STATE)
        if isinstance(chassis_state, dict):
            self._merge_target_facts(_extract_target_facts_from_chassis_state(chassis_state))

    def connect(self):
        with self._lock:
            self._channel = get_grpc_channel(self._address, self._port, self._use_tls, self._logger)
            self._stub = gNMIStub(self._channel)
            check_capabilities(self._stub, self._username, self._password, timeout=120)
            capability_info = check_capabilities(self._stub, self._username, self._password, timeout=120)
            self._discover_target_facts(capability_info)
            self._monit_thread = MonitoringThread(
                self._stub, self._logger, self._settings, self._in_subscriptions, self._out_samples)
            self._monit_thread.start()
@@ -178,16 +318,17 @@ class GnmiSessionHandler:
            #if resource_tuple is None: continue
            #_, value, exists, operation_done = resource_tuple
            if isinstance(resource_value, str): resource_value = json.loads(resource_value)
            str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=False)
            operation, prepared_value = _prepare_set_operation(self._settings, resource_key, resource_value)
            str_path, str_data = compose(resource_key, prepared_value, self._yang_handler, delete=False)
            if str_path is None: continue # nothing to set
            #self._logger.info('---3')
            #self._logger.info(str(str_path))
            #self._logger.info(str(str_data))
            set_request_list = set_request.update #if exists else set_request.replace
            set_request_list = set_request.replace if operation == 'replace' else set_request.update
            set_request_entry = set_request_list.add()
            set_request_entry.path.CopyFrom(path_from_string(str_path))
            set_request_entry.val.json_val = str_data.encode('UTF-8')
            resources_requested.append((resource_key, resource_value))
            resources_requested.append((len(resources_requested), resource_key, str_path, operation))

        self._logger.debug('set_request={:s}'.format(grpc_message_to_json_string(set_request)))
        metadata = [('username', self._username), ('password', self._password)]
@@ -195,13 +336,34 @@ class GnmiSessionHandler:
        set_reply = self._stub.Set(set_request, metadata=metadata, timeout=timeout)
        self._logger.debug('set_reply={:s}'.format(grpc_message_to_json_string(set_reply)))

        results = []
        for (resource_key, resource_value), update_result in zip(resources_requested, set_reply.response):
        results = [
            (resource_key, Exception('Not Processed'))
            for _, resource_key, _, _ in resources_requested
        ]
        pending_requests = list(resources_requested)
        for update_result in set_reply.response:
            operation = update_result.op
            str_path = path_to_string(update_result.path)
            expected_operation = None
            if operation == UpdateResult.UPDATE:
                results.append((resource_key, True))
            else:
                results.append((resource_key, Exception('Unexpected')))
                expected_operation = 'update'
            elif operation == UpdateResult.REPLACE:
                expected_operation = 'replace'

            matched = False
            if expected_operation is not None:
                for j, (resource_idx, resource_key, resource_path, requested_operation) in enumerate(pending_requests):
                    if resource_path == str_path and requested_operation == expected_operation:
                        results[resource_idx] = (resource_key, True)
                        pending_requests.pop(j)
                        matched = True
                        break

            if not matched:
                self._logger.warning(
                    'Unexpected Set reply operation=%s path=%s pending=%s',
                    str(operation), str(str_path), str(pending_requests)
                )

            #str_path = path_to_string(update_result.path)
            #resource_tuple = resource_tuples.get(str_path)
+58 −21
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)

MIN_MTU = 68
EOS_TAGGED_L3_REPLACE_FIELD = '_eos_tagged_l3_replace'


def _normalize_interface_type(if_type: str, sif_index: int) -> str:
@@ -34,13 +35,59 @@ class InterfaceHandler(_Handler):
    def get_resource_key(self) -> str: return '/interface/subinterface'
    def get_path(self) -> str: return '/openconfig-interfaces:interfaces'

    @staticmethod
    def _create_subinterface(
        yang_sifs: libyang.DContainer, sif_index: int, enabled: bool = None,
        vlan_id: int = None, address_ip: str = None, address_prefix: int = None
    ) -> libyang.DContainer:
        yang_sif_path = 'subinterface[index="{:d}"]'.format(sif_index)
        yang_sif: libyang.DContainer = yang_sifs.create_path(yang_sif_path)
        yang_sif.create_path('config/index', sif_index)
        if enabled is not None:
            yang_sif.create_path('config/enabled', enabled)

        if vlan_id is not None:
            yang_subif_vlan : libyang.DContainer = yang_sif.create_path('openconfig-vlan:vlan')
            yang_subif_vlan.create_path('match/single-tagged/config/vlan-id', vlan_id)

        yang_ipv4 : libyang.DContainer = yang_sif.create_path('openconfig-if-ip:ipv4')
        if enabled is not None:
            yang_ipv4.create_path('config/enabled', enabled)

        if address_ip is not None and address_prefix is not None:
            yang_ipv4_addrs : libyang.DContainer = yang_ipv4.create_path('addresses')
            yang_ipv4_addr_path = 'address[ip="{:s}"]'.format(address_ip)
            yang_ipv4_addr : libyang.DContainer = yang_ipv4_addrs.create_path(yang_ipv4_addr_path)
            yang_ipv4_addr.create_path('config/ip',            address_ip)
            yang_ipv4_addr.create_path('config/prefix-length', address_prefix)

        return yang_sif

    def compose(
        self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
    ) -> Tuple[str, str]:
        if_name   = get_str(resource_value, 'name'       )  # ethernet-1/1
        sif_index = get_int(resource_value, 'index', None)  # 0
        vlan_id   = get_int(resource_value, 'vlan_id', None)
        eos_tagged_l3_replace = get_bool(resource_value, EOS_TAGGED_L3_REPLACE_FIELD, False)

        if delete:
            if eos_tagged_l3_replace:
                root_node : libyang.DContainer = yang_handler.get_data_path(
                    '/openconfig-interfaces:interfaces'
                )
                str_path = '/interfaces/interface[name={:s}]'.format(if_name)
                yang_if = root_node.find_path('/'.join([
                    '',
                    'openconfig-interfaces:interfaces',
                    'interface[name="{:s}"]'.format(if_name),
                ]))
                if yang_if is not None:
                    yang_if.unlink()
                    yang_if.free()
                str_data = json.dumps({})
                return str_path, str_data

            if sif_index is None:
                return None, None

@@ -87,7 +134,6 @@ class InterfaceHandler(_Handler):

        enabled        = get_bool(resource_value, 'enabled'       ) # True/False
        if_type        = get_str (resource_value, 'type'          ) # 'l3ipvlan'
        vlan_id        = get_int (resource_value, 'vlan_id',      ) # 127
        address_ip     = get_str (resource_value, 'address_ip'    ) # 172.16.0.1
        address_prefix = get_int (resource_value, 'address_prefix') # 24
        mtu            = get_int (resource_value, 'mtu'           ) # 1500
@@ -113,26 +159,17 @@ class InterfaceHandler(_Handler):
            return str_path, str_data

        yang_sifs : libyang.DContainer = yang_if.create_path('subinterfaces')
        yang_sif_path = 'subinterface[index="{:d}"]'.format(sif_index)
        yang_sif : libyang.DContainer = yang_sifs.create_path(yang_sif_path)
        yang_sif.create_path('config/index', sif_index)
        if enabled is not None:
            yang_sif.create_path('config/enabled', enabled)

        if vlan_id is not None:
            yang_subif_vlan : libyang.DContainer = yang_sif.create_path('openconfig-vlan:vlan')
            yang_subif_vlan.create_path('match/single-tagged/config/vlan-id', vlan_id)

        yang_ipv4 : libyang.DContainer = yang_sif.create_path('openconfig-if-ip:ipv4')
        if enabled is not None:
            yang_ipv4.create_path('config/enabled', enabled)

        if address_ip is not None and address_prefix is not None:
            yang_ipv4_addrs : libyang.DContainer = yang_ipv4.create_path('addresses')
            yang_ipv4_addr_path = 'address[ip="{:s}"]'.format(address_ip)
            yang_ipv4_addr : libyang.DContainer = yang_ipv4_addrs.create_path(yang_ipv4_addr_path)
            yang_ipv4_addr.create_path('config/ip',            address_ip)
            yang_ipv4_addr.create_path('config/prefix-length', address_prefix)
        if eos_tagged_l3_replace and sif_index == 0 and vlan_id is not None:
            self._create_subinterface(yang_sifs, 0, enabled=enabled)
            self._create_subinterface(
                yang_sifs, vlan_id, enabled=enabled, vlan_id=vlan_id,
                address_ip=address_ip, address_prefix=address_prefix
            )
        else:
            self._create_subinterface(
                yang_sifs, sif_index, enabled=enabled, vlan_id=vlan_id,
                address_ip=address_ip, address_prefix=address_prefix
            )

        str_path = '/interfaces/interface[name={:s}]'.format(if_name)
        str_data = yang_if.print_mem('json')
+45 −7
Original line number Diff line number Diff line
@@ -12,14 +12,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, Set, Union
from typing import Any, Dict, List, Optional, Set
from common.tools.grpc.Tools import grpc_message_to_json
from ..gnmi.gnmi_pb2 import CapabilityRequest   # pylint: disable=no-name-in-module
from ..gnmi.gnmi_pb2_grpc import gNMIStub

def _normalize_string(value : Any) -> Optional[str]:
    if not isinstance(value, str): return None
    value = value.strip()
    if len(value) == 0: return None
    return value

def _infer_target_facts(supported_models : List[Dict[str, Any]]) -> Dict[str, str]:
    facts : Dict[str, str] = dict()

    names = list()
    organizations = list()
    for supported_model in supported_models:
        name = _normalize_string(supported_model.get('name'))
        if name is not None: names.append(name)

        organization = _normalize_string(supported_model.get('organization'))
        if organization is not None: organizations.append(organization)

    signatures = [*names, *organizations]
    signature_blob = ' '.join(signatures).lower()

    if 'arista' in signature_blob:
        facts['vendor'] = 'Arista'
        if any(
            ('eos' in name.lower()) or name.lower().startswith('arista-')
            for name in names
        ):
            facts['platform'] = 'EOS'

    return facts

def check_capabilities(
    stub : gNMIStub, username : str, password : str, timeout : Optional[int] = None
) -> Set[Union[str, int]]:
) -> Dict[str, Any]:
    metadata = [('username', username), ('password', password)]
    req = CapabilityRequest()
    reply = stub.Capabilities(req, metadata=metadata, timeout=timeout)
@@ -30,11 +61,11 @@ def check_capabilities(
    if gnmi_version is None or gnmi_version != '0.7.0':
        raise Exception('Unsupported gNMI version: {:s}'.format(str(gnmi_version)))

    #supported_models = {
    #    supported_model['name']: supported_model['version']
    #    for supported_model in data.get('supported_models', [])
    #}
    # TODO: check supported models and versions
    supported_models = [
        supported_model
        for supported_model in data.get('supported_models', [])
        if isinstance(supported_model, dict)
    ]

    supported_encodings = {
        supported_encoding
@@ -47,3 +78,10 @@ def check_capabilities(
    if 'JSON_IETF' not in supported_encodings:
        # pylint: disable=broad-exception-raised
        raise Exception('JSON_IETF encoding not supported')

    return {
        'gnmi_version': gnmi_version,
        'supported_models': supported_models,
        'supported_encodings': sorted(supported_encodings),
        'target_facts': _infer_target_facts(supported_models),
    }
+65 −0
Original line number Diff line number Diff line
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from device.service.drivers.gnmi_openconfig.GnmiOpenConfigDriver import GnmiOpenConfigDriver
from device.service.drivers.gnmi_openconfig.GnmiSessionHandler import INITIAL_TARGET_INFO_RESOURCE_KEY
from device.service.drivers.gnmi_openconfig.gnmi.gnmi_pb2 import CapabilityResponse, Encoding, ModelData
from device.service.drivers.gnmi_openconfig.tools.Capabilities import check_capabilities


class _MockGnmiStub:
    def __init__(self, reply):
        self._reply = reply

    def Capabilities(self, req, metadata=None, timeout=None): # pylint: disable=unused-argument
        return self._reply


def test_check_capabilities_extracts_arista_target_facts() -> None:
    reply = CapabilityResponse(
        gNMI_version='0.7.0',
        supported_models=[
            ModelData(name='openconfig-system', organization='OpenConfig working group', version='2.0.0'),
            ModelData(name='arista-exp-eos', organization='Arista Networks <http://arista.com/>', version=''),
        ],
        supported_encodings=[Encoding.JSON_IETF, Encoding.JSON],
    )

    capability_info = check_capabilities(_MockGnmiStub(reply), 'admin', 'admin', timeout=120)

    assert capability_info['gnmi_version'] == '0.7.0'
    assert capability_info['target_facts']['vendor'] == 'Arista'
    assert capability_info['target_facts']['platform'] == 'EOS'


def test_driver_get_initial_config_returns_target_facts() -> None:
    driver = GnmiOpenConfigDriver('127.0.0.1', 6030, username='admin', password='admin')
    driver._GnmiOpenConfigDriver__handler._target_facts = { # pylint: disable=protected-access
        'vendor': 'Arista',
        'platform': 'EOS',
        'model': 'cEOSLab',
        'software_version': '4.32.2F',
    }

    initial_config = driver.GetInitialConfig()

    assert initial_config == [(
        INITIAL_TARGET_INFO_RESOURCE_KEY,
        {
            'vendor': 'Arista',
            'platform': 'EOS',
            'model': 'cEOSLab',
            'software_version': '4.32.2F',
        }
    )]
Loading