Skip to content
Snippets Groups Projects
MetricsExporter.py 5.74 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime, logging, os, requests
from typing import Any, Literal, Union
from questdb.ingress import Sender, IngressError # pylint: disable=no-name-in-module

LOGGER = logging.getLogger(__name__)

MAX_RETRIES = 10
DELAY_RETRIES = 0.5

MSG_EXPORT_EXECUTED   = '[rest_request] Export(timestamp={:s}, symbols={:s}, columns={:s}) executed'
MSG_EXPORT_FAILED     = '[rest_request] Export(timestamp={:s}, symbols={:s}, columns={:s}) failed, retry={:d}/{:d}...'
MSG_REST_BAD_STATUS   = '[rest_request] Bad Reply url="{:s}" params="{:s}": status_code={:d} content={:s}'
MSG_REST_EXECUTED     = '[rest_request] Query({:s}) executed, result: {:s}'
MSG_REST_FAILED       = '[rest_request] Query({:s}) failed, retry={:d}/{:d}...'
MSG_ERROR_MAX_RETRIES = 'Maximum number of retries achieved: {:d}'

METRICSDB_HOSTNAME  = os.environ.get('METRICSDB_HOSTNAME')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICSDB_ILP_PORT  = int(os.environ.get('METRICSDB_ILP_PORT', 0))
METRICSDB_REST_PORT = int(os.environ.get('METRICSDB_REST_PORT', 0))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICSDB_TABLE_SLICE_GROUPS = os.environ.get('METRICSDB_TABLE_SLICE_GROUPS')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

COLORS = {
    'platinum': '#E5E4E2',
    'gold'    : '#FFD700',
    'silver'  : '#808080',
    'bronze'  : '#CD7F32',
}
DEFAULT_COLOR = '#000000' # black

SQL_MARK_DELETED = "UPDATE {:s} SET is_deleted='true' WHERE slice_uuid='{:s}';"

class MetricsExporter():
    def create_table(self) -> None:
        sql_query = ' '.join([
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            'CREATE TABLE IF NOT EXISTS {:s} ('.format(str(METRICSDB_TABLE_SLICE_GROUPS)),
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            ','.join([
                'timestamp TIMESTAMP',
                'slice_uuid SYMBOL',
                'slice_group SYMBOL',
                'slice_color SYMBOL',
                'is_deleted SYMBOL',
                'slice_availability DOUBLE',
                'slice_capacity_center DOUBLE',
                'slice_capacity DOUBLE',
            ]),
            ') TIMESTAMP(timestamp);'
        ])
        try:
            result = self.rest_request(sql_query)
            if not result: raise Exception
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.info('Table {:s} created'.format(str(METRICSDB_TABLE_SLICE_GROUPS)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except Exception as e:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.warning('Table {:s} cannot be created. {:s}'.format(str(METRICSDB_TABLE_SLICE_GROUPS), str(e)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            raise

    def export_point(
        self, slice_uuid : str, slice_group : str, slice_availability : float, slice_capacity : float,
        is_center : bool = False
    ) -> None:
        dt_timestamp = datetime.datetime.utcnow()
        slice_color = COLORS.get(slice_group, DEFAULT_COLOR)
        symbols = dict(slice_uuid=slice_uuid, slice_group=slice_group, slice_color=slice_color, is_deleted='false')
        columns = dict(slice_availability=slice_availability)
        columns['slice_capacity_center' if is_center else 'slice_capacity'] = slice_capacity

        for retry in range(MAX_RETRIES):
            try:
                with Sender(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT) as sender:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    sender.row(METRICSDB_TABLE_SLICE_GROUPS, symbols=symbols, columns=columns, at=dt_timestamp)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    sender.flush()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                LOGGER.debug(MSG_EXPORT_EXECUTED.format(str(dt_timestamp), str(symbols), str(columns)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                return
            except (Exception, IngressError): # pylint: disable=broad-except
                LOGGER.exception(MSG_EXPORT_FAILED.format(
                    str(dt_timestamp), str(symbols), str(columns), retry+1, MAX_RETRIES))

        raise Exception(MSG_ERROR_MAX_RETRIES.format(MAX_RETRIES))

    def delete_point(self, slice_uuid : str) -> None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        sql_query = SQL_MARK_DELETED.format(str(METRICSDB_TABLE_SLICE_GROUPS), slice_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        try:
            result = self.rest_request(sql_query)
            if not result: raise Exception
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.debug('Point {:s} deleted'.format(str(slice_uuid)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except Exception as e:
            LOGGER.warning('Point {:s} cannot be deleted. {:s}'.format(str(slice_uuid), str(e)))
            raise

    def rest_request(self, rest_query : str) -> Union[Any, Literal[True]]:
        url = 'http://{:s}:{:d}/exec'.format(METRICSDB_HOSTNAME, METRICSDB_REST_PORT)
        params = {'query': rest_query, 'fmt': 'json'}

        for retry in range(MAX_RETRIES):
            try:
                response = requests.get(url, params=params)
                status_code = response.status_code
                if status_code not in {200}:
                    str_content = response.content.decode('UTF-8')
                    raise Exception(MSG_REST_BAD_STATUS.format(str(url), str(params), status_code, str_content))

                json_response = response.json()
                if 'ddl' in json_response:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    LOGGER.debug(MSG_REST_EXECUTED.format(str(rest_query), str(json_response['ddl'])))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    return True
                elif 'dataset' in json_response:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    LOGGER.debug(MSG_REST_EXECUTED.format(str(rest_query), str(json_response['dataset'])))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    return json_response['dataset']

            except Exception: # pylint: disable=broad-except
                LOGGER.exception(MSG_REST_FAILED.format(str(rest_query), retry+1, MAX_RETRIES))

        raise Exception(MSG_ERROR_MAX_RETRIES.format(MAX_RETRIES))