Commit 73fd6fe5 authored by Sergio Gonzalez Diaz's avatar Sergio Gonzalez Diaz
Browse files

Substitute InfluxDB for QuestDB

parent afd3e856
Loading
Loading
Loading
Loading
+10 −5
Original line number Diff line number Diff line
@@ -49,14 +49,18 @@ unit test monitoring:
  before_script:
    - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
    - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
    - if docker container ls | grep influxdb; then docker rm -f influxdb; else echo "influxdb image is not in the system"; fi
    # - if docker container ls | grep influxdb; then docker rm -f influxdb; else echo "influxdb image is not in the system"; fi
    - if docker container ls | grep questdb; then docker rm -f questdb; else echo "questdb image is not in the system"; fi
    - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi
  script:
    - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
    - docker pull "influxdb:1.8"
    - docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge influxdb:1.8
    # - docker pull "influxdb:1.8"
    - docker pull "questdb"
    # - docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge influxdb:1.8
    - docker run --name questdb -p 9000:9000  -p 9009:9009  -p 8812:8812  -p 9003:9003  -e QDB_CAIRO_COMMIT_LAG=1000 -e QDB_CAIRO_MAX_UNCOMMITTED_ROWS=100000 --network=teraflowbridge --rm questdb/questdb
    - sleep 10
    - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
    # - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
    - docker run --name $IMAGE_NAME -d -p 7070:7070 --env METRICSDB_HOSTNAME=localhost --env METRICSDB_ILP_PORT=9009 --env METRICSDB_REST_PORT=9000 --env METRICSDB_TABLE=monitoring -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
    - sleep 30
    - docker ps -a
    - docker logs $IMAGE_NAME
@@ -65,7 +69,8 @@ unit test monitoring:
  coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
  after_script:
    - docker rm -f $IMAGE_NAME
    - docker rm -f  influxdb
    # - docker rm -f  influxdb
    - docker rm -f  questdb
    - docker network rm teraflowbridge
  rules:
    - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
+2 −1
Original line number Diff line number Diff line
@@ -9,7 +9,8 @@ Jinja2==3.0.3
ncclient==0.6.13
p4runtime==1.3.0
paramiko==2.9.2
influxdb
# influxdb
influx_line_protocol
python-dateutil==2.8.2
python-json-logger==2.0.2
pytz==2021.3
+52 −0
Original line number Diff line number Diff line
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from influx_line_protocol import Metric
import socket
import requests
import json
import sys

class MetricsDB():
  def __init__(self, host, ilp_port, rest_port, table):
      self.socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      self.host=host
      self.ilp_port=ilp_port
      self.rest_port=rest_port
      self.table=table

  def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value):
    self.socket.connect((self.host,self.ilp_port))
    metric = Metric(self.table)
    metric.with_timestamp(time)
    metric.add_tag('kpi_id', kpi_id)
    metric.add_tag('kpi_sample_type', kpi_sample_type)
    metric.add_tag('device_id', device_id)
    metric.add_tag('endpoint_id', endpoint_id)
    metric.add_tag('service_id', service_id)
    metric.add_value('kpi_value', kpi_value)
    str_metric = str(metric)
    str_metric += "\n"
    self.socket.sendall((str_metric).encode())
    self.socket.close()

  def run_query(self, sql_query):
      query_params = {'query': sql_query, 'fmt' : 'json'}
      url = f"http://{self.host}:{self.rest_port}/exec"
      try:
          response = requests.get(url, params=query_params)
          json_response = json.loads(response.text)
          print(json_response)
      except requests.exceptions.RequestException as e:
          print(f'Error: {e}', file=sys.stderr)
+17 −8
Original line number Diff line number Diff line
@@ -27,7 +27,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDL
    BundleKpiDescriptor, MonitorKpiRequest, Kpi
from common.rpc_method_wrapper.ServiceExceptions import ServiceException

from monitoring.service import SqliteTools, InfluxTools
from monitoring.service import SqliteTools, MetricsDBTools, InfluxTools
from device.client.DeviceClient import DeviceClient

from prometheus_client import Counter, Summary
@@ -38,10 +38,16 @@ MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
    'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')

INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
# INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
# INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
# INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
# INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")

METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")


DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST',      default=get_service_host(ServiceNameEnum.DEVICE)     )
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE))
@@ -56,7 +62,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
        self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC)  # instantiate the client

        # Create influx_db client
        self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
        # self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
        # Create metrics_db client
        self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE)

    # CreateKpi (CreateKpiRequest) returns (KpiId) {}
    def CreateKpi(
@@ -187,8 +195,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
            time_stamp      = request.timestamp
            kpi_value       = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))

            # Build the structure to be included as point in the influxDB
            self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
            # Build the structure to be included as point in the MetricsDB
            # self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
            self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)

            #self.influx_db.read_KPI_points()

+30 −13
Original line number Diff line number Diff line
@@ -64,11 +64,16 @@ MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORI
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST     )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)

INFLUXDB_HOSTNAME   = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_PORT       = os.environ.get("INFLUXDB_PORT")
INFLUXDB_USER       = os.environ.get("INFLUXDB_USER")
INFLUXDB_PASSWORD   = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE   = os.environ.get("INFLUXDB_DATABASE")
# INFLUXDB_HOSTNAME   = os.environ.get("INFLUXDB_HOSTNAME")
# INFLUXDB_PORT       = os.environ.get("INFLUXDB_PORT")
# INFLUXDB_USER       = os.environ.get("INFLUXDB_USER")
# INFLUXDB_PASSWORD   = os.environ.get("INFLUXDB_PASSWORD")
# INFLUXDB_DATABASE   = os.environ.get("INFLUXDB_DATABASE")
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")


@pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]:
@@ -149,11 +154,17 @@ def sql_db():
    _sql_db = SqliteTools.SQLite('monitoring.db')
    return _sql_db

# @pytest.fixture(scope='session')
# def influx_db():
#     _influx_db = InfluxTools.Influx(
#         INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
#     return _influx_db
@pytest.fixture(scope='session')
def influx_db():
    _influx_db = InfluxTools.Influx(
        INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
    return _influx_db
def metrics_db():
    _metrics_db = MetricsDBTools.MetricsDB(
        METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
    return _metrics_db



###########################
@@ -301,11 +312,17 @@ def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-oute
    assert response


def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_influxdb_tools_write_kpi begin')
# def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name
#     LOGGER.warning('test_influxdb_tools_write_kpi begin')
def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_metric_sdb_tools_write_kpi begin')


# def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name
#     LOGGER.warning('test_influxdb_tools_read_kpi_points begin')
def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_metrics_db_tools_read_kpi_points begin')

def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name
    LOGGER.warning('test_influxdb_tools_read_kpi_points begin')


def test_events_tools(