Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 616 additions and 1086 deletions
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from .TelemetryBackendService import TelemetryBackendService
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
grpc_service = TelemetryBackendService()
grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
grpc_service.stop()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
......@@ -12,14 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
print (sys.path)
sys.path.append('/home/tfs/tfs-ctrl')
import threading
import logging
from typing import Tuple
# from common.proto.context_pb2 import Empty
from src.telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
from common.tools.kafka.Variables import KafkaTopic
from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService
LOGGER = logging.getLogger(__name__)
......@@ -28,26 +24,15 @@ LOGGER = logging.getLogger(__name__)
# Tests Implementation of Telemetry Backend
###########################
def test_verify_kafka_topics():
LOGGER.info('test_verify_kafka_topics requesting')
# --- "test_validate_kafka_topics" should be run before the functionality tests ---
def test_validate_kafka_topics():
LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_RunRequestListener():
LOGGER.info('test_RunRequestListener')
TelemetryBackendServiceObj = TelemetryBackendService()
KafkaTopics = ['topic_request', 'topic_response', 'topic_raw', 'topic_labled']
response = TelemetryBackendServiceObj.create_topic_if_not_exists(KafkaTopics)
response = TelemetryBackendServiceObj.RunRequestListener()
LOGGER.debug(str(response))
assert isinstance(response, bool)
# def test_run_kafka_listener():
# LOGGER.info('test_receive_kafka_request requesting')
# TelemetryBackendServiceObj = TelemetryBackendService()
# response = TelemetryBackendServiceObj.run_kafka_listener()
# LOGGER.debug(str(response))
# assert isinstance(response, bool)
# def test_fetch_node_exporter_metrics():
# LOGGER.info(' >>> test_fetch_node_exporter_metrics START <<< ')
# TelemetryBackendService.fetch_single_node_exporter_metric()
def test_stream_node_export_metrics_to_raw_topic():
LOGGER.info(' >>> test_stream_node_export_metrics_to_raw_topic START <<< ')
threading.Thread(target=TelemetryBackendService.stream_node_export_metrics_to_raw_topic, args=()).start()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
import sqlalchemy
from sqlalchemy import inspect, MetaData, Table
from sqlalchemy.orm import sessionmaker
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryModel import Kpi as KpiModel
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryEngine import TelemetryEngine
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from sqlalchemy.exc import SQLAlchemyError
from telemetry.database.TelemetryModel import Base
LOGGER = logging.getLogger(__name__)
DB_NAME = "telemetryfrontend"
class TelemetryDBmanager:
def __init__(self):
self.db_engine = TelemetryEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
try:
# with self.db_engine.connect() as connection:
# connection.execute(f"CREATE DATABASE {self.db_name};")
TelemetryEngine.create_database(self.db_engine)
LOGGER.info('TelemetryDBmanager initalized DB Name: {:}'.format(self.db_name))
return True
except Exception as e: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(e)))
return False
def create_tables(self):
try:
Base.metadata.create_all(self.db_engine) # type: ignore
LOGGER.info("Tables created in database ({:}) the as per Models".format(self.db_name))
except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
def verify_tables(self):
try:
with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall()
LOGGER.info("Tables in DB: {:}".format(tables))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
def drop_table(self, table_to_drop: str):
try:
inspector = inspect(self.db_engine)
existing_tables = inspector.get_table_names()
if table_to_drop in existing_tables:
table = Table(table_to_drop, MetaData(), autoload_with=self.db_engine)
table.drop(self.db_engine)
LOGGER.info("Tables delete in the DB Name: {:}".format(self.db_name))
else:
LOGGER.warning("No table {:} in database {:} ".format(table_to_drop, DB_NAME))
except Exception as e:
LOGGER.info("Tables cannot be deleted in the {:} database. {:s}".format(DB_NAME, str(e)))
def list_databases(self):
query = "SHOW DATABASES"
with self.db_engine.connect() as connection:
result = connection.execute(query)
databases = [row[0] for row in result]
LOGGER.info("List of available DBs: {:}".format(databases))
# ------------------ INSERT METHODs --------------------------------------
def inser_kpi(self, request: KpiDescriptor):
session = self.Session()
try:
# Create a new Kpi instance
kpi_to_insert = KpiModel()
kpi_to_insert.kpi_id = request.kpi_id.kpi_id.uuid
kpi_to_insert.kpi_description = request.kpi_description
kpi_to_insert.kpi_sample_type = request.kpi_sample_type
kpi_to_insert.device_id = request.service_id.service_uuid.uuid
kpi_to_insert.endpoint_id = request.device_id.device_uuid.uuid
kpi_to_insert.service_id = request.slice_id.slice_uuid.uuid
kpi_to_insert.slice_id = request.endpoint_id.endpoint_uuid.uuid
kpi_to_insert.connection_id = request.connection_id.connection_uuid.uuid
# kpi_to_insert.link_id = request.link_id.link_id.uuid
# Add the instance to the session
session.add(kpi_to_insert)
session.commit()
LOGGER.info("Row inserted into kpi table: {:}".format(kpi_to_insert.kpi_id))
except Exception as e:
session.rollback()
LOGGER.info("Failed to insert new kpi. {:s}".format(str(e)))
finally:
# Close the session
session.close()
# Function to insert a row into the Collector model
def insert_collector(self, request: Collector):
session = self.Session()
try:
# Create a new Collector instance
collector_to_insert = CollectorModel()
collector_to_insert.collector_id = request.collector_id.collector_id.uuid
collector_to_insert.kpi_id = request.kpi_id.kpi_id.uuid
collector_to_insert.collector = "Test collector description"
collector_to_insert.sampling_duration_s = request.duration_s
collector_to_insert.sampling_interval_s = request.interval_s
collector_to_insert.start_timestamp = time.time()
collector_to_insert.end_timestamp = time.time()
session.add(collector_to_insert)
session.commit()
LOGGER.info("Row inserted into collector table: {:}".format(collector_to_insert.collector_id))
except Exception as e:
session.rollback()
LOGGER.info("Failed to insert new collector. {:s}".format(str(e)))
finally:
# Close the session
session.close()
# ------------------ GET METHODs --------------------------------------
def get_kpi_descriptor(self, request: KpiId):
session = self.Session()
try:
kpi_id_to_search = request.kpi_id.uuid
kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_search).first()
if kpi:
LOGGER.info("kpi ID found: {:s}".format(str(kpi)))
return kpi
else:
LOGGER.warning("Kpi ID not found {:s}".format(str(kpi_id_to_search)))
return None
except Exception as e:
session.rollback()
LOGGER.info("Failed to retrieve KPI ID. {:s}".format(str(e)))
raise
finally:
session.close()
def get_collector(self, request: CollectorId):
session = self.Session()
try:
collector_id_to_search = request.collector_id.uuid
collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_search).first()
if collector:
LOGGER.info("collector ID found: {:s}".format(str(collector)))
return collector
else:
LOGGER.warning("collector ID not found{:s}".format(str(collector_id_to_search)))
return None
except Exception as e:
session.rollback()
LOGGER.info("Failed to retrieve collector ID. {:s}".format(str(e)))
raise
finally:
session.close()
# ------------------ SELECT METHODs --------------------------------------
def select_kpi_descriptor(self, **filters):
session = self.Session()
try:
query = session.query(KpiModel)
for column, value in filters.items():
query = query.filter(getattr(KpiModel, column) == value)
result = query.all()
if len(result) != 0:
LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result)))
else:
LOGGER.warning("No matching row found : {:s}".format(str(result)))
return result
except SQLAlchemyError as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
return []
finally:
session.close()
def select_collector(self, **filters):
session = self.Session()
try:
query = session.query(CollectorModel)
for column, value in filters.items():
query = query.filter(getattr(CollectorModel, column) == value)
result = query.all()
if len(result) != 0:
LOGGER.info("Fetched filtered rows from KPI table with filters : {:s}".format(str(result)))
else:
LOGGER.warning("No matching row found : {:s}".format(str(result)))
return result
except SQLAlchemyError as e:
LOGGER.error("Error fetching filtered rows from KPI table with filters {:}: {:}".format(filters, e))
return []
finally:
session.close()
# ------------------ DELETE METHODs --------------------------------------
def delete_kpi_descriptor(self, request: KpiId):
session = self.Session()
try:
kpi_id_to_delete = request.kpi_id.uuid
kpi = session.query(KpiModel).filter_by(kpi_id=kpi_id_to_delete).first()
if kpi:
session.delete(kpi)
session.commit()
LOGGER.info("Deleted KPI with kpi_id: %s", kpi_id_to_delete)
else:
LOGGER.warning("KPI with kpi_id %s not found", kpi_id_to_delete)
except SQLAlchemyError as e:
session.rollback()
LOGGER.error("Error deleting KPI with kpi_id %s: %s", kpi_id_to_delete, e)
finally:
session.close()
def delete_collector(self, request: CollectorId):
session = self.Session()
try:
collector_id_to_delete = request.collector_id.uuid
collector = session.query(CollectorModel).filter_by(collector_id=collector_id_to_delete).first()
if collector:
session.delete(collector)
session.commit()
LOGGER.info("Deleted collector with collector_id: %s", collector_id_to_delete)
else:
LOGGER.warning("collector with collector_id %s not found", collector_id_to_delete)
except SQLAlchemyError as e:
session.rollback()
LOGGER.error("Error deleting collector with collector_id %s: %s", collector_id_to_delete, e)
finally:
session.close()
\ No newline at end of file
......@@ -12,48 +12,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy, sqlalchemy_utils
# from common.Settings import get_setting
import logging, sqlalchemy
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
APP_NAME = 'tfs'
ECHO = False # False: No dump SQL commands and transactions executed
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}'
# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class TelemetryEngine:
# def __init__(self):
# self.engine = self.get_engine()
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
CRDB_NAMESPACE = "crdb"
CRDB_SQL_PORT = "26257"
CRDB_DATABASE = "telemetryfrontend"
CRDB_USERNAME = "tfs"
CRDB_PASSWORD = "tfs123"
CRDB_SSLMODE = "require"
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
# crdb_uri = CRDB_URI_TEMPLATE.format(
# CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = "tfs-telemetry" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT')
CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
# engine = sqlalchemy.create_engine(
# crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' TelemetryDBmanager initalized with DB URL: {:}'.format(crdb_uri))
LOGGER.info(' TelemetryDB initalized with DB URL: {:}'.format(crdb_uri))
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore
return engine # type: ignore
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
LOGGER.info("Database created. {:}".format(engine.url))
sqlalchemy_utils.create_database(engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
return engine
......@@ -14,32 +14,60 @@
import logging
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Float, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import Column, String, Float
from sqlalchemy.orm import registry
from common.proto import telemetry_frontend_pb2
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
# Create a base class for declarative models
Base = registry().generate_base()
# Base = declarative_base()
class Collector(Base):
__tablename__ = 'collector'
collector_id = Column(UUID(as_uuid=False), primary_key=True)
kpi_id = Column(UUID(as_uuid=False))
collector_decription = Column(String)
sampling_duration_s = Column(Float)
sampling_interval_s = Column(Float)
start_timestamp = Column(Float)
end_timestamp = Column(Float)
kpi_id = Column(UUID(as_uuid=False), nullable=False)
sampling_duration_s = Column(Float , nullable=False)
sampling_interval_s = Column(Float , nullable=False)
start_timestamp = Column(Float , nullable=False)
end_timestamp = Column(Float , nullable=False)
# helps in logging the information
def __repr__(self):
return (f"<Collector(collector_id='{self.collector_id}', kpi_id='{self.kpi_id}', "
f"collector='{self.collector_decription}', sampling_duration_s='{self.sampling_duration_s}', "
f"sampling_interval_s='{self.sampling_interval_s}', start_timestamp='{self.start_timestamp}', "
f"end_timestamp='{self.end_timestamp}')>")
\ No newline at end of file
return (f"<Collector(collector_id='{self.collector_id}' , kpi_id='{self.kpi_id}', "
f"sampling_duration_s='{self.sampling_duration_s}', sampling_interval_s='{self.sampling_interval_s}',"
f"start_timestamp='{self.start_timestamp}' , end_timestamp='{self.end_timestamp}')>")
@classmethod
def ConvertCollectorToRow(cls, request):
"""
Create an instance of Collector table rows from a request object.
Args: request: The request object containing collector gRPC message.
Returns: A row (an instance of Collector table) initialized with content of the request.
"""
return cls(
collector_id = request.collector_id.collector_id.uuid,
kpi_id = request.kpi_id.kpi_id.uuid,
sampling_duration_s = request.duration_s,
sampling_interval_s = request.interval_s,
start_timestamp = request.start_time.timestamp,
end_timestamp = request.end_time.timestamp
)
@classmethod
def ConvertRowToCollector(cls, row):
"""
Create and return a dictionary representation of a Collector table instance.
Args: row: The Collector table instance (row) containing the data.
Returns: collector gRPC message initialized with the content of a row.
"""
response = telemetry_frontend_pb2.Collector()
response.collector_id.collector_id.uuid = row.collector_id
response.kpi_id.kpi_id.uuid = row.kpi_id
response.duration_s = row.sampling_duration_s
response.interval_s = row.sampling_interval_s
response.start_time.timestamp = row.start_timestamp
response.end_time.timestamp = row.end_timestamp
return response
......@@ -12,21 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, time
import sqlalchemy
import logging
import sqlalchemy_utils
from sqlalchemy import inspect
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryEngine import TelemetryEngine
from telemetry.database.TelemetryModel import Base
from common.method_wrappers.ServiceExceptions import (
OperationFailedException, AlreadyExistsException )
LOGGER = logging.getLogger(__name__)
DB_NAME = "telemetryfrontend"
DB_NAME = "tfs_telemetry"
# # Create a base class for declarative models
# Base = declarative_base()
class managementDB:
class TelemetryDB:
def __init__(self):
self.db_engine = TelemetryEngine.get_engine()
if self.db_engine is None:
......@@ -35,54 +33,49 @@ class managementDB:
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
LOGGER.info("Database created. {:}".format(engine.url))
sqlalchemy_utils.create_database(engine.url)
def create_database(self):
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
# def create_database(self):
# try:
# with self.db_engine.connect() as connection:
# connection.execute(f"CREATE DATABASE {self.db_name};")
# LOGGER.info('managementDB initalizes database. Name: {self.db_name}')
# return True
# except:
# LOGGER.exception('Failed to check/create the database: {:s}'.format(str(self.db_engine.url)))
# return False
@staticmethod
def create_tables(engine : sqlalchemy.engine.Engine):
def create_tables(self):
try:
Base.metadata.create_all(engine) # type: ignore
LOGGER.info("Tables created in the DB Name: {:}".format(DB_NAME))
CollectorModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the database: {:}".format(self.db_name))
except Exception as e:
LOGGER.info("Tables cannot be created in the TelemetryFrontend database. {:s}".format(str(e)))
LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e)))
raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
def verify_tables(self):
try:
with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall() # type: ignore
LOGGER.info("Tables verified: {:}".format(tables))
inspect_object = inspect(self.db_engine)
if(inspect_object.has_table('collector', None)):
LOGGER.info("Table exists in DB: {:}".format(self.db_name))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
@staticmethod
# ----------------- CURD METHODs ---------------------
def add_row_to_db(self, row):
session = self.Session()
try:
session.add(row)
session.commit()
LOGGER.info(f"Row inserted into {row.__class__.__name__} table.")
LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.")
return True
except Exception as e:
session.rollback()
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
......@@ -91,15 +84,16 @@ class managementDB:
try:
entity = session.query(model).filter_by(**{col_name: id_to_search}).first()
if entity:
LOGGER.info(f"{model.__name__} ID found: {str(entity)}")
# LOGGER.debug(f"{model.__name__} ID found: {str(entity)}")
return entity
else:
LOGGER.warning(f"{model.__name__} ID not found: {str(id_to_search)}")
LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}")
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.info(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
session.close()
......@@ -110,29 +104,34 @@ class managementDB:
if record:
session.delete(record)
session.commit()
LOGGER.info("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
else:
LOGGER.warning("%s with %s %s not found", model.__name__, col_name, id_to_search)
LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search)
return None
except Exception as e:
session.rollback()
LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def select_with_filter(self, model, **filters):
def select_with_filter(self, model, filter_object):
session = self.Session()
try:
query = session.query(model)
for column, value in filters.items():
query = query.filter(getattr(model, column) == value) # type: ignore
query = session.query(CollectorModel)
# Apply filters based on the filter_object
if filter_object.kpi_id:
query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))
result = query.all()
# query should be added to return all rows
if result:
LOGGER.info(f"Fetched filtered rows from {model.__name__} table with filters: {filters}") # - Results: {result}
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
else:
LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filters}")
LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}")
return result
except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filters} ::: {e}")
return []
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
finally:
session.close()
\ No newline at end of file
session.close()
This diff is collapsed.
This diff is collapsed.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/telemetry/frontend
WORKDIR /var/teraflow/telemetry/frontend
COPY src/telemetry/frontend/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/telemetry/__init__.py telemetry/__init__.py
COPY src/telemetry/frontend/. telemetry/frontend/
COPY src/telemetry/database/. telemetry/database/
# Start the service
ENTRYPOINT ["python", "-m", "telemetry.frontend.service"]
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.