diff --git a/my_deploy.sh b/my_deploy.sh index 9f671be3b853b48a65e0a443cd16cfc3aae780bf..85b50d397f265dbd33a2d49bcbaa287eb23fdd67 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -20,7 +20,8 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. -export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator" +#export TFS_COMPONENTS="context device automation monitoring pathcomp service slice compute webui load_generator" +export TFS_COMPONENTS="context device monitoring pathcomp service slice webui" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" diff --git a/src/device/service/drivers/openconfig/GnmiTelemetry.py b/src/device/service/drivers/openconfig/GnmiTelemetry.py new file mode 100644 index 0000000000000000000000000000000000000000..44b15d519d934fc5b464cf048145eebdcf8437d7 --- /dev/null +++ b/src/device/service/drivers/openconfig/GnmiTelemetry.py @@ -0,0 +1,166 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Ref: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md +# Ref: https://github.com/openconfig/gnmi/blob/master/proto/gnmi/gnmi.proto + +import anytree, logging, queue, threading +from typing import Any, Iterator, List, Optional, Tuple, Union +from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type +from device.service.driver_api._Driver import ( + RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES, + _Driver) + +LOGGER = logging.getLogger(__name__) + +SPECIAL_RESOURCE_MAPPINGS = { + RESOURCE_ENDPOINTS : '/endpoints', + RESOURCE_INTERFACES : '/interfaces', + RESOURCE_NETWORK_INSTANCES: '/net-instances', +} + +class MonitoringThread(threading.Thread): + def __init__(self, in_subscriptions : queue.Queue, out_samples : queue.Queue) -> None: + super().__init__(daemon=True) + self._in_subscriptions = in_subscriptions + self._out_samples = out_samples + + def run(self) -> None: + while True: + # TODO: req_iterator = generate_requests(self._in_subscriptions) + # TODO: stub.Subscribe(req_iterator) + self._out_samples.put_nowait((timestamp, resource_key, value)) + +class EmulatedDriver(_Driver): + def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called + self.__lock = threading.Lock() + + # endpoints = settings.get('endpoints', []) + + self.__started = threading.Event() + self.__terminate = threading.Event() + + self.__in_subscriptions = queue.Queue() + self.__out_samples = queue.Queue() + + self.__monitoring_thread = MonitoringThread(self.__in_subscriptions, self.__out_samples) + + def Connect(self) -> bool: + # If started, assume it is already connected + if self.__started.is_set(): return True + + # TODO: check capabilities + self.__monitoring_thread.start() + + # Indicate the driver is now connected to the device + self.__started.set() + return True + + def Disconnect(self) -> bool: + # Trigger termination of loops and processes + self.__terminate.set() + + # TODO: send unsubscriptions + # TODO: terminate monitoring thread + # TODO: disconnect gRPC + self.__monitoring_thread.join() + + # If not started, assume it is already disconnected + if not self.__started.is_set(): return True + return True + + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + with self.__lock: + return [] + + def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: + chk_type('resources', resource_keys, list) + with self.__lock: + results = [] + for i,resource_key in enumerate(resource_keys): + str_resource_name = 'resource_key[#{:d}]'.format(i) + try: + chk_string(str_resource_name, resource_key, allow_empty=False) + resource_key = SPECIAL_RESOURCE_MAPPINGS.get(resource_key, resource_key) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) + results.append((resource_key, e)) # if validation fails, store the exception + continue + + # TODO: if resource_key == '/endpoints': retornar lista de endpoints + # results.extend(endpoints) + return results + + def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + chk_type('subscriptions', subscriptions, list) + if len(subscriptions) == 0: return [] + results = [] + with self.__lock: + for i,subscription in enumerate(subscriptions): + str_subscription_name = 'subscriptions[#{:d}]'.format(i) + try: + chk_type(str_subscription_name, subscription, (list, tuple)) + chk_length(str_subscription_name, subscription, min_length=3, max_length=3) + resource_key,sampling_duration,sampling_interval = subscription + chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) + resource_path = resource_key.split('/') + chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) + chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) + results.append(e) # if validation fails, store the exception + continue + + # TODO: format subscription + # TODO: self.__in_subscriptions.put_nowait(subscription) + results.append(True) + return results + + def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + chk_type('subscriptions', subscriptions, list) + if len(subscriptions) == 0: return [] + results = [] + resolver = anytree.Resolver(pathattr='name') + with self.__lock: + for i,resource in enumerate(subscriptions): + str_subscription_name = 'resources[#{:d}]'.format(i) + try: + chk_type(str_subscription_name, resource, (list, tuple)) + chk_length(str_subscription_name, resource, min_length=3, max_length=3) + resource_key,sampling_duration,sampling_interval = resource + chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) + resource_path = resource_key.split('/') + chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) + chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) + results.append(e) # if validation fails, store the exception + continue + + # TODO: format unsubscription + # TODO: self.__in_subscriptions.put_nowait(unsubscription) + results.append(True) + return results + + def GetState(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Tuple[str, Any]]: + while True: + if self.__terminate.is_set(): break + if terminate is not None and terminate.is_set(): break + try: + sample = self.__out_samples.get(block=blocking, timeout=0.1) + except queue.Empty: + if blocking: continue + return + if sample is None: continue + yield sample diff --git a/src/device/service/drivers/openconfig/NetConfTelemetry.py b/src/device/service/drivers/openconfig/NetConfTelemetry.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/device/service/drivers/openconfig/NetConfTelemetry.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/device/service/drivers/openconfig/NetconfSessionHandler.py b/src/device/service/drivers/openconfig/NetconfSessionHandler.py new file mode 100644 index 0000000000000000000000000000000000000000..746f11d120088429b5bf007839400e09842c0c5c --- /dev/null +++ b/src/device/service/drivers/openconfig/NetconfSessionHandler.py @@ -0,0 +1,129 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, threading +from typing import Any, List, Tuple +from ncclient.manager import Manager, connect_ssh +from common.tools.client.RetryDecorator import delay_exponential +from common.type_checkers.Checkers import chk_length, chk_string, chk_type +from device.service.driver_api.Exceptions import UnsupportedResourceKeyException +from .templates import EMPTY_CONFIG, compose_config +from .RetryDecorator import retry + +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +LOGGER = logging.getLogger(__name__) + +class NetconfSessionHandler: + def __init__(self, address : str, port : int, **settings) -> None: + self.__lock = threading.RLock() + self.__connected = threading.Event() + self.__address = address + self.__port = int(port) + self.__username = settings.get('username') + self.__password = settings.get('password') + self.__vendor = settings.get('vendor') + self.__key_filename = settings.get('key_filename') + self.__hostkey_verify = settings.get('hostkey_verify', True) + self.__look_for_keys = settings.get('look_for_keys', True) + self.__allow_agent = settings.get('allow_agent', True) + self.__force_running = settings.get('force_running', False) + self.__commit_per_delete = settings.get('delete_rule', False) + self.__device_params = settings.get('device_params', {}) + self.__manager_params = settings.get('manager_params', {}) + self.__nc_params = settings.get('nc_params', {}) + self.__manager : Manager = None + self.__candidate_supported = False + + def connect(self): + with self.__lock: + self.__manager = connect_ssh( + host=self.__address, port=self.__port, username=self.__username, password=self.__password, + device_params=self.__device_params, manager_params=self.__manager_params, nc_params=self.__nc_params, + key_filename=self.__key_filename, hostkey_verify=self.__hostkey_verify, allow_agent=self.__allow_agent, + look_for_keys=self.__look_for_keys) + self.__candidate_supported = ':candidate' in self.__manager.server_capabilities + self.__connected.set() + + def disconnect(self): + if not self.__connected.is_set(): return + with self.__lock: + self.__manager.close_session() + + @property + def use_candidate(self): return self.__candidate_supported and not self.__force_running + + @property + def commit_per_rule(self): return self.__commit_per_delete + + @property + def vendor(self): return self.__vendor + + @RETRY_DECORATOR + def get(self, filter=None, with_defaults=None): # pylint: disable=redefined-builtin + with self.__lock: + return self.__manager.get(filter=filter, with_defaults=with_defaults) + + @RETRY_DECORATOR + def edit_config( + self, config, target='running', default_operation=None, test_option=None, + error_option=None, format='xml' # pylint: disable=redefined-builtin + ): + if config == EMPTY_CONFIG: return + with self.__lock: + self.__manager.edit_config( + config, target=target, default_operation=default_operation, test_option=test_option, + error_option=error_option, format=format) + + def locked(self, target): + return self.__manager.locked(target=target) + + def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): + return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) + +def edit_config( + netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False, + target='running', default_operation='merge', test_option=None, error_option=None, + format='xml' # pylint: disable=redefined-builtin +): + str_method = 'DeleteConfig' if delete else 'SetConfig' + LOGGER.info('[{:s}] resources = {:s}'.format(str_method, str(resources))) + results = [None for _ in resources] + for i,resource in enumerate(resources): + str_resource_name = 'resources[#{:d}]'.format(i) + try: + LOGGER.info('[{:s}] resource = {:s}'.format(str_method, str(resource))) + chk_type(str_resource_name, resource, (list, tuple)) + chk_length(str_resource_name, resource, min_length=2, max_length=2) + resource_key,resource_value = resource + chk_string(str_resource_name + '.key', resource_key, allow_empty=False) + str_config_message = compose_config( + resource_key, resource_value, delete=delete, vendor=netconf_handler.vendor) + if str_config_message is None: raise UnsupportedResourceKeyException(resource_key) + LOGGER.info('[{:s}] str_config_message[{:d}] = {:s}'.format( + str_method, len(str_config_message), str(str_config_message))) + netconf_handler.edit_config( + config=str_config_message, target=target, default_operation=default_operation, + test_option=test_option, error_option=error_option, format=format) + if commit_per_rule: + netconf_handler.commit() + results[i] = True + except Exception as e: # pylint: disable=broad-except + str_operation = 'preparing' if target == 'candidate' else ('deleting' if delete else 'setting') + msg = '[{:s}] Exception {:s} {:s}: {:s}' + LOGGER.exception(msg.format(str_method, str_operation, str_resource_name, str(resource))) + results[i] = e # if validation fails, store the exception + return results diff --git a/src/device/service/drivers/openconfig/OpenConfigDriver.py b/src/device/service/drivers/openconfig/OpenConfigDriver.py index d128a15e575e060a696c2f64529751c7c923b8dc..f96faf9c327886552ec5e73847a3b3ced4391ee9 100644 --- a/src/device/service/drivers/openconfig/OpenConfigDriver.py +++ b/src/device/service/drivers/openconfig/OpenConfigDriver.py @@ -652,4 +652,4 @@ class OpenConfigDriver(_Driver): if blocking: continue return if sample is None: continue - yield sample + yield sample \ No newline at end of file diff --git a/src/device/service/drivers/openconfig/SamplesCache.py b/src/device/service/drivers/openconfig/SamplesCache.py new file mode 100644 index 0000000000000000000000000000000000000000..24dc33663f1676e0ccf2096ec71b7f6448322740 --- /dev/null +++ b/src/device/service/drivers/openconfig/SamplesCache.py @@ -0,0 +1,100 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Collection of samples through NetConf is very slow and each request collects all the data. +# Populate a cache periodically (when first interface is interrogated). +# Evict data after some seconds, when data is considered as outdated + +import copy, queue, logging, re, threading +from datetime import datetime +from typing import Dict, Tuple +from .templates import get_filter, parse +from .NetconfSessionHandler import NetconfSessionHandler + +SAMPLE_EVICTION_SECONDS = 30.0 # seconds +SAMPLE_RESOURCE_KEY = 'interfaces/interface/state/counters' + +RE_GET_ENDPOINT_FROM_INTERFACE_KEY = re.compile(r'.*interface\[([^\]]+)\].*') +RE_GET_ENDPOINT_FROM_INTERFACE_XPATH = re.compile(r".*interface\[oci\:name\='([^\]]+)'\].*") + +LOGGER = logging.getLogger(__name__) + +def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): + if previous_sample is None: return None + if previous_timestamp is None: return None + if current_sample is None: return None + if current_timestamp is None: return None + delay = current_timestamp - previous_timestamp + field_keys = set(previous_sample.keys()).union(current_sample.keys()) + field_keys.discard('name') + delta_sample = {'name': previous_sample['name']} + for field_key in field_keys: + previous_sample_value = previous_sample[field_key] + if not isinstance(previous_sample_value, (int, float)): continue + current_sample_value = current_sample[field_key] + if not isinstance(current_sample_value, (int, float)): continue + delta_value = current_sample_value - previous_sample_value + if delta_value < 0: continue + delta_sample[field_key] = delta_value / delay + return delta_sample + +class SamplesCache: + def __init__(self, netconf_handler : NetconfSessionHandler) -> None: + self.__netconf_handler = netconf_handler + self.__lock = threading.Lock() + self.__timestamp = None + self.__absolute_samples = {} + self.__delta_samples = {} + + def _refresh_samples(self) -> None: + with self.__lock: + try: + now = datetime.timestamp(datetime.utcnow()) + if self.__timestamp is not None and (now - self.__timestamp) < SAMPLE_EVICTION_SECONDS: return + str_filter = get_filter(SAMPLE_RESOURCE_KEY) + xml_data = self.__netconf_handler.get(filter=str_filter).data_ele + interface_samples = parse(SAMPLE_RESOURCE_KEY, xml_data) + for interface,samples in interface_samples: + match = RE_GET_ENDPOINT_FROM_INTERFACE_KEY.match(interface) + if match is None: continue + interface = match.group(1) + delta_sample = compute_delta_sample( + self.__absolute_samples.get(interface), self.__timestamp, samples, now) + if delta_sample is not None: self.__delta_samples[interface] = delta_sample + self.__absolute_samples[interface] = samples + self.__timestamp = now + except: # pylint: disable=bare-except + LOGGER.exception('Error collecting samples') + + def get(self, resource_key : str) -> Tuple[float, Dict]: + self._refresh_samples() + match = RE_GET_ENDPOINT_FROM_INTERFACE_XPATH.match(resource_key) + with self.__lock: + if match is None: return self.__timestamp, {} + interface = match.group(1) + return self.__timestamp, copy.deepcopy(self.__delta_samples.get(interface, {})) + +def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : queue.Queue) -> None: + try: + timestamp, samples = samples_cache.get(resource_key) + counter_name = resource_key.split('/')[-1].split(':')[-1] + value = samples.get(counter_name) + if value is None: + LOGGER.warning('[do_sampling] value not found for {:s}'.format(resource_key)) + return + # resource_key template: //oci:interfaces/oci:interface[oci:name='{:s}']/state/counters/{:s} + sample = (timestamp, resource_key, value) + out_samples.put_nowait(sample) + except: # pylint: disable=bare-except + LOGGER.exception('Error retrieving samples') diff --git a/src/device/service/drivers/openconfig/TelemetryProtocolEnum.py b/src/device/service/drivers/openconfig/TelemetryProtocolEnum.py new file mode 100644 index 0000000000000000000000000000000000000000..e5927848fee942cc37ab883a8c9b29b0efbe33ad --- /dev/null +++ b/src/device/service/drivers/openconfig/TelemetryProtocolEnum.py @@ -0,0 +1,27 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum +from typing import Optional + +class TelemetryProtocolEnum(enum.Enum): + GNMI = 'gnmi' + NETCONF = 'netconf' + +DEFAULT_TELEMETRY_PROTOCOL = TelemetryProtocolEnum.NETCONF + +def parse_telemetry_protocol(telemetry_protocol : Optional[str] = None) -> TelemetryProtocolEnum: + if telemetry_protocol is None: return DEFAULT_TELEMETRY_PROTOCOL + # pylint: disable=no-member + return TelemetryProtocolEnum._member_map_.get(telemetry_protocol, DEFAULT_TELEMETRY_PROTOCOL) diff --git a/src/device/service/drivers/openconfig/_Telemetry.py b/src/device/service/drivers/openconfig/_Telemetry.py new file mode 100644 index 0000000000000000000000000000000000000000..efd05993b2bbcac8441f3022b659e40a8c54323e --- /dev/null +++ b/src/device/service/drivers/openconfig/_Telemetry.py @@ -0,0 +1,27 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class _Telemetry: + def __init__(self) -> None: + pass + + def subscribe(self) -> None: + pass + + def unsubscribe(self) -> None: + pass + + def get_samples_queue(self) -> None: + pass