diff --git a/manifests/deviceservice.yaml b/manifests/deviceservice.yaml index ca2c81f0f2e5d874066464ab0537adeec734cfbb..ddcc997cd9a23c99fae4001e3594957f867b3061 100644 --- a/manifests/deviceservice.yaml +++ b/manifests/deviceservice.yaml @@ -36,7 +36,7 @@ spec: - containerPort: 9192 env: - name: LOG_LEVEL - value: "INFO" + value: "DEBUG" readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:2020"] diff --git a/manifests/monitoringservice.yaml b/manifests/monitoringservice.yaml index 4447a1427980be6554228087924bf8e4ca775758..06ac823a169c2b08f46a225db3fe04defe7e87f4 100644 --- a/manifests/monitoringservice.yaml +++ b/manifests/monitoringservice.yaml @@ -36,7 +36,7 @@ spec: - containerPort: 9192 env: - name: LOG_LEVEL - value: "INFO" + value: "DEBUG" envFrom: - secretRef: name: qdb-data diff --git a/my_deploy.sh b/my_deploy.sh index 6f0e64afe311b8e56446caabfac6329024c207a9..9f671be3b853b48a65e0a443cd16cfc3aae780bf 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -57,7 +57,7 @@ export CRDB_DATABASE="tfs" export CRDB_DEPLOY_MODE="single" # Disable flag for dropping database, if exists. -export CRDB_DROP_DATABASE_IF_EXISTS="" +export CRDB_DROP_DATABASE_IF_EXISTS="YES" # Disable flag for re-deploying CockroachDB from scratch. export CRDB_REDEPLOY="" @@ -87,7 +87,7 @@ export QDB_PASSWORD="quest" export QDB_TABLE="tfs_monitoring" ## If not already set, disable flag for dropping table if exists. -#export QDB_DROP_TABLE_IF_EXISTS="" +export QDB_DROP_TABLE_IF_EXISTS="" # If not already set, disable flag for re-deploying QuestDB from scratch. -export QDB_REDEPLOY="" +export QDB_REDEPLOY="YES" diff --git a/src/device/service/drivers/openconfig/OpenConfigDriver.py b/src/device/service/drivers/openconfig/OpenConfigDriver.py index ef3d0728d5ed02ea4a15ba0c3ccd6f1428cab7df..d128a15e575e060a696c2f64529751c7c923b8dc 100644 --- a/src/device/service/drivers/openconfig/OpenConfigDriver.py +++ b/src/device/service/drivers/openconfig/OpenConfigDriver.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import anytree, copy, logging, pytz, queue, re, threading +import anytree, copy, logging, pytz, queue, re, threading, json, os, sys #import lxml.etree as ET from datetime import datetime, timedelta from typing import Any, Dict, Iterator, List, Optional, Tuple, Union @@ -31,6 +31,15 @@ from device.service.driver_api.AnyTreeTools import TreeNode, get_subnode, set_su from .templates import ALL_RESOURCE_KEYS, EMPTY_CONFIG, compose_config, get_filter, parse from .RetryDecorator import retry +import grpc +from google.protobuf.json_format import MessageToJson + +gnmi_path__ = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(gnmi_path__) +import gnmi_pb2_grpc +import gnmi_pb2 + + DEBUG_MODE = False logging.getLogger('ncclient.manager').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING) logging.getLogger('ncclient.transport.ssh').setLevel(logging.DEBUG if DEBUG_MODE else logging.WARNING) @@ -56,6 +65,7 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, class NetconfSessionHandler: def __init__(self, address : str, port : int, **settings) -> None: + mensaje = f"__init__: address={address}, port={port}, settings={settings}" self.__lock = threading.RLock() self.__connected = threading.Event() self.__address = address @@ -121,6 +131,182 @@ class NetconfSessionHandler: def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) +class gNMISessionHandler: + def __init__(self, address : str, **settings) -> None: + self.__lock = threading.RLock() + self.__connected = threading.Event() + self.__address = address + self.__port = settings.get('gnmi_port') + self.__username = settings.get('username') + self.__password = settings.get('password') + self.__vendor = settings.get('vendor') + self.__key_filename = settings.get('key_filename') + self.__hostkey_verify = settings.get('hostkey_verify', True) + self.__look_for_keys = settings.get('look_for_keys', True) + self.__allow_agent = settings.get('allow_agent', True) + self.__force_running = settings.get('force_running', False) + self.__commit_per_delete = settings.get('delete_rule', False) + self.__device_params = settings.get('device_params', {}) + self.__manager_params = settings.get('manager_params', {}) + self.__nc_params = settings.get('nc_params', {}) + self.__stub = None + self.__candidate_supported = False + self.__channel = None + self.__supportedEncodings = None + self.__options = Options() + + def connect(self): + with self.__lock: + self.__channel = grpc.insecure_channel(str(self.__address)+':'+self.__port) + self.__stub = gnmi_pb2_grpc.gNMIStub(self.__channel) + metadata = [('username',self.__username ), ('password', self.__password)] + req = gnmi_pb2.CapabilityRequest() + response = self.__stub.Capabilities(req, metadata=metadata) + data = json.loads(MessageToJson(response)) + self.__supportedEncodings = data['supportedEncodings'] + # TODO: self.__candidate_supported = + self.__connected.set() + + def disconnect(self): + if not self.__connected.is_set(): return + with self.__lock: + self.__channel.close() + + def subscribeStreaming(self, subscription : Tuple[str, float, float], out_samples : queue.Queue) -> None: + resource_key, sampling_duration, sampling_interval = subscription + options = copy.deepcopy(self.__options) + options.xpaths = [parse_xpath(resource_key)] + options.timeout = int(sampling_duration) + options.interval = int(sampling_interval) + req_iterator = gen_request(options) + metadata = [('username',self.__username), ('password', self.__password)] + responses = self.__stub.Subscribe(req_iterator, self.__options.timeout, metadata=metadata) + previous_sample = None + delta = 0.0 + previous_timestamp = datetime.timestamp(datetime.utcnow()) + for response in responses: + data = json.loads(MessageToJson(response)) + if data.get("update") is not None and data.get("update").get("update") != None: + now = datetime.timestamp(datetime.utcnow()) + for element in data['update']['update']: + counter_name = split_resource_key(dict_to_xpath(element['path'])) + if counter_name == split_resource_key(resource_key): + value = int(element['val']['uintVal']) + delay = now - previous_timestamp + if previous_sample is not None: delta = (value - previous_sample)/delay + previous_sample = int(value) + previous_timestamp = now + sample = (now, resource_key, delta) + out_samples.put_nowait(sample) + + @property + def use_candidate(self): return self.__candidate_supported and not self.__force_running + + @property + def commit_per_rule(self): return self.__commit_per_delete + + @property + def vendor(self): return self.__vendor + + @RETRY_DECORATOR + def get(self): # pylint: disable=redefined-builtin + return False + + @RETRY_DECORATOR + def edit_config( + self, config, target='running', default_operation=None, test_option=None, + error_option=None, format='xml' # pylint: disable=redefined-builtin + ): + if config == EMPTY_CONFIG: return + with self.__lock: + self.__manager.edit_config( + config, target=target, default_operation=default_operation, test_option=test_option, + error_option=error_option, format=format) + + def locked(self, target): + return self.__manager.locked(target=target) + + def commit(self, confirmed=False, timeout=None, persist=None, persist_id=None): + return self.__manager.commit(confirmed=confirmed, timeout=timeout, persist=persist, persist_id=persist_id) + +def path_from_string(path='/'): + if path: + if path[0]=='/': + if path[-1]=='/': + path_list = re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[1:-1] + else: + path_list = re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[1:] + else: + if path[-1]=='/': + path_list = re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path)[:-1] + else: + path_list = re.split('''/(?=(?:[^\[\]]|\[[^\[\]]+\])*$)''', path) + else: + return gnmi_pb2.Path(elem=[]) + + mypath = [] + + for e in path_list: + eName = e.split("[", 1)[0] + eKeys = re.findall('\[(.*?)\]', e) + dKeys = dict(x.split('=', 1) for x in eKeys) + mypath.append(gnmi_pb2.PathElem(name=eName, key=dKeys)) + + return gnmi_pb2.Path(elem=mypath) + +def gen_request(options): + + mysubs = [] + path = options.xpaths[0] + mypath = path_from_string(path) + mysub = gnmi_pb2.Subscription(path=mypath, mode=options.submode, suppress_redundant=options.suppress, sample_interval=options.interval*1000000000, heartbeat_interval=options.heartbeat) + mysubs.append(mysub) + + if options.prefix: + myprefix = path_from_string(options.prefix) + else: + myprefix = None + + if options.qos: + myqos = gnmi_pb2.QOSMarking(marking=options.qos) + else: + myqos = None + + mysblist = gnmi_pb2.SubscriptionList(prefix=myprefix, mode=options.mode, allow_aggregation=options.aggregate, encoding=options.encoding, subscription=mysubs, qos=myqos) + mysubreq = gnmi_pb2.SubscribeRequest( subscribe=mysblist ) + + yield mysubreq + +def parse_xpath(xpath): + xpath = xpath.replace("//", "/") + xpath = xpath.replace("oci:interface[", "interface[") + xpath = xpath.replace("/oci", "/openconfig-interfaces") + xpath = re.sub(r"\[oci:name='(.*?)'\]", r"[name=\1]", xpath) + # Eliminar el contador del final + xpath = "/".join(xpath.split("/")[:-1]) + "/" + return xpath + +def split_resource_key(path): + pattern = r"/state/counters/(.*)" + match = re.search(pattern, path) + if match: + return match.group(1) + else: + return None + +def dict_to_xpath(d: dict) -> str: + xpath = '/' + for item in d['elem']: + name = item.get('name') + if name == 'interface': + key = item.get('key') + interface_name = key.get('name') + xpath += f"/oci:interface[oci:name='{interface_name}']" + else: + xpath += f"/{name}" + xpath = xpath.replace('openconfig-interfaces', 'oci') + return xpath + def compute_delta_sample(previous_sample, previous_timestamp, current_sample, current_timestamp): if previous_sample is None: return None if previous_timestamp is None: return None @@ -141,12 +327,13 @@ def compute_delta_sample(previous_sample, previous_timestamp, current_sample, cu return delta_sample class SamplesCache: - def __init__(self, netconf_handler : NetconfSessionHandler) -> None: + def __init__(self, netconf_handler : NetconfSessionHandler, gNMI_handler : gNMISessionHandler) -> None: self.__netconf_handler = netconf_handler self.__lock = threading.Lock() self.__timestamp = None self.__absolute_samples = {} self.__delta_samples = {} + self.__gNMI_handler = gNMI_handler def _refresh_samples(self) -> None: with self.__lock: @@ -189,6 +376,24 @@ def do_sampling(samples_cache : SamplesCache, resource_key : str, out_samples : except: # pylint: disable=bare-except LOGGER.exception('Error retrieving samples') +class Options: + def __init__(self, xpaths=None, prefix=None, mode=0, submode=0, suppress=False, interval=0, + encoding='JSON', heartbeat=0, qos=None, aggregate=False, server=None, username='admin', password='admin', timeout=None): + self.xpaths = xpaths + self.prefix = prefix + self.mode = mode + self.submode = submode + self.suppress = suppress + self.interval = interval + self.encoding = encoding + self.heartbeat = heartbeat + self.qos = qos + self.aggregate = aggregate + self.server = server + self.username = username + self.password = password + self.timeout = timeout + def edit_config( netconf_handler : NetconfSessionHandler, resources : List[Tuple[str, Any]], delete=False, commit_per_rule= False, target='running', default_operation='merge', test_option=None, error_option=None, @@ -249,6 +454,7 @@ class OpenConfigDriver(_Driver): self.__subscriptions = TreeNode('.') self.__started = threading.Event() self.__terminate = threading.Event() + self.__gnmi_monitoring = settings.get('monitoring_protocol') == 'gnmi' self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events self.__scheduler.configure( jobstores = {'default': MemoryJobStore()}, @@ -257,12 +463,16 @@ class OpenConfigDriver(_Driver): timezone=pytz.utc) self.__out_samples = queue.Queue() self.__netconf_handler : NetconfSessionHandler = NetconfSessionHandler(address, port, **settings) - self.__samples_cache = SamplesCache(self.__netconf_handler) + self.__gNMI_handler : gNMISessionHandler = gNMISessionHandler(address, **settings) + self.__samples_cache = SamplesCache(self.__netconf_handler, self.__gNMI_handler) def Connect(self) -> bool: with self.__lock: if self.__started.is_set(): return True self.__netconf_handler.connect() + if self.__gnmi_monitoring: + self.__gNMI_handler.connect() + LOGGER.debug('Using gNMI as monitoring protocol') # Connect triggers activation of sampling events that will be scheduled based on subscriptions self.__scheduler.start() self.__started.set() @@ -276,7 +486,7 @@ class OpenConfigDriver(_Driver): if not self.__started.is_set(): return True # Disconnect triggers deactivation of sampling events self.__scheduler.shutdown() - self.__netconf_handler.disconnect() + if self.__gnmi_monitoring: self.__netconf_handler.disconnect() return True @metered_subclass_method(METRICS_POOL) @@ -373,7 +583,13 @@ class OpenConfigDriver(_Driver): end_date = start_date + timedelta(seconds=sampling_duration) job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval) - job = self.__scheduler.add_job( + + if self.__gnmi_monitoring: + LOGGER.debug('Processing gNMI subscription: '+ str(subscription)) + job = threading.Thread(target=self.__gNMI_handler.subscribeStreaming, args=(subscription, self.__out_samples)) + job.start() + else: + job = self.__scheduler.add_job( do_sampling, args=(self.__samples_cache, resource_key, self.__out_samples), kwargs={}, id=job_id, trigger='interval', seconds=sampling_interval, start_date=start_date, end_date=end_date, timezone=pytz.utc)