Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas
from typing import Dict, List, Tuple
from common.proto.context_pb2 import Empty, LinkId
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.monitoring_pb2 import KpiId, KpiQuery
from monitoring.client.MonitoringClient import MonitoringClient
class KpiManager:
def __init__(self) -> None:
self._monitoring_client = MonitoringClient()
def get_kpi_ids_from_link_ids(
self, link_ids : List[LinkId]
) -> Dict[Tuple[LinkId, KpiSampleType], KpiId]:
link_uuids = {link_id.link_uuid.uuid for link_id in link_ids}
kpi_descriptors = self._monitoring_client.GetKpiDescriptorList(Empty())
kpi_ids : Dict[Tuple[LinkId, KpiSampleType], KpiId] = {
(kpi_descriptor.link_id, kpi_descriptor.kpi_sample_type) : kpi_descriptor.kpi_id
for kpi_descriptor in kpi_descriptors
if kpi_descriptor.link_id.link_uuid.uuid in link_uuids
}
return kpi_ids
def get_kpi_id_samples(
self, kpi_ids : List[KpiId], start_timestamp : float, end_timestamp : float
) -> pandas.DataFrame:
kpi_query = KpiQuery()
kpi_query.kpi_ids.extend(kpi_ids) # pylint: disable=no-member
kpi_query.start_timestamp.timestamp = start_timestamp # pylint: disable=no-member
kpi_query.end_timestamp.timestamp = end_timestamp # pylint: disable=no-member
raw_kpi_table = self._monitoring_client.QueryKpiData(kpi_query)
data : List[Tuple[str, float, float]] = list()
for raw_kpi_list in raw_kpi_table.raw_kpi_lists:
kpi_uuid = raw_kpi_list.kpi_id.kpi_id.uuid
for raw_kpi in raw_kpi_list.raw_kpis:
timestamp = raw_kpi.timestamp.timestamp
value = float(getattr(raw_kpi.kpi_value, raw_kpi.kpi_value.WhichOneof('value')))
data.append((timestamp, kpi_uuid, value))
df = pandas.DataFrame(data, columns=['timestamp', 'kpi_id', 'value'])
df['timestamp'] = pandas.to_datetime(df['timestamp'].astype('int'), unit='s', utc=True)
df.sort_values('timestamp', ascending=True, inplace=True)
return df