Skip to content
Snippets Groups Projects
Commit 28bf80d3 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytic DB and Frontend

- Created the Analytic Engine, Model, and DB files.
- Added the DB connection.
- Added `add_row_to_db` in `StartCollector`.
- Added `delete_db_row_by_id` in `StopCollector`.
- Improved message formatting.
- Added a DB test file.
parent e0a77d5f
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
#!/bin/bash
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/tests/test_analytics_db.py
...@@ -17,7 +17,8 @@ ...@@ -17,7 +17,8 @@
PROJECTDIR=`pwd` PROJECTDIR=`pwd`
cd $PROJECTDIR/src cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc RCFILE=$PROJECTDIR/coverage/.coveragerc
CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}')
export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require"
python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \
analytics/frontend/tests/test_frontend.py analytics/frontend/tests/test_frontend.py
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class AnalyzerEngine:
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = "crdb"
CRDB_SQL_PORT = "26257"
CRDB_DATABASE = "tfs-analyzer"
CRDB_USERNAME = "tfs"
CRDB_PASSWORD = "tfs123"
CRDB_SSLMODE = "require"
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri))
except:
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None
return engine
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import enum
from sqlalchemy import Column, String, Float, Enum
from sqlalchemy.orm import registry
from common.proto import analytics_frontend_pb2
from common.proto import kpi_manager_pb2
from sqlalchemy.dialects.postgresql import UUID, ARRAY
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
# Create a base class for declarative models
Base = registry().generate_base()
class AnalyzerOperationMode (enum.Enum):
BATCH = analytics_frontend_pb2.AnalyzerOperationMode.ANALYZEROPERATIONMODE_BATCH
STREAMING = analytics_frontend_pb2.AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
class Analyzer(Base):
__tablename__ = 'analyzer'
analyzer_id = Column(UUID(as_uuid=False) , primary_key=True)
algorithm_name = Column(String , nullable=False)
input_kpi_ids = Column(ARRAY(UUID(as_uuid=False)) , nullable=False)
output_kpi_ids = Column(ARRAY(UUID(as_uuid=False)) , nullable=False)
operation_mode = Column(Enum(AnalyzerOperationMode), nullable=False)
batch_min_duration_s = Column(Float , nullable=False)
batch_max_duration_s = Column(Float , nullable=False)
bacth_min_size = Column(Float , nullable=False)
bacth_max_size = Column(Float , nullable=False)
# helps in logging the information
def __repr__(self):
return (f"<Analyzer(analyzer_id='{self.analyzer_id}', algorithm_name='{self.algorithm_name}', "
f"input_kpi_ids={self.input_kpi_ids}, output_kpi_ids={self.output_kpi_ids}, "
f"operation_mode='{self.operation_mode}', batch_min_duration_s={self.batch_min_duration_s}, "
f"batch_max_duration_s={self.batch_max_duration_s}, bacth_min_size={self.bacth_min_size}, "
f"bacth_max_size={self.bacth_max_size})>")
@classmethod
def ConvertAnalyzerToRow(cls, request):
"""
Create an instance of Analyzer table rows from a request object.
Args: request: The request object containing analyzer gRPC message.
Returns: A row (an instance of Analyzer table) initialized with content of the request.
"""
return cls(
analyzer_id = request.analyzer_id.analyzer_id.uuid,
algorithm_name = request.algorithm_name,
input_kpi_ids = [k.kpi_id.uuid for k in request.input_kpi_ids],
output_kpi_ids = [k.kpi_id.uuid for k in request.output_kpi_ids],
operation_mode = AnalyzerOperationMode(request.operation_mode), # converts integer to coresponding Enum class member
batch_min_duration_s = request.batch_min_duration_s,
batch_max_duration_s = request.batch_max_duration_s,
bacth_min_size = request.batch_min_size,
bacth_max_size = request.batch_max_size
)
@classmethod
def ConvertRowToAnalyzer(cls, row):
"""
Create and return an Analyzer gRPC message initialized with the content of a row.
Args: row: The Analyzer table instance (row) containing the data.
Returns: An Analyzer gRPC message initialized with the content of the row.
"""
# Create an instance of the Analyzer message
response = analytics_frontend_pb2.Analyzer()
response.analyzer_id.analyzer_id.uuid = row.analyzer_id
response.algorithm_name = row.algorithm_name
response.operation_mode = row.operation_mode
_kpi_id = kpi_manager_pb2.KpiId()
for input_kpi_id in row.input_kpi_ids:
_kpi_id.kpi_id.uuid = input_kpi_id
response.input_kpi_ids.append(_kpi_id)
for output_kpi_id in row.output_kpi_ids:
_kpi_id.kpi_id.uuid = output_kpi_id
response.output_kpi_ids.append(_kpi_id)
response.batch_min_duration_s = row.batch_min_duration_s
response.batch_max_duration_s = row.batch_max_duration_s
response.batch_min_size = row.bacth_min_size
response.batch_max_size = row.bacth_max_size
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sqlalchemy_utils
from sqlalchemy import inspect
from sqlalchemy.orm import sessionmaker
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
from analytics.database.AnalyzerEngine import AnalyzerEngine
from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException)
LOGGER = logging.getLogger(__name__)
DB_NAME = "tfs_analyzer" # TODO: export name from enviornment variable
class AnalyzerDB:
def __init__(self):
self.db_engine = AnalyzerEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
try:
AnalyzerModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the database: {:}".format(self.db_name))
except Exception as e:
LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e)))
raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
def verify_tables(self):
try:
inspect_object = inspect(self.db_engine)
if(inspect_object.has_table('analyzer', None)):
LOGGER.info("Table exists in DB: {:}".format(self.db_name))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
# ----------------- CURD OPERATIONS ---------------------
def add_row_to_db(self, row):
session = self.Session()
try:
session.add(row)
session.commit()
LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.")
return True
except Exception as e:
session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def search_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
entity = session.query(model).filter_by(**{col_name: id_to_search}).first()
if entity:
# LOGGER.debug(f"{model.__name__} ID found: {str(entity)}")
return entity
else:
LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}")
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
session.close()
def delete_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
record = session.query(model).filter_by(**{col_name: id_to_search}).first()
if record:
session.delete(record)
session.commit()
LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
else:
LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search)
return None
except Exception as e:
session.rollback()
LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def select_with_filter(self, model, filter_object):
session = self.Session()
try:
query = session.query(AnalyzerModel)
# Apply filters based on the filter_object
if filter_object.kpi_id:
query = query.filter(AnalyzerModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) # Need to be updated
result = query.all()
# query should be added to return all rows
if result:
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
else:
LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}")
return result
except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
finally:
session.close()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -25,6 +25,8 @@ from common.proto.context_pb2 import Empty ...@@ -25,6 +25,8 @@ from common.proto.context_pb2 import Empty
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
from analytics.database.Analyzer_DB import AnalyzerDB
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -34,6 +36,7 @@ ACTIVE_ANALYZERS = [] # In case of sevice restarts, the list can be populated ...@@ -34,6 +36,7 @@ ACTIVE_ANALYZERS = [] # In case of sevice restarts, the list can be populated
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def __init__(self): def __init__(self):
LOGGER.info('Init AnalyticsFrontendService') LOGGER.info('Init AnalyticsFrontendService')
self.db_obj = AnalyzerDB()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend', 'group.id' : 'analytics-frontend',
...@@ -46,7 +49,13 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -46,7 +49,13 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
) -> AnalyzerId: # type: ignore ) -> AnalyzerId: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request)) LOGGER.info ("At Service gRPC message: {:}".format(request))
response = AnalyzerId() response = AnalyzerId()
self.db_obj.add_row_to_db(
AnalyzerModel.ConvertAnalyzerToRow(request)
)
self.PublishStartRequestOnKafka(request) self.PublishStartRequestOnKafka(request)
response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
return response return response
def PublishStartRequestOnKafka(self, analyzer_obj): def PublishStartRequestOnKafka(self, analyzer_obj):
...@@ -76,9 +85,16 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ...@@ -76,9 +85,16 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore ) -> Empty: # type: ignore
LOGGER.info ("At Service gRPC message: {:}".format(request)) LOGGER.info ("At Service gRPC message: {:}".format(request))
try:
analyzer_id_to_delete = request.analyzer_id.uuid
self.db_obj.delete_db_row_by_id(
AnalyzerModel, "analyzer_id", analyzer_id_to_delete
)
except Exception as e:
LOGGER.warning('Unable to delete analyzer. Error: {:}'.format(e))
self.PublishStopRequestOnKafka(request) self.PublishStopRequestOnKafka(request)
return Empty() return Empty()
def PublishStopRequestOnKafka(self, analyzer_id): def PublishStopRequestOnKafka(self, analyzer_id):
""" """
Method to generate stop analyzer request on Kafka. Method to generate stop analyzer request on Kafka.
......
...@@ -19,13 +19,15 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze ...@@ -19,13 +19,15 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze
def create_analyzer_id(): def create_analyzer_id():
_create_analyzer_id = AnalyzerId() _create_analyzer_id = AnalyzerId()
_create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer_id.analyzer_id.uuid = "9baa306d-d91c-48c2-a92f-76a21bab35b6"
return _create_analyzer_id return _create_analyzer_id
def create_analyzer(): def create_analyzer():
_create_analyzer = Analyzer() _create_analyzer = Analyzer()
_create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.algorithm_name = "some_algo_name" _create_analyzer.algorithm_name = "some_algo_name"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId() _kpi_id = KpiId()
# input IDs to analyze # input IDs to analyze
...@@ -39,8 +41,6 @@ def create_analyzer(): ...@@ -39,8 +41,6 @@ def create_analyzer():
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
return _create_analyzer return _create_analyzer
def create_analyzer_filter(): def create_analyzer_filter():
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
java==11.0.*
pyspark==3.5.2
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
\ No newline at end of file
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from analytics.database.Analyzer_DB import AnalyzerDB
LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_tables():
LOGGER.info('>>> test_verify_databases_and_tables : START <<< ')
AnalyzerDBobj = AnalyzerDB()
# AnalyzerDBobj.drop_database()
# AnalyzerDBobj.verify_tables()
AnalyzerDBobj.create_database()
AnalyzerDBobj.create_tables()
AnalyzerDBobj.verify_tables()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment