diff --git a/common_requirements.in b/common_requirements.in index 8a027cfbd1ecf8bf4adc535dd9d5e3a769a2f2f8..772c1115d857664ed113007b89a6f7f9d9c48b99 100644 --- a/common_requirements.in +++ b/common_requirements.in @@ -6,3 +6,4 @@ prometheus-client==0.13.0 protobuf==3.20.* pytest==6.2.5 pytest-benchmark==3.4.1 +python-dateutil==2.8.2 diff --git a/manifests/monitoringservice.yaml b/manifests/monitoringservice.yaml index e6fa36d1a68e4e0f85776b511631b0b619ec100c..7f0bee9efc68e66c72487624241e763dccb2fc76 100644 --- a/manifests/monitoringservice.yaml +++ b/manifests/monitoringservice.yaml @@ -29,19 +29,23 @@ spec: terminationGracePeriodSeconds: 5 restartPolicy: Always containers: - - name: influxdb - image: influxdb:1.8 + - name: metricsdb + image: questdb/questdb ports: - - containerPort: 8086 - envFrom: - - secretRef: - name: influxdb-secrets + - containerPort: 9000 + - containerPort: 9009 + - containerPort: 9003 + env: + - name: QDB_CAIRO_COMMIT_LAG + value: "1000" + - name: QDB_CAIRO_MAX_UNCOMMITTED_ROWS + value: "100000" readinessProbe: exec: - command: ["curl", "-XGET", "localhost:8086/health"] + command: ["curl", "-XGET", "localhost:9000"] livenessProbe: exec: - command: ["curl", "-XGET", "localhost:8086/health"] + command: ["curl", "-XGET", "localhost:9003/metrics"] resources: requests: cpu: 250m @@ -54,9 +58,15 @@ spec: imagePullPolicy: Always ports: - containerPort: 7070 - envFrom: - - secretRef: - name: monitoring-secrets + env: + - name: METRICSDB_HOSTNAME + value: "localhost" + - name: METRICSDB_ILP_PORT + value: "9009" + - name: METRICSDB_REST_PORT + value: "9000" + - name: METRICSDB_TABLE + value: "monitoring" readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:7070"] @@ -70,6 +80,7 @@ spec: limits: cpu: 700m memory: 1024Mi + --- apiVersion: v1 kind: Service @@ -84,7 +95,7 @@ spec: protocol: TCP port: 7070 targetPort: 7070 - - name: influxdb + - name: questdb protocol: TCP - port: 8086 - targetPort: 8086 + port: 9000 + targetPort: 9000 \ No newline at end of file diff --git a/proto/monitoring.proto b/proto/monitoring.proto index 2c7b98d2ca3392906e0f42896907bb887b45e80b..8b83afa47b49c130d37dcbcc1024f079ebc2a2fe 100644 --- a/proto/monitoring.proto +++ b/proto/monitoring.proto @@ -19,56 +19,35 @@ import "context.proto"; import "kpi_sample_types.proto"; service MonitoringService { - rpc CreateKpi (KpiDescriptor ) returns (KpiId ) {} - rpc EditKpiDescriptor (EditedKpiDescriptor ) returns (context.Empty ) {} - rpc DeleteKpi (KpiId ) returns (context.Empty ) {} - rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList) {} - rpc CreateBundleKpi (BundleKpiDescriptor ) returns (KpiId ) {} - rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} - rpc IncludeKpi (Kpi ) returns (context.Empty ) {} - rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} - rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} - rpc SubscribeKpi (SubsDescriptor ) returns (stream KpiList ) {} - rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} - rpc GetSubscriptions (context.Empty ) returns (SubsIDList ) {} - rpc EditKpiSubscription (SubsDescriptor ) returns (context.Empty ) {} - rpc CreateKpiAlarm (AlarmDescriptor ) returns ( AlarmID ) {} - rpc EditKpiAlarm (AlarmDescriptor ) returns (context.Empty ) {} - rpc GetAlarms (context.Empty ) returns (AlarmIDList ) {} - rpc GetAlarmDescriptor (AlarmID ) returns (AlarmDescriptor ) {} - rpc GetAlarmResponseStream (AlarmID ) returns (stream AlarmResponse ) {} - // rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} - // rpc GetInstantKpi (KpiId ) returns (KpiList ) {} + rpc SetKpi (KpiDescriptor ) returns (KpiId ) {} + rpc DeleteKpi (KpiId ) returns (context.Empty ) {} + rpc GetKpiDescriptor (KpiId ) returns (KpiDescriptor ) {} + rpc GetKpiDescriptorList (context.Empty ) returns (KpiDescriptorList ) {} + rpc IncludeKpi (Kpi ) returns (context.Empty ) {} + rpc MonitorKpi (MonitorKpiRequest ) returns (context.Empty ) {} + rpc QueryKpiData (KpiQuery ) returns (KpiList ) {} + rpc SetKpiSubscription (SubsDescriptor ) returns (stream KpiList ) {} + rpc GetSubsDescriptor (SubscriptionID ) returns (SubsDescriptor ) {} + rpc GetSubscriptions (context.Empty ) returns (SubsIDList ) {} + rpc DeleteSubscription (SubscriptionID ) returns (context.Empty ) {} + rpc SetKpiAlarm (AlarmDescriptor ) returns (AlarmID ) {} + rpc GetAlarms (context.Empty ) returns (AlarmIDList ) {} + rpc GetAlarmDescriptor (AlarmID ) returns (AlarmDescriptor ) {} + rpc GetAlarmResponseStream(AlarmSubscription ) returns (stream AlarmResponse) {} + rpc DeleteAlarm (AlarmID ) returns (context.Empty ) {} + rpc GetStreamKpi (KpiId ) returns (stream Kpi ) {} + rpc GetInstantKpi (KpiId ) returns (KpiList ) {} } message KpiDescriptor { - string kpi_description = 1; - kpi_sample_types.KpiSampleType kpi_sample_type = 2; - context.DeviceId device_id = 3; - context.EndPointId endpoint_id = 4; - context.ServiceId service_id = 5; - context.SliceId slice_id = 6; -} - -message BundleKpiDescriptor { - string kpi_description = 1; - repeated KpiId kpi_id_list = 2; - kpi_sample_types.KpiSampleType kpi_sample_type = 3; - context.DeviceId device_id = 4; - context.EndPointId endpoint_id = 5; - context.ServiceId service_id = 6; - context.SliceId slice_id = 7; -} - -message EditedKpiDescriptor { - KpiId kpi_id = 1; - string kpi_description = 2; - repeated KpiId kpi_id_list = 3; - kpi_sample_types.KpiSampleType kpi_sample_type = 4; - context.DeviceId device_id = 5; - context.EndPointId endpoint_id = 6; - context.ServiceId service_id = 7; - context.SliceId slice_id = 8; + KpiId kpi_id = 1; + string kpi_description = 2; + repeated KpiId kpi_id_list = 3; + kpi_sample_types.KpiSampleType kpi_sample_type = 4; + context.DeviceId device_id = 5; + context.EndPointId endpoint_id = 6; + context.ServiceId service_id = 7; + context.SliceId slice_id = 8; } message MonitorKpiRequest { @@ -79,12 +58,12 @@ message MonitorKpiRequest { } message KpiQuery { - repeated KpiId kpi_id = 1; - float monitoring_window_s = 2; - float sampling_rate_s = 3; - uint32 last_n_samples = 4; // used when you want something like "get the last N many samples - string start_date = 5; // used when you want something like "get the samples since X date/time" - string end_date = 6; // used when you want something like "get the samples until X date/time" + repeated KpiId kpi_id = 1; + float monitoring_window_s = 2; + float sampling_rate_s = 3; + uint32 last_n_samples = 4; // used when you want something like "get the last N many samples + context.Timestamp start_timestamp = 5; // used when you want something like "get the samples since X date/time" + context.Timestamp end_timestamp = 6; // used when you want something like "get the samples until X date/time" // Pending add field to reflect Available Device Protocols } @@ -93,39 +72,47 @@ message KpiId { } message Kpi { - KpiId kpi_id = 1; - string timestamp = 2; - KpiValue kpi_value = 3; + KpiId kpi_id = 1; + context.Timestamp timestamp = 2; + KpiValue kpi_value = 3; } message KpiValueRange { - KpiValue kpiMinValue = 1; - KpiValue kpiMaxValue = 2; + KpiValue kpiMinValue = 1; + KpiValue kpiMaxValue = 2; + bool inRange = 3; // by default True + bool includeMinValue = 4; // False is outside the interval + bool includeMaxValue = 5; // False is outside the interval } message KpiValue { oneof value { - uint32 intVal = 1; - float floatVal = 2; - string stringVal = 3; - bool boolVal = 4; + int32 int32Val = 1; + uint32 uint32Val = 2; + int64 int64Val = 3; + uint64 uint64Val = 4; + float floatVal = 5; + string stringVal = 6; + bool boolVal = 7; } } + message KpiList { - repeated Kpi kpi_list = 1; + repeated Kpi kpi_list = 1; } message KpiDescriptorList { - repeated KpiDescriptor kpi_descriptor_list = 1; + repeated KpiDescriptor kpi_descriptor_list = 1; } message SubsDescriptor{ - KpiId kpi_id = 1; - float sampling_duration_s = 2; - float sampling_interval_s = 3; - string start_date = 4; // used when you want something like "get the samples since X date/time" - string end_date = 5; // used when you want something like "get the samples until X date/time" + SubscriptionID subs_id = 1; + KpiId kpi_id = 2; + float sampling_duration_s = 3; + float sampling_interval_s = 4; + context.Timestamp start_timestamp = 5; // used when you want something like "get the samples since X date/time" + context.Timestamp end_timestamp = 6; // used when you want something like "get the samples until X date/time" // Pending add field to reflect Available Device Protocols } @@ -134,30 +121,38 @@ message SubscriptionID { } message SubsResponse { - SubscriptionID subs_id = 1; + SubscriptionID subs_id = 1; repeated KpiList kpi_list = 2; } message SubsIDList { - repeated SubscriptionID subs_list = 1; + repeated SubscriptionID subs_list = 1; } message AlarmDescriptor { - string alarm_description = 1; - string name = 2; - KpiId kpi_id = 3; - KpiValueRange kpi_value_range = 4; - string timestamp = 5; + AlarmID alarm_id = 1; + string alarm_description = 2; + string name = 3; + repeated KpiId kpi_id = 4; + repeated KpiValueRange kpi_value_range = 5; + context.Timestamp timestamp = 6; } message AlarmID{ context.Uuid alarm_id = 1; } +message AlarmSubscription{ + AlarmID alarmID = 1; + float subscription_timeout_s = 2; + float subscription_frequency_ms = 3; +} + message AlarmResponse { - AlarmID alarm_id = 1; - string text = 2; - KpiValue kpi_value = 3; + AlarmID alarm_id = 1; + string text = 2; + KpiValue kpi_value = 3; + context.Timestamp timestamp = 4; } message AlarmIDList { diff --git a/proto/te.proto b/proto/te.proto index f85f94f48322d85e2c6bd7e667f7dfc9cb2febda..d639cb7f45f682d3e95d77e8b6d10d404ee51b9f 100644 --- a/proto/te.proto +++ b/proto/te.proto @@ -18,7 +18,7 @@ package te; import "context.proto"; service TEService { - rpc RequestLSP (context.Service) returns (context.ServiceStatus) {} + rpc RequestLSP(context.Service ) returns (context.ServiceStatus) {} rpc UpdateLSP (context.ServiceId) returns (context.ServiceStatus) {} - rpc DeleteLSP (context.ServiceId) returns (context.Empty) {} + rpc DeleteLSP (context.ServiceId) returns (context.Empty ) {} } diff --git a/src/monitoring/Config.py b/src/common/tools/timestamp/Converters.py similarity index 56% rename from src/monitoring/Config.py rename to src/common/tools/timestamp/Converters.py index cbae00509d8196b69bc2d6bacb39bfa5918be495..46a1bcb01a83f1ff4fc2d54cd8fb606dbe7f2bcc 100644 --- a/src/monitoring/Config.py +++ b/src/common/tools/timestamp/Converters.py @@ -12,29 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - -# General settings -LOG_LEVEL = logging.WARNING - -# gRPC settings -GRPC_SERVICE_PORT = 7070 -GRPC_MAX_WORKERS = 10 -GRPC_GRACE_PERIOD = 60 -GRPC_SERVICE_HOST = '127.0.0.1' - -# Prometheus settings -METRICS_PORT = 9192 - -# Dependency micro-service connection settings -CONTEXT_SERVICE_HOST = '127.0.0.1' -CONTEXT_GRPC_SERVICE_PORT = 1010 - -DEVICE_SERVICE_HOST = '127.0.0.1' -DEVICE_GRPC_SERVICE_PORT = 2020 -DEVICE_GRPC_MAX_WORKERS = 10 -DEVICE_GRPC_GRACE_PERIOD = 60 +import dateutil.parser +from datetime import datetime, timezone +def timestamp_string_to_float(str_timestamp : str) -> float: + return datetime.timestamp(dateutil.parser.isoparse(str_timestamp)) +def timestamp_float_to_string(flt_timestamp : float) -> str: + return datetime.utcfromtimestamp(flt_timestamp).isoformat() + 'Z' +def timestamp_utcnow_to_float() -> float: + return datetime.timestamp(datetime.now(tz=timezone.utc)) diff --git a/src/common/tools/timestamp/__init__.py b/src/common/tools/timestamp/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..70a33251242c51f49140e596b8208a19dd5245f7 --- /dev/null +++ b/src/common/tools/timestamp/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/device/requirements.in b/src/device/requirements.in index e5aaddecb4901da55a411d83c8c1df26f8d47069..10506fbd42c5b7a64afb3cc7c6ea32e0f1fa49f6 100644 --- a/src/device/requirements.in +++ b/src/device/requirements.in @@ -5,7 +5,6 @@ Jinja2==3.0.3 ncclient==0.6.13 p4runtime==1.3.0 paramiko==2.9.2 -python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2021.3 redis==4.1.2 diff --git a/src/monitoring/.gitignore b/src/monitoring/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..ac497892a71b6dda0ef118a1d174a5409849a201 --- /dev/null +++ b/src/monitoring/.gitignore @@ -0,0 +1,3 @@ +# Ignoring specific folders/files used internally while coding Monitoring component +proto/ +genproto_win.sh diff --git a/src/monitoring/.gitlab-ci.yml b/src/monitoring/.gitlab-ci.yml index fac3f967bbf531ef5cc9b67b2a1afe47fb9990d5..246b29bd42a889b0662a8ab0cb8b198e8f4b92ab 100644 --- a/src/monitoring/.gitlab-ci.yml +++ b/src/monitoring/.gitlab-ci.yml @@ -49,14 +49,14 @@ unit test monitoring: before_script: - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi - - if docker container ls | grep influxdb; then docker rm -f influxdb; else echo "influxdb image is not in the system"; fi + - if docker container ls | grep questdb; then docker rm -f questdb; else echo "questdb image is not in the system"; fi - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi script: - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - - docker pull "influxdb:1.8" - - docker run --name influxdb -d -p 8086:8086 -e INFLUXDB_DB=$INFLUXDB_DATABASE -e INFLUXDB_ADMIN_USER=$INFLUXDB_USER -e INFLUXDB_ADMIN_PASSWORD=$INFLUXDB_PASSWORD -e INFLUXDB_HTTP_AUTH_ENABLED=True --network=teraflowbridge influxdb:1.8 + - docker pull questdb/questdb + - docker run --name questdb -d -p 9000:9000 -p 9009:9009 -p 8812:8812 -p 9003:9003 -e QDB_CAIRO_COMMIT_LAG=1000 -e QDB_CAIRO_MAX_UNCOMMITTED_ROWS=100000 --network=teraflowbridge --rm questdb/questdb - sleep 10 - - docker run --name $IMAGE_NAME -d -p 7070:7070 --env INFLUXDB_USER=$INFLUXDB_USER --env INFLUXDB_PASSWORD=$INFLUXDB_PASSWORD --env INFLUXDB_DATABASE=$INFLUXDB_DATABASE --env INFLUXDB_HOSTNAME=influxdb --env INFLUXDB_PORT=8086 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker run --name $IMAGE_NAME -d -p 7070:7070 --env METRICSDB_HOSTNAME=localhost --env METRICSDB_ILP_PORT=9009 --env METRICSDB_REST_PORT=9000 --env METRICSDB_TABLE=monitoring -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - sleep 30 - docker ps -a - docker logs $IMAGE_NAME @@ -65,7 +65,7 @@ unit test monitoring: coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: - docker rm -f $IMAGE_NAME - - docker rm -f influxdb + - docker rm -f questdb - docker network rm teraflowbridge rules: - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' diff --git a/src/monitoring/client/MonitoringClient.py b/src/monitoring/client/MonitoringClient.py index 7042042dac6c070f79885226ec9a576e10c38f40..f65072f19013b820312aa56b7f0062f9c95f712c 100644 --- a/src/monitoring/client/MonitoringClient.py +++ b/src/monitoring/client/MonitoringClient.py @@ -16,11 +16,14 @@ import grpc, logging from typing import Iterator from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc -from common.proto.context_pb2 import Empty -from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest -from common.proto.monitoring_pb2_grpc import MonitoringServiceStub + from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string +from common.proto.context_pb2 import Empty +from common.proto.monitoring_pb2 import Kpi, KpiDescriptor, KpiId, MonitorKpiRequest, \ + KpiDescriptorList, KpiQuery, KpiList, SubsDescriptor, SubscriptionID, SubsIDList, \ + AlarmDescriptor, AlarmID, AlarmIDList, AlarmResponse, AlarmSubscription +from common.proto.monitoring_pb2_grpc import MonitoringServiceStub LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -48,10 +51,17 @@ class MonitoringClient: self.stub = None @RETRY_DECORATOR - def CreateKpi(self, request : KpiDescriptor) -> KpiId: - LOGGER.debug('CreateKpi: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.CreateKpi(request) - LOGGER.debug('CreateKpi result: {:s}'.format(grpc_message_to_json_string(response))) + def SetKpi(self, request : KpiDescriptor) -> KpiId: + LOGGER.debug('SetKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SetKpi(request) + LOGGER.debug('SetKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def DeleteKpi(self,request : KpiId) -> Empty: + LOGGER.debug('DeleteKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.DeleteKpi(request) + LOGGER.info('DeleteKpi result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR @@ -61,6 +71,13 @@ class MonitoringClient: LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) return response + @RETRY_DECORATOR + def GetKpiDescriptorList(self, request : Empty) -> KpiDescriptorList: + LOGGER.debug('GetKpiDescriptorList: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetKpiDescriptorList(request) + LOGGER.debug('GetKpiDescriptorList result: {:s}'.format(grpc_message_to_json_string(response))) + return response + @RETRY_DECORATOR def IncludeKpi(self, request : Kpi) -> Empty: LOGGER.debug('IncludeKpi: {:s}'.format(grpc_message_to_json_string(request))) @@ -75,6 +92,73 @@ class MonitoringClient: LOGGER.debug('MonitorKpi result: {:s}'.format(grpc_message_to_json_string(response))) return response + @RETRY_DECORATOR + def QueryKpiData(self, request : KpiQuery) -> KpiList: + LOGGER.debug('QueryKpiData: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.QueryKpiData(request) + LOGGER.debug('QueryKpiData result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SubscribeKpi(self, request : SubsDescriptor) -> Iterator[KpiList]: + LOGGER.debug('SubscribeKpi: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SubscribeKpi(request) + LOGGER.debug('SubscribeKpi result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetSubsDescriptor(self, request : SubscriptionID) -> SubsDescriptor: + LOGGER.debug('GetSubsDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetSubsDescriptor(request) + LOGGER.debug('GetSubsDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetSubscriptions(self, request : Empty) -> SubsIDList: + LOGGER.debug('GetSubscriptions: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetSubscriptions(request) + LOGGER.debug('GetSubscriptions result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def DeleteSubscription(self, request : SubscriptionID) -> Empty: + LOGGER.debug('DeleteSubscription: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.DeleteSubscription(request) + LOGGER.debug('DeleteSubscription result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SetKpiAlarm(self, request : AlarmDescriptor) -> AlarmID: + LOGGER.debug('SetKpiAlarm: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SetKpiAlarm(request) + LOGGER.debug('SetKpiAlarm result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetAlarms(self, request : Empty) -> AlarmIDList: + LOGGER.debug('GetAlarms: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetAlarms(request) + LOGGER.debug('GetAlarms result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + def GetAlarmDescriptor(self, request : AlarmID) -> AlarmDescriptor: + LOGGER.debug('GetAlarmDescriptor: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetAlarmDescriptor(request) + LOGGER.debug('GetAlarmDescriptor result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + def GetAlarmResponseStream(self, request : AlarmSubscription) -> AlarmResponse: + LOGGER.debug('GetAlarmResponseStream: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetAlarmResponseStream(request) + LOGGER.debug('GetAlarmResponseStream result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + def DeleteAlarm(self, request : AlarmID) -> Empty: + LOGGER.debug('DeleteAlarm: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.DeleteAlarm(request) + LOGGER.debug('DeleteAlarm result: {:s}'.format(grpc_message_to_json_string(response))) + return response + @RETRY_DECORATOR def GetStreamKpi(self, request : KpiId) -> Iterator[Kpi]: LOGGER.debug('GetStreamKpi: {:s}'.format(grpc_message_to_json_string(request))) diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index 1b5459e32c326893f89df02bd1c96fb459577a36..c77d9683a2372435779db520f9f4c537d5e012b0 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -9,7 +9,7 @@ Jinja2==3.0.3 ncclient==0.6.13 p4runtime==1.3.0 paramiko==2.9.2 -influxdb +influx-line-protocol==0.1.4 python-dateutil==2.8.2 python-json-logger==2.0.2 pytz==2021.3 diff --git a/src/monitoring/service/EventTools.py b/src/monitoring/service/EventTools.py index 6d017b627e2d464efbb67c7903afa529176bccd3..cbcf920f1c5dc98a18b0e48a123bc6490f55737c 100644 --- a/src/monitoring/service/EventTools.py +++ b/src/monitoring/service/EventTools.py @@ -91,14 +91,11 @@ class EventsDeviceCollector: kpi_descriptor.device_id.CopyFrom(device.device_id) kpi_descriptor.endpoint_id.CopyFrom(end_point.endpoint_id) - kpi_id = self._monitoring_client.CreateKpi(kpi_descriptor) + kpi_id = self._monitoring_client.SetKpi(kpi_descriptor) kpi_id_list.append(kpi_id) - return kpi_id_list - except ServiceException as e: LOGGER.exception('ListenEvents exception') - except Exception as e: # pragma: no cover LOGGER.exception('ListenEvents exception') diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py new file mode 100644 index 0000000000000000000000000000000000000000..ea6180aa072bd48a04f26d019ba1e4ab9e08af88 --- /dev/null +++ b/src/monitoring/service/MetricsDBTools.py @@ -0,0 +1,52 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from influx_line_protocol import Metric +import socket +import requests +import json +import sys + +class MetricsDB(): + def __init__(self, host, ilp_port, rest_port, table): + self.socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.host=host + self.ilp_port=ilp_port + self.rest_port=rest_port + self.table=table + + def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): + self.socket.connect((self.host,self.ilp_port)) + metric = Metric(self.table) + metric.with_timestamp(time) + metric.add_tag('kpi_id', kpi_id) + metric.add_tag('kpi_sample_type', kpi_sample_type) + metric.add_tag('device_id', device_id) + metric.add_tag('endpoint_id', endpoint_id) + metric.add_tag('service_id', service_id) + metric.add_value('kpi_value', kpi_value) + str_metric = str(metric) + str_metric += "\n" + self.socket.sendall((str_metric).encode()) + self.socket.close() + + def run_query(self, sql_query): + query_params = {'query': sql_query, 'fmt' : 'json'} + url = f"http://{self.host}:{self.rest_port}/exec" + try: + response = requests.get(url, params=query_params) + json_response = json.loads(response.text) + print(json_response) + except requests.exceptions.RequestException as e: + print(f'Error: {e}', file=sys.stderr) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 00dbf7c8ce84c618581762090ddfca663e304814..d9f8b1e100bada795f8d6c91a796f458da8d212f 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -13,47 +13,62 @@ # limitations under the License. import os, grpc, logging -from prometheus_client import Counter, Summary -from common.proto import context_pb2 -from common.proto import device_pb2 -from common.proto import monitoring_pb2 -from common.proto import monitoring_pb2_grpc + +from typing import Iterator + +from common.Constants import ServiceNameEnum +from common.Settings import get_setting, get_service_port_grpc, get_service_host +from common.proto.context_pb2 import Empty +from common.proto.device_pb2 import MonitoringSettings from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.monitoring_pb2_grpc import MonitoringServiceServicer +from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDList, SubsIDList, KpiId, \ + KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ + MonitorKpiRequest, Kpi, AlarmSubscription from common.rpc_method_wrapper.ServiceExceptions import ServiceException -from monitoring.service import SqliteTools, InfluxTools +from common.tools.timestamp.Converters import timestamp_float_to_string + +from monitoring.service import SqliteTools, MetricsDBTools from device.client.DeviceClient import DeviceClient +from prometheus_client import Counter, Summary + LOGGER = logging.getLogger(__name__) MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') -INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") -INFLUXDB_USER = os.environ.get("INFLUXDB_USER") -INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") -INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") +METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") +METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") +METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") +METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") -class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer): +DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) ) +DEVICESERVICE_SERVICE_PORT_GRPC = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=get_service_port_grpc(ServiceNameEnum.DEVICE)) + + +class MonitoringServiceServicerImpl(MonitoringServiceServicer): def __init__(self): LOGGER.info('Init monitoringService') # Init sqlite monitoring db self.sql_db = SqliteTools.SQLite('monitoring.db') + self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client - # Create influx_db client - self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE) + # Set metrics_db client + self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE) - # CreateKpi (CreateKpiRequest) returns (KpiId) {} - def CreateKpi( - self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext - ) -> monitoring_pb2.KpiId: + # SetKpi (SetKpiRequest) returns (KpiId) {} + def SetKpi( + self, request : KpiDescriptor, grpc_context : grpc.ServicerContext + ) -> KpiId: # CREATEKPI_COUNTER_STARTED.inc() - LOGGER.info('CreateKpi') + LOGGER.info('SetKpi') try: # Here the code to create a sqlite query to crete a KPI and return a KpiID - kpi_id = monitoring_pb2.KpiId() + kpi_id = KpiId() kpi_description = request.kpi_description kpi_sample_type = request.kpi_sample_type @@ -69,30 +84,52 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService # CREATEKPI_COUNTER_COMPLETED.inc() return kpi_id except ServiceException as e: - LOGGER.exception('CreateKpi exception') + LOGGER.exception('SetKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover - LOGGER.exception('CreateKpi exception') + LOGGER.exception('SetKpi exception') # CREATEKPI_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) + def DeleteKpi ( self, request : KpiId, grpc_context : grpc.ServicerContext) -> Empty: + + LOGGER.info('DeleteKpi') + try: + # TBC + return Empty() + except ServiceException as e: + LOGGER.exception('DeleteKpi exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('DeleteKpi exception') + + def GetKpiDescriptorList ( self, request : Empty, grpc_context : grpc.ServicerContext) -> KpiDescriptorList: + + LOGGER.info('GetKpiDescriptorList') + try: + # TBC + return KpiDescriptorList() + except ServiceException as e: + LOGGER.exception('GetKpiDescriptorList exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('GetKpiDescriptorList exception') + # rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {} - def MonitorKpi( - self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext - ) -> context_pb2.Empty: + def MonitorKpi ( self, request : MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('MonitorKpi') try: - # Creates the request to send to the device service - monitor_device_request = device_pb2.MonitoringSettings() + # Sets the request to send to the device service + monitor_device_request = MonitoringSettings() kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor) monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid - monitor_device_request.sampling_duration_s = request.sampling_duration_s - monitor_device_request.sampling_interval_s = request.sampling_interval_s + monitor_device_request.sampling_duration_s = request.monitoring_window_s + monitor_device_request.sampling_interval_s = request.sampling_rate_s device_client = DeviceClient() device_client.MonitorDeviceKpi(monitor_device_request) @@ -106,10 +143,10 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # CREATEKPI_COUNTER_FAILED.inc() - return context_pb2.Empty() + return Empty() # rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {} - def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty: + def IncludeKpi(self, request : Kpi, grpc_context : grpc.ServicerContext) -> Empty: LOGGER.info('IncludeKpi') @@ -117,18 +154,18 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context) if kpiDescriptor is None: LOGGER.warning('Ignoring sample with KPIId({:s}): not found in database'.format(str(request.kpi_id))) - return context_pb2.Empty() + return Empty() kpiSampleType = KpiSampleType.Name(kpiDescriptor.kpi_sample_type).upper().replace('KPISAMPLETYPE_', '') kpiId = request.kpi_id.kpi_id.uuid deviceId = kpiDescriptor.device_id.device_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid - time_stamp = request.timestamp + time_stamp = timestamp_float_to_string(request.timestamp.timestamp) kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) - # Build the structure to be included as point in the influxDB - self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) + # Build the structure to be included as point in the MetricsDB + self.metrics_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value) #self.influx_db.read_KPI_points() @@ -139,23 +176,21 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception: # pragma: no cover LOGGER.exception('IncludeKpi exception') # CREATEKPI_COUNTER_FAILED.inc() - return context_pb2.Empty() - - def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext): - # receives monitoring.KpiId returns stream monitoring.Kpi - LOGGER.info('GetStreamKpi') - yield monitoring_pb2.Kpi() + return Empty() - @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() - def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext): - # receives monitoring.KpiId returns monitoring.Kpi - LOGGER.info('GetInstantKpi') - return monitoring_pb2.Kpi() + # def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext): + # + # LOGGER.info('GetStreamKpi') + # yield monitoring_pb2.Kpi() + # + # @MONITORING_GETINSTANTKPI_REQUEST_TIME.time() + # def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext): + # + # LOGGER.info('GetInstantKpi') + # return monitoring_pb2.Kpi() - def GetKpiDescriptor( - self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext - ) -> monitoring_pb2.KpiDescriptor: + def GetKpiDescriptor(self, request : KpiId, grpc_context : grpc.ServicerContext) -> KpiDescriptor: LOGGER.info('getting Kpi by KpiID') try: kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid)) @@ -163,7 +198,7 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService #LOGGER.info('kpi_db={:s}'.format(str(kpi_db))) if kpi_db is None: return None - kpiDescriptor = monitoring_pb2.KpiDescriptor() + kpiDescriptor = KpiDescriptor() kpiDescriptor.kpi_description = kpi_db[1] kpiDescriptor.kpi_sample_type = kpi_db[2] @@ -178,3 +213,125 @@ class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceService except Exception: # pragma: no cover LOGGER.exception('GetKpiDescriptor exception') + + def QueryKpiData ( self, request : KpiQuery, grpc_context : grpc.ServicerContext) -> KpiList: + + LOGGER.info('QueryKpiData') + try: + # TBC + return KpiQuery() + except ServiceException as e: + LOGGER.exception('QueryKpiData exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('QueryKpiData exception') + + def SubscribeKpi ( self, request : SubsDescriptor, grpc_context : grpc.ServicerContext) -> KpiList: + + LOGGER.info('SubscribeKpi') + try: + # TBC + yield KpiList() + except ServiceException as e: + LOGGER.exception('SubscribeKpi exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('SubscribeKpi exception') + + + def GetSubsDescriptor ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> SubsDescriptor: + + LOGGER.info('GetSubsDescriptor') + try: + # TBC + return SubsDescriptor() + except ServiceException as e: + LOGGER.exception('GetSubsDescriptor exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('GetSubsDescriptor exception') + + def GetSubscriptions ( self, request : Empty, grpc_context : grpc.ServicerContext) -> SubsIDList: + + LOGGER.info('GetSubscriptions') + try: + # TBC + return SubsIDList() + except ServiceException as e: + LOGGER.exception('GetSubscriptions exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('GetSubscriptions exception') + + def DeleteSubscription ( self, request : SubscriptionID, grpc_context : grpc.ServicerContext) -> Empty: + + LOGGER.info('DeleteSubscription') + try: + # TBC + return Empty() + except ServiceException as e: + LOGGER.exception('DeleteSubscription exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('DeleteSubscription exception') + + def SetKpiAlarm ( self, request : AlarmDescriptor, grpc_context : grpc.ServicerContext) -> AlarmResponse: + + LOGGER.info('SetKpiAlarm') + try: + # TBC + return AlarmResponse() + except ServiceException as e: + LOGGER.exception('SetKpiAlarm exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('SetKpiAlarm exception') + + + def GetAlarms ( self, request : Empty, grpc_context : grpc.ServicerContext) -> AlarmIDList: + + LOGGER.info('GetAlarms') + try: + # TBC + return AlarmIDList() + except ServiceException as e: + LOGGER.exception('GetAlarms exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('GetAlarms exception') + + def GetAlarmDescriptor ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> AlarmDescriptor: + + LOGGER.info('GetAlarmDescriptor') + try: + # TBC + return AlarmDescriptor() + except ServiceException as e: + LOGGER.exception('GetAlarmDescriptor exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('GetAlarmDescriptor exception') + + def GetAlarmResponseStream(self, request : AlarmSubscription, grpc_context : grpc.ServicerContext) -> Iterator[AlarmResponse]: + + LOGGER.info('GetAlarmResponseStream') + try: + # TBC + yield AlarmResponse() + except ServiceException as e: + LOGGER.exception('GetAlarmResponseStream exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('GetAlarmResponseStream exception') + + def DeleteAlarm ( self, request : AlarmID, grpc_context : grpc.ServicerContext) -> Empty: + + LOGGER.info('DeleteAlarm') + try: + # TBC + return Empty() + except ServiceException as e: + LOGGER.exception('DeleteAlarm exception') + grpc_context.abort(e.code, e.details) + except Exception as e: # pragma: no cover + LOGGER.exception('DeleteAlarm exception') diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index 94fcc78c1a408f21e1b16316237560e329cb78b9..7b7f4150e5c084bbf25c6a4d9c1c47b70e3f76a0 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -14,6 +14,8 @@ from common.proto import monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.tools.timestamp.Converters import timestamp_string_to_float + def kpi(): _kpi = monitoring_pb2.Kpi() @@ -34,16 +36,16 @@ def create_kpi_request(): _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' # pylint: disable=maybe-no-member return _create_kpi_request -def monitor_kpi_request(kpi_uuid, sampling_duration_s, sampling_interval_s): +def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): _monitor_kpi_request = monitoring_pb2.MonitorKpiRequest() _monitor_kpi_request.kpi_id.kpi_id.uuid = kpi_uuid # pylint: disable=maybe-no-member - _monitor_kpi_request.sampling_duration_s = sampling_duration_s - _monitor_kpi_request.sampling_interval_s = sampling_interval_s + _monitor_kpi_request.monitoring_window_s = monitoring_window_s + _monitor_kpi_request.sampling_rate_s = sampling_rate_s return _monitor_kpi_request def include_kpi_request(): - _include_kpi_request = monitoring_pb2.Kpi() - _include_kpi_request.kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member - _include_kpi_request.timestamp = "2021-10-12T13:14:42Z" - _include_kpi_request.kpi_value.intVal = 500 # pylint: disable=maybe-no-member + _include_kpi_request = monitoring_pb2.Kpi() + _include_kpi_request.kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member + _include_kpi_request.timestamp.timestamp = timestamp_string_to_float("2021-10-12T13:14:42Z") + _include_kpi_request.kpi_value.int32Val = 500 # pylint: disable=maybe-no-member return _include_kpi_request diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ee7d6f51eb81bbe1c6a7b005f1e02108d14bc65e..f8b4d59bdf61b6897da36258496eb5be7faaf8a2 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -13,6 +13,7 @@ # limitations under the License. import copy, logging, os, pytest +from time import sleep from typing import Tuple from common.Constants import ServiceNameEnum from common.Settings import ( @@ -21,10 +22,11 @@ from common.orm.Database import Database from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.MessageBroker import MessageBroker +from common.proto.monitoring_pb2 import KpiId, KpiDescriptor from context.client.ContextClient import ContextClient from context.service.grpc_server.ContextService import ContextService -from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device +from common.proto.context_pb2 import EventTypeEnum, DeviceEvent, Device, Empty from device.client.DeviceClient import DeviceClient from device.service.DeviceService import DeviceService @@ -35,7 +37,7 @@ from device.service.drivers import DRIVERS from monitoring.client.MonitoringClient import MonitoringClient from common.proto import context_pb2, monitoring_pb2 from common.proto.kpi_sample_types_pb2 import KpiSampleType -from monitoring.service import SqliteTools, InfluxTools +from monitoring.service import SqliteTools, MetricsDBTools from monitoring.service.MonitoringService import MonitoringService from monitoring.service.EventTools import EventsDeviceCollector from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request @@ -63,11 +65,11 @@ MONITORING_SERVICE_PORT = 10000 + get_service_port_grpc(ServiceNameEnum.MONITORI os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(MONITORING_SERVICE_PORT) -INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME") -INFLUXDB_PORT = os.environ.get("INFLUXDB_PORT") -INFLUXDB_USER = os.environ.get("INFLUXDB_USER") -INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD") -INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE") +METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") +METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") +METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") +METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") + @pytest.fixture(scope='session') def context_db_mb() -> Tuple[Database, MessageBroker]: @@ -149,10 +151,11 @@ def sql_db(): return _sql_db @pytest.fixture(scope='session') -def influx_db(): - _influx_db = InfluxTools.Influx( - INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USER, INFLUXDB_PASSWORD, INFLUXDB_DATABASE) - return _influx_db +def metrics_db(): + _metrics_db = MetricsDBTools.MetricsDB( + METRICSDB_HOSTNAME, METRICSDB_ILP_PORT, METRICSDB_REST_PORT, METRICSDB_TABLE) + return _metrics_db + ########################### @@ -160,12 +163,12 @@ def influx_db(): ########################### # Test case that makes use of client fixture to test server's CreateKpi method -def test_create_kpi(monitoring_client): # pylint: disable=redefined-outer-name +def test_set_kpi(monitoring_client): # pylint: disable=redefined-outer-name # make call to server LOGGER.warning('test_create_kpi requesting') - response = monitoring_client.CreateKpi(create_kpi_request()) + response = monitoring_client.SetKpi(create_kpi_request()) LOGGER.debug(str(response)) - assert isinstance(response, monitoring_pb2.KpiId) + assert isinstance(response, KpiId) # Test case that makes use of client fixture to test server's MonitorKpi method def test_monitor_kpi( @@ -196,11 +199,11 @@ def test_monitor_kpi( response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID - response = monitoring_client.CreateKpi(create_kpi_request()) + response = monitoring_client.SetKpi(create_kpi_request()) _monitor_kpi_request = monitor_kpi_request(response.kpi_id.uuid, 120, 5) # pylint: disable=maybe-no-member response = monitoring_client.MonitorKpi(_monitor_kpi_request) LOGGER.debug(str(response)) - assert isinstance(response, context_pb2.Empty) + assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's IncludeKpi method @@ -209,29 +212,29 @@ def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_include_kpi requesting') response = monitoring_client.IncludeKpi(include_kpi_request()) LOGGER.debug(str(response)) - assert isinstance(response, context_pb2.Empty) + assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's GetStreamKpi method def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getstream_kpi begin') response = monitoring_client.GetStreamKpi(kpi()) LOGGER.debug(str(response)) - #assert isinstance(response, monitoring_pb2.Kpi) + #assert isinstance(response, Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method -def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name - LOGGER.warning('test_getinstant_kpi begin') - response = monitoring_client.GetInstantKpi(kpi_id()) - LOGGER.debug(str(response)) - assert isinstance(response, monitoring_pb2.Kpi) +# def test_get_instant_kpi(monitoring_client): # pylint: disable=redefined-outer-name +# LOGGER.warning('test_getinstant_kpi begin') +# response = monitoring_client.GetInstantKpi(kpi_id()) +# LOGGER.debug(str(response)) +# # assert isinstance(response, Kpi) # Test case that makes use of client fixture to test server's GetInstantKpi method def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_getkpidescritor_kpi begin') - response = monitoring_client.CreateKpi(create_kpi_request()) + response = monitoring_client.SetKpi(create_kpi_request()) response = monitoring_client.GetKpiDescriptor(response) LOGGER.debug(str(response)) - assert isinstance(response, monitoring_pb2.KpiDescriptor) + assert isinstance(response, KpiDescriptor) def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name LOGGER.warning('test_sqlitedb_tools_insert_kpi begin') @@ -300,11 +303,13 @@ def test_sqlitedb_tools_delete_kpid_id(sql_db): # pylint: disable=redefined-oute assert response -def test_influxdb_tools_write_kpi(influx_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_influxdb_tools_write_kpi begin') +def test_metrics_db_tools_write_kpi(metrics_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_metric_sdb_tools_write_kpi begin') + + +def test_metrics_db_tools_read_kpi_points(metrics_db): # pylint: disable=redefined-outer-name + LOGGER.warning('test_metrics_db_tools_read_kpi_points begin') -def test_influxdb_tools_read_kpi_points(influx_db): # pylint: disable=redefined-outer-name - LOGGER.warning('test_influxdb_tools_read_kpi_points begin') def test_events_tools( @@ -414,6 +419,9 @@ def test_listen_events( response = device_client.AddDevice(Device(**device_with_connect_rules)) assert response.device_uuid.uuid == DEVICE_DEV1_UUID + sleep(0.1) + kpi_id_list = events_collector.listen_events() assert len(kpi_id_list) > 0 + events_collector.stop()