Commit 161e2f94 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Updated Telemetry Backend

- Add GNMI OpenConfig collector
- Modify _Collector class
- Added test script
- Updated requirement file
parent 6573fa12
Loading
Loading
Loading
Loading
+29 −0
Original line number Diff line number Diff line
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


PROJECTDIR=`pwd`

cd $PROJECTDIR/src
# RCFILE=$PROJECTDIR/coverage/.coveragerc

# export KFK_SERVER_ADDRESS='127.0.0.1:9092'
# CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
# export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require"
RCFILE=$PROJECTDIR/coverage/.coveragerc

python3 -m pytest --log-level=debug --log-cli-level=info --verbose \
    telemetry/backend/tests/test_gnmi_collector.py
+13 −19
Original line number Diff line number Diff line
@@ -135,26 +135,20 @@ class _Collector:
        """
        raise NotImplementedError()

    def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> \
    def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> \
                List[Union[bool, Exception]]:
        """ Subscribe to state information of the entire device or selected resources. 
            Subscriptions are incremental, and the collector should keep track of requested resources.
                    List of tuples, each containing:
                        - resource_id (str): Identifier pointing to the resource to be subscribed.
                        - resource_dict (dict): Dictionary containing resource name, KPI to be subscribed, and type.
                        - sampling_duration (float): Duration (in seconds) for how long monitoring should last.
                        - sampling_interval (float): Desired monitoring interval (in seconds) for the specified resource.
                    List of results for the requested resource key subscriptions. 
                    The return values are in the same order as the requested resource keys.
                    - True if a resource is successfully subscribed.
                    - Exception if an error occurs during the subscription process.
            List[Union[bool, Exception]]:
        """ Subscribe to state information of entire device or
        selected resources. Subscriptions are incremental.
            Collector should keep track of requested resources.
            Parameters:
                subscriptions : List[Tuple[str, float, float]]
                    List of tuples, each containing a resource_key pointing the
                    resource to be subscribed, a sampling_duration, and a
                    sampling_interval (both in seconds with float
                    representation) defining, respectively, for how long
                    monitoring should last, and the desired monitoring interval
                    for the resource specified.
            Returns:
                results : List[Union[bool, Exception]]
                    List of results for resource key subscriptions requested.
                    Return values must be in the same order as the resource keys
                    requested. If a resource is properly subscribed,
                    True must be retrieved; otherwise, the Exception that is
                    raised during the processing must be retrieved.
        """
        raise NotImplementedError()

+25 −0
Original line number Diff line number Diff line
This code is partially based on:
https://github.com/nokia/pygnmi/blob/master/gNMI_Subscribe.py


MIT License

Copyright (c) 2017 Nokia

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
 No newline at end of file
+81 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, queue, threading
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_type
from telemetry.backend.collector_api._Collector import _Collector
from .GnmiSessionHandler import GnmiSessionHandler

COLLECTOR_NAME = 'gnmi_openconfig'
METRICS_POOL   = MetricsPool('Telemtery', 'Collectors', labels={'collector': COLLECTOR_NAME})

class GnmiOpenConfigCollector(_Collector):
    def __init__(self, address : str, port : int, **settings) -> None:
        super().__init__(COLLECTOR_NAME, address, port, **settings)
        self.__logger      = logging.getLogger('{:s}:[{:s}:{:s}]'.format(str(__name__), str(self.address), str(self.port)))
        self.__lock        = threading.Lock()
        self.__started     = threading.Event()
        self.__terminate   = threading.Event()
        self.__handler     = GnmiSessionHandler(self.address, self.port, settings, self.__logger)
        self.__out_samples = self.__handler.out_samples

    def Connect(self) -> bool:
        with self.__lock:
            if self.__started.is_set(): return True
            self.__handler.connect()
            self.__started.set()
            return True

    def Disconnect(self) -> bool:
        with self.__lock:
            # Trigger termination of loops and processes
            self.__terminate.set()
            # If not started, assume it is already disconnected
            if not self.__started.is_set(): return True
            self.__handler.disconnect()
            return True

    @metered_subclass_method(METRICS_POOL)
    def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
        chk_type('resources', resource_keys, list)
        with self.__lock:
            return self.__handler.get(resource_keys)

    @metered_subclass_method(METRICS_POOL)
    def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        chk_type('subscriptions', subscriptions, list)
        if len(subscriptions) == 0: return []
        with self.__lock:
            return self.__handler.subscribe(subscriptions)

    @metered_subclass_method(METRICS_POOL)
    def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        chk_type('subscriptions', subscriptions, list)
        if len(subscriptions) == 0: return []
        with self.__lock:
            return self.__handler.unsubscribe(subscriptions)

    def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[float, str, Any]]:
        while True:
            if self.__terminate.is_set(): break
            if terminate is not None and terminate.is_set(): break
            try:
                sample = self.__out_samples.get(block=blocking, timeout=0.1)
            except queue.Empty:
                if blocking: continue
                return
            if sample is None: continue
            yield sample
+171 −0
Original line number Diff line number Diff line
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy, grpc, json, logging, queue, threading
from typing import Any, Dict, List, Optional, Tuple, Union
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
from .gnmi.gnmi_pb2_grpc import gNMIStub
from .gnmi.gnmi_pb2 import Encoding, GetRequest
from .handlers import ALL_RESOURCE_KEYS, compose, get_path, parse
from .handlers.YangHandler import YangHandler
from .tools.Capabilities import check_capabilities
from .tools.Channel import get_grpc_channel
from .tools.Path import path_from_string, path_to_string #, compose_path
from .tools.Subscriptions import Subscriptions
from .tools.Value import decode_value #, value_exists

