Commit 505db5ca authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Slice component:

- first complete implementation of slice grouper (under debug)
parent 315bd644
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -14,4 +14,7 @@

#deepdiff==5.8.*
numpy==1.23.*
pandas==1.5.*
questdb==1.0.1
requests==2.27.*
scikit-learn==1.1.*
+8 −0
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from interdomain.client.InterdomainClient import InterdomainClient
from service.client.ServiceClient import ServiceClient
from .slice_grouper.SliceGrouper import SliceGrouper

LOGGER = logging.getLogger(__name__)

@@ -36,6 +37,7 @@ METRICS_POOL = MetricsPool('Slice', 'RPC')
class SliceServiceServicerImpl(SliceServiceServicer):
    def __init__(self):
        LOGGER.debug('Creating Servicer...')
        self._slice_grouper = SliceGrouper()
        LOGGER.debug('Servicer Created')

    def create_update(self, request : Slice) -> SliceId:
@@ -82,6 +84,9 @@ class SliceServiceServicerImpl(SliceServiceServicer):
            context_client.SetSlice(slice_active)
            return slice_id

        if self._slice_grouper.is_enabled:
            grouped = self._slice_grouper.group(slice_with_uuids)

        # Local domain slice
        service_id = ServiceId()
        # pylint: disable=no-member
@@ -202,6 +207,9 @@ class SliceServiceServicerImpl(SliceServiceServicer):
            current_slice.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_DEINIT # pylint: disable=no-member
            context_client.SetSlice(current_slice)

            if self._slice_grouper.is_enabled:
                ungrouped = self._slice_grouper.ungroup(current_slice)

            service_client = ServiceClient()
            for service_id in _slice.slice_service_ids:
                current_slice = Slice()
+22 −0
Original line number Diff line number Diff line
@@ -12,50 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

#import numpy as np
#import pandas as pd
from matplotlib import pyplot as plt
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
from common.proto.context_pb2 import ContextId
from context.client.ContextClient import ContextClient

class SliceGrouper:
    def __init__(self) -> None:
        pass

    def load_slices(self, context_uuid : str) -> None:
        context_client = ContextClient()

        
        context_client.ListSlices(ContextId)

X, y = make_blobs(n_samples=300, n_features=2, cluster_std=[(10,.1),(100,.01)],centers= [(10,.9), (100,.99)])

plt.scatter(X[:,0], X[:,1])
plt.show()


wcss = []
for i in range(1, 11):
    kmeans = KMeans(n_clusters=i, init='k-means++', max_iter=300, n_init=10, random_state=0)
    kmeans.fit(X)
    wcss.append(kmeans.inertia_)
plt.plot(range(1, 11), wcss)
plt.title('Elbow Method')
plt.xlabel('Number of clusters')
plt.ylabel('WCSS')
plt.show()


kmeans = KMeans(n_clusters=2, init='k-means++', max_iter=300, n_init=10, random_state=0)
pred_y = kmeans.fit_predict(X)
plt.scatter(X[:,0], X[:,1])
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1], s=300, c='red')
plt.ylabel('service-slo-availability')
plt.xlabel('service-slo-one-way-bandwidth')
ax = plt.subplot(1, 1, 1)

ax.set_ylim(bottom=0., top=1.)
ax.set_xlim(left=0.)
plt.show()
# TODO: define by means of settings
SLICE_GROUPS = [
    ('bronze',   10.0,  10.0), # Bronze   (10%, 10Gb/s)
    ('silver',   30.0,  40.0), # Silver   (30%, 40Gb/s)
    ('gold',     70.0,  50.0), # Gold     (70%, 50Gb/s)
    ('platinum', 99.0, 100.0), # Platinum (99%, 100Gb/s)
]
SLICE_GROUP_NAMES = {slice_group[0] for slice_group in SLICE_GROUPS}
+126 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime, logging, os, requests
from typing import Any, Literal, Union
from questdb.ingress import Sender, IngressError # pylint: disable=no-name-in-module

LOGGER = logging.getLogger(__name__)

MAX_RETRIES = 10
DELAY_RETRIES = 0.5

