Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
from sqlalchemy import engine
from sqlalchemy.orm import sessionmaker
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryModel import Kpi as KpiModel
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryEngine import TelemetryEngine
LOGGER = logging.getLogger(__name__)
# Create a base class for declarative models
Base = declarative_base()
class TelemetryDB:
def __init__(self):
self.db_engine = TelemetryEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
LOGGER.info('test_telemetry_DB_connection -- Engine created sucessfully')
def create_database(self):
try:
TelemetryEngine.create_database(self.db_engine)
LOGGER.info('test_telemetry_DB_connection -- DB created sucessfully')
return True
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url)))
return False
# Function to create the collector and KPI tables in the database
def create_tables(self):
try:
Base.metadata.create_all(self.db_engine) # type: ignore
LOGGER.info("Collector and KPI tables created in the TelemetryFrontend database")
except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
# Function to insert a row into the Collector model
def insert_collector(self, kpi_id: int, collector: str, duration_s: float, interval_s: float):
# Create a session
Session = sessionmaker(bind=self.db_engine)
session = Session()
try:
# Create a new Collector instance
collectorObj = CollectorModel()
collectorObj.kpi_id = kpi_id
collectorObj.collector = collector
collectorObj.sampling_duration_s = duration_s
collectorObj.sampling_interval_s = interval_s
collectorObj.start_timestamp = time.time()
collectorObj.end_timestamp = time.time()
# Add the instance to the session
session.add(collectorObj)
# Commit the session
session.commit()
LOGGER.info("New collector inserted successfully")
except Exception as e:
session.rollback()
LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
finally:
# Close the session
session.close()
def inser_kpi(self, kpi_id, kpi_descriptor):
# Create a session
Session = sessionmaker(bind=self.db_engine)
session = Session()
try:
# Create a new Collector instance
KpiObj = KpiModel()
KpiObj.kpi_id = kpi_id
KpiObj.kpi_description = kpi_descriptor
# Add the instance to the session
session.add(KpiObj)
# Commit the session
session.commit()
LOGGER.info("New collector inserted successfully")
except Exception as e:
session.rollback()
LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
finally:
# Close the session
session.close()
def get_kpi(self, kpi_id):
# Create a session
Session = sessionmaker(bind=self.db_engine)
session = Session()
try:
kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id).first()
if kpi:
LOGGER.info("kpi ID found: {:s}".format(str(kpi)))
return kpi
else:
LOGGER.info("Kpi ID not found")
return None
except Exception as e:
LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e)))
raise
finally:
# Close the session
session.close()