diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index c77d9683a2372435779db520f9f4c537d5e012b0..e0176e0266ad6239dabb3aeedc273ddc0b638ded 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -9,13 +9,14 @@ Jinja2==3.0.3 ncclient==0.6.13 p4runtime==1.3.0 paramiko==2.9.2 -influx-line-protocol==0.1.4 +# influx-line-protocol==0.1.4 python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2021.3 redis==4.1.2 requests==2.27.1 xmltodict==0.12.0 +questdb==1.0.1 # pip's dependency resolver does not take into account installed packages. # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index d7d219a95130d19a352e2307f35b87e0b6a07be8..dc194c430c9700a2d89e0757c75c64025082ac29 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -12,18 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from influx_line_protocol import Metric -import socket +from questdb.ingress import Sender, IngressError import requests import json -import sys import logging +import datetime LOGGER = logging.getLogger(__name__) class MetricsDB(): def __init__(self, host, ilp_port, rest_port, table): - self.socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.host=host self.ilp_port=int(ilp_port) self.rest_port=rest_port @@ -31,19 +29,30 @@ class MetricsDB(): self.create_table() def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): - self.socket.connect((self.host,self.ilp_port)) - metric = Metric(self.table) - metric.with_timestamp(time) - metric.add_tag('kpi_id', kpi_id) - metric.add_tag('kpi_sample_type', kpi_sample_type) - metric.add_tag('device_id', device_id) - metric.add_tag('endpoint_id', endpoint_id) - metric.add_tag('service_id', service_id) - metric.add_value('kpi_value', kpi_value) - str_metric = str(metric) - str_metric += "\n" - self.socket.sendall((str_metric).encode()) - self.socket.close() + counter=0 + number_of_retries=10 + while (counter<number_of_retries): + try: + with Sender(self.host, self.ilp_port) as sender: + sender.row( + self.table, + symbols={ + 'kpi_id': kpi_id, + 'kpi_sample_type': kpi_sample_type, + 'device_id': device_id, + 'endpoint_id': endpoint_id, + 'service_id': service_id}, + columns={ + 'kpi_value': kpi_value}, + at=datetime.datetime.fromtimestamp(time)) + sender.flush() + counter=number_of_retries + LOGGER.info(f"KPI written") + except IngressError as ierr: + # LOGGER.info(ierr) + # LOGGER.info(f"Ingress Retry number {counter}") + counter=counter+1 + def run_query(self, sql_query): query_params = {'query': sql_query, 'fmt' : 'json'}