Commit 494b4cd7 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Improvement in SelectKpiValues method.

- Added "GetKpiSampleType" method to extract KpiSampleType based on KpiId.
- Added PromtheusConnect method to query Prometheus from prometheus_api_client library.
- KpiManagerClient added in DockerFile
- prometheus_api_client added in requirement file.
parent 70677712
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -63,6 +63,8 @@ RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/kpi_value_api/. kpi_value_api/
COPY src/kpi_manager/__init__.py kpi_manager/__init__.py
COPY src/kpi_manager/client/. kpi_manager/client/

# Start the service
ENTRYPOINT ["python", "-m", "kpi_value_api.service"]
+1 −0
Original line number Diff line number Diff line
@@ -14,3 +14,4 @@

confluent-kafka==2.3.*
requests==2.27.*
prometheus-api-client==0.5.3
 No newline at end of file
+62 −31
Original line number Diff line number Diff line
@@ -12,18 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, grpc, requests
import logging, grpc
from typing import Tuple, Any
from datetime import datetime
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic

from common.proto.context_pb2 import Empty
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType

from confluent_kafka import Producer as KafkaProducer

from prometheus_api_client import PrometheusConnect
from prometheus_api_client.utils import parse_datetime

from kpi_manager.client.KpiManagerClient import KpiManagerClient

LOGGER       = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC')
@@ -63,40 +68,67 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
                        ) -> KpiValueList:
        LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
        response = KpiValueList()
        metrics          = [kpi.kpi_id for kpi in request.kpi_id]
        start_timestamps = [timestamp for timestamp in request.start_timestamp]
        end_timestamps   = [timestamp for timestamp in request.end_timestamp]
        results = []
        
        for start, end in zip(start_timestamps, end_timestamps):
            start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z"
            end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z"
        kpi_manager_client = KpiManagerClient()
        prom_connect       = PrometheusConnect(url=PROM_URL)

        metrics          = [self.GetKpiSampleType(kpi, kpi_manager_client) for kpi       in request.kpi_id]
        start_timestamps = [parse_datetime(timestamp)                      for timestamp in request.start_timestamp]
        end_timestamps   = [parse_datetime(timestamp)                      for timestamp in request.end_timestamp]

        prom_response = []
        for start_time, end_time in zip(start_timestamps, end_timestamps):
            for metric in metrics:
                url    = f'{PROM_URL}/api/v1/query_range'
                params = {
                    'query': metric,
                    'start': start_str,
                    'end'  : end_str,
                    'step' : '30s'           # or any other step you need
                }
                response = requests.get(url, params=params)
                if response.status_code == 200:
                    data = response.json()
                    for result in data['data']['result']:
                        for value in result['values']:
                            kpi_value = KpiValue(
                                kpi_id=metric,
                                timestamp=str(seconds=value[0]),
                                kpi_value_type=self._convert_value_to_kpi_value_type(value[1])
                # print(start_time, end_time, metric)
                prom_response.append(
                    prom_connect.custom_query_range(
                    query      = metric,        # this is the metric name and label config
                    start_time = start_time,
                    end_time   = end_time,
                    step       = 30,            # or any other step value (missing in gRPC Filter request)
                    )
                )
                            results.append(kpi_value)
        
    def _convert_value_to_kpi_value_type(self, value):
        for single_resposne in prom_response:
            # print ("{:}".format(single_resposne))
            for record in single_resposne:
                # print("Record >>> kpi: {:} >>> time & values set: {:}".format(record['metric']['__name__'], record['values']))
                for value in record['values']:
                    # print("{:} - {:}".format(record['metric']['__name__'], value))
                    kpi_value = KpiValue()
                    kpi_value.kpi_id.kpi_id  = record['metric']['__name__'],      
                    kpi_value.timestamp      = value[0],      
                    kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1])
                    response.kpi_value_list.append(kpi_value)
        return response
    
    def GetKpiSampleType(self, kpi_value: str, kpi_manager_client):
        print("--- START -----")

        kpi_id = KpiId()
        kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
        # print("KpiId generated: {:}".format(kpi_id))

        try:
            kpi_descriptor_object = KpiDescriptor()
            kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
            # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object???
            if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid:
                LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
                print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object))
                return KpiSampleType.Name(kpi_descriptor_object.kpi_sample_type)    # extract and return the name of KpiSampleType
            else:
                LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
                print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id))
        except Exception as e:
            LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
            print ("Unable to get KpiDescriptor. Error: {:}".format(e))

    def ConverValueToKpiValueType(self, value):
        # Check if the value is an integer (int64)
        try:
            int64_value = int(value)
            return KpiValueType(int64Val=int64_value)
            int_value = int(value)
            return KpiValueType(int64Val=int_value)
        except ValueError:
            pass
        # Check if the value is a float
@@ -112,7 +144,6 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
        # If none of the above, treat it as a string
        return KpiValueType(stringVal=value)


    def delivery_callback(self, err, msg):
        if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
        else:   LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
+1 −1

File changed.

Contains only whitespace changes.