Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 950 additions and 443 deletions
......@@ -13,13 +13,15 @@
# limitations under the License.
import copy
from typing import Any, Dict, Tuple, Union
from typing import Any, Dict, Optional, Tuple, Union
class DeltaSampleCache:
def __init__(self) -> None:
self._previous_samples : Dict[str, Tuple[float, Union[int, float]]] = dict()
def get_delta(self, path : str, current_timestamp : float, current_value : Any) -> None:
def get_delta(
self, path : str, current_timestamp : float, current_value : Any
) -> Optional[Tuple[float, Optional[Any]]]:
previous_sample = copy.deepcopy(self._previous_samples.get(path))
self._previous_samples[path] = current_timestamp, current_value
......@@ -30,6 +32,10 @@ class DeltaSampleCache:
delta_value = max(0, current_value - previous_value)
delay = current_timestamp - previous_timestamp
delta_sample = current_timestamp, delta_value / delay
return delta_sample
if delay < 1.e-12:
# return a special value meaning, at that timestamp,
# computed value is not a number, e.g., division by zero
# also, recover previuos samples to do not miss any packet/byte
self._previous_samples[path] = previous_sample
return current_timestamp, None
return current_timestamp, delta_value / delay
......@@ -19,7 +19,8 @@ from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk
from .gnmi.gnmi_pb2_grpc import gNMIStub
from .gnmi.gnmi_pb2 import Encoding, GetRequest, SetRequest, UpdateResult # pylint: disable=no-name-in-module
from .handlers import ALL_RESOURCE_KEYS, compose, get_path, parse
from .tools.Capabilities import get_supported_encodings
from .handlers.YangHandler import YangHandler
from .tools.Capabilities import check_capabilities
from .tools.Channel import get_grpc_channel
from .tools.Path import path_from_string, path_to_string #, compose_path
from .tools.Subscriptions import Subscriptions
......@@ -39,12 +40,22 @@ class GnmiSessionHandler:
self._use_tls = settings.get('use_tls', False)
self._channel : Optional[grpc.Channel] = None
self._stub : Optional[gNMIStub] = None
self._yang_handler = None
self._monit_thread = None
self._supported_encodings = None
self._yang_handler = YangHandler()
self._subscriptions = Subscriptions()
self._in_subscriptions = queue.Queue()
self._out_samples = queue.Queue()
def __del__(self) -> None:
self._logger.info('Destroying YangValidator...')
if self._yang_handler is not None:
self._logger.debug('yang_validator.data:')
for path, dnode in self._yang_handler.get_data_paths().items():
self._logger.debug(' {:s}: {:s}'.format(str(path), json.dumps(dnode.print_dict())))
self._yang_handler.destroy()
self._logger.info('DONE')
@property
def subscriptions(self): return self._subscriptions
......@@ -58,8 +69,7 @@ class GnmiSessionHandler:
with self._lock:
self._channel = get_grpc_channel(self._address, self._port, self._use_tls, self._logger)
self._stub = gNMIStub(self._channel)
self._supported_encodings = get_supported_encodings(
self._stub, self._username, self._password, timeout=120)
check_capabilities(self._stub, self._username, self._password, timeout=120)
self._monit_thread = MonitoringThread(
self._stub, self._logger, self._settings, self._in_subscriptions, self._out_samples)
self._monit_thread.start()
......@@ -96,13 +106,15 @@ class GnmiSessionHandler:
self._logger.exception(MSG.format(str_resource_name, str(resource_key)))
parsing_results.append((resource_key, e)) # if validation fails, store the exception
self._logger.debug('parsing_results={:s}'.format(str(parsing_results)))
if len(parsing_results) > 0:
return parsing_results
metadata = [('username', self._username), ('password', self._password)]
timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration)
get_reply = self._stub.Get(get_request, metadata=metadata, timeout=timeout)
#self._logger.info('get_reply={:s}'.format(grpc_message_to_json_string(get_reply)))
self._logger.debug('get_reply={:s}'.format(grpc_message_to_json_string(get_reply)))
results = []
#results[str_filter] = [i, None, False] # (index, value, processed?)
......@@ -119,7 +131,7 @@ class GnmiSessionHandler:
# resource_key_tuple[2] = True
for update in notification.update:
#self._logger.info('update={:s}'.format(grpc_message_to_json_string(update)))
self._logger.debug('update={:s}'.format(grpc_message_to_json_string(update)))
str_path = path_to_string(update.path)
#resource_key_tuple = results.get(str_path)
#if resource_key_tuple is None:
......@@ -130,10 +142,10 @@ class GnmiSessionHandler:
value = decode_value(update.val)
#resource_key_tuple[1] = value
#resource_key_tuple[2] = True
results.extend(parse(str_path, value))
results.extend(parse(str_path, value, self._yang_handler))
except Exception as e: # pylint: disable=broad-except
MSG = 'Exception processing notification {:s}'
self._logger.exception(MSG.format(grpc_message_to_json_string(notification)))
MSG = 'Exception processing update {:s}'
self._logger.exception(MSG.format(grpc_message_to_json_string(update)))
results.append((str_path, e)) # if validation fails, store the exception
#_results = sorted(results.items(), key=lambda x: x[1][0])
......@@ -158,31 +170,34 @@ class GnmiSessionHandler:
set_request = SetRequest()
#for resource_key in resource_keys:
resources_requested = list()
for resource_key, resource_value in resources:
self._logger.info('---1')
self._logger.info(str(resource_key))
self._logger.info(str(resource_value))
#self._logger.info('---1')
#self._logger.info(str(resource_key))
#self._logger.info(str(resource_value))
#resource_tuple = resource_tuples.get(resource_key)
#if resource_tuple is None: continue
#_, value, exists, operation_done = resource_tuple
if isinstance(resource_value, str): resource_value = json.loads(resource_value)
str_path, str_data = compose(resource_key, resource_value, delete=False)
self._logger.info('---3')
self._logger.info(str(str_path))
self._logger.info(str(str_data))
str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=False)
if str_path is None: continue # nothing to set
#self._logger.info('---3')
#self._logger.info(str(str_path))
#self._logger.info(str(str_data))
set_request_list = set_request.update #if exists else set_request.replace
set_request_entry = set_request_list.add()
set_request_entry.path.CopyFrom(path_from_string(str_path))
set_request_entry.val.json_val = str_data.encode('UTF-8')
resources_requested.append((resource_key, resource_value))
self._logger.info('set_request={:s}'.format(grpc_message_to_json_string(set_request)))
self._logger.debug('set_request={:s}'.format(grpc_message_to_json_string(set_request)))
metadata = [('username', self._username), ('password', self._password)]
timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration)
set_reply = self._stub.Set(set_request, metadata=metadata, timeout=timeout)
self._logger.info('set_reply={:s}'.format(grpc_message_to_json_string(set_reply)))
self._logger.debug('set_reply={:s}'.format(grpc_message_to_json_string(set_reply)))
results = []
for (resource_key, resource_value), update_result in zip(resources, set_reply.response):
for (resource_key, resource_value), update_result in zip(resources_requested, set_reply.response):
operation = update_result.op
if operation == UpdateResult.UPDATE:
results.append((resource_key, True))
......@@ -227,30 +242,34 @@ class GnmiSessionHandler:
set_request = SetRequest()
#for resource_key in resource_keys:
resources_requested = list()
for resource_key, resource_value in resources:
self._logger.info('---1')
self._logger.info(str(resource_key))
self._logger.info(str(resource_value))
#self._logger.info('---1')
#self._logger.info(str(resource_key))
#self._logger.info(str(resource_value))
#resource_tuple = resource_tuples.get(resource_key)
#if resource_tuple is None: continue
#_, value, exists, operation_done = resource_tuple
#if not exists: continue
if isinstance(resource_value, str): resource_value = json.loads(resource_value)
str_path, str_data = compose(resource_key, resource_value, delete=True)
self._logger.info('---3')
self._logger.info(str(str_path))
self._logger.info(str(str_data))
# pylint: disable=unused-variable
str_path, str_data = compose(resource_key, resource_value, self._yang_handler, delete=True)
if str_path is None: continue # nothing to do with this resource_key
#self._logger.info('---3')
#self._logger.info(str(str_path))
#self._logger.info(str(str_data))
set_request_entry = set_request.delete.add()
set_request_entry.CopyFrom(path_from_string(str_path))
resources_requested.append((resource_key, resource_value))
self._logger.info('set_request={:s}'.format(grpc_message_to_json_string(set_request)))
self._logger.debug('set_request={:s}'.format(grpc_message_to_json_string(set_request)))
metadata = [('username', self._username), ('password', self._password)]
timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration)
set_reply = self._stub.Set(set_request, metadata=metadata, timeout=timeout)
self._logger.info('set_reply={:s}'.format(grpc_message_to_json_string(set_reply)))
self._logger.debug('set_reply={:s}'.format(grpc_message_to_json_string(set_reply)))
results = []
for (resource_key, resource_value), update_result in zip(resources, set_reply.response):
for (resource_key, resource_value), update_result in zip(resources_requested, set_reply.response):
operation = update_result.op
if operation == UpdateResult.DELETE:
results.append((resource_key, True))
......
......@@ -94,9 +94,14 @@ class MonitoringThread(threading.Thread):
subscriptions = []
while not self._terminate.is_set():
try:
subscription = self._in_subscriptions.get(block=True, timeout=0.1)
# Some devices do not support to process multiple
# SubscriptionList requests in a bidirectional channel.
# Increased timeout to 5 seconds assuming it should
# bring enough time to receive all the subscriptions in
# the queue and process them in bulk.
subscription = self._in_subscriptions.get(block=True, timeout=5.0)
operation, resource_key, sampling_duration, sampling_interval = subscription # pylint: disable=unused-variable
if operation != 'subscribe': continue # Unsubscribe not supported by gNM, needs to cancel entire connection
if operation != 'subscribe': continue # Unsubscribe not supported by gNMI, needs to cancel entire connection
# options.timeout = int(sampling_duration)
#_path = parse_xpath(resource_key)
path = path_from_string(resource_key)
......@@ -107,15 +112,15 @@ class MonitoringThread(threading.Thread):
subscriptions.append(subscription)
except queue.Empty:
if len(subscriptions) == 0: continue
#self._logger.warning('[generate_requests] process')
self._logger.debug('[generate_requests] process')
prefix = path_from_string(GNMI_PATH_PREFIX) if GNMI_PATH_PREFIX is not None else None
qos = QOSMarking(marking=GNMI_QOS_MARKING) if GNMI_QOS_MARKING is not None else None
subscriptions_list = SubscriptionList(
prefix=prefix, mode=GNMI_SUBSCRIPTION_LIST_MODE, allow_aggregation=GNMI_ALLOW_AGGREGATION,
encoding=GNMI_ENCODING, subscription=subscriptions, qos=qos)
subscribe_request = SubscribeRequest(subscribe=subscriptions_list)
#str_subscribe_request = grpc_message_to_json_string(subscribe_request)
#self._logger.warning('[generate_requests] subscribe_request={:s}'.format(str_subscribe_request))
str_subscribe_request = grpc_message_to_json_string(subscribe_request)
self._logger.debug('[generate_requests] subscribe_request={:s}'.format(str_subscribe_request))
yield subscribe_request
subscriptions = []
except: # pylint: disable=bare-except
......@@ -134,7 +139,7 @@ class MonitoringThread(threading.Thread):
self._response_iterator = self._stub.Subscribe(request_iterator, metadata=metadata, timeout=timeout)
for subscribe_response in self._response_iterator:
str_subscribe_response = grpc_message_to_json_string(subscribe_response)
self._logger.warning('[run] subscribe_response={:s}'.format(str_subscribe_response))
self._logger.debug('[run] subscribe_response={:s}'.format(str_subscribe_response))
update = subscribe_response.update
timestamp_device = float(update.timestamp) / 1.e9
timestamp_local = datetime.timestamp(datetime.utcnow())
......@@ -145,25 +150,37 @@ class MonitoringThread(threading.Thread):
else:
# might be clocks are not synchronized, use local timestamp
timestamp = timestamp_local
str_prefix = path_to_string(update.prefix) if len(update.prefix.elem) > 0 else ''
for update_entry in update.update:
str_path = path_to_string(update_entry.path)
if len(str_prefix) > 0:
str_path = '{:s}/{:s}'.format(str_prefix, str_path)
str_path = str_path.replace('//', '/')
if str_path.startswith('/interfaces/'):
# Add namespace, if missing
str_path_parts = str_path.split('/')
str_path_parts[1] = 'openconfig-interfaces:interfaces'
str_path = '/'.join(str_path_parts)
#if str_path != '/system/name/host-name': continue
#counter_name = update_entry.path[-1].name
value_type = update_entry.val.WhichOneof('value')
value = getattr(update_entry.val, value_type)
if re.match(r'^[0-9]+$', value) is not None:
value = int(value)
elif re.match(r'^[0-9]*\.[0-9]*$', value) is not None:
value = float(value)
else:
value = str(value)
if isinstance(value, str):
if re.match(r'^[0-9]+$', value) is not None:
value = int(value)
elif re.match(r'^[0-9]*\.[0-9]*$', value) is not None:
value = float(value)
else:
value = str(value)
delta_sample = self._delta_sample_cache.get_delta(str_path, timestamp, value)
if delta_sample is None:
sample = (timestamp, str_path, value)
else:
sample = (delta_sample[0], str_path, delta_sample[1])
self._logger.warning('[run] sample={:s}'.format(str(sample)))
self._out_samples.put_nowait(sample)
self._logger.debug('[run] sample={:s}'.format(str(sample)))
if sample[2] is not None:
# Skip not-a-number (e.g., division by zero) samples
self._out_samples.put_nowait(sample)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.CANCELLED: raise # pylint: disable=no-member
if e.details() != 'Locally cancelled by application!': raise # pylint: disable=no-member
......
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
BASE_PATH=~/tfs-ctrl/src/device/service/drivers/gnmi_openconfig
GIT_BASE_PATH=${BASE_PATH}/git/openconfig
rm -rf ${GIT_BASE_PATH}
OC_PUBLIC_PATH=${GIT_BASE_PATH}/public
mkdir -p ${OC_PUBLIC_PATH}
git clone https://github.com/openconfig/public.git ${OC_PUBLIC_PATH}
#OC_HERCULES_PATH=${GIT_BASE_PATH}/hercules
#mkdir -p ${OC_HERCULES_PATH}
#git clone https://github.com/openconfig/hercules.git ${OC_HERCULES_PATH}
......@@ -12,45 +12,52 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json, logging, re # libyang
from typing import Any, Dict, List, Tuple
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from ._Handler import _Handler
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
PATH_IF_CTR = "/interfaces/interface[name={:s}]/state/counters/{:s}"
PATH_IF_CTR = '/openconfig-interfaces:interfaces/interface[name={:s}]/state/counters/{:s}'
#pylint: disable=abstract-method
class ComponentHandler(_Handler):
def get_resource_key(self) -> str: return '/endpoints/endpoint'
def get_path(self) -> str: return '/components/component'
def get_path(self) -> str: return '/openconfig-platform:components'
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]:
#LOGGER.info('json_data = {:s}'.format(json.dumps(json_data)))
json_component_list : List[Dict] = json_data.get('component', [])
response = []
for json_component in json_component_list:
#LOGGER.info('json_component = {:s}'.format(json.dumps(json_component)))
def parse(
self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
LOGGER.debug('json_data = {:s}'.format(json.dumps(json_data)))
endpoint = {}
yang_components_path = self.get_path()
json_data_valid = yang_handler.parse_to_dict(yang_components_path, json_data, fmt='json')
component_type = json_component.get('state', {}).get('type')
if component_type is None: continue
component_type = component_type.replace('oc-platform-types:', '')
component_type = component_type.replace('openconfig-platform-types:', '')
if component_type not in {'PORT'}: continue
endpoint['type'] = '-'
entries = []
for component in json_data_valid['components']['component']:
LOGGER.debug('component={:s}'.format(str(component)))
component_name = component['name']
#component_config = component.get('config', {})
#LOGGER.info('PORT json_component = {:s}'.format(json.dumps(json_component)))
#yang_components : libyang.DContainer = yang_handler.get_data_path(yang_components_path)
#yang_component_path = 'component[name="{:s}"]'.format(component_name)
#yang_component : libyang.DContainer = yang_components.create_path(yang_component_path)
#yang_component.merge_data_dict(component, strict=True, validate=False)
component_name = json_component.get('name')
if component_name is None: continue
component_state = component.get('state', {})
component_type = component_state.get('type')
if component_type is None: continue
component_type = component_type.split(':')[-1]
if component_type not in {'PORT'}: continue
# TODO: improve mapping between interface name and component name
# By now, computed by time for the sake of saving time for the Hackfest.
interface_name = component_name.lower().replace('-port', '')
interface_name = re.sub(r'\-[pP][oO][rR][tT]', '', component_name)
endpoint['uuid'] = interface_name
endpoint = {'uuid': interface_name, 'type': '-'}
endpoint['sample_types'] = {
KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED : PATH_IF_CTR.format(interface_name, 'in-octets' ),
KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED : PATH_IF_CTR.format(interface_name, 'out-octets'),
......@@ -58,6 +65,6 @@ class ComponentHandler(_Handler):
KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED: PATH_IF_CTR.format(interface_name, 'out-pkts' ),
}
if len(endpoint) == 0: continue
response.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint))
return response
entries.append(('/endpoints/endpoint[{:s}]'.format(endpoint['uuid']), endpoint))
return entries
......@@ -12,69 +12,53 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging
import json, libyang, logging
from typing import Any, Dict, List, Tuple
from ._Handler import _Handler
from .Tools import dict_get_first
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
#pylint: disable=abstract-method
class InterfaceCounterHandler(_Handler):
def get_resource_key(self) -> str: return '/interface/counters'
def get_path(self) -> str: return '/interfaces/interface/state/counters'
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]:
LOGGER.info('[parse] json_data = {:s}'.format(json.dumps(json_data)))
json_interface_list : List[Dict] = json_data.get('interface', [])
response = []
for json_interface in json_interface_list:
LOGGER.info('[parse] json_interface = {:s}'.format(json.dumps(json_interface)))
interface = {}
NAME_FIELDS = ('name', 'openconfig-interface:name', 'oci:name')
interface_name = dict_get_first(json_interface, NAME_FIELDS)
if interface_name is None: continue
interface['name'] = interface_name
STATE_FIELDS = ('state', 'openconfig-interface:state', 'oci:state')
json_state = dict_get_first(json_interface, STATE_FIELDS, default={})
COUNTERS_FIELDS = ('counters', 'openconfig-interface:counters', 'oci:counters')
json_counters = dict_get_first(json_state, COUNTERS_FIELDS, default={})
IN_PKTS_FIELDS = ('in-pkts', 'openconfig-interface:in-pkts', 'oci:in-pkts')
interface_in_pkts = dict_get_first(json_counters, IN_PKTS_FIELDS)
if interface_in_pkts is not None: interface['in-pkts'] = int(interface_in_pkts)
IN_OCTETS_FIELDS = ('in-octets', 'openconfig-interface:in-octets', 'oci:in-octets')
interface_in_octets = dict_get_first(json_counters, IN_OCTETS_FIELDS)
if interface_in_octets is not None: interface['in-octets'] = int(interface_in_octets)
IN_ERRORS_FIELDS = ('in-errors', 'openconfig-interface:in-errors', 'oci:in-errors')
interface_in_errors = dict_get_first(json_counters, IN_ERRORS_FIELDS)
if interface_in_errors is not None: interface['in-errors'] = int(interface_in_errors)
OUT_OCTETS_FIELDS = ('out-octets', 'openconfig-interface:out-octets', 'oci:out-octets')
interface_out_octets = dict_get_first(json_counters, OUT_OCTETS_FIELDS)
if interface_out_octets is not None: interface['out-octets'] = int(interface_out_octets)
OUT_PKTS_FIELDS = ('out-pkts', 'openconfig-interface:out-pkts', 'oci:out-pkts')
interface_out_pkts = dict_get_first(json_counters, OUT_PKTS_FIELDS)
if interface_out_pkts is not None: interface['out-pkts'] = int(interface_out_pkts)
OUT_ERRORS_FIELDS = ('out-errors', 'openconfig-interface:out-errors', 'oci:out-errors')
interface_out_errors = dict_get_first(json_counters, OUT_ERRORS_FIELDS)
if interface_out_errors is not None: interface['out-errors'] = int(interface_out_errors)
OUT_DISCARDS_FIELDS = ('out-discards', 'openconfig-interface:out-discards', 'oci:out-discards')
interface_out_discards = dict_get_first(json_counters, OUT_DISCARDS_FIELDS)
if interface_out_discards is not None: interface['out-discards'] = int(interface_out_discards)
#LOGGER.info('[parse] interface = {:s}'.format(str(interface)))
if len(interface) == 0: continue
response.append(('/interface[{:s}]'.format(interface['name']), interface))
return response
def get_path(self) -> str: return '/openconfig-interfaces:interfaces/interface/state/counters'
def parse(
self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
LOGGER.debug('json_data = {:s}'.format(json.dumps(json_data)))
yang_interfaces_path = self.get_path()
json_data_valid = yang_handler.parse_to_dict(yang_interfaces_path, json_data, fmt='json')
entries = []
for interface in json_data_valid['interfaces']['interface']:
LOGGER.debug('interface={:s}'.format(str(interface)))
interface_name = interface['name']
interface_counters = interface.get('state', {}).get('counters', {})
_interface = {
'name' : interface_name,
'in-broadcast-pkts' : interface_counters['in_broadcast_pkts' ],
'in-discards' : interface_counters['in_discards' ],
'in-errors' : interface_counters['in_errors' ],
'in-fcs-errors' : interface_counters['in_fcs_errors' ],
'in-multicast-pkts' : interface_counters['in_multicast_pkts' ],
'in-octets' : interface_counters['in_octets' ],
'in-pkts' : interface_counters['in_pkts' ],
'in-unicast-pkts' : interface_counters['in_unicast_pkts' ],
'out-broadcast-pkts': interface_counters['out_broadcast_pkts'],
'out-discards' : interface_counters['out_discards' ],
'out-errors' : interface_counters['out_errors' ],
'out-multicast-pkts': interface_counters['out_multicast_pkts'],
'out-octets' : interface_counters['out_octets' ],
'out-pkts' : interface_counters['out_pkts' ],
'out-unicast-pkts' : interface_counters['out_unicast_pkts' ],
}
LOGGER.debug('interface = {:s}'.format(str(interface)))
entry_interface_key = '/interface[{:s}]'.format(interface_name)
entries.append((entry_interface_key, _interface))
return entries
......@@ -12,18 +12,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging
import json, libyang, logging
from typing import Any, Dict, List, Tuple
from ._Handler import _Handler
from .Tools import get_str
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
MAP_NETWORK_INSTANCE_TYPE = {
# special routing instance; acts as default/global routing instance for a network device
'DEFAULT': 'openconfig-network-instance-types:DEFAULT_INSTANCE',
# private L3-only routing instance; formed of one or more RIBs
'L3VRF': 'openconfig-network-instance-types:L3VRF',
# private L2-only switch instance; formed of one or more L2 forwarding tables
'L2VSI': 'openconfig-network-instance-types:L2VSI',
# private L2-only forwarding instance; point to point connection between two endpoints
'L2P2P': 'openconfig-network-instance-types:L2P2P',
# private Layer 2 and Layer 3 forwarding instance
'L2L3': 'openconfig-network-instance-types:L2L3',
}
class NetworkInstanceHandler(_Handler):
def get_resource_key(self) -> str: return '/network_instance'
def get_path(self) -> str: return '/network-instances/network-instance'
def get_path(self) -> str: return '/openconfig-network-instance:network-instances'
def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]:
ni_name = str(resource_value['name']) # test-svc
def compose(
self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
ni_name = get_str(resource_value, 'name') # test-svc
if delete:
PATH_TMPL = '/network-instances/network-instance[name={:s}]'
......@@ -31,32 +52,124 @@ class NetworkInstanceHandler(_Handler):
str_data = json.dumps({})
return str_path, str_data
ni_type = str(resource_value['type']) # L3VRF / L2VSI / ...
ni_type = get_str(resource_value, 'type') # L3VRF / L2VSI / ...
ni_type = MAP_NETWORK_INSTANCE_TYPE.get(ni_type, ni_type)
str_path = '/network-instances/network-instance[name={:s}]'.format(ni_name)
#str_data = json.dumps({
# 'name': ni_name,
# 'config': {'name': ni_name, 'type': ni_type},
#})
# not works: [FailedPrecondition] unsupported identifier 'DIRECTLY_CONNECTED'
#protocols = [self._compose_directly_connected()]
yang_nis : libyang.DContainer = yang_handler.get_data_path('/openconfig-network-instance:network-instances')
yang_ni_path = 'network-instance[name="{:s}"]'.format(ni_name)
yang_ni : libyang.DContainer = yang_nis.create_path(yang_ni_path)
yang_ni.create_path('config/name', ni_name)
yang_ni.create_path('config/type', ni_type)
MAP_OC_NI_TYPE = {
'L3VRF': 'openconfig-network-instance-types:L3VRF',
}
ni_type = MAP_OC_NI_TYPE.get(ni_type, ni_type)
# 'DIRECTLY_CONNECTED' is implicitly added
#'protocols': {'protocol': protocols},
str_path = '/network-instances/network-instance[name={:s}]'.format(ni_name)
str_data = json.dumps({
'name': ni_name,
'config': {'name': ni_name, 'type': ni_type},
#'protocols': {'protocol': protocols},
})
str_data = yang_ni.print_mem('json')
json_data = json.loads(str_data)
json_data = json_data['openconfig-network-instance:network-instance'][0]
str_data = json.dumps(json_data)
return str_path, str_data
def _compose_directly_connected(self, name=None, enabled=True) -> Dict:
identifier = 'DIRECTLY_CONNECTED'
if name is None: name = 'DIRECTLY_CONNECTED'
return {
'identifier': identifier, 'name': name,
'config': {'identifier': identifier, 'name': name, 'enabled': enabled},
}
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]:
response = []
return response
def parse(
self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
LOGGER.debug('json_data = {:s}'.format(json.dumps(json_data)))
# Arista Parsing Fixes:
# - Default instance comes with mpls/signaling-protocols/rsvp-te/global/hellos/state/hello-interval set to 0
# overwrite with .../hellos/config/hello-interval
network_instances = json_data.get('openconfig-network-instance:network-instance', [])
for network_instance in network_instances:
if network_instance['name'] != 'default': continue
mpls_rsvp_te = network_instance.get('mpls', {}).get('signaling-protocols', {}).get('rsvp-te', {})
mpls_rsvp_te_hellos = mpls_rsvp_te.get('global', {}).get('hellos', {})
hello_interval = mpls_rsvp_te_hellos.get('config', {}).get('hello-interval', 9000)
mpls_rsvp_te_hellos.get('state', {})['hello-interval'] = hello_interval
yang_network_instances_path = self.get_path()
json_data_valid = yang_handler.parse_to_dict(yang_network_instances_path, json_data, fmt='json', strict=False)
entries = []
for network_instance in json_data_valid['network-instances']['network-instance']:
LOGGER.debug('network_instance={:s}'.format(str(network_instance)))
ni_name = network_instance['name']
ni_config = network_instance['config']
ni_type = ni_config['type'].split(':')[-1]
_net_inst = {'name': ni_name, 'type': ni_type}
entry_net_inst_key = '/network_instance[{:s}]'.format(ni_name)
entries.append((entry_net_inst_key, _net_inst))
ni_interfaces = network_instance.get('interfaces', {}).get('interface', [])
for ni_interface in ni_interfaces:
#ni_if_id = ni_interface['id']
ni_if_config = ni_interface['config']
ni_if_name = ni_if_config['interface']
ni_sif_index = ni_if_config['subinterface']
ni_if_id = '{:s}.{:d}'.format(ni_if_name, ni_sif_index)
_interface = {'name': ni_name, 'id': ni_if_id, 'if_name': ni_if_name, 'sif_index': ni_sif_index}
entry_interface_key = '{:s}/interface[{:s}]'.format(entry_net_inst_key, ni_if_id)
entries.append((entry_interface_key, _interface))
ni_protocols = network_instance.get('protocols', {}).get('protocol', [])
for ni_protocol in ni_protocols:
ni_protocol_id = ni_protocol['identifier'].split(':')[-1]
ni_protocol_name = ni_protocol['name']
_protocol = {'name': ni_name, 'identifier': ni_protocol_id, 'protocol_name': ni_protocol_name}
entry_protocol_key = '{:s}/protocols[{:s}]'.format(entry_net_inst_key, ni_protocol_id)
entries.append((entry_protocol_key, _protocol))
if ni_protocol_id == 'STATIC':
static_routes = ni_protocol.get('static-routes', {}).get('static', [])
for static_route in static_routes:
static_route_prefix = static_route['prefix']
for next_hop in static_route.get('next-hops', {}).get('next-hop', []):
static_route_metric = next_hop['config']['metric']
_static_route = {
'prefix' : static_route_prefix,
'index' : next_hop['index'],
'next_hop': next_hop['config']['next-hop'],
'metric' : static_route_metric,
}
_static_route.update(_protocol)
entry_static_route_key = '{:s}/static_route[{:s}:{:d}]'.format(
entry_protocol_key, static_route_prefix, static_route_metric
)
entries.append((entry_static_route_key, _static_route))
ni_tables = network_instance.get('tables', {}).get('table', [])
for ni_table in ni_tables:
ni_table_protocol = ni_table['protocol'].split(':')[-1]
ni_table_address_family = ni_table['address-family'].split(':')[-1]
_table = {'protocol': ni_table_protocol, 'address_family': ni_table_address_family}
entry_table_key = '{:s}/table[{:s},{:s}]'.format(
entry_net_inst_key, ni_table_protocol, ni_table_address_family
)
entries.append((entry_table_key, _table))
ni_vlans = network_instance.get('vlans', {}).get('vlan', [])
for ni_vlan in ni_vlans:
ni_vlan_id = ni_vlan['vlan-id']
#ni_vlan_config = ni_vlan['config']
ni_vlan_state = ni_vlan['state']
ni_vlan_name = ni_vlan_state['name']
_members = [
member['state']['interface']
for member in ni_vlan.get('members', {}).get('member', [])
]
_vlan = {'vlan_id': ni_vlan_id, 'name': ni_vlan_name, 'members': _members}
entry_vlan_key = '{:s}/vlan[{:d}]'.format(entry_net_inst_key, ni_vlan_id)
entries.append((entry_vlan_key, _vlan))
return entries
......@@ -12,35 +12,62 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging
import json, libyang, logging
from typing import Any, Dict, List, Tuple
from ._Handler import _Handler
from .Tools import get_int, get_str
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
IS_CEOS = True
class NetworkInstanceInterfaceHandler(_Handler):
def get_resource_key(self) -> str: return '/network_instance/interface'
def get_path(self) -> str: return '/network-instances/network-instance/interfaces'
def get_path(self) -> str: return '/openconfig-network-instance:network-instances/network-instance/interfaces'
def compose(
self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
ni_name = get_str(resource_value, 'name' ) # test-svc
ni_if_id = get_str(resource_value, 'id' ) # ethernet-1/1.0
if_name = get_str(resource_value, 'interface' ) # ethernet-1/1
sif_index = get_int(resource_value, 'subinterface', 0) # 0
def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]:
ni_name = str(resource_value['name' ]) # test-svc
if_name = str(resource_value['if_name' ]) # ethernet-1/1
sif_index = int(resource_value['sif_index']) # 0
if_id = '{:s}.{:d}'.format(if_name, sif_index)
if IS_CEOS: ni_if_id = if_name
if delete:
PATH_TMPL = '/network-instances/network-instance[name={:s}]/interfaces/interface[id={:s}]'
str_path = PATH_TMPL.format(ni_name, if_id)
str_path = PATH_TMPL.format(ni_name, ni_if_id)
str_data = json.dumps({})
return str_path, str_data
str_path = '/network-instances/network-instance[name={:s}]/interfaces/interface[id={:s}]'.format(ni_name, if_id)
str_data = json.dumps({
'id': if_id,
'config': {'id': if_id, 'interface': if_name, 'subinterface': sif_index},
})
str_path = '/network-instances/network-instance[name={:s}]/interfaces/interface[id={:s}]'.format(
ni_name, ni_if_id
)
#str_data = json.dumps({
# 'id': if_id,
# 'config': {'id': if_id, 'interface': if_name, 'subinterface': sif_index},
#})
yang_nis : libyang.DContainer = yang_handler.get_data_path('/openconfig-network-instance:network-instances')
yang_ni : libyang.DContainer = yang_nis.create_path('network-instance[name="{:s}"]'.format(ni_name))
yang_ni_ifs : libyang.DContainer = yang_ni.create_path('interfaces')
yang_ni_if_path = 'interface[id="{:s}"]'.format(ni_if_id)
yang_ni_if : libyang.DContainer = yang_ni_ifs.create_path(yang_ni_if_path)
yang_ni_if.create_path('config/id', ni_if_id)
yang_ni_if.create_path('config/interface', if_name)
yang_ni_if.create_path('config/subinterface', sif_index)
str_data = yang_ni_if.print_mem('json')
json_data = json.loads(str_data)
json_data = json_data['openconfig-network-instance:interface'][0]
str_data = json.dumps(json_data)
return str_path, str_data
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]:
def parse(
self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
LOGGER.debug('[parse] json_data = {:s}'.format(str(json_data)))
response = []
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, libyang, logging
from typing import Any, Dict, List, Tuple
from ._Handler import _Handler
from .Tools import get_str
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
class NetworkInstanceProtocolHandler(_Handler):
def get_resource_key(self) -> str: return '/network_instance/protocols'
def get_path(self) -> str:
return '/openconfig-network-instance:network-instances/network-instance/protocols/protocol'
def compose(
self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
ni_name = get_str(resource_value, 'name' ) # test-svc
identifier = get_str(resource_value, 'identifier') # 'STATIC'
proto_name = get_str(resource_value, 'protocol_name') # 'STATIC'
if ':' not in identifier:
identifier = 'openconfig-policy-types:{:s}'.format(identifier)
PATH_TMPL = '/network-instances/network-instance[name={:s}]/protocols/protocol[identifier={:s}][name={:s}]'
str_path = PATH_TMPL.format(ni_name, identifier, proto_name)
if delete:
str_data = json.dumps({})
return str_path, str_data
#str_data = json.dumps({
# 'identifier': identifier, 'name': name,
# 'config': {'identifier': identifier, 'name': name, 'enabled': True},
# 'static_routes': {'static': [{
# 'prefix': prefix,
# 'config': {'prefix': prefix},
# 'next_hops': {
# 'next-hop': [{
# 'index': next_hop_index,
# 'config': {'index': next_hop_index, 'next_hop': next_hop}
# }]
# }
# }]}
#})
yang_nis : libyang.DContainer = yang_handler.get_data_path('/openconfig-network-instance:network-instances')
yang_ni : libyang.DContainer = yang_nis.create_path('network-instance[name="{:s}"]'.format(ni_name))
yang_ni_prs : libyang.DContainer = yang_ni.create_path('protocols')
yang_ni_pr_path = 'protocol[identifier="{:s}"][name="{:s}"]'.format(identifier, proto_name)
yang_ni_pr : libyang.DContainer = yang_ni_prs.create_path(yang_ni_pr_path)
yang_ni_pr.create_path('config/identifier', identifier)
yang_ni_pr.create_path('config/name', proto_name)
yang_ni_pr.create_path('config/enabled', True )
str_data = yang_ni_pr.print_mem('json')
json_data = json.loads(str_data)
json_data = json_data['openconfig-network-instance:protocol'][0]
str_data = json.dumps(json_data)
return str_path, str_data
def parse(
self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
LOGGER.debug('[parse] json_data = {:s}'.format(str(json_data)))
response = []
return response
......@@ -12,50 +12,90 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging
import json, libyang, logging
from typing import Any, Dict, List, Tuple
from ._Handler import _Handler
from .Tools import get_int, get_str
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
class NetworkInstanceStaticRouteHandler(_Handler):
def get_resource_key(self) -> str: return '/network_instance/static_route'
def get_path(self) -> str: return '/network-instances/network-instance/static_route'
def get_resource_key(self) -> str: return '/network_instance/protocols/static_route'
def get_path(self) -> str:
return '/openconfig-network-instance:network-instances/network-instance/protocols/protocol/static-routes'
def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]:
ni_name = str(resource_value['name' ]) # test-svc
prefix = str(resource_value['prefix' ]) # '172.0.1.0/24'
def compose(
self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
ni_name = get_str(resource_value, 'name' ) # test-svc
identifier = get_str(resource_value, 'identifier') # 'STATIC'
proto_name = get_str(resource_value, 'protocol_name') # 'STATIC'
prefix = get_str(resource_value, 'prefix') # '172.0.1.0/24'
if ':' not in identifier:
identifier = 'openconfig-policy-types:{:s}'.format(identifier)
identifier = 'STATIC'
name = 'static'
if delete:
PATH_TMPL = '/network-instances/network-instance[name={:s}]/protocols'
PATH_TMPL += '/protocol[identifier={:s}][name={:s}]/static-routes/static[prefix={:s}]'
str_path = PATH_TMPL.format(ni_name, identifier, name, prefix)
str_path = PATH_TMPL.format(ni_name, identifier, proto_name, prefix)
str_data = json.dumps({})
return str_path, str_data
next_hop = str(resource_value['next_hop' ]) # '172.0.0.1'
next_hop_index = int(resource_value.get('next_hop_index', 0)) # 0
next_hop = get_str(resource_value, 'next_hop') # '172.0.0.1'
metric = get_int(resource_value, 'metric' ) # 20
index = get_str(resource_value, 'index' ) # AUTO_1_172-0-0-1
if index is None:
index = 'AUTO_{:d}_{:s}'.format(metric, next_hop)
PATH_TMPL = '/network-instances/network-instance[name={:s}]/protocols/protocol[identifier={:s}][name={:s}]'
str_path = PATH_TMPL.format(ni_name, identifier, name)
str_data = json.dumps({
'identifier': identifier, 'name': name,
'config': {'identifier': identifier, 'name': name, 'enabled': True},
'static_routes': {'static': [{
'prefix': prefix,
'config': {'prefix': prefix},
'next_hops': {
'next-hop': [{
'index': next_hop_index,
'config': {'index': next_hop_index, 'next_hop': next_hop}
}]
}
}]}
})
str_path = PATH_TMPL.format(ni_name, identifier, proto_name)
#str_data = json.dumps({
# 'identifier': identifier, 'name': name,
# 'config': {'identifier': identifier, 'name': name, 'enabled': True},
# 'static_routes': {'static': [{
# 'prefix': prefix,
# 'config': {'prefix': prefix},
# 'next_hops': {
# 'next-hop': [{
# 'index': next_hop_index,
# 'config': {'index': next_hop_index, 'next_hop': next_hop}
# }]
# }
# }]}
#})
yang_nis : libyang.DContainer = yang_handler.get_data_path('/openconfig-network-instance:network-instances')
yang_ni : libyang.DContainer = yang_nis.create_path('network-instance[name="{:s}"]'.format(ni_name))
yang_ni_prs : libyang.DContainer = yang_ni.create_path('protocols')
yang_ni_pr_path = 'protocol[identifier="{:s}"][name="{:s}"]'.format(identifier, proto_name)
yang_ni_pr : libyang.DContainer = yang_ni_prs.create_path(yang_ni_pr_path)
yang_ni_pr.create_path('config/identifier', identifier)
yang_ni_pr.create_path('config/name', proto_name)
yang_ni_pr.create_path('config/enabled', True )
yang_ni_pr_srs : libyang.DContainer = yang_ni_pr.create_path('static-routes')
yang_ni_pr_sr_path = 'static[prefix="{:s}"]'.format(prefix)
yang_ni_pr_sr : libyang.DContainer = yang_ni_pr_srs.create_path(yang_ni_pr_sr_path)
yang_ni_pr_sr.create_path('config/prefix', prefix)
yang_ni_pr_sr_nhs : libyang.DContainer = yang_ni_pr_sr.create_path('next-hops')
yang_ni_pr_sr_nh_path = 'next-hop[index="{:s}"]'.format(index)
yang_ni_pr_sr_nh : libyang.DContainer = yang_ni_pr_sr_nhs.create_path(yang_ni_pr_sr_nh_path)
yang_ni_pr_sr_nh.create_path('config/index', index)
yang_ni_pr_sr_nh.create_path('config/next-hop', next_hop)
yang_ni_pr_sr_nh.create_path('config/metric', metric)
str_data = yang_ni_pr.print_mem('json')
json_data = json.loads(str_data)
json_data = json_data['openconfig-network-instance:protocol'][0]
str_data = json.dumps(json_data)
return str_path, str_data
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]:
def parse(
self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
LOGGER.debug('[parse] json_data = {:s}'.format(str(json_data)))
response = []
return response
......@@ -13,7 +13,7 @@
# limitations under the License.
import re
from typing import Any, Dict, Iterable
from typing import Any, Callable, Dict, Iterable, Optional
RE_REMOVE_FILTERS = re.compile(r'\[[^\]]+\]')
RE_REMOVE_NAMESPACES = re.compile(r'\/[a-zA-Z0-9\_\-]+:')
......@@ -23,8 +23,39 @@ def get_schema(resource_key : str):
resource_key = RE_REMOVE_NAMESPACES.sub('/', resource_key)
return resource_key
def dict_get_first(d : Dict, field_names : Iterable[str], default=None) -> Any:
for field_name in field_names:
if field_name not in d: continue
return d[field_name]
def container_get_first(
container : Dict[str, Any], key_name : str, namespace : Optional[str]=None, namespaces : Iterable[str]=tuple(),
default : Optional[Any] = None
) -> Any:
value = container.get(key_name)
if value is not None: return value
if namespace is not None:
if len(namespaces) > 0:
raise Exception('At maximum, one of namespace or namespaces can be specified')
namespaces = (namespace,)
for namespace in namespaces:
namespace_key_name = '{:s}:{:s}'.format(namespace, key_name)
if namespace_key_name in container: return container[namespace_key_name]
return default
def get_value(
resource_value : Dict, field_name : str, cast_func : Callable = lambda x:x, default : Optional[Any] = None
) -> Optional[Any]:
field_value = resource_value.get(field_name, default)
if field_value is not None: field_value = cast_func(field_value)
return field_value
def get_bool(resource_value : Dict, field_name : bool, default : Optional[Any] = None) -> bool:
return get_value(resource_value, field_name, cast_func=bool, default=default)
def get_float(resource_value : Dict, field_name : float, default : Optional[Any] = None) -> float:
return get_value(resource_value, field_name, cast_func=float, default=default)
def get_int(resource_value : Dict, field_name : int, default : Optional[Any] = None) -> int:
return get_value(resource_value, field_name, cast_func=int, default=default)
def get_str(resource_value : Dict, field_name : str, default : Optional[Any] = None) -> str:
return get_value(resource_value, field_name, cast_func=str, default=default)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, libyang, logging, os
from typing import Dict, Optional
YANG_BASE_PATH = os.path.join(os.path.dirname(__file__), '..', 'git', 'openconfig', 'public')
YANG_SEARCH_PATHS = ':'.join([
os.path.join(YANG_BASE_PATH, 'release'),
os.path.join(YANG_BASE_PATH, 'third_party'),
])
YANG_MODULES = [
'iana-if-type',
'openconfig-bgp-types',
'openconfig-vlan-types',
'openconfig-interfaces',
'openconfig-if-8021x',
'openconfig-if-aggregate',
'openconfig-if-ethernet-ext',
'openconfig-if-ethernet',
'openconfig-if-ip-ext',
'openconfig-if-ip',
'openconfig-if-poe',
'openconfig-if-sdn-ext',
'openconfig-if-tunnel',
'openconfig-vlan',
'openconfig-types',
'openconfig-policy-types',
'openconfig-mpls-types',
'openconfig-network-instance-types',
'openconfig-network-instance',
'openconfig-platform',
'openconfig-platform-controller-card',
'openconfig-platform-cpu',
'openconfig-platform-ext',
'openconfig-platform-fabric',
'openconfig-platform-fan',
'openconfig-platform-integrated-circuit',
'openconfig-platform-linecard',
'openconfig-platform-pipeline-counters',
'openconfig-platform-port',
'openconfig-platform-psu',
'openconfig-platform-software',
'openconfig-platform-transceiver',
'openconfig-platform-types',
]
LOGGER = logging.getLogger(__name__)
class YangHandler:
def __init__(self) -> None:
self._yang_context = libyang.Context(YANG_SEARCH_PATHS)
self._loaded_modules = set()
for yang_module_name in YANG_MODULES:
LOGGER.info('Loading module: {:s}'.format(str(yang_module_name)))
self._yang_context.load_module(yang_module_name).feature_enable_all()
self._loaded_modules.add(yang_module_name)
self._data_path_instances = dict()
def get_data_paths(self) -> Dict[str, libyang.DNode]:
return self._data_path_instances
def get_data_path(self, path : str) -> libyang.DNode:
data_path_instance = self._data_path_instances.get(path)
if data_path_instance is None:
data_path_instance = self._yang_context.create_data_path(path)
self._data_path_instances[path] = data_path_instance
return data_path_instance
def parse_to_dict(
self, request_path : str, json_data : Dict, fmt : str = 'json', strict : bool = True
) -> Dict:
if fmt != 'json': raise Exception('Unsupported format: {:s}'.format(str(fmt)))
LOGGER.debug('request_path = {:s}'.format(str(request_path)))
LOGGER.debug('json_data = {:s}'.format(str(json_data)))
LOGGER.debug('format = {:s}'.format(str(fmt)))
parent_path_parts = list(filter(lambda s: len(s) > 0, request_path.split('/')))
for parent_path_part in reversed(parent_path_parts):
json_data = {parent_path_part: json_data}
str_data = json.dumps(json_data)
dnode : Optional[libyang.DNode] = self._yang_context.parse_data_mem(
str_data, fmt, strict=strict, parse_only=True, #validate_present=True, #validate=True,
)
if dnode is None: raise Exception('Unable to parse Data({:s})'.format(str(json_data)))
parsed = dnode.print_dict()
LOGGER.debug('parsed = {:s}'.format(json.dumps(parsed)))
dnode.free()
return parsed
def destroy(self) -> None:
self._yang_context.destroy()
......@@ -13,6 +13,7 @@
# limitations under the License.
from typing import Any, Dict, List, Tuple
from .YangHandler import YangHandler
class _Handler:
def get_resource_key(self) -> str:
......@@ -23,10 +24,14 @@ class _Handler:
# Retrieve the OpenConfig path schema used to interrogate the device
raise NotImplementedError()
def compose(self, resource_key : str, resource_value : Dict, delete : bool = False) -> Tuple[str, str]:
def compose(
self, resource_key : str, resource_value : Dict, yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
# Compose a Set/Delete message based on the resource_key/resource_value fields, and the delete flag
raise NotImplementedError()
def parse(self, json_data : Dict) -> List[Tuple[str, Dict[str, Any]]]:
def parse(
self, json_data : Dict, yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
# Parse a Reply from the device and return a list of resource_key/resource_value pairs
raise NotImplementedError()
......@@ -13,7 +13,7 @@
# limitations under the License.
import logging
from typing import Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from device.service.driver_api._Driver import RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from ._Handler import _Handler
from .Component import ComponentHandler
......@@ -21,8 +21,10 @@ from .Interface import InterfaceHandler
from .InterfaceCounter import InterfaceCounterHandler
from .NetworkInstance import NetworkInstanceHandler
from .NetworkInstanceInterface import NetworkInstanceInterfaceHandler
from .NetworkInstanceProtocol import NetworkInstanceProtocolHandler
from .NetworkInstanceStaticRoute import NetworkInstanceStaticRouteHandler
from .Tools import get_schema
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
......@@ -31,6 +33,7 @@ ifaceh = InterfaceHandler()
ifctrh = InterfaceCounterHandler()
nih = NetworkInstanceHandler()
niifh = NetworkInstanceInterfaceHandler()
niph = NetworkInstanceProtocolHandler()
nisrh = NetworkInstanceStaticRouteHandler()
ALL_RESOURCE_KEYS = [
......@@ -46,9 +49,10 @@ RESOURCE_KEY_MAPPER = {
}
PATH_MAPPER = {
'/components' : comph.get_path(),
'/interfaces' : ifaceh.get_path(),
'/network-instances' : nih.get_path(),
'/components' : comph.get_path(),
'/components/component' : comph.get_path(),
'/interfaces' : ifaceh.get_path(),
'/network-instances' : nih.get_path(),
}
RESOURCE_KEY_TO_HANDLER = {
......@@ -57,6 +61,7 @@ RESOURCE_KEY_TO_HANDLER = {
ifctrh.get_resource_key() : ifctrh,
nih.get_resource_key() : nih,
niifh.get_resource_key() : niifh,
niph.get_resource_key() : niph,
nisrh.get_resource_key() : nisrh,
}
......@@ -66,11 +71,13 @@ PATH_TO_HANDLER = {
ifctrh.get_path() : ifctrh,
nih.get_path() : nih,
niifh.get_path() : niifh,
niph.get_path() : niph,
nisrh.get_path() : nisrh,
}
def get_handler(
resource_key : Optional[str] = None, path : Optional[str] = None, raise_if_not_found=True
resource_key : Optional[str] = None, path : Optional[str] = None,
raise_if_not_found=True
) -> Optional[_Handler]:
if (resource_key is None) == (path is None):
MSG = 'Exactly one of resource_key({:s}) or path({:s}) must be specified'
......@@ -88,16 +95,24 @@ def get_handler(
path_schema = PATH_MAPPER.get(path_schema, path_schema)
handler = PATH_TO_HANDLER.get(path_schema)
if handler is None and raise_if_not_found:
MSG = 'Handler not found: resource_key={:s} resource_key_schema={:s}'
MSG = 'Handler not found: path={:s} path_schema={:s}'
# pylint: disable=broad-exception-raised
raise Exception(MSG.format(str(resource_key), str(resource_key_schema)))
raise Exception(MSG.format(str(path), str(path_schema)))
return handler
def get_path(resource_key : str) -> str:
return get_handler(resource_key=resource_key).get_path()
handler = get_handler(resource_key=resource_key)
return handler.get_path()
def parse(str_path : str, value : Union[Dict, List]):
return get_handler(path=str_path).parse(value)
def parse(
str_path : str, value : Union[Dict, List], yang_handler : YangHandler
) -> List[Tuple[str, Dict[str, Any]]]:
handler = get_handler(path=str_path)
return handler.parse(value, yang_handler)
def compose(resource_key : str, resource_value : Union[Dict, List], delete : bool = False) -> Tuple[str, str]:
return get_handler(resource_key=resource_key).compose(resource_key, resource_value, delete=delete)
def compose(
resource_key : str, resource_value : Union[Dict, List],
yang_handler : YangHandler, delete : bool = False
) -> Tuple[str, str]:
handler = get_handler(resource_key=resource_key)
return handler.compose(resource_key, resource_value, yang_handler, delete=delete)
......@@ -17,7 +17,7 @@ from common.tools.grpc.Tools import grpc_message_to_json
from ..gnmi.gnmi_pb2 import CapabilityRequest # pylint: disable=no-name-in-module
from ..gnmi.gnmi_pb2_grpc import gNMIStub
def get_supported_encodings(
def check_capabilities(
stub : gNMIStub, username : str, password : str, timeout : Optional[int] = None
) -> Set[Union[str, int]]:
metadata = [('username', username), ('password', password)]
......@@ -25,6 +25,17 @@ def get_supported_encodings(
reply = stub.Capabilities(req, metadata=metadata, timeout=timeout)
data = grpc_message_to_json(reply)
gnmi_version = data.get('gNMI_version')
if gnmi_version is None or gnmi_version != '0.7.0':
raise Exception('Unsupported gNMI version: {:s}'.format(str(gnmi_version)))
#supported_models = {
# supported_model['name']: supported_model['version']
# for supported_model in data.get('supported_models', [])
#}
# TODO: check supported models and versions
supported_encodings = {
supported_encoding
for supported_encoding in data.get('supported_encodings', [])
......@@ -33,4 +44,6 @@ def get_supported_encodings(
if len(supported_encodings) == 0:
# pylint: disable=broad-exception-raised
raise Exception('No supported encodings found')
return supported_encodings
if 'JSON_IETF' not in supported_encodings:
# pylint: disable=broad-exception-raised
raise Exception('JSON_IETF encoding not supported')
......@@ -19,8 +19,8 @@ from ..gnmi.gnmi_pb2 import Path, PathElem
RE_PATH_SPLIT = re.compile(r'/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)')
RE_PATH_KEYS = re.compile(r'\[(.*?)\]')
def path_from_string(path='/'):
if not path: return Path(elem=[])
def path_from_string(path='/'): #, origin='openconfig'
if not path: return Path(elem=[]) #, origin=origin
if path[0] == '/':
if path[-1] == '/':
......@@ -40,7 +40,7 @@ def path_from_string(path='/'):
dict_keys = dict(x.split('=', 1) for x in elem_keys)
path.append(PathElem(name=elem_name, key=dict_keys))
return Path(elem=path)
return Path(elem=path) #, origin=origin
def path_to_string(path : Path) -> str:
path_parts = list()
......
......@@ -13,9 +13,36 @@
# limitations under the License.
import base64, json
from typing import Any
from typing import Any, Dict, List, Union
from ..gnmi.gnmi_pb2 import TypedValue
REMOVE_NAMESPACES = (
'arista-intf-augments',
'arista-netinst-augments',
'openconfig-hercules-platform',
)
def remove_fields(key : str) -> bool:
parts = key.split(':')
if len(parts) == 1: return False
namespace = parts[0].lower()
return namespace in REMOVE_NAMESPACES
def recursive_remove_keys(container : Union[Dict, List, Any]) -> None:
if isinstance(container, dict):
remove_keys = [
key
for key in container.keys()
if remove_fields(key)
]
for key in remove_keys:
container.pop(key, None)
for value in container.values():
recursive_remove_keys(value)
elif isinstance(container, list):
for value in container:
recursive_remove_keys(value)
def decode_value(value : TypedValue) -> Any:
encoding = value.WhichOneof('value')
if encoding == 'json_val':
......@@ -31,9 +58,13 @@ def decode_value(value : TypedValue) -> Any:
raise NotImplementedError()
#return value
elif encoding == 'json_ietf_val':
value : str = value.json_ietf_val
str_value : str = value.json_ietf_val.decode('UTF-8')
try:
return json.loads(value)
# Cleanup and normalize the records according to OpenConfig
#str_value = str_value.replace('openconfig-platform-types:', 'oc-platform-types:')
json_value = json.loads(str_value)
recursive_remove_keys(json_value)
return json_value
except json.decoder.JSONDecodeError:
# Assume is Base64-encoded
b_b64_value = value.encode('UTF-8')
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .StorageEndpoints import StorageEndpoints
from .StorageInterface import StorageInterface
from .StorageNetworkInstance import StorageNetworkInstance
class Storage:
def __init__(self) -> None:
self.endpoints = StorageEndpoints()
self.interfaces = StorageInterface()
self.network_instances = StorageNetworkInstance()