diff --git a/src/kpi_value_api/Dockerfile b/src/kpi_value_api/Dockerfile index 7dd8d307b8338c4a29e97c742ca12a49c4611e0a..25b8da931f88000dd229c536456a3eb1fa7f56db 100644 --- a/src/kpi_value_api/Dockerfile +++ b/src/kpi_value_api/Dockerfile @@ -63,6 +63,8 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow COPY src/kpi_value_api/. kpi_value_api/ +COPY src/kpi_manager/__init__.py kpi_manager/__init__.py +COPY src/kpi_manager/client/. kpi_manager/client/ # Start the service ENTRYPOINT ["python", "-m", "kpi_value_api.service"] diff --git a/src/kpi_value_api/requirements.in b/src/kpi_value_api/requirements.in index 7e4694109dc4e1d31b86abfc03162494faafcdaf..f5695906a8d02d55e15960a76986b8d03f02dba1 100644 --- a/src/kpi_value_api/requirements.in +++ b/src/kpi_value_api/requirements.in @@ -14,3 +14,4 @@ confluent-kafka==2.3.* requests==2.27.* +prometheus-api-client==0.5.3 \ No newline at end of file diff --git a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py index 1559457d7fc80c3e9718c182f736b3f44abdeaea..b2ebecad00e65ac63f3681d43c317585f5dbe7ed 100644 --- a/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py +++ b/src/kpi_value_api/service/KpiValueApiServiceServicerImpl.py @@ -12,18 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, grpc, requests +import logging, grpc from typing import Tuple, Any -from datetime import datetime from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.proto.context_pb2 import Empty +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType from confluent_kafka import Producer as KafkaProducer +from prometheus_api_client import PrometheusConnect +from prometheus_api_client.utils import parse_datetime + +from kpi_manager.client.KpiManagerClient import KpiManagerClient LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC') @@ -63,40 +68,67 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ) -> KpiValueList: LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request)) response = KpiValueList() - metrics = [kpi.kpi_id for kpi in request.kpi_id] - start_timestamps = [timestamp for timestamp in request.start_timestamp] - end_timestamps = [timestamp for timestamp in request.end_timestamp] - results = [] + + kpi_manager_client = KpiManagerClient() + prom_connect = PrometheusConnect(url=PROM_URL) - for start, end in zip(start_timestamps, end_timestamps): - start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z" - end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z" + metrics = [self.GetKpiSampleType(kpi, kpi_manager_client) for kpi in request.kpi_id] + start_timestamps = [parse_datetime(timestamp) for timestamp in request.start_timestamp] + end_timestamps = [parse_datetime(timestamp) for timestamp in request.end_timestamp] + prom_response = [] + for start_time, end_time in zip(start_timestamps, end_timestamps): for metric in metrics: - url = f'{PROM_URL}/api/v1/query_range' - params = { - 'query': metric, - 'start': start_str, - 'end' : end_str, - 'step' : '30s' # or any other step you need - } - response = requests.get(url, params=params) - if response.status_code == 200: - data = response.json() - for result in data['data']['result']: - for value in result['values']: - kpi_value = KpiValue( - kpi_id=metric, - timestamp=str(seconds=value[0]), - kpi_value_type=self._convert_value_to_kpi_value_type(value[1]) - ) - results.append(kpi_value) + # print(start_time, end_time, metric) + prom_response.append( + prom_connect.custom_query_range( + query = metric, # this is the metric name and label config + start_time = start_time, + end_time = end_time, + step = 30, # or any other step value (missing in gRPC Filter request) + ) + ) + + for single_resposne in prom_response: + # print ("{:}".format(single_resposne)) + for record in single_resposne: + # print("Record >>> kpi: {:} >>> time & values set: {:}".format(record['metric']['__name__'], record['values'])) + for value in record['values']: + # print("{:} - {:}".format(record['metric']['__name__'], value)) + kpi_value = KpiValue() + kpi_value.kpi_id.kpi_id = record['metric']['__name__'], + kpi_value.timestamp = value[0], + kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1]) + response.kpi_value_list.append(kpi_value) + return response + + def GetKpiSampleType(self, kpi_value: str, kpi_manager_client): + print("--- START -----") - def _convert_value_to_kpi_value_type(self, value): + kpi_id = KpiId() + kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid + # print("KpiId generated: {:}".format(kpi_id)) + + try: + kpi_descriptor_object = KpiDescriptor() + kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) + # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object??? + if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: + LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) + print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) + return KpiSampleType.Name(kpi_descriptor_object.kpi_sample_type) # extract and return the name of KpiSampleType + else: + LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + except Exception as e: + LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) + print ("Unable to get KpiDescriptor. Error: {:}".format(e)) + + def ConverValueToKpiValueType(self, value): # Check if the value is an integer (int64) try: - int64_value = int(value) - return KpiValueType(int64Val=int64_value) + int_value = int(value) + return KpiValueType(int64Val=int_value) except ValueError: pass # Check if the value is a float @@ -112,7 +144,6 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): # If none of the above, treat it as a string return KpiValueType(stringVal=value) - def delivery_callback(self, err, msg): if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index b681164786bd310d457998bae55b836522888b94..40bffa06ef162abdc55e2d4171dfe287ce8febb7 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -93,4 +93,4 @@ class MetricWriterToPrometheus: print("Metric {:} is already registered. Skipping.".format(metric_name)) else: LOGGER.error("Error while pushing metric: {}".format(e)) - raise \ No newline at end of file + raise