Skip to content
Snippets Groups Projects
Commit f1a182d3 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Added gNMI-OpenConfig Collector in Telemetry Backend

- Added GNMI collector basic skeleton
- Updated test script
- Remove obsolete files
- Restructure the collector directory inside the Telemetry backend.
parent a8c40070
No related branches found
No related tags found
1 merge request!289Draft: Resolve "(CTTC) Implement Telemetry Backend Collector gNMI/OpenConfig"
Showing
with 464 additions and 11 deletions
...@@ -24,6 +24,6 @@ cd $PROJECTDIR/src ...@@ -24,6 +24,6 @@ cd $PROJECTDIR/src
# export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" # export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
python3 -m pytest --log-level=debug --log-cli-level=info --verbose \ python3 -m pytest --log-level=info --log-cli-level=info --verbose \
telemetry/backend/tests/test_gnmi_collector.py telemetry/backend/tests/gnmi_openconfig/test_gnmi_openconfig_collector.py
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import anytree
from typing import Any, List, Optional, Union
from apscheduler.job import Job
class TreeNode(anytree.node.Node):
def __init__(self, name, parent=None, children=None, **kwargs) -> None:
super().__init__(name, parent=parent, children=children, **kwargs)
self.value : Optional[Any] = None
def get_full_path(self):
return self.separator.join([''] + [str(node.name) for node in self.path])
class RawStyle(anytree.render.AbstractStyle):
def __init__(self):
"""
Raw style.
>>> from anytree import Node, RenderTree
>>> root = Node("root")
>>> s0 = Node("sub0", parent=root)
>>> s0b = Node("sub0B", parent=s0)
>>> s0a = Node("sub0A", parent=s0)
>>> s1 = Node("sub1", parent=root)
>>> print(RenderTree(root, style=RawStyle()))
Node('/root')
Node('/root/sub0')
Node('/root/sub0/sub0B')
Node('/root/sub0/sub0A')
Node('/root/sub1')
"""
super(RawStyle, self).__init__('', '', '')
def get_subnode(
resolver : anytree.Resolver, root : TreeNode, key_or_path : Union[str, List[str]], default : Optional[Any] = None):
if isinstance(key_or_path, str): key_or_path = key_or_path.split('/')
node = root
for path_item in key_or_path:
try:
node = resolver.get(node, path_item)
except anytree.ChildResolverError:
return default
return node
def set_subnode_value(resolver : anytree.Resolver, root : TreeNode, key_or_path : Union[str, List[str]], value : Any):
if isinstance(key_or_path, str): key_or_path = key_or_path.split('/')
node = root
for path_item in key_or_path:
try:
node = resolver.get(node, path_item)
except anytree.ChildResolverError:
node = TreeNode(path_item, parent=node)
if isinstance(node.value, dict) and isinstance(value, dict):
node.value.update(value)
else:
node.value = value
def dump_subtree(root : TreeNode):
if not isinstance(root, TreeNode): raise Exception('root must be a TreeNode')
results = []
for row in anytree.RenderTree(root, style=RawStyle()):
node : TreeNode = row.node
path = node.get_full_path()[2:] # get full path except the heading root placeholder "/."
if len(path) == 0: continue
value = node.value
if value is None: continue
if isinstance(value, Job): value = str(value)
results.append((path, value))
return results
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, operator
from enum import Enum
from typing import Any, Dict, Iterable, List, Set, Tuple
from ._Collector import _Collector
from .Exceptions import (
UnsatisfiedFilterException, UnsupportedDriverClassException, UnsupportedFilterFieldException,
UnsupportedFilterFieldValueException)
from .FilterFields import FILTER_FIELD_ALLOWED_VALUES, FilterFieldEnum
LOGGER = logging.getLogger(__name__)
class DriverFactory:
def __init__(self, drivers : List[Tuple[type, List[Dict[FilterFieldEnum, Any]]]]) -> None:
self.__indices : Dict[str, Dict[str, Set[_Collector]]] = {} # Dict{field_name => Dict{field_value => Set{Driver}}}
for driver_class,filter_field_sets in drivers:
for filter_fields in filter_field_sets:
filter_fields = {k.value:v for k,v in filter_fields.items()}
self.register_driver_class(driver_class, **filter_fields)
def register_driver_class(self, driver_class, **filter_fields):
if not issubclass(driver_class, _Collector): raise UnsupportedDriverClassException(str(driver_class))
driver_name = driver_class.__name__
supported_filter_fields = set(FILTER_FIELD_ALLOWED_VALUES.keys())
unsupported_filter_fields = set(filter_fields.keys()).difference(supported_filter_fields)
if len(unsupported_filter_fields) > 0:
raise UnsupportedFilterFieldException(unsupported_filter_fields, driver_class_name=driver_name)
for field_name, field_values in filter_fields.items():
field_indice = self.__indices.setdefault(field_name, dict())
field_enum_values = FILTER_FIELD_ALLOWED_VALUES.get(field_name)
if not isinstance(field_values, Iterable) or isinstance(field_values, str):
field_values = [field_values]
for field_value in field_values:
if isinstance(field_value, Enum): field_value = field_value.value
if field_enum_values is not None and field_value not in field_enum_values:
raise UnsupportedFilterFieldValueException(
field_name, field_value, field_enum_values, driver_class_name=driver_name)
field_indice_drivers = field_indice.setdefault(field_value, set())
field_indice_drivers.add(driver_class)
def get_driver_class(self, **filter_fields) -> _Collector:
supported_filter_fields = set(FILTER_FIELD_ALLOWED_VALUES.keys())
unsupported_filter_fields = set(filter_fields.keys()).difference(supported_filter_fields)
if len(unsupported_filter_fields) > 0: raise UnsupportedFilterFieldException(unsupported_filter_fields)
candidate_driver_classes : Dict[_Collector, int] = None # number of filter hits per driver
for field_name, field_values in filter_fields.items():
field_indice = self.__indices.get(field_name)
if field_indice is None: continue
field_enum_values = FILTER_FIELD_ALLOWED_VALUES.get(field_name)
if not isinstance(field_values, Iterable) or isinstance(field_values, str):
field_values = [field_values]
field_candidate_driver_classes = set()
for field_value in field_values:
if field_enum_values is not None and field_value not in field_enum_values:
raise UnsupportedFilterFieldValueException(field_name, field_value, field_enum_values)
field_indice_drivers = field_indice.get(field_value)
if field_indice_drivers is None: continue
field_candidate_driver_classes = field_candidate_driver_classes.union(field_indice_drivers)
if candidate_driver_classes is None:
if len(field_candidate_driver_classes) == 0: continue
candidate_driver_classes = {k:1 for k in field_candidate_driver_classes}
else:
for candidate_driver_class in candidate_driver_classes:
if candidate_driver_class not in field_candidate_driver_classes: continue
candidate_driver_classes[candidate_driver_class] += 1
if len(candidate_driver_classes) == 0: raise UnsatisfiedFilterException(filter_fields)
candidate_driver_classes = sorted(candidate_driver_classes.items(), key=operator.itemgetter(1), reverse=True)
return candidate_driver_classes[0][0]
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, threading
from typing import Any, Dict, Optional
from common.method_wrappers.ServiceExceptions import InvalidArgumentException
from common.proto.context_pb2 import Device, Empty
from context.client.ContextClient import ContextClient
from telemetry.backend.Tools import get_connect_rules
from ._Collector import _Collector
from .DriverFactory import DriverFactory
from .Exceptions import DriverInstanceCacheTerminatedException
from .FilterFields import FilterFieldEnum, get_device_driver_filter_fields
LOGGER = logging.getLogger(__name__)
class DriverInstanceCache:
def __init__(self, driver_factory : DriverFactory) -> None:
self._lock = threading.Lock()
self._terminate = threading.Event()
self._device_uuid__to__driver_instance : Dict[str, _Collector] = {}
self._driver_factory = driver_factory
def get(
self, device_uuid : str, filter_fields : Dict[FilterFieldEnum, Any] = {}, address : Optional[str] = None,
port : Optional[int] = None, settings : Dict[str, Any] = {}
) -> _Collector:
if self._terminate.is_set():
raise DriverInstanceCacheTerminatedException()
filter_fields = {k.value:v for k,v in filter_fields.items()}
with self._lock:
driver_instance = self._device_uuid__to__driver_instance.get(device_uuid)
if driver_instance is not None: return driver_instance
if len(filter_fields) == 0: return None
MSG = 'Selecting driver for device({:s}) with filter_fields({:s})...'
LOGGER.info(MSG.format(str(device_uuid), str(filter_fields)))
driver_class = self._driver_factory.get_driver_class(**filter_fields)
MSG = 'Driver({:s}) selected for device({:s}) with filter_fields({:s})...'
LOGGER.info(MSG.format(str(driver_class.__name__), str(device_uuid), str(filter_fields)))
if driver_class.__name__ == "OCDriver":
driver_instance : _Collector = driver_class(address, port, device_uuid=device_uuid, **settings)
else:
driver_instance : _Collector = driver_class(address, port, **settings)
self._device_uuid__to__driver_instance[device_uuid] = driver_instance
return driver_instance
def delete(self, device_uuid : str) -> None:
with self._lock:
device_driver = self._device_uuid__to__driver_instance.pop(device_uuid, None)
if device_driver is None: return
device_driver.Disconnect()
def terminate(self) -> None:
self._terminate.set()
with self._lock:
while len(self._device_uuid__to__driver_instance) > 0:
device_uuid,device_driver = self._device_uuid__to__driver_instance.popitem()
try:
device_driver.Disconnect()
except: # pylint: disable=bare-except
msg = 'Error disconnecting Driver({:s}) from device. Will retry later...'
LOGGER.exception(msg.format(device_uuid))
# re-adding to retry disconnect
self._device_uuid__to__driver_instance[device_uuid] = device_driver
def get_driver(driver_instance_cache : DriverInstanceCache, device : Device) -> _Collector:
device_uuid = device.device_id.device_uuid.uuid
driver : _Collector = driver_instance_cache.get(device_uuid)
if driver is not None: return driver
driver_filter_fields = get_device_driver_filter_fields(device)
connect_rules = get_connect_rules(device.device_config)
#LOGGER.info('[get_driver] connect_rules = {:s}'.format(str(connect_rules)))
address = connect_rules.get('address', '127.0.0.1')
port = connect_rules.get('port', '0')
settings = connect_rules.get('settings', '{}')
try:
settings = json.loads(settings)
except ValueError as e:
raise InvalidArgumentException(
'device.device_config.config_rules[settings]', settings,
extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.'
) from e
driver : _Collector = driver_instance_cache.get(
device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
driver.Connect()
return driver
def preload_drivers(driver_instance_cache : DriverInstanceCache) -> None:
context_client = ContextClient()
devices = context_client.ListDevices(Empty())
for device in devices.devices: get_driver(driver_instance_cache, device)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class UnsatisfiedFilterException(Exception):
def __init__(self, filter_fields):
msg = 'No Driver satisfies FilterFields({:s})'
super().__init__(msg.format(str(filter_fields)))
class UnsupportedDriverClassException(Exception):
def __init__(self, driver_class_name):
msg = 'Class({:s}) is not a subclass of _Driver'
super().__init__(msg.format(str(driver_class_name)))
class UnsupportedFilterFieldException(Exception):
def __init__(self, unsupported_filter_fields, driver_class_name=None):
if driver_class_name:
msg = 'FilterFields({:s}) specified by Driver({:s}) are not supported'
msg = msg.format(str(unsupported_filter_fields), str(driver_class_name))
else:
msg = 'FilterFields({:s}) specified in Filter are not supported'
msg = msg.format(str(unsupported_filter_fields))
super().__init__(msg)
class UnsupportedFilterFieldValueException(Exception):
def __init__(self, filter_field_name, filter_field_value, allowed_filter_field_values, driver_class_name=None):
if driver_class_name:
msg = 'FilterField({:s}={:s}) specified by Driver({:s}) is not supported. Allowed values are {:s}'
msg = msg.format(
str(filter_field_name), str(filter_field_value), str(driver_class_name),
str(allowed_filter_field_values))
else:
msg = 'FilterField({:s}={:s}) specified in Filter is not supported. Allowed values are {:s}'
msg = msg.format(str(filter_field_name), str(filter_field_value), str(allowed_filter_field_values))
super().__init__(msg)
class DriverInstanceCacheTerminatedException(Exception):
def __init__(self):
msg = 'DriverInstanceCache is terminated. No new instances can be processed.'
super().__init__(msg)
class UnsupportedResourceKeyException(Exception):
def __init__(self, resource_key):
msg = 'ResourceKey({:s}) not supported'
msg = msg.format(str(resource_key))
super().__init__(msg)
class ConfigFieldNotFoundException(Exception):
def __init__(self, config_field_name):
msg = 'ConfigField({:s}) not specified in resource'
msg = msg.format(str(config_field_name))
super().__init__(msg)
class ConfigFieldsNotSupportedException(Exception):
def __init__(self, config_fields):
msg = 'ConfigFields({:s}) not supported in resource'
msg = msg.format(str(config_fields))
super().__init__(msg)
...@@ -11,3 +11,31 @@ ...@@ -11,3 +11,31 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from enum import Enum
from typing import Any, Dict, Optional
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import Device, DeviceDriverEnum
class FilterFieldEnum(Enum):
DEVICE_TYPE = 'device_type'
DRIVER = 'driver'
VENDOR = 'vendor'
MODEL = 'model'
SERIAL_NUMBER = 'serial_number'
# Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified
FILTER_FIELD_ALLOWED_VALUES = {
FilterFieldEnum.DEVICE_TYPE.value : {i.value for i in DeviceTypeEnum},
FilterFieldEnum.DRIVER.value : set(DeviceDriverEnum.values()),
FilterFieldEnum.VENDOR.value : None,
FilterFieldEnum.MODEL.value : None,
FilterFieldEnum.SERIAL_NUMBER.value : None,
}
def get_device_driver_filter_fields(device : Optional[Device]) -> Dict[FilterFieldEnum, Any]:
if device is None: return {}
return {
FilterFieldEnum.DEVICE_TYPE: device.device_type,
FilterFieldEnum.DRIVER : [driver for driver in device.device_drivers],
}
...@@ -18,14 +18,14 @@ from typing import Any, Iterator, List, Optional, Tuple, Union ...@@ -18,14 +18,14 @@ from typing import Any, Iterator, List, Optional, Tuple, Union
# Special resource names to request to the collector to retrieve the specified # Special resource names to request to the collector to retrieve the specified
# configuration/structural resources. # configuration/structural resources.
# These resource names should be used with GetConfig() method. # These resource names should be used with GetConfig() method.
RESOURCE_ENDPOINTS = '__endpoints__' RESOURCE_ENDPOINTS = '__endpoints__'
RESOURCE_INTERFACES = '__interfaces__' RESOURCE_INTERFACES = '__interfaces__'
RESOURCE_NETWORK_INSTANCES = '__network_instances__' RESOURCE_NETWORK_INSTANCES = '__network_instances__'
RESOURCE_ROUTING_POLICIES = '__routing_policies__' RESOURCE_ROUTING_POLICIES = '__routing_policies__'
RESOURCE_SERVICES = '__services__' RESOURCE_SERVICES = '__services__'
RESOURCE_ACL = '__acl__' RESOURCE_ACL = '__acl__'
RESOURCE_INVENTORY = '__inventory__' RESOURCE_INVENTORY = '__inventory__'
RESOURCE_RULES = "__rules__"
class _Collector: class _Collector:
def __init__(self, name : str, address: str, port: int, **settings) -> None: def __init__(self, name : str, address: str, port: int, **settings) -> None:
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import DeviceDriverEnum
from telemetry.backend.Config import LOAD_ALL_DEVICE_DRIVERS
from ..collector_api.FilterFields import FilterFieldEnum
DRIVERS = []
from .emulated.EmulatedCollector import EmulatedCollector # pylint: disable=wrong-import-position
DRIVERS.append(
(EmulatedCollector, [
# TODO: multi-filter is not working
{
FilterFieldEnum.DEVICE_TYPE: [
DeviceTypeEnum.EMULATED_P4_SWITCH,
DeviceTypeEnum.EMULATED_PACKET_ROUTER,
DeviceTypeEnum.EMULATED_PACKET_SWITCH,
],
FilterFieldEnum.DRIVER: [
DeviceDriverEnum.DEVICEDRIVER_UNDEFINED,
],
},
]))
if LOAD_ALL_DEVICE_DRIVERS:
from .gnmi_openconfig.GnmiOpenConfigCollector import GnmiOpenConfigCollector # pylint: disable=wrong-import-position
DRIVERS.append(
(GnmiOpenConfigCollector, [
{
# Real Packet Router, specifying OpenConfig Driver => use OpenConfigDriver
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.PACKET_ROUTER,
FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG,
}
]))
if LOAD_ALL_DEVICE_DRIVERS:
from .p4.p4_collector import P4Collector # pylint: disable=wrong-import-position
DRIVERS.append(
(P4Collector, [
{
# Real P4 Switch, specifying P4 Collector => use P4Collector
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.P4_SWITCH,
FilterFieldEnum.DRIVER : DeviceDriverEnum.DEVICEDRIVER_P4,
}
]))
...@@ -21,7 +21,7 @@ from apscheduler.jobstores.memory import MemoryJobStore ...@@ -21,7 +21,7 @@ from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Iterator, List, Tuple, Union, Optional from typing import Any, Iterator, List, Tuple, Union, Optional
from telemetry.backend.collector_api._Collector import _Collector from telemetry.backend.service.collector_api._Collector import _Collector
from .EmulatedHelper import EmulatedCollectorHelper from .EmulatedHelper import EmulatedCollectorHelper
from .SyntheticMetricsGenerator import SyntheticMetricsGenerator from .SyntheticMetricsGenerator import SyntheticMetricsGenerator
......
...@@ -16,7 +16,7 @@ import logging, queue, threading ...@@ -16,7 +16,7 @@ import logging, queue, threading
from typing import Any, Iterator, List, Optional, Tuple, Union from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_type from common.type_checkers.Checkers import chk_type
from telemetry.backend.collector_api._Collector import _Collector from telemetry.backend.service.collector_api._Collector import _Collector
from .GnmiSessionHandler import GnmiSessionHandler from .GnmiSessionHandler import GnmiSessionHandler
COLLECTOR_NAME = 'gnmi_openconfig' COLLECTOR_NAME = 'gnmi_openconfig'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment