Skip to content
Snippets Groups Projects
KpiValueApiServiceServicerImpl.py 5.17 KiB
Newer Older
Waleed Akbar's avatar
Waleed Akbar committed
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Waleed Akbar's avatar
Waleed Akbar committed
import logging, grpc, requests
from typing import Tuple, Any
Waleed Akbar's avatar
Waleed Akbar committed
from datetime import datetime
Waleed Akbar's avatar
Waleed Akbar committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic

from common.proto.context_pb2 import Empty
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
Waleed Akbar's avatar
Waleed Akbar committed
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType
Waleed Akbar's avatar
Waleed Akbar committed

from confluent_kafka import Producer as KafkaProducer


Waleed Akbar's avatar
Waleed Akbar committed
LOGGER       = logging.getLogger(__name__)
Waleed Akbar's avatar
Waleed Akbar committed
METRICS_POOL = MetricsPool('KpiValueAPI', 'NBIgRPC')
Waleed Akbar's avatar
Waleed Akbar committed
PROM_URL     = "http://localhost:9090"
Waleed Akbar's avatar
Waleed Akbar committed

class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
Waleed Akbar's avatar
Waleed Akbar committed
    def __init__(self):
Waleed Akbar's avatar
Waleed Akbar committed
        LOGGER.debug('Init KpiValueApiService')
    
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
                       ) -> Empty:
        LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
        producer_obj = KafkaProducer({
            'bootstrap.servers' : KafkaConfig.SERVER_IP.value    
        })
Waleed Akbar's avatar
Waleed Akbar committed
        for kpi_value in request.kpi_value_list:
            kpi_value_to_produce : Tuple [str, Any, Any] = (
Waleed Akbar's avatar
Waleed Akbar committed
                kpi_value.kpi_id.kpi_id,            
                kpi_value.timestamp,                
                kpi_value.kpi_value_type            # kpi_value.kpi_value_type.(many options) how?
Waleed Akbar's avatar
Waleed Akbar committed
            )
            LOGGER.debug('KPI to produce is {:}'.format(kpi_value_to_produce))
Waleed Akbar's avatar
Waleed Akbar committed
            msg_key = "gRPC-kpivalueapi"        # str(__class__.__name__) can be used
        
            producer_obj.produce(
                KafkaTopic.VALUE.value, 
Waleed Akbar's avatar
Waleed Akbar committed
                key      = msg_key,
                value    = kpi_value.SerializeToString(),      # value = json.dumps(kpi_value_to_produce),
                callback = self.delivery_callback
            )
Waleed Akbar's avatar
Waleed Akbar committed
            producer_obj.flush()
        return Empty()

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def SelectKpiValues(self, request: KpiValueFilter, grpc_context: grpc.ServicerContext
                        ) -> KpiValueList:
Waleed Akbar's avatar
Waleed Akbar committed
        LOGGER.debug('StoreKpiValues: Received gRPC message object: {:}'.format(request))
        response = KpiValueList()
        metrics          = [kpi.kpi_id for kpi in request.kpi_id]
        start_timestamps = [timestamp for timestamp in request.start_timestamp]
        end_timestamps   = [timestamp for timestamp in request.end_timestamp]
        results = []

        for start, end in zip(start_timestamps, end_timestamps):
            start_str = datetime.fromtimestamp(start.seconds).isoformat() + "Z"
            end_str = datetime.fromtimestamp(end.seconds).isoformat() + "Z"

            for metric in metrics:
                url    = f'{PROM_URL}/api/v1/query_range'
                params = {
                    'query': metric,
                    'start': start_str,
                    'end'  : end_str,
                    'step' : '30s'           # or any other step you need
                }
                response = requests.get(url, params=params)
                if response.status_code == 200:
                    data = response.json()
                    for result in data['data']['result']:
                        for value in result['values']:
                            kpi_value = KpiValue(
                                kpi_id=metric,
                                timestamp=str(seconds=value[0]),
                                kpi_value_type=self._convert_value_to_kpi_value_type(value[1])
                            )
                            results.append(kpi_value)

    def _convert_value_to_kpi_value_type(self, value):
        # Check if the value is an integer (int64)
        try:
            int64_value = int(value)
            return KpiValueType(int64Val=int64_value)
        except ValueError:
            pass
        # Check if the value is a float
        try:
            float_value = float(value)
            return KpiValueType(floatVal=float_value)
        except ValueError:
            pass
        # Check if the value is a boolean
        if value.lower() in ['true', 'false']:
            bool_value = value.lower() == 'true'
            return KpiValueType(boolVal=bool_value)
        # If none of the above, treat it as a string
        return KpiValueType(stringVal=value)

Waleed Akbar's avatar
Waleed Akbar committed

    def delivery_callback(self, err, msg):
        if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
Waleed Akbar's avatar
Waleed Akbar committed
        else:   LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))