MSG_EXPORT_EXECUTED   = '[rest_request] Export(timestamp={:s}, symbols={:s}, columns={:s}) executed'
MSG_EXPORT_FAILED     = '[rest_request] Export(timestamp={:s}, symbols={:s}, columns={:s}) failed, retry={:d}/{:d}...'
MSG_REST_BAD_STATUS   = '[rest_request] Bad Reply url="{:s}" params="{:s}": status_code={:d} content={:s}'
MSG_REST_EXECUTED     = '[rest_request] Query({:s}) executed, result: {:s}'
MSG_REST_FAILED       = '[rest_request] Query({:s}) failed, retry={:d}/{:d}...'
MSG_ERROR_MAX_RETRIES = 'Maximum number of retries achieved: {:d}'

METRICSDB_HOSTNAME  = os.environ.get('METRICSDB_HOSTNAME')
METRICSDB_ILP_PORT  = int(os.environ.get('METRICSDB_ILP_PORT'))
METRICSDB_REST_PORT = int(os.environ.get('METRICSDB_REST_PORT'))
METRICSDB_TABLE     = 'slice_groups'

COLORS = {
    'platinum': '#E5E4E2',
    'gold'    : '#FFD700',
    'silver'  : '#808080',
    'bronze'  : '#CD7F32',
}
DEFAULT_COLOR = '#000000' # black

SQL_MARK_DELETED = "UPDATE {:s} SET is_deleted='true' WHERE slice_uuid='{:s}';"

class MetricsExporter():
    def create_table(self) -> None:
        sql_query = ' '.join([
            'CREATE TABLE IF NOT EXISTS {:s} ('.format(str(METRICSDB_TABLE)),
            ','.join([
                'timestamp TIMESTAMP',
                'slice_uuid SYMBOL',
                'slice_group SYMBOL',
                'slice_color SYMBOL',
                'is_deleted SYMBOL',
                'slice_availability DOUBLE',
                'slice_capacity_center DOUBLE',
                'slice_capacity DOUBLE',
            ]),
            ') TIMESTAMP(timestamp);'
        ])
        try:
            result = self.rest_request(sql_query)
            if not result: raise Exception
            LOGGER.info('Table {:s} created'.format(str(METRICSDB_TABLE)))
        except Exception as e:
            LOGGER.warning('Table {:s} cannot be created. {:s}'.format(str(METRICSDB_TABLE), str(e)))
            raise

    def export_point(
        self, slice_uuid : str, slice_group : str, slice_availability : float, slice_capacity : float,
        is_center : bool = False
    ) -> None:
        dt_timestamp = datetime.datetime.utcnow()
        slice_color = COLORS.get(slice_group, DEFAULT_COLOR)
        symbols = dict(slice_uuid=slice_uuid, slice_group=slice_group, slice_color=slice_color, is_deleted='false')
        columns = dict(slice_availability=slice_availability)
        columns['slice_capacity_center' if is_center else 'slice_capacity'] = slice_capacity

        for retry in range(MAX_RETRIES):
            try:
                with Sender(METRICSDB_HOSTNAME, METRICSDB_ILP_PORT) as sender:
                    sender.row(METRICSDB_TABLE, symbols=symbols, columns=columns, at=dt_timestamp)
                    sender.flush()
                LOGGER.info(MSG_EXPORT_EXECUTED.format(str(dt_timestamp), str(symbols), str(columns)))
                return
            except (Exception, IngressError): # pylint: disable=broad-except
                LOGGER.exception(MSG_EXPORT_FAILED.format(
                    str(dt_timestamp), str(symbols), str(columns), retry+1, MAX_RETRIES))

        raise Exception(MSG_ERROR_MAX_RETRIES.format(MAX_RETRIES))

    def delete_point(self, slice_uuid : str) -> None:
        sql_query = SQL_MARK_DELETED.format(str(METRICSDB_TABLE), slice_uuid)
        try:
            result = self.rest_request(sql_query)
            if not result: raise Exception
            LOGGER.info('Point {:s} deleted'.format(str(slice_uuid)))
        except Exception as e:
            LOGGER.warning('Point {:s} cannot be deleted. {:s}'.format(str(slice_uuid), str(e)))
            raise

    def rest_request(self, rest_query : str) -> Union[Any, Literal[True]]:
        url = 'http://{:s}:{:d}/exec'.format(METRICSDB_HOSTNAME, METRICSDB_REST_PORT)
        params = {'query': rest_query, 'fmt': 'json'}

        for retry in range(MAX_RETRIES):
            try:
                response = requests.get(url, params=params)
                status_code = response.status_code
                if status_code not in {200}:
                    str_content = response.content.decode('UTF-8')
                    raise Exception(MSG_REST_BAD_STATUS.format(str(url), str(params), status_code, str_content))

                json_response = response.json()
                if 'ddl' in json_response:
                    LOGGER.info(MSG_REST_EXECUTED.format(str(rest_query), str(json_response['ddl'])))
                    return True
                elif 'dataset' in json_response:
                    LOGGER.info(MSG_REST_EXECUTED.format(str(rest_query), str(json_response['dataset'])))
                    return json_response['dataset']

            except Exception: # pylint: disable=broad-except
                LOGGER.exception(MSG_REST_FAILED.format(str(rest_query), retry+1, MAX_RETRIES))

        raise Exception(MSG_ERROR_MAX_RETRIES.format(MAX_RETRIES))
+94 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging, pandas, threading
from typing import Dict, Optional, Tuple
from sklearn.cluster import KMeans
from common.proto.context_pb2 import Slice
from common.tools.grpc.Tools import grpc_message_to_json_string
from .Constants import SLICE_GROUPS
from .MetricsExporter import MetricsExporter
from .Tools import (
    add_slice_to_group, create_slice_groups, get_slice_grouping_parameters, is_slice_grouping_enabled,
    remove_slice_from_group)

LOGGER = logging.getLogger(__name__)

class SliceGrouper:
    def __init__(self) -> None:
        self._lock = threading.Lock()
        self._is_enabled = is_slice_grouping_enabled()
        if not self._is_enabled: return

        metrics_exporter = MetricsExporter()
        metrics_exporter.create_table()

        self._slice_groups = create_slice_groups(SLICE_GROUPS)

        # Initialize and fit K-Means with the pre-defined clusters we want, i.e., one per slice group
        df_groups = pandas.DataFrame(SLICE_GROUPS, columns=['name', 'availability', 'capacity_gbps'])
        k_means = KMeans(n_clusters=df_groups.shape[0])
        k_means.fit(df_groups[['availability', 'capacity_gbps']])
        df_groups['label'] = k_means.predict(df_groups[['availability', 'capacity_gbps']])
        self._k_means = k_means
        self._df_groups = df_groups

        self._group_mapping : Dict[str, Dict] = {
            group['name']:{k:v for k,v in group.items() if k != 'name'}
            for group in list(df_groups.to_dict('records'))
        }

        label_to_group = {}
        for group_name,group_attrs in self._group_mapping.items():
            label = group_attrs['label']
            availability = group_attrs['availability']
            capacity_gbps = group_attrs['capacity_gbps']
            metrics_exporter.export_point(
                group_name, group_name, availability, capacity_gbps, is_center=True)
            label_to_group[label] = group_name
        self._label_to_group = label_to_group

    def _select_group(self, slice_obj : Slice) -> Optional[Tuple[str, float, float]]:
        with self._lock:
            grouping_parameters = get_slice_grouping_parameters(slice_obj)
            LOGGER.debug('[_select_group] grouping_parameters={:s}'.format(str(grouping_parameters)))
            if grouping_parameters is None: return None

            sample = pandas.DataFrame([grouping_parameters], columns=['availability', 'capacity_gbps'])
            sample['label'] = self._k_means.predict(sample)
            sample = sample.to_dict('records')[0]   # pylint: disable=unsubscriptable-object
            LOGGER.debug('[_select_group] sample={:s}'.format(str(sample)))
            label = sample['label']
            availability = sample['availability']
            capacity_gbps = sample['capacity_gbps']
            group_name = self._label_to_group[label]
            LOGGER.debug('[_select_group] group_name={:s}'.format(str(group_name)))
            return group_name, availability, capacity_gbps

    @property
    def is_enabled(self): return self._is_enabled

    def group(self, slice_obj : Slice) -> bool:
        LOGGER.debug('[group] slice_obj={:s}'.format(grpc_message_to_json_string(slice_obj)))
        selected_group = self._select_group(slice_obj)
        LOGGER.debug('[group] selected_group={:s}'.format(str(selected_group)))
        if selected_group is None: return False
        return add_slice_to_group(slice_obj, selected_group)

    def ungroup(self, slice_obj : Slice) -> bool:
        LOGGER.debug('[ungroup] slice_obj={:s}'.format(grpc_message_to_json_string(slice_obj)))
        selected_group = self._select_group(slice_obj)
        LOGGER.debug('[ungroup] selected_group={:s}'.format(str(selected_group)))
        if selected_group is None: return False
        return remove_slice_from_group(slice_obj, selected_group)
Loading