class GnmiSessionHandler:
    def __init__(self, address : str, port : int, settings : Dict, logger : logging.Logger) -> None:
        self._address   = address
        self._port      = port
        self._settings  = copy.deepcopy(settings)
        self._logger    = logger
        self._lock      = threading.Lock()
        self._connected = threading.Event()
        self._username  = settings.get('username')
        self._password  = settings.get('password')
        self._use_tls   = settings.get('use_tls', False)
        self._channel : Optional[grpc.Channel] = None
        self._stub : Optional[gNMIStub] = None
        self._yang_handler = YangHandler()
        self._subscriptions = Subscriptions()
        self._in_subscriptions = queue.Queue()
        self._out_samples = queue.Queue()

    def __del__(self) -> None:
        self._logger.info('Destroying YangValidator...')
        if self._yang_handler is not None:
            self._logger.debug('yang_validator.data:')
            for path, dnode in self._yang_handler.get_data_paths().items():
                self._logger.debug('  {:s}: {:s}'.format(str(path), json.dumps(dnode.print_dict())))
            self._yang_handler.destroy()
        self._logger.info('DONE')

    @property
    def subscriptions(self): return self._subscriptions

    @property
    def in_subscriptions(self): return self._in_subscriptions

    @property
    def out_samples(self): return self._out_samples

    def connect(self):
        with self._lock:
            self._channel = get_grpc_channel(self._address, self._port, self._use_tls, self._logger)
            self._stub = gNMIStub(self._channel)
            check_capabilities(self._stub, self._username, self._password, timeout=120) # type: ignore
            self._connected.set()

    def disconnect(self):
        if not self._connected.is_set(): return
        with self._lock:
            self._channel.close() # type: ignore
            self._connected.clear()

    def get(self, resource_keys : List[str]) -> List[Tuple[str, Union[Any, None, Exception]]]:
        if len(resource_keys) == 0: resource_keys = ALL_RESOURCE_KEYS
        chk_type('resources', resource_keys, list)

        parsing_results = []

        get_request = GetRequest()
        get_request.type = GetRequest.DataType.ALL
        get_request.encoding = Encoding.JSON_IETF
        #get_request.use_models.add() # kept empty: return for all models supported
        for i,resource_key in enumerate(resource_keys):
            str_resource_name = 'resource_key[#{:d}]'.format(i)
            try:
                chk_string(str_resource_name, resource_key, allow_empty=False)
                self._logger.debug('[GnmiSessionHandler:get] resource_key = {:s}'.format(str(resource_key)))
                str_path = get_path(resource_key)
                self._logger.debug('[GnmiSessionHandler:get] str_path = {:s}'.format(str(str_path)))
                get_request.path.append(path_from_string(str_path))
            except Exception as e: # pylint: disable=broad-except
                MSG = 'Exception parsing {:s}: {:s}'
                self._logger.exception(MSG.format(str_resource_name, str(resource_key)))
                parsing_results.append((resource_key, e)) # if validation fails, store the exception

        self._logger.debug('parsing_results={:s}'.format(str(parsing_results)))

        if len(parsing_results) > 0:
            return parsing_results

        metadata = [('username', self._username), ('password', self._password)]
        timeout = None # GNMI_SUBSCRIPTION_TIMEOUT = int(sampling_duration)
        get_reply = self._stub.Get(get_request, metadata=metadata, timeout=timeout)
        self._logger.debug('get_reply={:s}'.format(grpc_message_to_json_string(get_reply)))

        results = []
        #results[str_filter] = [i, None, False]  # (index, value, processed?)

        for notification in get_reply.notification:
            for update in notification.update:
                self._logger.debug('update={:s}'.format(grpc_message_to_json_string(update)))
                str_path = path_to_string(update.path)
                try:
                    value = decode_value(update.val)
                    #resource_key_tuple[1] = value
                    #resource_key_tuple[2] = True
                    results.extend(parse(str_path, value, self._yang_handler))
                except Exception as e: # pylint: disable=broad-except
                    MSG = 'Exception processing update {:s}'
                    self._logger.exception(MSG.format(grpc_message_to_json_string(update)))
                    results.append((str_path, e)) # if validation fails, store the exception
        return results

    def subscribe(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        results = []
        for i,subscription in enumerate(subscriptions):
            str_subscription_name = 'subscriptions[#{:d}]'.format(i)
            try:
                chk_type(str_subscription_name, subscription, {list, tuple})
                chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
                resource_key, sampling_duration, sampling_interval = subscription
                chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
                chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
                chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
            except Exception as e: # pylint: disable=broad-except
                MSG = 'Exception validating {:s}: {:s}'
                self._logger.exception(MSG.format(str_subscription_name, str(resource_key)))
                results.append(e) # if validation fails, store the exception
                continue

            subscription = 'subscribe', resource_key, sampling_duration, sampling_interval
            self._in_subscriptions.put_nowait(subscription)
            results.append(True)
        return results

    def unsubscribe(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
        results = []
        for i,subscription in enumerate(subscriptions):
            str_subscription_name = 'subscriptions[#{:d}]'.format(i)
            try:
                chk_type(str_subscription_name, subscription, {list, tuple})
                chk_length(str_subscription_name, subscription, min_length=3, max_length=3)
                resource_key, sampling_duration, sampling_interval = subscription
                chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False)
                chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0)
                chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0)
            except Exception as e: # pylint: disable=broad-except
                MSG = 'Exception validating {:s}: {:s}'
                self._logger.exception(MSG.format(str_subscription_name, str(resource_key)))
                results.append(e) # if validation fails, store the exception
                continue

            subscription = 'unsubscribe', resource_key, sampling_duration, sampling_interval
            self._in_subscriptions.put_nowait(subscription)
            results.append(True)
        return results
Loading