Newer
Older
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas
from typing import Dict, List, Tuple
from common.proto.context_pb2 import Empty, LinkId
from common.proto.monitoring_pb2 import KpiId, KpiQuery
from monitoring.client.MonitoringClient import MonitoringClient
class KpiManager:
def __init__(self) -> None:
self._monitoring_client = MonitoringClient()
def get_kpi_ids_from_link_ids(
self, link_ids : List[LinkId]
link_uuids = {link_id.link_uuid.uuid for link_id in link_ids}
kpi_descriptors = self._monitoring_client.GetKpiDescriptorList(Empty())
kpi_ids : Dict[Tuple[str, int], KpiId] = {
(kpi_descriptor.link_id.link_uuid.uuid, kpi_descriptor.kpi_sample_type) : kpi_descriptor.kpi_id
for kpi_descriptor in kpi_descriptors.kpi_descriptor_list
if kpi_descriptor.link_id.link_uuid.uuid in link_uuids
}
return kpi_ids
def get_kpi_id_samples(
self, kpi_ids : List[KpiId], start_timestamp : float, end_timestamp : float
) -> pandas.DataFrame:
kpi_query = KpiQuery()
for kpi_id in kpi_ids: kpi_query.kpi_ids.add().kpi_id.uuid = kpi_id.kpi_id.uuid
kpi_query.start_timestamp.timestamp = start_timestamp # pylint: disable=no-member
kpi_query.end_timestamp.timestamp = end_timestamp # pylint: disable=no-member
raw_kpi_table = self._monitoring_client.QueryKpiData(kpi_query)
data : List[Tuple[str, float, float]] = list()
for raw_kpi_list in raw_kpi_table.raw_kpi_lists:
kpi_uuid = raw_kpi_list.kpi_id.kpi_id.uuid
for raw_kpi in raw_kpi_list.raw_kpis:
timestamp = raw_kpi.timestamp.timestamp
value = float(getattr(raw_kpi.kpi_value, raw_kpi.kpi_value.WhichOneof('value')))
data.append((timestamp, kpi_uuid, value))
df = pandas.DataFrame(data, columns=['timestamp', 'kpi_id', 'value'])
df['timestamp'] = pandas.to_datetime(df['timestamp'].astype('int'), unit='s', utc=True)
df.sort_values('timestamp', ascending=True, inplace=True)
return df