diff --git a/manifests/monitoringservice.yaml b/manifests/monitoringservice.yaml index e6fa36d1a68e4e0f85776b511631b0b619ec100c..7f0bee9efc68e66c72487624241e763dccb2fc76 100644 --- a/manifests/monitoringservice.yaml +++ b/manifests/monitoringservice.yaml @@ -29,19 +29,23 @@ spec: terminationGracePeriodSeconds: 5 restartPolicy: Always containers: - - name: influxdb - image: influxdb:1.8 + - name: metricsdb + image: questdb/questdb ports: - - containerPort: 8086 - envFrom: - - secretRef: - name: influxdb-secrets + - containerPort: 9000 + - containerPort: 9009 + - containerPort: 9003 + env: + - name: QDB_CAIRO_COMMIT_LAG + value: "1000" + - name: QDB_CAIRO_MAX_UNCOMMITTED_ROWS + value: "100000" readinessProbe: exec: - command: ["curl", "-XGET", "localhost:8086/health"] + command: ["curl", "-XGET", "localhost:9000"] livenessProbe: exec: - command: ["curl", "-XGET", "localhost:8086/health"] + command: ["curl", "-XGET", "localhost:9003/metrics"] resources: requests: cpu: 250m @@ -54,9 +58,15 @@ spec: imagePullPolicy: Always ports: - containerPort: 7070 - envFrom: - - secretRef: - name: monitoring-secrets + env: + - name: METRICSDB_HOSTNAME + value: "localhost" + - name: METRICSDB_ILP_PORT + value: "9009" + - name: METRICSDB_REST_PORT + value: "9000" + - name: METRICSDB_TABLE + value: "monitoring" readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:7070"] @@ -70,6 +80,7 @@ spec: limits: cpu: 700m memory: 1024Mi + --- apiVersion: v1 kind: Service @@ -84,7 +95,7 @@ spec: protocol: TCP port: 7070 targetPort: 7070 - - name: influxdb + - name: questdb protocol: TCP - port: 8086 - targetPort: 8086 + port: 9000 + targetPort: 9000 \ No newline at end of file diff --git a/src/monitoring/.gitlab-ci.yml b/src/monitoring/.gitlab-ci.yml index 69337996af00f724fc48d6b6749a4ba6f0a56dff..19706902bc3a47b32106a24c18b36ece5eb7cb73 100644 --- a/src/monitoring/.gitlab-ci.yml +++ b/src/monitoring/.gitlab-ci.yml @@ -49,14 +49,14 @@ unit test monitoring: before_script: - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi - - if docker container ls | grep influxdb; then docker rm -f influxdb; else echo "influxdb image is not in the system"; fi + - if docker container ls | grep questdb; then docker rm -f questdb; else echo "questdb image is not in the system"; fi - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi script: - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - - docker pull "influxdb:1.8" - - docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge influxdb:1.8 + - docker pull questdb/questdb + - docker run --name questdb -d -p 9000:9000 -p 9009:9009 -p 8812:8812 -p 9003:9003 -e QDB_CAIRO_COMMIT_LAG=1000 -e QDB_CAIRO_MAX_UNCOMMITTED_ROWS=100000 --network=teraflowbridge --rm questdb/questdb - sleep 10 - - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker run --name $IMAGE_NAME -d -p 7070:7070 --env METRICSDB_HOSTNAME=localhost --env METRICSDB_ILP_PORT=9009 --env METRICSDB_REST_PORT=9000 --env METRICSDB_TABLE=monitoring -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - sleep 30 - docker ps -a - docker logs $IMAGE_NAME @@ -65,7 +65,7 @@ unit test monitoring: coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: - docker rm -f $IMAGE_NAME - - docker rm -f influxdb + - docker rm -f questdb - docker network rm teraflowbridge rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index 1b5459e32c326893f89df02bd1c96fb459577a36..bbd6f5d75c75e9148af74fcd0a37651643f90e39 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -9,7 +9,8 @@ Jinja2==3.0.3 ncclient==0.6.13 p4runtime==1.3.0 paramiko==2.9.2 -influxdb +# influxdb +influx_line_protocol python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2021.3 diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py new file mode 100644 index 0000000000000000000000000000000000000000..ea6180aa072bd48a04f26d019ba1e4ab9e08af88 --- /dev/null +++ b/src/monitoring/service/MetricsDBTools.py @@ -0,0 +1,52 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from influx_line_protocol import Metric +import socket +import requests +import json +import sys + +class MetricsDB(): + def __init__(self, host, ilp_port, rest_port, table): + self.socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.host=host + self.ilp_port=ilp_port + self.rest_port=rest_port + self.table=table + + def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): + self.socket.connect((self.host,self.ilp_port)) + metric = Metric(self.table) + metric.with_timestamp(time) + metric.add_tag('kpi_id', kpi_id) + metric.add_tag('kpi_sample_type', kpi_sample_type) + metric.add_tag('device_id', device_id) + metric.add_tag('endpoint_id', endpoint_id) + metric.add_tag('service_id', service_id) + metric.add_value('kpi_value', kpi_value) + str_metric = str(metric) + str_metric += "\n" + self.socket.sendall((str_metric).encode()) + self.socket.close() + + def run_query(self, sql_query): + query_params = {'query': sql_query, 'fmt' : 'json'} + url = f"http://{self.host}:{self.rest_port}/exec" + try: + response = requests.get(url, params=query_params) + json_response = json.loads(response.text) + print(json_response) + except requests.exceptions.RequestException as e: + print(f'Error: {e}', file=sys.stderr) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 79a0519293307902fe1e6a405929ef7b9e2ada59..8068e8f6178e25e5452722f37fa30d6123060b4d 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -28,7 +28,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDL from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.tools.timestamp.Converters import timestamp_float_to_string -from monitoring.service import SqliteTools, InfluxTools +from monitoring.service import SqliteTools, MetricsDBTools from device.client.DeviceClient import DeviceClient from prometheus_client import Counter, Summary @@ -39,10 +39,11 @@ MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') -INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") -INFLUXDB_USER = os.environ.get("INFLUXDB_USER") -INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") -INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") +METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") +METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") +METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") +METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") + DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) ) DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE)) @@ -56,8 +57,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): self.sql_db = SqliteTools.SQLite('monitoring.db') self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client - # Create influx_db client - self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) + # Create metrics_db client + self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE) # CreateKpi (CreateKpiRequest) returns (KpiId) {} def CreateKpi( @@ -188,8 +189,8 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): time_stamp = timestamp_float_to_string(request.timestamp.timestamp) kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) - # Build the structure to be included as point in the influxDB - self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) + # Build the structure to be included as point in the MetricsDB + self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) #self.influx_db.read_KPI_points() diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 913c6003b56689cdf74cbc04142a8bd9858becfc..f5f92986081ce2cf4d70cc55f121253c7f68e90e 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -37,7 +37,7 @@ from device.service.drivers import DRIVERS from monitoring.client.MonitoringClient import MonitoringClient from common.proto import context_pb2, monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType -from monitoring.service import SqliteTools, InfluxTools +from monitoring.service import SqliteTools, MetricsDBTools from monitoring.service.MonitoringService import MonitoringService from monitoring.service.EventTools import EventsDeviceCollector from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request @@ -65,11 +65,11 @@ MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORI os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT) -INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") -INFLUXDB_PORT = os.environ.get("INFLUXDB_PORT") -INFLUXDB_USER = os.environ.get("INFLUXDB_USER") -INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") -INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") +METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") +METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") +METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") +METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") + @pytest.fixture(scope='session') def context_db_mb() -> Tuple[Database, MessageBroker]: @@ -151,10 +151,11 @@ def sql_db(): return _sql_db @pytest.fixture(scope='session') -def influx_db(): - _influx_db = InfluxTools.Influx( - INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) - return _influx_db +def metrics_db(): + _metrics_db = MetricsDBTools.MetricsDB( + METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE) + return _metrics_db + ########################### @@ -302,11 +303,13 @@ def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-oute assert response -def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_influxdb_tools_write_kpi begin') +def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_metric_sdb_tools_write_kpi begin') + + +def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') -def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_influxdb_tools_read_kpi_points begin') def test_events_tools(