diff --git a/src/monitoring/.gitlab-ci.yml b/src/monitoring/.gitlab-ci.yml index 69337996af00f724fc48d6b6749a4ba6f0a56dff..0c42a4e0bdf04124e71db8958238b5fa93ed4ea4 100644 --- a/src/monitoring/.gitlab-ci.yml +++ b/src/monitoring/.gitlab-ci.yml @@ -49,14 +49,18 @@ unit test monitoring: before_script: - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi - - if docker container ls | grep influxdb; then docker rm -f influxdb; else echo "influxdb image is not in the system"; fi + # - if docker container ls | grep influxdb; then docker rm -f influxdb; else echo "influxdb image is not in the system"; fi + - if docker container ls | grep questdb; then docker rm -f questdb; else echo "questdb image is not in the system"; fi - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi script: - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - - docker pull "influxdb:1.8" - - docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge influxdb:1.8 + # - docker pull "influxdb:1.8" + - docker pull "questdb" + # - docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge influxdb:1.8 + - docker run --name questdb -p 9000:9000 -p 9009:9009 -p 8812:8812 -p 9003:9003 -e QDB_CAIRO_COMMIT_LAG=1000 -e QDB_CAIRO_MAX_UNCOMMITTED_ROWS=100000 --network=teraflowbridge --rm questdb/questdb - sleep 10 - - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + # - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker run --name $IMAGE_NAME -d -p 7070:7070 --env METRICSDB_HOSTNAME=localhost --env METRICSDB_ILP_PORT=9009 --env METRICSDB_REST_PORT=9000 --env METRICSDB_TABLE=monitoring -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - sleep 30 - docker ps -a - docker logs $IMAGE_NAME @@ -65,7 +69,8 @@ unit test monitoring: coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: - docker rm -f $IMAGE_NAME - - docker rm -f influxdb + # - docker rm -f influxdb + - docker rm -f questdb - docker network rm teraflowbridge rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index 1b5459e32c326893f89df02bd1c96fb459577a36..bbd6f5d75c75e9148af74fcd0a37651643f90e39 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -9,7 +9,8 @@ Jinja2==3.0.3 ncclient==0.6.13 p4runtime==1.3.0 paramiko==2.9.2 -influxdb +# influxdb +influx_line_protocol python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2021.3 diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py new file mode 100644 index 0000000000000000000000000000000000000000..ea6180aa072bd48a04f26d019ba1e4ab9e08af88 --- /dev/null +++ b/src/monitoring/service/MetricsDBTools.py @@ -0,0 +1,52 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from influx_line_protocol import Metric +import socket +import requests +import json +import sys + +class MetricsDB(): + def __init__(self, host, ilp_port, rest_port, table): + self.socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.host=host + self.ilp_port=ilp_port + self.rest_port=rest_port + self.table=table + + def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): + self.socket.connect((self.host,self.ilp_port)) + metric = Metric(self.table) + metric.with_timestamp(time) + metric.add_tag('kpi_id', kpi_id) + metric.add_tag('kpi_sample_type', kpi_sample_type) + metric.add_tag('device_id', device_id) + metric.add_tag('endpoint_id', endpoint_id) + metric.add_tag('service_id', service_id) + metric.add_value('kpi_value', kpi_value) + str_metric = str(metric) + str_metric += "\n" + self.socket.sendall((str_metric).encode()) + self.socket.close() + + def run_query(self, sql_query): + query_params = {'query': sql_query, 'fmt' : 'json'} + url = f"http://{self.host}:{self.rest_port}/exec" + try: + response = requests.get(url, params=query_params) + json_response = json.loads(response.text) + print(json_response) + except requests.exceptions.RequestException as e: + print(f'Error: {e}', file=sys.stderr) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index e8db5943ebe01fbf95b9557fd8545a37d6ca246f..e23c936e55eccd5fb367713407f8be4ccba8be22 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -27,7 +27,7 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDL BundleKpiDescriptor, MonitorKpiRequest, Kpi from common.rpc_method_wrapper.ServiceExceptions import ServiceException -from monitoring.service import SqliteTools, InfluxTools +from monitoring.service import SqliteTools, MetricsDBTools, InfluxTools from device.client.DeviceClient import DeviceClient from prometheus_client import Counter, Summary @@ -38,10 +38,16 @@ MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') -INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") -INFLUXDB_USER = os.environ.get("INFLUXDB_USER") -INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") -INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") +# INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") +# INFLUXDB_USER = os.environ.get("INFLUXDB_USER") +# INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") +# INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") + +METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") +METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") +METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") +METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") + DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) ) DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE)) @@ -56,7 +62,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client # Create influx_db client - self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) + # self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) + # Create metrics_db client + self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE) # CreateKpi (CreateKpiRequest) returns (KpiId) {} def CreateKpi( @@ -187,8 +195,9 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): time_stamp = request.timestamp kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) - # Build the structure to be included as point in the influxDB - self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) + # Build the structure to be included as point in the MetricsDB + # self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) + self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) #self.influx_db.read_KPI_points() diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index 87a4b4d3e33d3ea6c69b569f4008d794574226f6..310eb491c039d336bd187c8fa0ee38a5300a095c 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -64,11 +64,16 @@ MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORI os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT) -INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") -INFLUXDB_PORT = os.environ.get("INFLUXDB_PORT") -INFLUXDB_USER = os.environ.get("INFLUXDB_USER") -INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") -INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") +# INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") +# INFLUXDB_PORT = os.environ.get("INFLUXDB_PORT") +# INFLUXDB_USER = os.environ.get("INFLUXDB_USER") +# INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") +# INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") +METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") +METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") +METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") +METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") + @pytest.fixture(scope='session') def context_db_mb() -> Tuple[Database, MessageBroker]: @@ -149,11 +154,17 @@ def sql_db(): _sql_db = SqliteTools.SQLite('monitoring.db') return _sql_db +# @pytest.fixture(scope='session') +# def influx_db(): +# _influx_db = InfluxTools.Influx( +# INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) +# return _influx_db @pytest.fixture(scope='session') -def influx_db(): - _influx_db = InfluxTools.Influx( - INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) - return _influx_db +def metrics_db(): + _metrics_db = MetricsDBTools.MetricsDB( + METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE) + return _metrics_db + ########################### @@ -301,11 +312,17 @@ def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-oute assert response -def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_influxdb_tools_write_kpi begin') +# def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name +# LOGGER.warning('test_influxdb_tools_write_kpi begin') +def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_metric_sdb_tools_write_kpi begin') + + +# def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name +# LOGGER.warning('test_influxdb_tools_read_kpi_points begin') +def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') -def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_influxdb_tools_read_kpi_points begin') def test_events_tools(