Skip to content
Snippets Groups Projects
Commit 6a881217 authored by Sergio Gonzalez Diaz's avatar Sergio Gonzalez Diaz
Browse files

Merge branch 'feat/monitoring-questdb' into 'feat/monitoring'

Add QuestDB to feat/monitoring branch

See merge request teraflow-h2020/controller!141
parents 2789700f fb57e05c
No related branches found
No related tags found
1 merge request!54Release 2.0.0
...@@ -29,19 +29,23 @@ spec: ...@@ -29,19 +29,23 @@ spec:
terminationGracePeriodSeconds: 5 terminationGracePeriodSeconds: 5
restartPolicy: Always restartPolicy: Always
containers: containers:
- name: influxdb - name: metricsdb
image: influxdb:1.8 image: questdb/questdb
ports: ports:
- containerPort: 8086 - containerPort: 9000
envFrom: - containerPort: 9009
- secretRef: - containerPort: 9003
name: influxdb-secrets env:
- name: QDB_CAIRO_COMMIT_LAG
value: "1000"
- name: QDB_CAIRO_MAX_UNCOMMITTED_ROWS
value: "100000"
readinessProbe: readinessProbe:
exec: exec:
command: ["curl", "-XGET", "localhost:8086/health"] command: ["curl", "-XGET", "localhost:9000"]
livenessProbe: livenessProbe:
exec: exec:
command: ["curl", "-XGET", "localhost:8086/health"] command: ["curl", "-XGET", "localhost:9003/metrics"]
resources: resources:
requests: requests:
cpu: 250m cpu: 250m
...@@ -54,9 +58,15 @@ spec: ...@@ -54,9 +58,15 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
ports: ports:
- containerPort: 7070 - containerPort: 7070
envFrom: env:
- secretRef: - name: METRICSDB_HOSTNAME
name: monitoring-secrets value: "localhost"
- name: METRICSDB_ILP_PORT
value: "9009"
- name: METRICSDB_REST_PORT
value: "9000"
- name: METRICSDB_TABLE
value: "monitoring"
readinessProbe: readinessProbe:
exec: exec:
command: ["/bin/grpc_health_probe", "-addr=:7070"] command: ["/bin/grpc_health_probe", "-addr=:7070"]
...@@ -70,6 +80,7 @@ spec: ...@@ -70,6 +80,7 @@ spec:
limits: limits:
cpu: 700m cpu: 700m
memory: 1024Mi memory: 1024Mi
--- ---
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
...@@ -84,7 +95,7 @@ spec: ...@@ -84,7 +95,7 @@ spec:
protocol: TCP protocol: TCP
port: 7070 port: 7070
targetPort: 7070 targetPort: 7070
- name: influxdb - name: questdb
protocol: TCP protocol: TCP
port: 8086 port: 9000
targetPort: 8086 targetPort: 9000
\ No newline at end of file
...@@ -49,14 +49,14 @@ unit test monitoring: ...@@ -49,14 +49,14 @@ unit test monitoring:
before_script: before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep influxdb; then docker rm -f influxdb; else echo "influxdb image is not in the system"; fi - if docker container ls | grep questdb; then docker rm -f questdb; else echo "questdb image is not in the system"; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi
script: script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker pull "influxdb:1.8" - docker pull questdb/questdb
- docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge influxdb:1.8 - docker run --name questdb -d -p 9000:9000 -p 9009:9009 -p 8812:8812 -p 9003:9003 -e QDB_CAIRO_COMMIT_LAG=1000 -e QDB_CAIRO_MAX_UNCOMMITTED_ROWS=100000 --network=teraflowbridge --rm questdb/questdb
- sleep 10 - sleep 10
- docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - docker run --name $IMAGE_NAME -d -p 7070:7070 --env METRICSDB_HOSTNAME=localhost --env METRICSDB_ILP_PORT=9009 --env METRICSDB_REST_PORT=9000 --env METRICSDB_TABLE=monitoring -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 30 - sleep 30
- docker ps -a - docker ps -a
- docker logs $IMAGE_NAME - docker logs $IMAGE_NAME
...@@ -65,7 +65,7 @@ unit test monitoring: ...@@ -65,7 +65,7 @@ unit test monitoring:
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script: after_script:
- docker rm -f $IMAGE_NAME - docker rm -f $IMAGE_NAME
- docker rm -f influxdb - docker rm -f questdb
- docker network rm teraflowbridge - docker network rm teraflowbridge
rules: rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
......
...@@ -9,7 +9,8 @@ Jinja2==3.0.3 ...@@ -9,7 +9,8 @@ Jinja2==3.0.3
ncclient==0.6.13 ncclient==0.6.13
p4runtime==1.3.0 p4runtime==1.3.0
paramiko==2.9.2 paramiko==2.9.2
influxdb # influxdb
influx_line_protocol
python-dateutil==2.8.2 python-dateutil==2.8.2
python-json-logger==2.0.2 python-json-logger==2.0.2
pytz==2021.3 pytz==2021.3
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from influx_line_protocol import Metric
import socket
import requests
import json
import sys
class MetricsDB():
def __init__(self, host, ilp_port, rest_port, table):
self.socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host=host
self.ilp_port=ilp_port
self.rest_port=rest_port
self.table=table
def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value):
self.socket.connect((self.host,self.ilp_port))
metric = Metric(self.table)
metric.with_timestamp(time)
metric.add_tag('kpi_id', kpi_id)
metric.add_tag('kpi_sample_type', kpi_sample_type)
metric.add_tag('device_id', device_id)
metric.add_tag('endpoint_id', endpoint_id)
metric.add_tag('service_id', service_id)
metric.add_value('kpi_value', kpi_value)
str_metric = str(metric)
str_metric += "\n"
self.socket.sendall((str_metric).encode())
self.socket.close()
def run_query(self, sql_query):
query_params = {'query': sql_query, 'fmt' : 'json'}
url = f"http://{self.host}:{self.rest_port}/exec"
try:
response = requests.get(url, params=query_params)
json_response = json.loads(response.text)
print(json_response)
except requests.exceptions.RequestException as e:
print(f'Error: {e}', file=sys.stderr)
...@@ -28,7 +28,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDL ...@@ -28,7 +28,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDL
from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_float_to_string from common.tools.timestamp.Converters import timestamp_float_to_string
from monitoring.service import SqliteTools, InfluxTools from monitoring.service import SqliteTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient from device.client.DeviceClient import DeviceClient
from prometheus_client import Counter, Summary from prometheus_client import Counter, Summary
...@@ -39,10 +39,11 @@ MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( ...@@ -39,10 +39,11 @@ MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
INFLUXDB_USER = os.environ.get("INFLUXDB_USER") METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) ) DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) )
DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE)) DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE))
...@@ -56,8 +57,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): ...@@ -56,8 +57,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
self.sql_db = SqliteTools.SQLite('monitoring.db') self.sql_db = SqliteTools.SQLite('monitoring.db')
self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client
# Create influx_db client # Create metrics_db client
self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {} # CreateKpi (CreateKpiRequest) returns (KpiId) {}
def CreateKpi( def CreateKpi(
...@@ -188,8 +189,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): ...@@ -188,8 +189,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
time_stamp = timestamp_float_to_string(request.timestamp.timestamp) time_stamp = timestamp_float_to_string(request.timestamp.timestamp)
kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
# Build the structure to be included as point in the influxDB # Build the structure to be included as point in the MetricsDB
self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
#self.influx_db.read_KPI_points() #self.influx_db.read_KPI_points()
......
...@@ -37,7 +37,7 @@ from device.service.drivers import DRIVERS ...@@ -37,7 +37,7 @@ from device.service.drivers import DRIVERS
from monitoring.client.MonitoringClient import MonitoringClient from monitoring.client.MonitoringClient import MonitoringClient
from common.proto import context_pb2, monitoring_pb2 from common.proto import context_pb2, monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.service import SqliteTools, InfluxTools from monitoring.service import SqliteTools, MetricsDBTools
from monitoring.service.MonitoringService import MonitoringService from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request
...@@ -65,11 +65,11 @@ MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORI ...@@ -65,11 +65,11 @@ MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORI
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT) os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT)
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
INFLUXDB_PORT = os.environ.get("INFLUXDB_PORT") METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
INFLUXDB_USER = os.environ.get("INFLUXDB_USER") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def context_db_mb() -> Tuple[Database, MessageBroker]: def context_db_mb() -> Tuple[Database, MessageBroker]:
...@@ -151,10 +151,11 @@ def sql_db(): ...@@ -151,10 +151,11 @@ def sql_db():
return _sql_db return _sql_db
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def influx_db(): def metrics_db():
_influx_db = InfluxTools.Influx( _metrics_db = MetricsDBTools.MetricsDB(
INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE)
return _influx_db return _metrics_db
########################### ###########################
...@@ -302,11 +303,13 @@ def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-oute ...@@ -302,11 +303,13 @@ def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-oute
assert response assert response
def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_influxdb_tools_write_kpi begin') LOGGER.warning('test_metric_sdb_tools_write_kpi begin')
def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_metrics_db_tools_read_kpi_points begin')
def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name
LOGGER.warning('test_influxdb_tools_read_kpi_points begin')
def test_events_tools( def test_events_tools(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment