Commit d34a33b9 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Slice component:

- Added old test files
parent 909433c2
Loading
Loading
Loading
Loading
+98 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, os, pandas, random, sys, time
#from matplotlib import pyplot as plt
from sklearn.cluster import KMeans
from typing import Dict, List, Tuple

os.environ['METRICSDB_HOSTNAME' ] = '127.0.0.1' #'questdb-public.qdb.svc.cluster.local'
os.environ['METRICSDB_ILP_PORT' ] = '9009'
os.environ['METRICSDB_REST_PORT'] = '9000'

from .MetricsExporter import MetricsExporter # pylint: disable=wrong-import-position

logging.basicConfig(level=logging.DEBUG)
LOGGER : logging.Logger = logging.getLogger(__name__)

def get_random_slices(count : int) -> List[Tuple[str, float, float]]:
    slices = list()
    for i in range(count):
        slice_name          = 'slice-{:03d}'.format(i)
        slice_availability  = random.uniform(00.0, 99.99)
        slice_capacity_gbps = random.uniform(0.1, 100.0)
        slices.append((slice_name, slice_availability, slice_capacity_gbps))
    return slices

def init_kmeans() -> Tuple[KMeans, Dict[str, int]]:
    groups = [
        # Name, avail[0..100], bw_gbps[0..100]
        ('bronze',   10.0,  10.0), # ('silver',   25.0,  25.0),
        ('silver',   30.0,  40.0), # ('silver',   25.0,  25.0),
        ('gold',     70.0,  50.0), # ('gold',     90.0,  50.0),
        ('platinum', 99.0, 100.0),
    ]
    df_groups = pandas.DataFrame(groups, columns=['name', 'availability', 'capacity'])

    num_clusters = len(groups)
    k_means = KMeans(n_clusters=num_clusters)
    k_means.fit(df_groups[['availability', 'capacity']])

    df_groups['label'] = k_means.predict(df_groups[['availability', 'capacity']])
    mapping = {
        group['name']:{k:v for k,v in group.items() if k != 'name'}
        for group in list(df_groups.to_dict('records'))
    }

    return k_means, mapping

def main():
    LOGGER.info('Starting...')
    metrics_exporter = MetricsExporter()
    metrics_exporter.create_table()

    k_means, mapping = init_kmeans()
    label_to_group = {}
    for group_name,group_attrs in mapping.items():
        label = group_attrs['label']
        availability = group_attrs['availability']
        capacity = group_attrs['capacity']
        metrics_exporter.export_point(group_name, group_name, availability, capacity, is_center=True)
        label_to_group[label] = group_name

    slices = get_random_slices(10000)
    for slice_ in slices:
        sample = pandas.DataFrame([slice_[1:3]], columns=['availability', 'capacity'])
        sample['label'] = k_means.predict(sample)
        sample = sample.to_dict('records')[0]
        label = sample['label']
        availability = sample['availability']
        capacity = sample['capacity']
        group_name = label_to_group[label]
        metrics_exporter.export_point(slice_[0], group_name, availability, capacity, is_center=False)
        time.sleep(0.01)

    #df_silver   = df_slices[df_slices['group']==mapping['silver']]
    #df_gold     = df_slices[df_slices['group']==mapping['gold']]
    #df_platinum = df_slices[df_slices['group']==mapping['platinum']]
    #plt.scatter(df_silver.availability,         df_silver.capacity,             s=25,  c='black' )
    #plt.scatter(df_gold.availability,           df_gold.capacity,               s=25,  c='gold'  )
    #plt.scatter(df_platinum.availability,       df_platinum.capacity,           s=25,  c='silver')
    #plt.scatter(k_means.cluster_centers_[:, 0], k_means.cluster_centers_[:, 1], s=100, c='red'   )

    LOGGER.info('Bye')
    return 0

if __name__ == '__main__':
    sys.exit(main())
+116 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime, logging, os, requests
from typing import Any, Literal, Union
from questdb.ingress import Sender, IngressError # pylint: disable=no-name-in-module

LOGGER = logging.getLogger(__name__)

MAX_RETRIES = 10
DELAY_RETRIES = 0.5

MSG_EXPORT_EXECUTED   = '[rest_request] Export(timestamp={:s}, symbols={:s}, columns={:s}) executed'
MSG_EXPORT_FAILED     = '[rest_request] Export(timestamp={:s}, symbols={:s}, columns={:s}) failed, retry={:d}/{:d}...'
MSG_REST_BAD_STATUS   = '[rest_request] Bad Reply url="{:s}" params="{:s}": status_code={:d} content={:s}'
MSG_REST_EXECUTED     = '[rest_request] Query({:s}) executed, result: {:s}'
MSG_REST_FAILED       = '[rest_request] Query({:s}) failed, retry={:d}/{:d}...'
MSG_ERROR_MAX_RETRIES = 'Maximum number of retries achieved: {:d}'

METRICSDB_HOSTNAME  = os.environ.get('METRICSDB_HOSTNAME')
METRICSDB_ILP_PORT  = int(os.environ.get('METRICSDB_ILP_PORT'))
METRICSDB_REST_PORT = int(os.environ.get('METRICSDB_REST_PORT'))
METRICSDB_TABLE_SLICE_GROUPS = 'slice_groups'

COLORS = {
    'platinum': '#E5E4E2',
    'gold'    : '#FFD700',
    'silver'  : '#808080',
    'bronze'  : '#CD7F32',
}
DEFAULT_COLOR = '#000000' # black

class MetricsExporter():
    def __init__(self) -> None:
        pass

    def create_table(self) -> None:
        sql_query = ' '.join([
            'CREATE TABLE IF NOT EXISTS {:s} ('.format(str(METRICSDB_TABLE_SLICE_GROUPS)),
            ','.join([
                'timestamp TIMESTAMP',
                'slice_uuid SYMBOL',
                'slice_group SYMBOL',
                'slice_color SYMBOL',
                'slice_availability DOUBLE',
                'slice_capacity_center DOUBLE',
                'slice_capacity DOUBLE',
            ]),
            ') TIMESTAMP(timestamp);'
        ])
        try:
            result = self.rest_request(sql_query)
            if not result: raise Exception
            LOGGER.info('Table {:s} created'.format(str(METRICSDB_TABLE_SLICE_GROUPS)))
        except Exception as e:
            LOGGER.warning('Table {:s} cannot be created. {:s}'.format(str(METRICSDB_TABLE_SLICE_GROUPS), str(e)))
            raise

    def export_point(
        self, slice_uuid : str, slice_group : str, slice_availability : float, slice_capacity : float,
        is_center : bool = False
    ) -> None:
        dt_timestamp = datetime.datetime.utcnow()
        slice_color = COLORS.get(slice_group, DEFAULT_COLOR)
        symbols = dict(slice_uuid=slice_uuid, slice_group=slice_group, slice_color=slice_color)
        columns = dict(slice_availability=slice_availability)
        columns['slice_capacity_center' if is_center else 'slice_capacity'] = slice_capacity

        for retry in range(MAX_RETRIES):
            try:
                with Sender(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT) as sender:
                    sender.row(METRICSDB_TABLE_SLICE_GROUPS, symbols=symbols, columns=columns, at=dt_timestamp)
                    sender.flush()
                LOGGER.info(MSG_EXPORT_EXECUTED.format(str(dt_timestamp), str(symbols), str(columns)))
                return
            except (Exception, IngressError): # pylint: disable=broad-except
                LOGGER.exception(MSG_EXPORT_FAILED.format(
                    str(dt_timestamp), str(symbols), str(columns), retry+1, MAX_RETRIES))

        raise Exception(MSG_ERROR_MAX_RETRIES.format(MAX_RETRIES))

    def rest_request(self, rest_query : str) -> Union[Any, Literal[True]]:
        url = 'http://{:s}:{:d}/exec'.format(METRICSDB_HOSTNAME, METRICSDB_REST_PORT)
        params = {'query': rest_query, 'fmt': 'json'}

        for retry in range(MAX_RETRIES):
            try:
                response = requests.get(url, params=params)
                status_code = response.status_code
                if status_code not in {200}:
                    str_content = response.content.decode('UTF-8')
                    raise Exception(MSG_REST_BAD_STATUS.format(str(url), str(params), status_code, str_content))

                json_response = response.json()
                if 'ddl' in json_response:
                    LOGGER.info(MSG_REST_EXECUTED.format(str(rest_query), str(json_response['ddl'])))
                    return True
                elif 'dataset' in json_response:
                    LOGGER.info(MSG_REST_EXECUTED.format(str(rest_query), str(json_response['dataset'])))
                    return json_response['dataset']

            except Exception: # pylint: disable=broad-except
                LOGGER.exception(MSG_REST_FAILED.format(str(rest_query), retry+1, MAX_RETRIES))

        raise Exception(MSG_ERROR_MAX_RETRIES.format(MAX_RETRIES))
+77 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import pandas, random, sys
from matplotlib import pyplot as plt
from sklearn.cluster import KMeans
from typing import Dict, List, Tuple

def get_random_slices(count : int) -> List[Tuple[str, float, float]]:
    slices = list()
    for i in range(count):
        slice_name          = 'slice-{:03d}'.format(i)
        slice_availability  = random.uniform(00.0, 99.99)
        slice_capacity_gbps = random.uniform(0.1, 100.0)
        slices.append((slice_name, slice_availability, slice_capacity_gbps))
    return slices

def init_kmeans() -> Tuple[KMeans, Dict[str, int]]:
    groups = [
        # Name, avail[0..100], bw_gbps[0..100]
        ('silver',   25.0,  50.0), # ('silver',   25.0,  25.0),
        ('gold',     90.0,  10.0), # ('gold',     90.0,  50.0),
        ('platinum', 99.0, 100.0),
    ]
    df_groups = pandas.DataFrame(groups, columns=['name', 'availability', 'capacity'])

    num_clusters = len(groups)
    k_means = KMeans(n_clusters=num_clusters)
    k_means.fit(df_groups[['availability', 'capacity']])

    df_groups['label'] = k_means.predict(df_groups[['availability', 'capacity']])
    mapping = {group['name']:group['label'] for group in list(df_groups.to_dict('records'))}

    return k_means, mapping

def main():
    k_means, mapping = init_kmeans()
    slices = get_random_slices(500)
    df_slices = pandas.DataFrame(slices, columns=['slice_uuid', 'availability', 'capacity'])

    # predict one
    #sample = df_slices[['availability', 'capacity']].iloc[[0]]
    #y_predicted = k_means.predict(sample)
    #y_predicted

    df_slices['group'] = k_means.predict(df_slices[['availability', 'capacity']])

    df_silver   = df_slices[df_slices['group']==mapping['silver']]
    df_gold     = df_slices[df_slices['group']==mapping['gold']]
    df_platinum = df_slices[df_slices['group']==mapping['platinum']]

    plt.scatter(df_silver.availability,         df_silver.capacity,             s=25,  c='black' )
    plt.scatter(df_gold.availability,           df_gold.capacity,               s=25,  c='gold'  )
    plt.scatter(df_platinum.availability,       df_platinum.capacity,           s=25,  c='silver')
    plt.scatter(k_means.cluster_centers_[:, 0], k_means.cluster_centers_[:, 1], s=100, c='red'   )
    plt.xlabel('service-slo-availability')
    plt.ylabel('service-slo-one-way-bandwidth')
    #ax = plt.subplot(1, 1, 1)
    #ax.set_ylim(bottom=0., top=1.)
    #ax.set_xlim(left=0.)
    plt.savefig('slice_grouping.png')
    return 0

if __name__ == '__main__':
    sys.exit(main())
+96 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import sqlalchemy, sys
from sqlalchemy import Column, ForeignKey, String, event, insert
from sqlalchemy.orm import Session, declarative_base, relationship
from typing import Dict

def _fk_pragma_on_connect(dbapi_con, con_record):
    dbapi_con.execute('pragma foreign_keys=ON')

_Base = declarative_base()

class SliceModel(_Base):
    __tablename__ = 'slice'

    slice_uuid = Column(String, primary_key=True)

    slice_subslices = relationship(
        'SliceSubSliceModel', primaryjoin='slice.c.slice_uuid == slice_subslice.c.slice_uuid')

    def dump_id(self) -> Dict:
        return {'uuid': self.slice_uuid}

    def dump(self) -> Dict:
        return {
            'slice_id': self.dump_id(),
            'slice_subslice_ids': [
                slice_subslice.subslice.dump_id()
                for slice_subslice in self.slice_subslices
            ]
        }

class SliceSubSliceModel(_Base):
    __tablename__ = 'slice_subslice'

    slice_uuid    = Column(ForeignKey('slice.slice_uuid', ondelete='CASCADE' ), primary_key=True)
    subslice_uuid = Column(ForeignKey('slice.slice_uuid', ondelete='RESTRICT'), primary_key=True)

    slice    = relationship('SliceModel', foreign_keys='SliceSubSliceModel.slice_uuid', back_populates='slice_subslices', lazy='joined')
    subslice = relationship('SliceModel', foreign_keys='SliceSubSliceModel.subslice_uuid', lazy='joined')

def main():
    engine = sqlalchemy.create_engine('sqlite:///:memory:', echo=False, future=True)
    event.listen(engine, 'connect', _fk_pragma_on_connect)

    _Base.metadata.create_all(engine)

    slice_data = [
        {'slice_uuid': 'slice-01'},
        {'slice_uuid': 'slice-01-01'},
        {'slice_uuid': 'slice-01-02'},
    ]

    slice_subslices_data = [
        {'slice_uuid': 'slice-01', 'subslice_uuid': 'slice-01-01'},
        {'slice_uuid': 'slice-01', 'subslice_uuid': 'slice-01-02'},
    ]

    # insert
    with engine.connect() as conn:
        conn.execute(insert(SliceModel).values(slice_data))
        conn.execute(insert(SliceSubSliceModel).values(slice_subslices_data))
        conn.commit()

    # read
    with Session(engine) as session:
        obj_list = session.query(SliceModel).all()
        print([obj.dump() for obj in obj_list])
        session.commit()

    return 0

if __name__ == '__main__':
    sys.exit(main())

[
    {'slice_id': {'uuid': 'slice-01'}, 'slice_subslice_ids': [
        {'uuid': 'slice-01-01'},
        {'uuid': 'slice-01-02'}
    ]},
    {'slice_id': {'uuid': 'slice-01-01'}, 'slice_subslice_ids': []},
    {'slice_id': {'uuid': 'slice-01-02'}, 'slice_subslice_ids': []}
]