Skip to content
Snippets Groups Projects
Commit 8d398733 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

minor changes in TelemetryFrontend

parent 10e29ea8
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!207Resolve "(CTTC) Separation of Monitoring"
......@@ -28,26 +28,26 @@ LOGGER = logging.getLogger(__name__)
# Tests Implementation of Telemetry Backend
###########################
# def test_verify_kafka_topics():
# LOGGER.info('test_verify_kafka_topics requesting')
# TelemetryBackendServiceObj = TelemetryBackendService()
# KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled']
# response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# def test_run_kafka_listener():
# LOGGER.info('test_receive_kafka_request requesting')
# TelemetryBackendServiceObj = TelemetryBackendService()
# response = TelemetryBackendServiceObj.run_kafka_listener()
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
def test_verify_kafka_topics():
LOGGER.info('test_verify_kafka_topics requesting')
TelemetryBackendServiceObj = TelemetryBackendService()
KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled']
response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
LOGGER.debug(str(response))
assert isinstance(response, bool)
def test_run_kafka_listener():
LOGGER.info('test_receive_kafka_request requesting')
TelemetryBackendServiceObj = TelemetryBackendService()
response = TelemetryBackendServiceObj.run_kafka_listener()
LOGGER.debug(str(response))
assert isinstance(response, bool)
# def test_fetch_node_exporter_metrics():
# LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ')
# TelemetryBackendService.fetch_single_node_exporter_metric()
def test_stream_node_export_metrics_to_raw_topic():
LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ')
threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start()
# def test_stream_node_export_metrics_to_raw_topic():
# LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ')
# threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start()
......@@ -17,55 +17,29 @@ from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Float, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.orm import registry
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
# Create a base class for declarative models
Base = declarative_base()
class Kpi(Base):
__tablename__ = 'kpi'
kpi_id = Column(UUID(as_uuid=False), primary_key=True)
kpi_description = Column(Text)
kpi_sample_type = Column(Integer)
device_id = Column(String)
endpoint_id = Column(String)
service_id = Column(String)
slice_id = Column(String)
connection_id = Column(String)
link_id = Column(String)
# monitor_flag = Column(String)
# Relationship to Collector model: allows access to related Collector objects from a Kpi object
collectors = relationship('Collector', back_populates='kpi')
# helps in logging the information
def __repr__(self):
return (f"<Kpi(kpi_id='{self.kpi_id}', kpi_description='{self.kpi_description}', "
f"kpi_sample_type='{self.kpi_sample_type}', device_id='{self.device_id}', "
f"endpoint_id='{self.endpoint_id}', service_id='{self.service_id}', "
f"slice_id='{self.slice_id}', connection_id='{self.connection_id}', "
f"link_id='{self.link_id}')>")
Base = registry().generate_base()
# Base = declarative_base()
class Collector(Base):
__tablename__ = 'collector'
collector_id = Column(UUID(as_uuid=False), primary_key=True)
kpi_id = Column(UUID(as_uuid=False), ForeignKey('kpi.kpi_id'))
collector = Column(String)
sampling_duration_s = Column(Float)
sampling_interval_s = Column(Float)
start_timestamp = Column(Float)
end_timestamp = Column(Float)
collector_id = Column(UUID(as_uuid=False), primary_key=True)
kpi_id = Column(UUID(as_uuid=False))
collector_decription = Column(String)
sampling_duration_s = Column(Float)
sampling_interval_s = Column(Float)
start_timestamp = Column(Float)
end_timestamp = Column(Float)
# Relationship to Kpi model: allows access to the related Kpi object from a Collector object
kpi = relationship('Kpi', back_populates='collectors')
def __repr__(self):
return (f"<Collector(collector_id='{self.collector_id}', kpi_id='{self.kpi_id}', "
f"collector='{self.collector}', sampling_duration_s='{self.sampling_duration_s}', "
f"collector='{self.collector_decription}', sampling_duration_s='{self.sampling_duration_s}', "
f"sampling_interval_s='{self.sampling_interval_s}', start_timestamp='{self.start_timestamp}', "
f"end_timestamp='{self.end_timestamp}')>")
\ No newline at end of file
......@@ -13,16 +13,18 @@
# limitations under the License.
import logging, time
import sqlalchemy
import sqlalchemy_utils
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryEngine import TelemetryEngine
from telemetry.database.TelemetryModel import Base
LOGGER = logging.getLogger(__name__)
TELEMETRY_DB_NAME = "telemetryfrontend"
DB_NAME = "telemetryfrontend"
# Create a base class for declarative models
Base = declarative_base()
# # Create a base class for declarative models
# Base = declarative_base()
class managementDB:
def __init__(self):
......@@ -30,23 +32,35 @@ class managementDB:
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = TELEMETRY_DB_NAME
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
try:
with self.db_engine.connect() as connection:
connection.execute(f"CREATE DATABASE {self.db_name};")
LOGGER.info('managementDB initalizes database. Name: {self.db_name}')
return True
except:
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url)))
return False
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
LOGGER.info("Database created. {:}".format(engine.url))
sqlalchemy_utils.create_database(engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
# def create_database(self):
# try:
# with self.db_engine.connect() as connection:
# connection.execute(f"CREATE DATABASE {self.db_name};")
# LOGGER.info('managementDB initalizes database. Name: {self.db_name}')
# return True
# except:
# LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url)))
# return False
def create_tables(self):
@staticmethod
def create_tables(engine : sqlalchemy.engine.Engine):
try:
Base.metadata.create_all(self.db_engine) # type: ignore
LOGGER.info("Tables created in the DB Name: {:}".format(self.db_name))
Base.metadata.create_all(engine) # type: ignore
LOGGER.info("Tables created in the DB Name: {:}".format(DB_NAME))
except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
......@@ -59,6 +73,7 @@ class managementDB:
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
@staticmethod
def add_row_to_db(self, row):
session = self.Session()
try:
......@@ -103,7 +118,7 @@ class managementDB:
LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
finally:
session.close()
def select_with_filter(self, model, **filters):
session = self.Session()
try:
......
......@@ -55,12 +55,12 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
collector_to_insert = CollectorModel()
collector_to_insert.collector_id = request.collector_id.collector_id.uuid
collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid
collector_to_insert.collector = "DESC 1"
# collector_to_insert.collector_decription= request.collector
collector_to_insert.sampling_duration_s = request.duration_s
collector_to_insert.sampling_interval_s = request.interval_s
collector_to_insert.start_timestamp = time.time()
collector_to_insert.end_timestamp = time.time()
self.managementDBobj.add_row_to_db(collector_to_insert)
managementDB.add_row_to_db(collector_to_insert)
except Exception as e:
LOGGER.info("Unable to create collectorModel class object. {:}".format(e))
......
......@@ -17,6 +17,8 @@ import random
from common.proto import telemetry_frontend_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType
# ----------------------- "2nd" Iteration --------------------------------
def create_collector_id():
_collector_id = telemetry_frontend_pb2.CollectorId()
_collector_id.collector_id.uuid = uuid.uuid4()
......@@ -31,16 +33,18 @@ def create_collector_request():
_create_collector_request = telemetry_frontend_pb2.Collector()
_create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4())
_create_collector_request.kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f"
# _create_collector_request.collector = "collector description"
_create_collector_request.duration_s = float(random.randint(8, 16))
_create_collector_request.interval_s = float(random.randint(2, 4))
return _create_collector_request
def create_collector_filter():
_create_collector_filter = telemetry_frontend_pb2.CollectorFilter()
new_kpi_id = _create_collector_filter.kpi_id.add()
new_kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f"
new_kpi_id = _create_collector_filter.kpi_id.add()
new_kpi_id.kpi_id.uuid = "165d20c5-a446-42fa-812f-e2b7ed283c6f"
return _create_collector_filter
# ----------------------- "First" Iteration --------------------------------
# def create_collector_request_a():
# _create_collector_request_a = telemetry_frontend_pb2.Collector()
# _create_collector_request_a.collector_id.collector_id.uuid = "-1"
......
......@@ -32,6 +32,8 @@ from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendC
from telemetry.frontend.service.TelemetryFrontendService import TelemetryFrontendService
from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import TelemetryFrontendServiceServicerImpl
from telemetry.frontend.tests.Messages import ( create_collector_request, create_collector_filter)
from telemetry.database.managementDB import managementDB
from telemetry.database.TelemetryEngine import TelemetryEngine
from device.client.DeviceClient import DeviceClient
from device.service.DeviceService import DeviceService
......@@ -166,6 +168,12 @@ def telemetryFrontend_client(
# Tests Implementation of Telemetry Frontend
###########################
def test_verify_db_and_table():
LOGGER.info(' >>> test_verify_database_and_tables START: <<< ')
_engine = TelemetryEngine.get_engine()
managementDB.create_database(_engine)
managementDB.create_tables(_engine)
def test_StartCollector(telemetryFrontend_client):
LOGGER.info(' >>> test_StartCollector START: <<< ')
response = telemetryFrontend_client.StartCollector(create_collector_request())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment