From d8243efb63f1214ab8dd2af3c8806b1bf0b583d0 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Sat, 17 Jun 2023 08:02:37 +0000 Subject: [PATCH] Device component - gNMI Driver: - First functional version of gNMI driver able to configure - Telemetry is work in progress --- src/device/service/drivers/__init__.py | 11 + .../gnmi_openconfig/GnmiOpenConfigDriver.py | 100 ++++++ .../gnmi_openconfig/GnmiSessionHandler.py | 332 ++++++++++++++++++ .../gnmi_openconfig/MonitoringThread.py | 150 ++++++++ .../drivers/gnmi_openconfig/SamplesCache.py | 101 ++++++ .../{protocols/gnmi => }/__init__.py | 1 + .../{protocols => }/gnmi/Acknowledgement.txt | 0 .../drivers/gnmi_openconfig/gnmi/__init__.py | 14 + .../{protocols => }/gnmi/gnmi.proto | 0 .../{protocols => }/gnmi/gnmi_ext.proto | 0 .../{protocols => }/gnmi/gnmi_pb2.py | 0 .../{protocols => }/gnmi/gnmi_pb2.py.old | 0 .../{protocols => }/gnmi/gnmi_pb2.pyi | 0 .../{protocols => }/gnmi/gnmi_pb2_grpc.py | 0 .../gnmi_openconfig/handlers/Component.py | 63 ++++ .../gnmi_openconfig/handlers/Interface.py | 248 +++++++++++++ .../handlers/InterfaceCounter.py | 80 +++++ .../handlers/NetworkInstance.py | 62 ++++ .../handlers/NetworkInstanceInterface.py | 46 +++ .../handlers/NetworkInstanceStaticRoute.py | 61 ++++ .../drivers/gnmi_openconfig/handlers/Tools.py | 30 ++ .../gnmi_openconfig/handlers/_Handler.py | 32 ++ .../gnmi_openconfig/handlers/__init__.py | 103 ++++++ .../handlers/old_bgp_handler.txt | 138 ++++++++ .../gnmi_openconfig/tools/Capabilities.py | 36 ++ .../drivers/gnmi_openconfig/tools/Channel.py | 34 ++ .../drivers/gnmi_openconfig/tools/Path.py | 98 ++++++ .../gnmi_openconfig/tools/Subscriptions.py | 47 +++ .../drivers/gnmi_openconfig/tools/Value.py | 52 +++ .../drivers/gnmi_openconfig/tools/__init__.py | 14 + 30 files changed, 1853 insertions(+) create mode 100644 src/device/service/drivers/gnmi_openconfig/GnmiOpenConfigDriver.py create mode 100644 src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py create mode 100644 src/device/service/drivers/gnmi_openconfig/MonitoringThread.py create mode 100644 src/device/service/drivers/gnmi_openconfig/SamplesCache.py rename src/device/service/drivers/gnmi_openconfig/{protocols/gnmi => }/__init__.py (99%) rename src/device/service/drivers/gnmi_openconfig/{protocols => }/gnmi/Acknowledgement.txt (100%) create mode 100644 src/device/service/drivers/gnmi_openconfig/gnmi/__init__.py rename src/device/service/drivers/gnmi_openconfig/{protocols => }/gnmi/gnmi.proto (100%) rename src/device/service/drivers/gnmi_openconfig/{protocols => }/gnmi/gnmi_ext.proto (100%) rename src/device/service/drivers/gnmi_openconfig/{protocols => }/gnmi/gnmi_pb2.py (100%) rename src/device/service/drivers/gnmi_openconfig/{protocols => }/gnmi/gnmi_pb2.py.old (100%) rename src/device/service/drivers/gnmi_openconfig/{protocols => }/gnmi/gnmi_pb2.pyi (100%) rename src/device/service/drivers/gnmi_openconfig/{protocols => }/gnmi/gnmi_pb2_grpc.py (100%) create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/Component.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/Interface.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/InterfaceCounter.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstance.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstanceInterface.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstanceStaticRoute.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/Tools.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/_Handler.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/__init__.py create mode 100644 src/device/service/drivers/gnmi_openconfig/handlers/old_bgp_handler.txt create mode 100644 src/device/service/drivers/gnmi_openconfig/tools/Capabilities.py create mode 100644 src/device/service/drivers/gnmi_openconfig/tools/Channel.py create mode 100644 src/device/service/drivers/gnmi_openconfig/tools/Path.py create mode 100644 src/device/service/drivers/gnmi_openconfig/tools/Subscriptions.py create mode 100644 src/device/service/drivers/gnmi_openconfig/tools/Value.py create mode 100644 src/device/service/drivers/gnmi_openconfig/tools/__init__.py diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index b3b485a47..6a9726315 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -70,6 +70,7 @@ DRIVERS.append( # DeviceDriverEnum.DEVICEDRIVER_P4, # DeviceDriverEnum.DEVICEDRIVER_IETF_NETWORK_TOPOLOGY, # DeviceDriverEnum.DEVICEDRIVER_ONF_TR_352, + # DeviceDriverEnum.DEVICEDRIVER_GNMI_OPENCONFIG, # ], #} ])) @@ -94,6 +95,16 @@ if LOAD_ALL_DEVICE_DRIVERS: } ])) + from .gnmi_openconfig.GnmiOpenConfigDriver import GnmiOpenConfigDriver # pylint: disable=wrong-import-position + DRIVERS.append( + (GnmiOpenConfigDriver, [ + { + # Real Packet Router, specifying gNMI OpenConfig Driver => use GnmiOpenConfigDriver + FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER, + FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_GNMI_OPENCONFIG, + } + ])) + if LOAD_ALL_DEVICE_DRIVERS: from .transport_api.TransportApiDriver import TransportApiDriver # pylint: disable=wrong-import-position DRIVERS.append( diff --git a/src/device/service/drivers/gnmi_openconfig/GnmiOpenConfigDriver.py b/src/device/service/drivers/gnmi_openconfig/GnmiOpenConfigDriver.py new file mode 100644 index 000000000..882c0de07 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/GnmiOpenConfigDriver.py @@ -0,0 +1,100 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, queue, threading +from typing import Any, Iterator, List, Optional, Tuple, Union +from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method +from common.type_checkers.Checkers import chk_type +from device.service.driver_api._Driver import _Driver +from .GnmiSessionHandler import GnmiSessionHandler + +DRIVER_NAME = 'gnmi_openconfig' +METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) + +class GnmiOpenConfigDriver(_Driver): + def __init__(self, address : str, port : int, **settings) -> None: + super().__init__(DRIVER_NAME, address, port, **settings) + self.__logger = logging.getLogger('{:s}:[{:s}:{:s}]'.format(str(__name__), str(self.address), str(self.port))) + self.__lock = threading.Lock() + self.__started = threading.Event() + self.__terminate = threading.Event() + self.__handler = GnmiSessionHandler(self.address, self.port, settings, self.__logger) + self.__out_samples = self.__handler.out_samples + + def Connect(self) -> bool: + with self.__lock: + if self.__started.is_set(): return True + self.__handler.connect() + self.__started.set() + return True + + def Disconnect(self) -> bool: + with self.__lock: + # Trigger termination of loops and processes + self.__terminate.set() + # If not started, assume it is already disconnected + if not self.__started.is_set(): return True + self.__handler.disconnect() + return True + + @metered_subclass_method(METRICS_POOL) + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + with self.__lock: + return [] + + @metered_subclass_method(METRICS_POOL) + def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: + chk_type('resources', resource_keys, list) + with self.__lock: + return self.__handler.get(resource_keys) + + @metered_subclass_method(METRICS_POOL) + def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + chk_type('resources', resources, list) + if len(resources) == 0: return [] + with self.__lock: + return self.__handler.set(resources) + + @metered_subclass_method(METRICS_POOL) + def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + chk_type('resources', resources, list) + if len(resources) == 0: return [] + with self.__lock: + return self.__handler.delete(resources) + + @metered_subclass_method(METRICS_POOL) + def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + chk_type('subscriptions', subscriptions, list) + if len(subscriptions) == 0: return [] + with self.__lock: + return self.__handler.subscribe(subscriptions) + + @metered_subclass_method(METRICS_POOL) + def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + chk_type('subscriptions', subscriptions, list) + if len(subscriptions) == 0: return [] + with self.__lock: + return self.__handler.unsubscribe(subscriptions) + + def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break + try: + sample = self.__out_samples.get(block=blocking, timeout=0.1) + except queue.Empty: + if blocking: continue + return + if sample is None: continue + yield sample diff --git a/src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py b/src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py new file mode 100644 index 000000000..04dae4f5f --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/GnmiSessionHandler.py @@ -0,0 +1,332 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy, grpc, json, logging, queue, threading +from typing import Any, Dict, List, Optional, Tuple, Union +from common.tools.grpc.Tools import grpc_message_to_json_string +from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type +from .gnmi.gnmi_pb2_grpc import gNMIStub +from .gnmi.gnmi_pb2 import Encoding, GetRequest, SetRequest, UpdateResult # pylint: disable=no-name-in-module +from .handlers import ALL_RESOURCE_KEYS, compose, get_path, parse +from .tools.Capabilities import get_supported_encodings +from .tools.Channel import get_grpc_channel +from .tools.Path import path_from_string, path_to_string #, compose_path +from .tools.Subscriptions import Subscriptions +from .tools.Value import decode_value #, value_exists +from .MonitoringThread import MonitoringThread + +class GnmiSessionHandler: + def __init__(self, address : str, port : int, settings : Dict, logger : logging.Logger) -> None: + self._address = address + self._port = port + self._settings = copy.deepcopy(settings) + self._logger = logger + self._lock = threading.Lock() + self._connected = threading.Event() + self._username = settings.get('username') + self._password = settings.get('password') + self._use_tls = settings.get('use_tls', False) + self._channel : Optional[grpc.Channel] = None + self._stub : Optional[gNMIStub] = None + self._monit_thread = None + self._supported_encodings = None + self._subscriptions = Subscriptions() + self._in_subscriptions = queue.Queue() + self._out_samples = queue.Queue() + + @property + def subscriptions(self): return self._subscriptions + + @property + def in_subscriptions(self): return self._in_subscriptions + + @property + def out_samples(self): return self._out_samples + + def connect(self): + with self._lock: + self._channel = get_grpc_channel(self._address, self._port, self._use_tls, self._logger) + self._stub = gNMIStub(self._channel) + self._supported_encodings = get_supported_encodings( + self._stub, self._username, self._password, timeout=120) + self._monit_thread = MonitoringThread( + self._stub, self._logger, self._settings, self._in_subscriptions, self._out_samples) + self._monit_thread.start() + self._connected.set() + + def disconnect(self): + if not self._connected.is_set(): return + with self._lock: + self._monit_thread.stop() + self._monit_thread.join() + self._channel.close() + self._connected.clear() + + def get(self, resource_keys : List[str]) -> List[Tuple[str, Union[Any, None, Exception]]]: + if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS + chk_type('resources', resource_keys, list) + + parsing_results = [] + + get_request = GetRequest() + get_request.type = GetRequest.DataType.ALL + get_request.encoding = Encoding.JSON_IETF + #get_request.use_models.add() # kept empty: return for all models supported + for i,resource_key in enumerate(resource_keys): + str_resource_name = 'resource_key[#{:d}]'.format(i) + try: + chk_string(str_resource_name, resource_key, allow_empty=False) + self._logger.debug('[GnmiSessionHandler:get] resource_key = {:s}'.format(str(resource_key))) + str_path = get_path(resource_key) + self._logger.debug('[GnmiSessionHandler:get] str_path = {:s}'.format(str(str_path))) + get_request.path.append(path_from_string(str_path)) + except Exception as e: # pylint: disable=broad-except + MSG = 'Exception parsing {:s}: {:s}' + self._logger.exception(MSG.format(str_resource_name, str(resource_key))) + parsing_results.append((resource_key, e)) # if validation fails, store the exception + + if len(parsing_results) > 0: + return parsing_results + + metadata = [('username', self._username), ('password', self._password)] + timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration) + get_reply = self._stub.Get(get_request, metadata=metadata, timeout=timeout) + #self._logger.info('get_reply={:s}'.format(grpc_message_to_json_string(get_reply))) + + results = [] + #results[str_filter] = [i, None, False] # (index, value, processed?) + + for notification in get_reply.notification: + #for delete_path in notification.delete: + # self._logger.info('delete_path={:s}'.format(grpc_message_to_json_string(delete_path))) + # str_path = path_to_string(delete_path) + # resource_key_tuple = results.get(str_path) + # if resource_key_tuple is None: + # # pylint: disable=broad-exception-raised + # MSG = 'Unexpected Delete Path({:s}); requested resource_keys({:s})' + # raise Exception(MSG.format(str(str_path), str(resource_keys))) + # resource_key_tuple[2] = True + + for update in notification.update: + #self._logger.info('update={:s}'.format(grpc_message_to_json_string(update))) + str_path = path_to_string(update.path) + #resource_key_tuple = results.get(str_path) + #if resource_key_tuple is None: + # # pylint: disable=broad-exception-raised + # MSG = 'Unexpected Update Path({:s}); requested resource_keys({:s})' + # raise Exception(MSG.format(str(str_path), str(resource_keys))) + try: + value = decode_value(update.val) + #resource_key_tuple[1] = value + #resource_key_tuple[2] = True + results.extend(parse(str_path, value)) + except Exception as e: # pylint: disable=broad-except + MSG = 'Exception processing notification {:s}' + self._logger.exception(MSG.format(grpc_message_to_json_string(notification))) + results.append((str_path, e)) # if validation fails, store the exception + + #_results = sorted(results.items(), key=lambda x: x[1][0]) + #results = list() + #for resource_key,resource_key_tuple in _results: + # _, value, processed = resource_key_tuple + # value = value if processed else Exception('Not Processed') + # results.append((resource_key, value)) + return results + + def set(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + #resource_keys = [key for key,_ in resources] + #current_values = self.get(resource_keys) + + #resource_tuples = { + # resource_key : [i, value, value_exists(value), None] + # for i,(resource_key,value) in enumerate(current_values) + #} + + #self._logger.info('---0') + #self._logger.info(str(resource_tuples)) + + set_request = SetRequest() + #for resource_key in resource_keys: + for resource_key, resource_value in resources: + self._logger.info('---1') + self._logger.info(str(resource_key)) + self._logger.info(str(resource_value)) + #resource_tuple = resource_tuples.get(resource_key) + #if resource_tuple is None: continue + #_, value, exists, operation_done = resource_tuple + if isinstance(resource_value, str): resource_value = json.loads(resource_value) + str_path, str_data = compose(resource_key, resource_value, delete=False) + self._logger.info('---3') + self._logger.info(str(str_path)) + self._logger.info(str(str_data)) + set_request_list = set_request.update #if exists else set_request.replace + set_request_entry = set_request_list.add() + set_request_entry.path.CopyFrom(path_from_string(str_path)) + set_request_entry.val.json_val = str_data.encode('UTF-8') + + self._logger.info('set_request={:s}'.format(grpc_message_to_json_string(set_request))) + metadata = [('username', self._username), ('password', self._password)] + timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration) + set_reply = self._stub.Set(set_request, metadata=metadata, timeout=timeout) + self._logger.info('set_reply={:s}'.format(grpc_message_to_json_string(set_reply))) + + results = [] + for (resource_key, resource_value), update_result in zip(resources, set_reply.response): + operation = update_result.op + if operation == UpdateResult.UPDATE: + results.append((resource_key, True)) + else: + results.append((resource_key, Exception('Unexpected'))) + + #str_path = path_to_string(update_result.path) + #resource_tuple = resource_tuples.get(str_path) + #if resource_tuple is None: continue + #resource_tuple[3] = operation + + #resource_tuples = sorted(resource_tuples.items(), key=lambda x: x[1][0]) + #results = list() + #for resource_key,resource_tuple in resource_tuples: + # _, _, exists, operation_done = resource_tuple + # desired_operation = 'update' if exists else 'replace' + # + # if operation_done == UpdateResult.INVALID: + # value = Exception('Invalid') + # elif operation_done == UpdateResult.DELETE: + # value = Exception('Unexpected Delete') + # elif operation_done == UpdateResult.REPLACE: + # value = True if desired_operation == 'replace' else Exception('Failed') + # elif operation_done == UpdateResult.UPDATE: + # value = True if desired_operation == 'update' else Exception('Failed') + # else: + # value = Exception('Unexpected') + # results.append((resource_key, value)) + return results + + def delete(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + #resource_keys = [key for key,_ in resources] + #current_values = self.get(resource_keys) + + #resource_tuples = { + # resource_key : [i, value, value_exists(value), None] + # for i,(resource_key,value) in enumerate(current_values) + #} + + #self._logger.info('---0') + #self._logger.info(str(resource_tuples)) + + set_request = SetRequest() + #for resource_key in resource_keys: + for resource_key, resource_value in resources: + self._logger.info('---1') + self._logger.info(str(resource_key)) + self._logger.info(str(resource_value)) + #resource_tuple = resource_tuples.get(resource_key) + #if resource_tuple is None: continue + #_, value, exists, operation_done = resource_tuple + #if not exists: continue + if isinstance(resource_value, str): resource_value = json.loads(resource_value) + str_path, str_data = compose(resource_key, resource_value, delete=True) + self._logger.info('---3') + self._logger.info(str(str_path)) + self._logger.info(str(str_data)) + set_request_entry = set_request.delete.add() + set_request_entry.CopyFrom(path_from_string(str_path)) + + self._logger.info('set_request={:s}'.format(grpc_message_to_json_string(set_request))) + metadata = [('username', self._username), ('password', self._password)] + timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration) + set_reply = self._stub.Set(set_request, metadata=metadata, timeout=timeout) + self._logger.info('set_reply={:s}'.format(grpc_message_to_json_string(set_reply))) + + results = [] + for (resource_key, resource_value), update_result in zip(resources, set_reply.response): + operation = update_result.op + if operation == UpdateResult.DELETE: + results.append((resource_key, True)) + else: + results.append((resource_key, Exception('Unexpected'))) + + #str_path = path_to_string(update_result.path) + #resource_tuple = resource_tuples.get(str_path) + #if resource_tuple is None: continue + #resource_tuple[3] = operation + + #resource_tuples = sorted(resource_tuples.items(), key=lambda x: x[1][0]) + #results = list() + #for resource_key,resource_tuple in resource_tuples: + # _, _, exists, operation_done = resource_tuple + # if operation_done == UpdateResult.INVALID: + # value = Exception('Invalid') + # elif operation_done == UpdateResult.DELETE: + # value = True + # elif operation_done == UpdateResult.REPLACE: + # value = Exception('Unexpected Replace') + # elif operation_done == UpdateResult.UPDATE: + # value = Exception('Unexpected Update') + # else: + # value = Exception('Unexpected') + # results.append((resource_key, value)) + return results + + def subscribe(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + results = [] + for i,subscription in enumerate(subscriptions): + str_subscription_name = 'subscriptions[#{:d}]'.format(i) + try: + chk_type(str_subscription_name, subscription, (list, tuple)) + chk_length(str_subscription_name, subscription, min_length=3, max_length=3) + resource_key, sampling_duration, sampling_interval = subscription + chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) + chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) + chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) + except Exception as e: # pylint: disable=broad-except + MSG = 'Exception validating {:s}: {:s}' + self._logger.exception(MSG.format(str_subscription_name, str(resource_key))) + results.append(e) # if validation fails, store the exception + continue + + #resource_path = resource_key.split('/') + #self._subscriptions.add(resource_path, sampling_duration, sampling_interval, reference) + subscription = 'subscribe', resource_key, sampling_duration, sampling_interval + self._in_subscriptions.put_nowait(subscription) + results.append(True) + return results + + def unsubscribe(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + results = [] + for i,subscription in enumerate(subscriptions): + str_subscription_name = 'subscriptions[#{:d}]'.format(i) + try: + chk_type(str_subscription_name, subscription, (list, tuple)) + chk_length(str_subscription_name, subscription, min_length=3, max_length=3) + resource_key, sampling_duration, sampling_interval = subscription + chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) + chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) + chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) + except Exception as e: # pylint: disable=broad-except + MSG = 'Exception validating {:s}: {:s}' + self._logger.exception(MSG.format(str_subscription_name, str(resource_key))) + results.append(e) # if validation fails, store the exception + continue + + #resource_path = resource_key.split('/') + #reference = self._subscriptions.get(resource_path, sampling_duration, sampling_interval) + #if reference is None: + # results.append(False) + # continue + #self._subscriptions.delete(reference) + subscription = 'unsubscribe', resource_key, sampling_duration, sampling_interval + self._in_subscriptions.put_nowait(subscription) + results.append(True) + return results diff --git a/src/device/service/drivers/gnmi_openconfig/MonitoringThread.py b/src/device/service/drivers/gnmi_openconfig/MonitoringThread.py new file mode 100644 index 000000000..5c40b13b9 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/MonitoringThread.py @@ -0,0 +1,150 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Ref: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md +# Ref: https://github.com/openconfig/gnmi/blob/master/proto/gnmi/gnmi.proto + +from __future__ import annotations +import grpc, logging, queue, threading +from collections.abc import Iterator +from datetime import datetime +from typing import Dict +from common.tools.grpc.Tools import grpc_message_to_json_string +from .gnmi.gnmi_pb2 import ( # pylint: disable=no-name-in-module + QOSMarking, SubscribeRequest, Subscription, SubscriptionList, SubscriptionMode +) +from .gnmi.gnmi_pb2_grpc import gNMIStub +from .tools.Path import path_from_string, path_to_string + + +LOGGER = logging.getLogger(__name__) + +# SubscriptionList Mode: Mode of the subscription. +# STREAM = 0: Values streamed by the target. gNMI Specification Section 3.5.1.5.2 +# ONCE = 1: Values sent once-off by the target. gNMI Specification Section 3.5.1.5.1 +# POLL = 2: Values sent in response to a poll request. gNMI Specification Section 3.5.1.5.3 +GNMI_SUBSCRIPTION_LIST_MODE = SubscriptionList.Mode.STREAM + +# Path Prefix: Prefix used for paths. +GNMI_PATH_PREFIX = None + +# QOS MArking: DSCP marking to be used. +GNMI_QOS_MARKING = None + +# Allow Aggregation: Whether elements of the schema that are marked as eligible for aggregation +# should be aggregated or not. +GNMI_ALLOW_AGGREGATION = False + +# Encoding: The encoding that the target should use within the Notifications generated +# corresponding to the SubscriptionList. +GNMI_ENCODING = 'JSON' + +#Subscription Mode: The mode of the subscription, specifying how the target must return values +# in a subscription. gNMI Specification Section 3.5.1.3 +# TARGET_DEFINED = 0: The target selects the relevant mode for each element. +# ON_CHANGE = 1: The target sends an update on element value change. +# SAMPLE = 2: The target samples values according to the interval. +GNMI_SUBSCRIPTION_MODE = SubscriptionMode.SAMPLE + +# Suppress Redundant: Indicates whether values that have not changed should be sent in a SAMPLE +# subscription. gNMI Specification Section 3.5.1.3 +GNMI_SUPPRESS_REDUNDANT = False + +# Heartbeat Interval: Specifies the maximum allowable silent period in nanoseconds when +# suppress_redundant is in use. The target should send a value at least once in the period +# specified. gNMI Specification Section 3.5.1.3 +GNMI_HEARTBEAT_INTERVAL = 10 # seconds + +GNMI_SUBSCRIPTION_TIMEOUT = None + +class MonitoringThread(threading.Thread): + def __init__( + self, stub : gNMIStub, logger : logging.Logger, settings : Dict, + in_subscriptions : queue.Queue, out_samples : queue.Queue + ) -> None: + super().__init__(daemon=True) + self._terminate = threading.Event() + self._stub = stub + self._logger = logger + self._username = settings.get('username') + self._password = settings.get('password') + self._in_subscriptions = in_subscriptions + self._out_samples = out_samples + self._response_iterator = None + + def stop(self) -> None: + self._terminate.set() + if self._response_iterator is not None: + self._response_iterator.cancel() + + def generate_requests(self) -> Iterator[SubscribeRequest]: + subscriptions = [] + while not self._terminate.is_set(): + try: + subscription = self._in_subscriptions.get(block=True, timeout=0.1) + operation, resource_key, sampling_duration, sampling_interval = subscription # pylint: disable=unused-variable + if operation != 'subscribe': continue # Unsubscribe not supported by gNM, needs to cancel entire connection + # options.timeout = int(sampling_duration) + #_path = parse_xpath(resource_key) + path = path_from_string(resource_key) + subscription = Subscription( + path=path, mode=GNMI_SUBSCRIPTION_MODE, suppress_redundant=GNMI_SUPPRESS_REDUNDANT, + sample_interval=int(sampling_interval * 1000000000), + heartbeat_interval=int(GNMI_HEARTBEAT_INTERVAL * 1000000000)) + subscriptions.append(subscription) + except queue.Empty: + if len(subscriptions) == 0: continue + #self._logger.warning('[generate_requests] process') + prefix = path_from_string(GNMI_PATH_PREFIX) if GNMI_PATH_PREFIX is not None else None + qos = QOSMarking(marking=GNMI_QOS_MARKING) if GNMI_QOS_MARKING is not None else None + subscriptions_list = SubscriptionList( + prefix=prefix, mode=GNMI_SUBSCRIPTION_LIST_MODE, allow_aggregation=GNMI_ALLOW_AGGREGATION, + encoding=GNMI_ENCODING, subscription=subscriptions, qos=qos) + subscribe_request = SubscribeRequest(subscribe=subscriptions_list) + #str_subscribe_request = grpc_message_to_json_string(subscribe_request) + #self._logger.warning('[generate_requests] subscribe_request={:s}'.format(str_subscribe_request)) + yield subscribe_request + subscriptions = [] + except: # pylint: disable=bare-except + self._logger.exception('[generate_requests] Unhandled Exception') + + def run(self) -> None: + # Add a dummy subscription to be used as keep-alive + # usable only with SRLinux native data models + #subscription = ('/system/name/host-name', None, 1) + #self._in_subscriptions.put_nowait(subscription) + + try: + request_iterator = self.generate_requests() + metadata = [('username', self._username), ('password', self._password)] + timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration) + self._response_iterator = self._stub.Subscribe(request_iterator, metadata=metadata, timeout=timeout) + for subscribe_response in self._response_iterator: + timestamp = datetime.timestamp(datetime.utcnow()) + str_subscribe_response = grpc_message_to_json_string(subscribe_response) + self._logger.warning('[run] subscribe_response={:s}'.format(str_subscribe_response)) + for update in subscribe_response.update.update: + str_path = path_to_string(update.path) + if str_path != '/system/name/host-name': continue + #counter_name = update.path[-1].name + value_type = update.val.WhichOneof('value') + value = getattr(update.val, value_type) + sample = (timestamp, str_path, value) + self._logger.warning('[run] sample={:s}'.format(str(sample))) + self._out_samples.put_nowait(sample) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.CANCELLED: raise # pylint: disable=no-member + if e.details() != 'Locally cancelled by application!': raise # pylint: disable=no-member + except: # pylint: disable=bare-except + self._logger.exception('Unhandled Exception') diff --git a/src/device/service/drivers/gnmi_openconfig/SamplesCache.py b/src/device/service/drivers/gnmi_openconfig/SamplesCache.py new file mode 100644 index 000000000..28be2d661 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/SamplesCache.py @@ -0,0 +1,101 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Collection of samples through NetConf is very slow and each request collects all the data. +# Populate a cache periodically (when first interface is interrogated). +# Evict data after some seconds, when data is considered as outdated + +import copy, queue, logging, re, threading +from datetime import datetime +from typing import Dict, Tuple +from .templates_old import get_filter, parse +from .GnmiSessionHandler import GnmiSessionHandler + +RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*') +RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*") + +SAMPLE_EVICTION_SECONDS = 30.0 # seconds +SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters' + +def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): + if previous_sample is None: return None + if previous_timestamp is None: return None + if current_sample is None: return None + if current_timestamp is None: return None + delay = current_timestamp - previous_timestamp + field_keys = set(previous_sample.keys()).union(current_sample.keys()) + field_keys.discard('name') + delta_sample = {'name': previous_sample['name']} + for field_key in field_keys: + previous_sample_value = previous_sample[field_key] + if not isinstance(previous_sample_value, (int, float)): continue + current_sample_value = current_sample[field_key] + if not isinstance(current_sample_value, (int, float)): continue + delta_value = current_sample_value - previous_sample_value + if delta_value < 0: continue + delta_sample[field_key] = delta_value / delay + return delta_sample + +class SamplesCache: + def __init__(self, handler : GnmiSessionHandler, logger : logging.Logger) -> None: + self.__handler = handler + self.__logger = logger + self.__lock = threading.Lock() + self.__timestamp = None + self.__absolute_samples = {} + self.__delta_samples = {} + + def _refresh_samples(self) -> None: + with self.__lock: + try: + now = datetime.timestamp(datetime.utcnow()) + if self.__timestamp is not None and (now - self.__timestamp) < SAMPLE_EVICTION_SECONDS: return + str_filter = get_filter(SAMPLE_RESOURCE_KEY) + xml_data = self.__handler.get(filter=str_filter).data_ele + interface_samples = parse(SAMPLE_RESOURCE_KEY, xml_data) + for interface,samples in interface_samples: + match = RE_GET_ENDPOINT_FROM_INTERFACE_KEY.match(interface) + if match is None: continue + interface = match.group(1) + delta_sample = compute_delta_sample( + self.__absolute_samples.get(interface), self.__timestamp, samples, now) + if delta_sample is not None: self.__delta_samples[interface] = delta_sample + self.__absolute_samples[interface] = samples + self.__timestamp = now + except: # pylint: disable=bare-except + self.__logger.exception('Error collecting samples') + + def get(self, resource_key : str) -> Tuple[float, Dict]: + self._refresh_samples() + match = RE_GET_ENDPOINT_FROM_INTERFACE_XPATH.match(resource_key) + with self.__lock: + if match is None: return self.__timestamp, {} + interface = match.group(1) + return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {})) + +def do_sampling( + samples_cache : SamplesCache, logger : logging.Logger, resource_key : str, out_samples : queue.Queue +) -> None: + try: + timestamp, samples = samples_cache.get(resource_key) + counter_name = resource_key.split('/')[-1].split(':')[-1] + value = samples.get(counter_name) + if value is None: + logger.warning('[do_sampling] value not found for {:s}'.format(resource_key)) + return + # resource_key template: //oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s} + sample = (timestamp, resource_key, value) + out_samples.put_nowait(sample) + except: # pylint: disable=bare-except + logger.exception('Error retrieving samples') diff --git a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/__init__.py b/src/device/service/drivers/gnmi_openconfig/__init__.py similarity index 99% rename from src/device/service/drivers/gnmi_openconfig/protocols/gnmi/__init__.py rename to src/device/service/drivers/gnmi_openconfig/__init__.py index 38d04994f..1549d9811 100644 --- a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/__init__.py +++ b/src/device/service/drivers/gnmi_openconfig/__init__.py @@ -11,3 +11,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + diff --git a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/Acknowledgement.txt b/src/device/service/drivers/gnmi_openconfig/gnmi/Acknowledgement.txt similarity index 100% rename from src/device/service/drivers/gnmi_openconfig/protocols/gnmi/Acknowledgement.txt rename to src/device/service/drivers/gnmi_openconfig/gnmi/Acknowledgement.txt diff --git a/src/device/service/drivers/gnmi_openconfig/gnmi/__init__.py b/src/device/service/drivers/gnmi_openconfig/gnmi/__init__.py new file mode 100644 index 000000000..1549d9811 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/gnmi/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi.proto b/src/device/service/drivers/gnmi_openconfig/gnmi/gnmi.proto similarity index 100% rename from src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi.proto rename to src/device/service/drivers/gnmi_openconfig/gnmi/gnmi.proto diff --git a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_ext.proto b/src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_ext.proto similarity index 100% rename from src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_ext.proto rename to src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_ext.proto diff --git a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_pb2.py b/src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_pb2.py similarity index 100% rename from src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_pb2.py rename to src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_pb2.py diff --git a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_pb2.py.old b/src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_pb2.py.old similarity index 100% rename from src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_pb2.py.old rename to src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_pb2.py.old diff --git a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_pb2.pyi b/src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_pb2.pyi similarity index 100% rename from src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_pb2.pyi rename to src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_pb2.pyi diff --git a/src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_pb2_grpc.py b/src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_pb2_grpc.py similarity index 100% rename from src/device/service/drivers/gnmi_openconfig/protocols/gnmi/gnmi_pb2_grpc.py rename to src/device/service/drivers/gnmi_openconfig/gnmi/gnmi_pb2_grpc.py diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/Component.py b/src/device/service/drivers/gnmi_openconfig/handlers/Component.py new file mode 100644 index 000000000..0b3c1f970 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/Component.py @@ -0,0 +1,63 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Any, Dict, List, Tuple +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from ._Handler import _Handler + +LOGGER = logging.getLogger(__name__) + +PATH_IF_CTR = "/interfaces/interface[name={:s}]/state/counters/{:s}" + +class ComponentHandler(_Handler): + def get_resource_key(self) -> str: return '/endpoints/endpoint' + def get_path(self) -> str: return '/components/component' + + def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: + #LOGGER.info('json_data = {:s}'.format(json.dumps(json_data))) + json_component_list : List[Dict] = json_data.get('component', []) + response = [] + for json_component in json_component_list: + #LOGGER.info('json_component = {:s}'.format(json.dumps(json_component))) + + endpoint = {} + + component_type = json_component.get('state', {}).get('type') + if component_type is None: continue + component_type = component_type.replace('oc-platform-types:', '') + component_type = component_type.replace('openconfig-platform-types:', '') + if component_type not in {'PORT'}: continue + endpoint['type'] = '-' + + #LOGGER.info('PORT json_component = {:s}'.format(json.dumps(json_component))) + + component_name = json_component.get('name') + if component_name is None: continue + + # TODO: improve mapping between interface name and component name + # By now, computed by time for the sake of saving time for the Hackfest. + interface_name = component_name.lower().replace('-port', '') + + endpoint['uuid'] = interface_name + endpoint['sample_types'] = { + KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED : PATH_IF_CTR.format(interface_name, 'in-octets' ), + KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED : PATH_IF_CTR.format(interface_name, 'out-octets'), + KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED : PATH_IF_CTR.format(interface_name, 'in-pkts' ), + KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED: PATH_IF_CTR.format(interface_name, 'out-pkts' ), + } + + if len(endpoint) == 0: continue + response.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint)) + return response diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/Interface.py b/src/device/service/drivers/gnmi_openconfig/handlers/Interface.py new file mode 100644 index 000000000..20f79b3c2 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/Interface.py @@ -0,0 +1,248 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging +from typing import Any, Dict, List, Tuple +from ._Handler import _Handler +from .Tools import dict_get_first + +LOGGER = logging.getLogger(__name__) + +class InterfaceHandler(_Handler): + def get_resource_key(self) -> str: return '/interface' + def get_path(self) -> str: return '/interfaces/interface' + + def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: + if_name = str (resource_value['name' ]) # ethernet-1/1 + sif_index = int (resource_value.get('sub_if_index' , 0 )) # 0 + + if delete: + PATH_TMPL = '/interfaces/interface[name={:s}]/subinterfaces/subinterface[index={:d}]' + str_path = PATH_TMPL.format(if_name, sif_index) + str_data = json.dumps({}) + return str_path, str_data + + if_enabled = bool(resource_value.get('enabled' , True)) # True/False + sif_enabled = bool(resource_value.get('sub_if_enabled' , True)) # True/False + sif_ipv4_enabled = bool(resource_value.get('sub_if_ipv4_enabled', True)) # True/False + sif_ipv4_address = str (resource_value['sub_if_ipv4_address' ]) # 172.16.0.1 + sif_ipv4_prefix = int (resource_value['sub_if_ipv4_prefix' ]) # 24 + + str_path = '/interfaces/interface[name={:s}]'.format(if_name) + str_data = json.dumps({ + 'name': if_name, + 'config': {'name': if_name, 'enabled': if_enabled}, + 'subinterfaces': { + 'subinterface': { + 'index': sif_index, + 'config': {'index': sif_index, 'enabled': sif_enabled}, + 'ipv4': { + 'config': {'enabled': sif_ipv4_enabled}, + 'addresses': { + 'address': { + 'ip': sif_ipv4_address, + 'config': {'ip': sif_ipv4_address, 'prefix_length': sif_ipv4_prefix}, + } + } + } + } + } + }) + return str_path, str_data + + def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: + #LOGGER.info('json_data = {:s}'.format(json.dumps(json_data))) + json_interface_list : List[Dict] = json_data.get('interface', []) + + response = [] + for json_interface in json_interface_list: + #LOGGER.info('json_interface = {:s}'.format(json.dumps(json_interface))) + + interface = {} + + interface_name = json_interface.get('name') + if interface_name is None: + LOGGER.info('DISCARDED json_interface = {:s}'.format(json.dumps(json_interface))) + continue + interface['name'] = interface_name + + CONFIG_FIELDS = ('config', 'openconfig-interface:config', 'oci:config') + json_config : Dict = dict_get_first(json_interface, CONFIG_FIELDS, default={}) + + STATE_FIELDS = ('state', 'openconfig-interface:state', 'oci:state') + json_state : Dict = dict_get_first(json_interface, STATE_FIELDS, default={}) + + interface_type = json_config.get('type') + if interface_type is None: interface_type = json_state.get('type') + if interface_type is None: + LOGGER.info('DISCARDED json_interface = {:s}'.format(json.dumps(json_interface))) + continue + interface_type = interface_type.replace('ianaift:', '') + interface_type = interface_type.replace('iana-if-type:', '') + interface['type'] = interface_type + + interface_mtu = json_config.get('mtu') + if interface_mtu is None: interface_mtu = json_state.get('mtu') + if interface_mtu is not None: interface['mtu'] = int(interface_mtu) + + interface_enabled = json_config.get('enabled') + if interface_enabled is None: interface_enabled = json_state.get('enabled') + interface['enabled'] = False if interface_enabled is None else bool(interface_enabled) + + interface_management = json_config.get('management') + if interface_management is None: interface_management = json_state.get('management') + interface['management'] = False if interface_management is None else bool(interface_management) + + interface_descr = json_interface.get('config', {}).get('description') + if interface_descr is not None: interface['description'] = interface_descr + + json_subinterfaces = json_interface.get('subinterfaces', {}) + json_subinterface_list : List[Dict] = json_subinterfaces.get('subinterface', []) + + for json_subinterface in json_subinterface_list: + #LOGGER.info('json_subinterface = {:s}'.format(json.dumps(json_subinterface))) + + subinterface = {} + + subinterface_index = json_subinterface.get('state', {}).get('index') + if subinterface_index is None: continue + subinterface['index'] = int(subinterface_index) + + subinterface_name = json_subinterface.get('state', {}).get('name') + if subinterface_name is None: continue + subinterface['name'] = subinterface_name + + subinterface_enabled = json_subinterface.get('state', {}).get('enabled', False) + subinterface['enabled'] = bool(subinterface_enabled) + + VLAN_FIELDS = ('vlan', 'openconfig-vlan:vlan', 'ocv:vlan') + json_vlan = dict_get_first(json_subinterface, VLAN_FIELDS, default={}) + + MATCH_FIELDS = ('match', 'openconfig-vlan:match', 'ocv:match') + json_vlan = dict_get_first(json_vlan, MATCH_FIELDS, default={}) + + SIN_TAG_FIELDS = ('single-tagged', 'openconfig-vlan:single-tagged', 'ocv:single-tagged') + json_vlan = dict_get_first(json_vlan, SIN_TAG_FIELDS, default={}) + + CONFIG_FIELDS = ('config', 'openconfig-vlan:config', 'ocv:config') + json_vlan = dict_get_first(json_vlan, CONFIG_FIELDS, default={}) + + VLAN_ID_FIELDS = ('vlan-id', 'openconfig-vlan:vlan-id', 'ocv:vlan-id') + subinterface_vlan_id = dict_get_first(json_vlan, VLAN_ID_FIELDS) + if subinterface_vlan_id is not None: subinterface['vlan_id'] = subinterface_vlan_id + + + # TODO: implement support for multiple IP addresses per subinterface + + IPV4_FIELDS = ('ipv4', 'openconfig-if-ip:ipv4', 'ociip:ipv4') + json_ipv4 = dict_get_first(json_subinterface, IPV4_FIELDS, default={}) + + IPV4_ADDRESSES_FIELDS = ('addresses', 'openconfig-if-ip:addresses', 'ociip:addresses') + json_ipv4_addresses = dict_get_first(json_ipv4, IPV4_ADDRESSES_FIELDS, default={}) + + IPV4_ADDRESS_FIELDS = ('address', 'openconfig-if-ip:address', 'ociip:address') + json_ipv4_address_list : List[Dict] = dict_get_first(json_ipv4_addresses, IPV4_ADDRESS_FIELDS, default=[]) + + #ipv4_addresses = [] + for json_ipv4_address in json_ipv4_address_list: + #LOGGER.info('json_ipv4_address = {:s}'.format(json.dumps(json_ipv4_address))) + + STATE_FIELDS = ('state', 'openconfig-if-ip:state', 'ociip:state') + json_ipv4_address_state = dict_get_first(json_ipv4_address, STATE_FIELDS, default={}) + + #ipv4_address = {} + + #ORIGIN_FIELDS = ('origin', 'openconfig-if-ip:origin', 'ociip:origin') + #ipv4_address_origin = dict_get_first(json_ipv4_address_state, ORIGIN_FIELDS, default={}) + #if ipv4_address_origin is not None: ipv4_address['origin'] = ipv4_address_origin + + IP_FIELDS = ('ip', 'openconfig-if-ip:ip', 'ociip:ip') + ipv4_address_ip = dict_get_first(json_ipv4_address_state, IP_FIELDS) + #if ipv4_address_ip is not None: ipv4_address['address_ip'] = ipv4_address_ip + if ipv4_address_ip is not None: subinterface['address_ip'] = ipv4_address_ip + + PREFIX_FIELDS = ('prefix-length', 'openconfig-if-ip:prefix-length', 'ociip:prefix-length') + ipv4_address_prefix = dict_get_first(json_ipv4_address_state, PREFIX_FIELDS) + #if ipv4_address_prefix is not None: ipv4_address['address_prefix'] = int(ipv4_address_prefix) + if ipv4_address_prefix is not None: subinterface['address_prefix'] = int(ipv4_address_prefix) + + #if len(ipv4_address) == 0: continue + #ipv4_addresses.append(ipv4_address) + + #subinterface['ipv4_addresses'] = ipv4_addresses + + if len(subinterface) == 0: continue + resource_key = '/interface[{:s}]/subinterface[{:s}]'.format(interface['name'], str(subinterface['index'])) + response.append((resource_key, subinterface)) + + if len(interface) == 0: continue + response.append(('/interface[{:s}]'.format(interface['name']), interface)) + + return response + + def parse_counters(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: + LOGGER.info('[parse_counters] json_data = {:s}'.format(json.dumps(json_data))) + json_interface_list : List[Dict] = json_data.get('interface', []) + + response = [] + for json_interface in json_interface_list: + LOGGER.info('[parse_counters] json_interface = {:s}'.format(json.dumps(json_interface))) + + interface = {} + + NAME_FIELDS = ('name', 'openconfig-interface:name', 'oci:name') + interface_name = dict_get_first(json_interface, NAME_FIELDS) + if interface_name is None: continue + interface['name'] = interface_name + + STATE_FIELDS = ('state', 'openconfig-interface:state', 'oci:state') + json_state = dict_get_first(json_interface, STATE_FIELDS, default={}) + + COUNTERS_FIELDS = ('counters', 'openconfig-interface:counters', 'oci:counters') + json_counters = dict_get_first(json_state, COUNTERS_FIELDS, default={}) + + IN_PKTS_FIELDS = ('in-pkts', 'openconfig-interface:in-pkts', 'oci:in-pkts') + interface_in_pkts = dict_get_first(json_counters, IN_PKTS_FIELDS) + if interface_in_pkts is not None: interface['in-pkts'] = int(interface_in_pkts) + + IN_OCTETS_FIELDS = ('in-octets', 'openconfig-interface:in-octets', 'oci:in-octets') + interface_in_octets = dict_get_first(json_counters, IN_OCTETS_FIELDS) + if interface_in_octets is not None: interface['in-octets'] = int(interface_in_octets) + + IN_ERRORS_FIELDS = ('in-errors', 'openconfig-interface:in-errors', 'oci:in-errors') + interface_in_errors = dict_get_first(json_counters, IN_ERRORS_FIELDS) + if interface_in_errors is not None: interface['in-errors'] = int(interface_in_errors) + + OUT_OCTETS_FIELDS = ('out-octets', 'openconfig-interface:out-octets', 'oci:out-octets') + interface_out_octets = dict_get_first(json_counters, OUT_OCTETS_FIELDS) + if interface_out_octets is not None: interface['out-octets'] = int(interface_out_octets) + + OUT_PKTS_FIELDS = ('out-pkts', 'openconfig-interface:out-pkts', 'oci:out-pkts') + interface_out_pkts = dict_get_first(json_counters, OUT_PKTS_FIELDS) + if interface_out_pkts is not None: interface['out-pkts'] = int(interface_out_pkts) + + OUT_ERRORS_FIELDS = ('out-errors', 'openconfig-interface:out-errors', 'oci:out-errors') + interface_out_errors = dict_get_first(json_counters, OUT_ERRORS_FIELDS) + if interface_out_errors is not None: interface['out-errors'] = int(interface_out_errors) + + OUT_DISCARDS_FIELDS = ('out-discards', 'openconfig-interface:out-discards', 'oci:out-discards') + interface_out_discards = dict_get_first(json_counters, OUT_DISCARDS_FIELDS) + if interface_out_discards is not None: interface['out-discards'] = int(interface_out_discards) + + #LOGGER.info('[parse_counters] interface = {:s}'.format(str(interface))) + + if len(interface) == 0: continue + response.append(('/interface[{:s}]'.format(interface['name']), interface)) + + return response diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/InterfaceCounter.py b/src/device/service/drivers/gnmi_openconfig/handlers/InterfaceCounter.py new file mode 100644 index 000000000..a45dc9e7f --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/InterfaceCounter.py @@ -0,0 +1,80 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging +from typing import Any, Dict, List, Tuple +from ._Handler import _Handler +from .Tools import dict_get_first + +LOGGER = logging.getLogger(__name__) + +class InterfaceCounterHandler(_Handler): + def get_resource_key(self) -> str: return '/interface/counters' + def get_path(self) -> str: return '/interfaces/interface/state/counters' + + def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: + LOGGER.info('[parse] json_data = {:s}'.format(json.dumps(json_data))) + json_interface_list : List[Dict] = json_data.get('interface', []) + + response = [] + for json_interface in json_interface_list: + LOGGER.info('[parse] json_interface = {:s}'.format(json.dumps(json_interface))) + + interface = {} + + NAME_FIELDS = ('name', 'openconfig-interface:name', 'oci:name') + interface_name = dict_get_first(json_interface, NAME_FIELDS) + if interface_name is None: continue + interface['name'] = interface_name + + STATE_FIELDS = ('state', 'openconfig-interface:state', 'oci:state') + json_state = dict_get_first(json_interface, STATE_FIELDS, default={}) + + COUNTERS_FIELDS = ('counters', 'openconfig-interface:counters', 'oci:counters') + json_counters = dict_get_first(json_state, COUNTERS_FIELDS, default={}) + + IN_PKTS_FIELDS = ('in-pkts', 'openconfig-interface:in-pkts', 'oci:in-pkts') + interface_in_pkts = dict_get_first(json_counters, IN_PKTS_FIELDS) + if interface_in_pkts is not None: interface['in-pkts'] = int(interface_in_pkts) + + IN_OCTETS_FIELDS = ('in-octets', 'openconfig-interface:in-octets', 'oci:in-octets') + interface_in_octets = dict_get_first(json_counters, IN_OCTETS_FIELDS) + if interface_in_octets is not None: interface['in-octets'] = int(interface_in_octets) + + IN_ERRORS_FIELDS = ('in-errors', 'openconfig-interface:in-errors', 'oci:in-errors') + interface_in_errors = dict_get_first(json_counters, IN_ERRORS_FIELDS) + if interface_in_errors is not None: interface['in-errors'] = int(interface_in_errors) + + OUT_OCTETS_FIELDS = ('out-octets', 'openconfig-interface:out-octets', 'oci:out-octets') + interface_out_octets = dict_get_first(json_counters, OUT_OCTETS_FIELDS) + if interface_out_octets is not None: interface['out-octets'] = int(interface_out_octets) + + OUT_PKTS_FIELDS = ('out-pkts', 'openconfig-interface:out-pkts', 'oci:out-pkts') + interface_out_pkts = dict_get_first(json_counters, OUT_PKTS_FIELDS) + if interface_out_pkts is not None: interface['out-pkts'] = int(interface_out_pkts) + + OUT_ERRORS_FIELDS = ('out-errors', 'openconfig-interface:out-errors', 'oci:out-errors') + interface_out_errors = dict_get_first(json_counters, OUT_ERRORS_FIELDS) + if interface_out_errors is not None: interface['out-errors'] = int(interface_out_errors) + + OUT_DISCARDS_FIELDS = ('out-discards', 'openconfig-interface:out-discards', 'oci:out-discards') + interface_out_discards = dict_get_first(json_counters, OUT_DISCARDS_FIELDS) + if interface_out_discards is not None: interface['out-discards'] = int(interface_out_discards) + + #LOGGER.info('[parse] interface = {:s}'.format(str(interface))) + + if len(interface) == 0: continue + response.append(('/interface[{:s}]'.format(interface['name']), interface)) + + return response diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstance.py b/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstance.py new file mode 100644 index 000000000..aed821a06 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstance.py @@ -0,0 +1,62 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging +from typing import Any, Dict, List, Tuple +from ._Handler import _Handler + +LOGGER = logging.getLogger(__name__) + +class NetworkInstanceHandler(_Handler): + def get_resource_key(self) -> str: return '/network_instance' + def get_path(self) -> str: return '/network-instances/network-instance' + + def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: + ni_name = str(resource_value['name']) # test-svc + + if delete: + PATH_TMPL = '/network-instances/network-instance[name={:s}]' + str_path = PATH_TMPL.format(ni_name) + str_data = json.dumps({}) + return str_path, str_data + + ni_type = str(resource_value['type']) # L3VRF / L2VSI / ... + + # not works: [FailedPrecondition] unsupported identifier 'DIRECTLY_CONNECTED' + #protocols = [self._compose_directly_connected()] + + MAP_OC_NI_TYPE = { + 'L3VRF': 'openconfig-network-instance-types:L3VRF', + } + ni_type = MAP_OC_NI_TYPE.get(ni_type, ni_type) + + str_path = '/network-instances/network-instance[name={:s}]'.format(ni_name) + str_data = json.dumps({ + 'name': ni_name, + 'config': {'name': ni_name, 'type': ni_type}, + #'protocols': {'protocol': protocols}, + }) + return str_path, str_data + + def _compose_directly_connected(self, name=None, enabled=True) -> Dict: + identifier = 'DIRECTLY_CONNECTED' + if name is None: name = 'DIRECTLY_CONNECTED' + return { + 'identifier': identifier, 'name': name, + 'config': {'identifier': identifier, 'name': name, 'enabled': enabled}, + } + + def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: + response = [] + return response diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstanceInterface.py b/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstanceInterface.py new file mode 100644 index 000000000..205373fca --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstanceInterface.py @@ -0,0 +1,46 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging +from typing import Any, Dict, List, Tuple +from ._Handler import _Handler + +LOGGER = logging.getLogger(__name__) + +class NetworkInstanceInterfaceHandler(_Handler): + def get_resource_key(self) -> str: return '/network_instance/interface' + def get_path(self) -> str: return '/network-instances/network-instance/interfaces' + + def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: + ni_name = str(resource_value['name' ]) # test-svc + if_name = str(resource_value['if_name' ]) # ethernet-1/1 + sif_index = int(resource_value['sif_index']) # 0 + if_id = '{:s}.{:d}'.format(if_name, sif_index) + + if delete: + PATH_TMPL = '/network-instances/network-instance[name={:s}]/interfaces/interface[id={:s}]' + str_path = PATH_TMPL.format(ni_name, if_id) + str_data = json.dumps({}) + return str_path, str_data + + str_path = '/network-instances/network-instance[name={:s}]/interfaces/interface[id={:s}]'.format(ni_name, if_id) + str_data = json.dumps({ + 'id': if_id, + 'config': {'id': if_id, 'interface': if_name, 'subinterface': sif_index}, + }) + return str_path, str_data + + def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: + response = [] + return response diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstanceStaticRoute.py b/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstanceStaticRoute.py new file mode 100644 index 000000000..9d75e9ac6 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/NetworkInstanceStaticRoute.py @@ -0,0 +1,61 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging +from typing import Any, Dict, List, Tuple +from ._Handler import _Handler + +LOGGER = logging.getLogger(__name__) + +class NetworkInstanceStaticRouteHandler(_Handler): + def get_resource_key(self) -> str: return '/network_instance/static_route' + def get_path(self) -> str: return '/network-instances/network-instance/static_route' + + def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: + ni_name = str(resource_value['name' ]) # test-svc + prefix = str(resource_value['prefix' ]) # '172.0.1.0/24' + + identifier = 'STATIC' + name = 'static' + if delete: + PATH_TMPL = '/network-instances/network-instance[name={:s}]/protocols' + PATH_TMPL += '/protocol[identifier={:s}][name={:s}]/static-routes/static[prefix={:s}]' + str_path = PATH_TMPL.format(ni_name, identifier, name, prefix) + str_data = json.dumps({}) + return str_path, str_data + + next_hop = str(resource_value['next_hop' ]) # '172.0.0.1' + next_hop_index = int(resource_value.get('next_hop_index', 0)) # 0 + + PATH_TMPL = '/network-instances/network-instance[name={:s}]/protocols/protocol[identifier={:s}][name={:s}]' + str_path = PATH_TMPL.format(ni_name, identifier, name) + str_data = json.dumps({ + 'identifier': identifier, 'name': name, + 'config': {'identifier': identifier, 'name': name, 'enabled': True}, + 'static_routes': {'static': [{ + 'prefix': prefix, + 'config': {'prefix': prefix}, + 'next_hops': { + 'next-hop': [{ + 'index': next_hop_index, + 'config': {'index': next_hop_index, 'next_hop': next_hop} + }] + } + }]} + }) + return str_path, str_data + + def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: + response = [] + return response diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/Tools.py b/src/device/service/drivers/gnmi_openconfig/handlers/Tools.py new file mode 100644 index 000000000..30343ac28 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/Tools.py @@ -0,0 +1,30 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +from typing import Any, Dict, Iterable + +RE_REMOVE_FILTERS = re.compile(r'\[[^\]]+\]') +RE_REMOVE_NAMESPACES = re.compile(r'\/[a-zA-Z0-9\_\-]+:') + +def get_schema(resource_key : str): + resource_key = RE_REMOVE_FILTERS.sub('', resource_key) + resource_key = RE_REMOVE_NAMESPACES.sub('/', resource_key) + return resource_key + +def dict_get_first(d : Dict, field_names : Iterable[str], default=None) -> Any: + for field_name in field_names: + if field_name not in d: continue + return d[field_name] + return default diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/_Handler.py b/src/device/service/drivers/gnmi_openconfig/handlers/_Handler.py new file mode 100644 index 000000000..d20c77b11 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/_Handler.py @@ -0,0 +1,32 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, List, Tuple + +class _Handler: + def get_resource_key(self) -> str: + # Retrieve the TeraFlowSDN resource_key path schema used to point this handler + raise NotImplementedError() + + def get_path(self) -> str: + # Retrieve the OpenConfig path schema used to interrogate the device + raise NotImplementedError() + + def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]: + # Compose a Set/Delete message based on the resource_key/resource_value fields, and the delete flag + raise NotImplementedError() + + def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]: + # Parse a Reply from the device and return a list of resource_key/resource_value pairs + raise NotImplementedError() diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/__init__.py b/src/device/service/drivers/gnmi_openconfig/handlers/__init__.py new file mode 100644 index 000000000..39cd7c66a --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/__init__.py @@ -0,0 +1,103 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Dict, List, Optional, Tuple, Union +from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES +from ._Handler import _Handler +from .Component import ComponentHandler +from .Interface import InterfaceHandler +from .InterfaceCounter import InterfaceCounterHandler +from .NetworkInstance import NetworkInstanceHandler +from .NetworkInstanceInterface import NetworkInstanceInterfaceHandler +from .NetworkInstanceStaticRoute import NetworkInstanceStaticRouteHandler +from .Tools import get_schema + +LOGGER = logging.getLogger(__name__) + +comph = ComponentHandler() +ifaceh = InterfaceHandler() +ifctrh = InterfaceCounterHandler() +nih = NetworkInstanceHandler() +niifh = NetworkInstanceInterfaceHandler() +nisrh = NetworkInstanceStaticRouteHandler() + +ALL_RESOURCE_KEYS = [ + RESOURCE_ENDPOINTS, + RESOURCE_INTERFACES, + RESOURCE_NETWORK_INSTANCES, +] + +RESOURCE_KEY_MAPPER = { + RESOURCE_ENDPOINTS : comph.get_resource_key(), + RESOURCE_INTERFACES : ifaceh.get_resource_key(), + RESOURCE_NETWORK_INSTANCES : nih.get_resource_key(), +} + +PATH_MAPPER = { + '/components' : comph.get_path(), + '/interfaces' : ifaceh.get_path(), + '/network-instances' : nih.get_path(), +} + +RESOURCE_KEY_TO_HANDLER = { + comph.get_resource_key() : comph, + ifaceh.get_resource_key() : ifaceh, + ifctrh.get_resource_key() : ifctrh, + nih.get_resource_key() : nih, + niifh.get_resource_key() : niifh, + nisrh.get_resource_key() : nisrh, +} + +PATH_TO_HANDLER = { + comph.get_path() : comph, + ifaceh.get_path() : ifaceh, + ifctrh.get_path() : ifctrh, + nih.get_path() : nih, + niifh.get_path() : niifh, + nisrh.get_path() : nisrh, +} + +def get_handler( + resource_key : Optional[str] = None, path : Optional[str] = None, raise_if_not_found=True +) -> Optional[_Handler]: + if (resource_key is None) == (path is None): + MSG = 'Exactly one of resource_key({:s}) or path({:s}) must be specified' + raise Exception(MSG.format(str(resource_key), str(path))) # pylint: disable=broad-exception-raised + if resource_key is not None: + resource_key_schema = get_schema(resource_key) + resource_key_schema = RESOURCE_KEY_MAPPER.get(resource_key_schema, resource_key_schema) + handler = RESOURCE_KEY_TO_HANDLER.get(resource_key_schema) + if handler is None and raise_if_not_found: + MSG = 'Handler not found: resource_key={:s} resource_key_schema={:s}' + # pylint: disable=broad-exception-raised + raise Exception(MSG.format(str(resource_key), str(resource_key_schema))) + elif path is not None: + path_schema = get_schema(path) + path_schema = PATH_MAPPER.get(path_schema, path_schema) + handler = PATH_TO_HANDLER.get(path_schema) + if handler is None and raise_if_not_found: + MSG = 'Handler not found: resource_key={:s} resource_key_schema={:s}' + # pylint: disable=broad-exception-raised + raise Exception(MSG.format(str(resource_key), str(resource_key_schema))) + return handler + +def get_path(resource_key : str) -> str: + return get_handler(resource_key=resource_key).get_path() + +def parse(str_path : str, value : Union[Dict, List]): + return get_handler(path=str_path).parse(value) + +def compose(resource_key : str, resource_value : Union[Dict, List], delete : bool = False) -> Tuple[str, str]: + return get_handler(resource_key=resource_key).compose(resource_key, resource_value, delete=delete) diff --git a/src/device/service/drivers/gnmi_openconfig/handlers/old_bgp_handler.txt b/src/device/service/drivers/gnmi_openconfig/handlers/old_bgp_handler.txt new file mode 100644 index 000000000..595a19788 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/handlers/old_bgp_handler.txt @@ -0,0 +1,138 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# WARNING: this handler is work in progress. Use with care! + +import logging, json +from typing import Any, Dict, List, Tuple + +LOGGER = logging.getLogger(__name__) + +class NetworkInstanceHandler: + def get_resource_key(self) -> str: return '/network_instance' + def get_path(self) -> str: return '/network-instances/network-instance' + + def compose_set(self, resource_key : str, resource_value : Dict) -> Tuple[str, str]: + ni_name = str(resource_value['name']) # test-svc + ni_type = str(resource_value['type']) # L3VRF / + + if_name = str (resource_value['name' ]) # ethernet-1/1 + if_enabled = bool(resource_value.get('enabled' , True)) # True/False + sif_index = int (resource_value.get('sub_if_index' , 0 )) # 0 + sif_enabled = bool(resource_value.get('sub_if_enabled' , True)) # True/False + sif_ipv4_enabled = bool(resource_value.get('sub_if_ipv4_enabled', True)) # True/False + sif_ipv4_address = str (resource_value['sub_if_ipv4_address' ]) # 172.16.0.1 + sif_ipv4_prefix = int (resource_value['sub_if_ipv4_prefix' ]) # 24 + + str_path = '/interfaces/interface[name={:s}]'.format(if_name) + str_data = json.dumps({ + "name": if_name, + "config": {"name": if_name, "enabled": if_enabled}, + "subinterfaces": { + "subinterface": { + "index": sif_index, + "config": {"index": sif_index, "enabled": sif_enabled}, + "ipv4": { + "config": {"enabled": sif_ipv4_enabled}, + "addresses": { + "address": { + "ip": sif_ipv4_address, + "config": {"ip": sif_ipv4_address, "prefix_length": sif_ipv4_prefix}, + } + } + } + } + } + }) + return str_path, str_data + + + #oc_ni = openconfig_network_instance() + #ni = oc_ni.network_instances.network_instance.add(name=ni_name) + #ni.config.name = ni_name + + #ni_desc = resource_value.get('description') + #if ni_desc is not None: ni.config.description = ni_desc + + #if ni_type == 'L3VRF': + # ni.config.type = 'L3VRF' + # #ni_router_id = resource_value.get('router_id') + # #if ni_router_id is not None: ni.config.router_id = ni_router_id + + # proto_bgp = ni.protocols.protocol.add(identifier='BGP', name=ni_name) + # proto_bgp.config.identifier = 'BGP' + # proto_bgp.config.name = ni_name + # proto_bgp.config.enabled = True + # proto_bgp.bgp.global_.config.as_ = 65000 + # proto_bgp.bgp.global_.config.router_id = '172.0.0.1' + + # #ni.config.route_distinguisher = resource_value['route_distinguisher'] + #elif ni_type == 'L3VRF': + # pass + #else: + # raise NotImplementedError() + + #str_path = '/network-instances/network-instance[name={:s}]'.format(ni_name) + #str_data = pybindJSON.dumps(ni, mode='default') + + #str_path = '/network-instances/network-instance[name={:s}]/protocols/protocol[identifier=BGP][name=BGP]'.format(ni_name) + #str_data = json.dumps({ + # "identifier": "BGP", + # "name": "BGP", + # "config": {"identifier": "BGP", "name": "BGP", "enabled": True}, + # "bgp": {"global": {"config": {"as": 65000, "router-id": "5.5.5.5"}}} + #}) + + str_path = '/network-instances/network-instance[name=test-svc]' + str_data = json.dumps({ + "name": "test-svc", + "config": { + "name": "test-svc", + "type": "openconfig-network-instance-types:L3VRF" + }, + "protocols": { + "protocol": [ + { + "identifier": "DIRECTLY_CONNECTED", + "name": "DIRECTLY-CONNECTED", + "config": {"identifier": "DIRECTLY_CONNECTED", "name": "DIRECTLY-CONNECTED", "enabled": True}, + }, + { + "identifier": "STATIC", + "name": "static", + "config": {"identifier": "STATIC", "name": "static", "enabled": True}, + "static_routes": { + "static": [ + { + "prefix": "172.0.1.0/24", + "config": {"prefix": "172.0.1.0/24"}, + "next_hops": { + "next-hop": [{"index": 0, "config": {"index": 0, "next_hop": "172.0.0.1"}}] + } + } + ] + } + } + ] + }, + }) + + + #str_path = '/network-instances/network-instance[name={:s}]/protocols/protocol[identifier=DIRECTLY_CONNECTED][name=DIR]'.format(ni_name) + #str_data = json.dumps({ + # "identifier": "DIRECTLY_CONNECTED", + # "name": "DIR", + # "config": {"identifier": "DIRECTLY_CONNECTED", "name": "DIR", "enabled": True}, + #}) diff --git a/src/device/service/drivers/gnmi_openconfig/tools/Capabilities.py b/src/device/service/drivers/gnmi_openconfig/tools/Capabilities.py new file mode 100644 index 000000000..b90bf3db8 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/tools/Capabilities.py @@ -0,0 +1,36 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Set, Union +from common.tools.grpc.Tools import grpc_message_to_json +from ..gnmi.gnmi_pb2 import CapabilityRequest # pylint: disable=no-name-in-module +from ..gnmi.gnmi_pb2_grpc import gNMIStub + +def get_supported_encodings( + stub : gNMIStub, username : str, password : str, timeout : Optional[int] = None +) -> Set[Union[str, int]]: + metadata = [('username', username), ('password', password)] + req = CapabilityRequest() + reply = stub.Capabilities(req, metadata=metadata, timeout=timeout) + + data = grpc_message_to_json(reply) + supported_encodings = { + supported_encoding + for supported_encoding in data.get('supported_encodings', []) + if isinstance(supported_encoding, str) + } + if len(supported_encodings) == 0: + # pylint: disable=broad-exception-raised + raise Exception('No supported encodings found') + return supported_encodings diff --git a/src/device/service/drivers/gnmi_openconfig/tools/Channel.py b/src/device/service/drivers/gnmi_openconfig/tools/Channel.py new file mode 100644 index 000000000..264dd0321 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/tools/Channel.py @@ -0,0 +1,34 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging, ssl + +def get_grpc_channel(address : str, port : int, use_tls : bool, logger : logging.Logger) -> grpc.Channel: + endpoint = str(address) + ':' + str(port) + logger.info('Connecting gNMI {:s}...'.format(endpoint)) + if use_tls: + logger.debug('Getting server certificate...') + str_server_certificate = ssl.get_server_certificate((str(address), int(port))) + bytes_server_certificate = str_server_certificate.encode('UTF-8') + logger.debug('Using secure SSL channel...') + credentials = grpc.ssl_channel_credentials( + root_certificates=bytes_server_certificate, private_key=None, certificate_chain=None) + options = [ + #('grpc.ssl_target_name_override', options.altName,) + ] + channel = grpc.secure_channel(endpoint, credentials, options) + else: + logger.debug('Using insecure channel...') + channel = grpc.insecure_channel(endpoint) + return channel diff --git a/src/device/service/drivers/gnmi_openconfig/tools/Path.py b/src/device/service/drivers/gnmi_openconfig/tools/Path.py new file mode 100644 index 000000000..40ab28dc6 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/tools/Path.py @@ -0,0 +1,98 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +from typing import List +from ..gnmi.gnmi_pb2 import Path, PathElem + +RE_PATH_SPLIT = re.compile(r'/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)') +RE_PATH_KEYS = re.compile(r'\[(.*?)\]') + +def path_from_string(path='/'): + if not path: return Path(elem=[]) + + if path[0] == '/': + if path[-1] == '/': + path_list = RE_PATH_SPLIT.split(path)[1:-1] + else: + path_list = RE_PATH_SPLIT.split(path)[1:] + else: + if path[-1] == '/': + path_list = RE_PATH_SPLIT.split(path)[:-1] + else: + path_list = RE_PATH_SPLIT.split(path) + + path = [] + for elem in path_list: + elem_name = elem.split('[', 1)[0] + elem_keys = RE_PATH_KEYS.findall(elem) + dict_keys = dict(x.split('=', 1) for x in elem_keys) + path.append(PathElem(name=elem_name, key=dict_keys)) + + return Path(elem=path) + +def path_to_string(path : Path) -> str: + path_parts = list() + for elem in path.elem: + kv_list = list() + for key in elem.key: + value = elem.key[key] + kv = '{:s}={:s}'.format(key, value) + kv_list.append(kv) + + path_part_name = elem.name + if len(kv_list) == 0: + path_parts.append(path_part_name) + else: + str_kv = ', '.join(kv_list) + path_part = '{:s}[{:s}]'.format(path_part_name, str_kv) + path_parts.append(path_part) + + str_path = '/{:s}'.format('/'.join(path_parts)) + return str_path + +def parse_xpath(xpath : str) -> str: + xpath = xpath.replace('//', '/') + xpath = xpath.replace('oci:interface[', 'interface[') + xpath = xpath.replace('/oci', '/openconfig-interfaces') + xpath = re.sub(r"\[oci:name='(.*?)'\]", r"[name=\1]", xpath) + # Eliminar el contador del final + xpath = '/'.join(xpath.split('/')[:-1]) + '/' + return xpath + +def split_resource_key(path): + pattern = r'/state/counters/(.*)' + match = re.search(pattern, path) + if match is None: return None + return match.group(1) + +def dict_to_xpath(d: dict) -> str: + xpath = '/' + for item in d['elem']: + name = item.get('name') + if name == 'interface': + key = item.get('key') + interface_name = key.get('name') + xpath += f"/oci:interface[oci:name='{interface_name}']" + else: + xpath += f"/{name}" + xpath = xpath.replace('openconfig-interfaces', 'oci') + return xpath + +def compose_path(base_path : str, path_filters : List[str] = []): + new_path = '' if base_path is None else str(base_path) + for path_filter in path_filters: + if path_filter == '': continue + new_path = '{:s}[{:s}]'.format(new_path, path_filter) + return new_path diff --git a/src/device/service/drivers/gnmi_openconfig/tools/Subscriptions.py b/src/device/service/drivers/gnmi_openconfig/tools/Subscriptions.py new file mode 100644 index 000000000..18b6445ae --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/tools/Subscriptions.py @@ -0,0 +1,47 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Collection of samples through NetConf is very slow and each request collects all the data. +# Populate a cache periodically (when first interface is interrogated). +# Evict data after some seconds, when data is considered as outdated + +import anytree +from typing import Any, List +from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_subnode_value + +class Subscriptions: + def __init__(self) -> None: + self.__resolver = anytree.Resolver(pathattr='name') + self.__subscriptions = TreeNode('.') + + def add( + self, resource_path : List[str], sampling_duration : float, sampling_interval : float, value : Any + ) -> None: + subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] + set_subnode_value(self.__resolver, self.__subscriptions, subscription_path, value) + + def get( + self, resource_path : List[str], sampling_duration : float, sampling_interval : float + ) -> TreeNode: + subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] + value = get_subnode(self.__resolver, self.__subscriptions, subscription_path) + return value + + def delete( + self, reference : TreeNode + ) -> None: + parent : TreeNode = reference.parent + children = list(parent.children) + children.remove(reference) + parent.children = tuple(children) diff --git a/src/device/service/drivers/gnmi_openconfig/tools/Value.py b/src/device/service/drivers/gnmi_openconfig/tools/Value.py new file mode 100644 index 000000000..4797930a1 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/tools/Value.py @@ -0,0 +1,52 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64, json +from typing import Any +from ..gnmi.gnmi_pb2 import TypedValue + +def decode_value(value : TypedValue) -> Any: + encoding = value.WhichOneof('value') + if encoding == 'json_val': + value = value.json_val + #mdl, cls = self._classes[className] + #obj = json.loads(strObj) + #if isinstance(obj, (list,)): + # obj = map(lambda n: pybindJSON.loads(n, mdl, cls.__name__), obj) + # data = map(lambda n: json.loads(pybindJSON.dumps(n, mode='default')), obj) + #else: + # obj = pybindJSON.loads(obj, mdl, cls.__name__) + # data = json.loads(pybindJSON.dumps(obj, mode='default')) + raise NotImplementedError() + #return value + elif encoding == 'json_ietf_val': + value : str = value.json_ietf_val + try: + return json.loads(value) + except json.decoder.JSONDecodeError: + # Assume is Base64-encoded + b_b64_value = value.encode('UTF-8') + b_value = base64.b64decode(b_b64_value, validate=True) + value = b_value.decode('UTF-8') + return json.loads(value) + else: + MSG = 'Unsupported Encoding({:s}) in Value({:s})' + # pylint: disable=broad-exception-raised + raise Exception(MSG.format(str(encoding), str(value))) + +def value_exists(value) -> bool: + if value is None: return False + if isinstance(value, Exception): return False + if issubclass(type(value), Exception): return False + return True diff --git a/src/device/service/drivers/gnmi_openconfig/tools/__init__.py b/src/device/service/drivers/gnmi_openconfig/tools/__init__.py new file mode 100644 index 000000000..1549d9811 --- /dev/null +++ b/src/device/service/drivers/gnmi_openconfig/tools/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + -- GitLab