Skip to content
Snippets Groups Projects
MetricsDBTools.py 2.93 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    
    from questdb.ingress import Sender, IngressError
    
    import requests
    import json
    
    import logging
    
    from common.tools.timestamp.Converters import timestamp_float_to_string
    
    
    LOGGER = logging.getLogger(__name__)
    
    
    class MetricsDB():
      def __init__(self, host, ilp_port, rest_port, table):
    
        self.host=host
        self.ilp_port=int(ilp_port)
        self.rest_port=rest_port
        self.table=table
    
        self.create_table()
    
    
      def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value):
    
        counter=0
        number_of_retries=10
        while (counter<number_of_retries):
          try:
            with Sender(self.host, self.ilp_port) as sender:
              sender.row(
              self.table,
              symbols={
                  'kpi_id': kpi_id,
                  'kpi_sample_type': kpi_sample_type,
                  'device_id': device_id,
                  'endpoint_id': endpoint_id,
                  'service_id': service_id},
              columns={
                  'kpi_value': kpi_value},
              at=datetime.datetime.fromtimestamp(time))
              sender.flush()
            counter=number_of_retries
            LOGGER.info(f"KPI written")
          except IngressError as ierr:
            # LOGGER.info(ierr)
    
            # LOGGER.info(f"Retry number {counter}")
    
    
      def run_query(self, sql_query):
    
        query_params = {'query': sql_query, 'fmt' : 'json'}
        url = f"http://{self.host}:{self.rest_port}/exec"
    
        response = requests.get(url, params=query_params)
        json_response = json.loads(response.text)
        LOGGER.info(f"Query executed, result:{json_response}")
    
      
      def create_table(self):
        query = f'CREATE TABLE IF NOT EXISTS {self.table}'\
        '(kpi_id SYMBOL,'\
        'kpi_sample_type SYMBOL,'\
        'device_id SYMBOL,'\
        'endpoint_id SYMBOL,'\
        'service_id SYMBOL,'\
        'timestamp TIMESTAMP,'\
        'kpi_value DOUBLE)'\
        'TIMESTAMP(timestamp);'
        self.run_query(query)
        LOGGER.info(f"Table {self.table} created")
    
    
      def get_subscription_data(self, kpi_id, end_date, sampling_interval_s):
          start_date = end_date-sampling_interval_s
          query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')"
          response=self.run_query(query)
          kpi_list=response['dataset']