diff --git a/scripts/run_tests_locally-analytics-DB.sh b/scripts/run_tests_locally-analytics-DB.sh new file mode 100755 index 0000000000000000000000000000000000000000..9df5068d6bde361a4a1e73b96990c0d407c88cb4 --- /dev/null +++ b/scripts/run_tests_locally-analytics-DB.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +PROJECTDIR=`pwd` + +cd $PROJECTDIR/src +RCFILE=$PROJECTDIR/coverage/.coveragerc +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" +python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + analytics/tests/test_analytics_db.py diff --git a/scripts/run_tests_locally-analytics-frontend.sh b/scripts/run_tests_locally-analytics-frontend.sh index 58fd062f278f2b43498da1d80105bd577d190657..e30d30da623b2d0eee3d925d69a846b4b1f516a3 100755 --- a/scripts/run_tests_locally-analytics-frontend.sh +++ b/scripts/run_tests_locally-analytics-frontend.sh @@ -17,7 +17,8 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src - RCFILE=$PROJECTDIR/coverage/.coveragerc +CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') +export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_kpi_mgmt?sslmode=require" python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ analytics/frontend/tests/test_frontend.py diff --git a/src/analytics/database/AnalyzerEngine.py b/src/analytics/database/AnalyzerEngine.py new file mode 100644 index 0000000000000000000000000000000000000000..4bed9f93a7446800f9b3fa35a48223360fad6c13 --- /dev/null +++ b/src/analytics/database/AnalyzerEngine.py @@ -0,0 +1,40 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, sqlalchemy +from common.Settings import get_setting + +LOGGER = logging.getLogger(__name__) +CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}' + +class AnalyzerEngine: + @staticmethod + def get_engine() -> sqlalchemy.engine.Engine: + crdb_uri = get_setting('CRDB_URI', default=None) + if crdb_uri is None: + CRDB_NAMESPACE = "crdb" + CRDB_SQL_PORT = "26257" + CRDB_DATABASE = "tfs-analyzer" + CRDB_USERNAME = "tfs" + CRDB_PASSWORD = "tfs123" + CRDB_SSLMODE = "require" + crdb_uri = CRDB_URI_TEMPLATE.format( + CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) + try: + engine = sqlalchemy.create_engine(crdb_uri, echo=False) + LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) + except: + LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) + return None + return engine diff --git a/src/analytics/database/AnalyzerModel.py b/src/analytics/database/AnalyzerModel.py new file mode 100644 index 0000000000000000000000000000000000000000..0b66e04d016a5759d9beb812b4338e10e884323b --- /dev/null +++ b/src/analytics/database/AnalyzerModel.py @@ -0,0 +1,101 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import enum + +from sqlalchemy import Column, String, Float, Enum +from sqlalchemy.orm import registry +from common.proto import analytics_frontend_pb2 +from common.proto import kpi_manager_pb2 + +from sqlalchemy.dialects.postgresql import UUID, ARRAY + + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + +# Create a base class for declarative models +Base = registry().generate_base() + +class AnalyzerOperationMode (enum.Enum): + BATCH = analytics_frontend_pb2.AnalyzerOperationMode.ANALYZEROPERATIONMODE_BATCH + STREAMING = analytics_frontend_pb2.AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING + +class Analyzer(Base): + __tablename__ = 'analyzer' + + analyzer_id = Column(UUID(as_uuid=False) , primary_key=True) + algorithm_name = Column(String , nullable=False) + input_kpi_ids = Column(ARRAY(UUID(as_uuid=False)) , nullable=False) + output_kpi_ids = Column(ARRAY(UUID(as_uuid=False)) , nullable=False) + operation_mode = Column(Enum(AnalyzerOperationMode), nullable=False) + batch_min_duration_s = Column(Float , nullable=False) + batch_max_duration_s = Column(Float , nullable=False) + bacth_min_size = Column(Float , nullable=False) + bacth_max_size = Column(Float , nullable=False) + + # helps in logging the information + def __repr__(self): + return (f"<Analyzer(analyzer_id='{self.analyzer_id}', algorithm_name='{self.algorithm_name}', " + f"input_kpi_ids={self.input_kpi_ids}, output_kpi_ids={self.output_kpi_ids}, " + f"operation_mode='{self.operation_mode}', batch_min_duration_s={self.batch_min_duration_s}, " + f"batch_max_duration_s={self.batch_max_duration_s}, bacth_min_size={self.bacth_min_size}, " + f"bacth_max_size={self.bacth_max_size})>") + + @classmethod + def ConvertAnalyzerToRow(cls, request): + """ + Create an instance of Analyzer table rows from a request object. + Args: request: The request object containing analyzer gRPC message. + Returns: A row (an instance of Analyzer table) initialized with content of the request. + """ + return cls( + analyzer_id = request.analyzer_id.analyzer_id.uuid, + algorithm_name = request.algorithm_name, + input_kpi_ids = [k.kpi_id.uuid for k in request.input_kpi_ids], + output_kpi_ids = [k.kpi_id.uuid for k in request.output_kpi_ids], + operation_mode = AnalyzerOperationMode(request.operation_mode), # converts integer to coresponding Enum class member + batch_min_duration_s = request.batch_min_duration_s, + batch_max_duration_s = request.batch_max_duration_s, + bacth_min_size = request.batch_min_size, + bacth_max_size = request.batch_max_size + ) + + @classmethod + def ConvertRowToAnalyzer(cls, row): + """ + Create and return an Analyzer gRPC message initialized with the content of a row. + Args: row: The Analyzer table instance (row) containing the data. + Returns: An Analyzer gRPC message initialized with the content of the row. + """ + # Create an instance of the Analyzer message + response = analytics_frontend_pb2.Analyzer() + response.analyzer_id.analyzer_id.uuid = row.analyzer_id + response.algorithm_name = row.algorithm_name + response.operation_mode = row.operation_mode + + _kpi_id = kpi_manager_pb2.KpiId() + for input_kpi_id in row.input_kpi_ids: + _kpi_id.kpi_id.uuid = input_kpi_id + response.input_kpi_ids.append(_kpi_id) + for output_kpi_id in row.output_kpi_ids: + _kpi_id.kpi_id.uuid = output_kpi_id + response.output_kpi_ids.append(_kpi_id) + + response.batch_min_duration_s = row.batch_min_duration_s + response.batch_max_duration_s = row.batch_max_duration_s + response.batch_min_size = row.bacth_min_size + response.batch_max_size = row.bacth_max_size + return response diff --git a/src/analytics/database/Analyzer_DB.py b/src/analytics/database/Analyzer_DB.py new file mode 100644 index 0000000000000000000000000000000000000000..896ba110027b6e8db6bcb3db6add334f1eb017dd --- /dev/null +++ b/src/analytics/database/Analyzer_DB.py @@ -0,0 +1,137 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sqlalchemy_utils + +from sqlalchemy import inspect +from sqlalchemy.orm import sessionmaker + +from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel +from analytics.database.AnalyzerEngine import AnalyzerEngine +from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException) + +LOGGER = logging.getLogger(__name__) +DB_NAME = "tfs_analyzer" # TODO: export name from enviornment variable + +class AnalyzerDB: + def __init__(self): + self.db_engine = AnalyzerEngine.get_engine() + if self.db_engine is None: + LOGGER.error('Unable to get SQLAlchemy DB Engine...') + return False + self.db_name = DB_NAME + self.Session = sessionmaker(bind=self.db_engine) + + def create_database(self): + if not sqlalchemy_utils.database_exists(self.db_engine.url): + LOGGER.debug("Database created. {:}".format(self.db_engine.url)) + sqlalchemy_utils.create_database(self.db_engine.url) + + def drop_database(self) -> None: + if sqlalchemy_utils.database_exists(self.db_engine.url): + sqlalchemy_utils.drop_database(self.db_engine.url) + + def create_tables(self): + try: + AnalyzerModel.metadata.create_all(self.db_engine) # type: ignore + LOGGER.debug("Tables created in the database: {:}".format(self.db_name)) + except Exception as e: + LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e))) + raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)]) + + def verify_tables(self): + try: + inspect_object = inspect(self.db_engine) + if(inspect_object.has_table('analyzer', None)): + LOGGER.info("Table exists in DB: {:}".format(self.db_name)) + except Exception as e: + LOGGER.info("Unable to fetch Table names. {:s}".format(str(e))) + +# ----------------- CURD OPERATIONS --------------------- + + def add_row_to_db(self, row): + session = self.Session() + try: + session.add(row) + session.commit() + LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.") + return True + except Exception as e: + session.rollback() + if "psycopg2.errors.UniqueViolation" in str(e): + LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}") + raise AlreadyExistsException(row.__class__.__name__, row, + extra_details=["Unique key voilation: {:}".format(e)] ) + else: + LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}") + raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) + finally: + session.close() + + def search_db_row_by_id(self, model, col_name, id_to_search): + session = self.Session() + try: + entity = session.query(model).filter_by(**{col_name: id_to_search}).first() + if entity: + # LOGGER.debug(f"{model.__name__} ID found: {str(entity)}") + return entity + else: + LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}") + print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search)) + return None + except Exception as e: + session.rollback() + LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}") + raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)]) + finally: + session.close() + + def delete_db_row_by_id(self, model, col_name, id_to_search): + session = self.Session() + try: + record = session.query(model).filter_by(**{col_name: id_to_search}).first() + if record: + session.delete(record) + session.commit() + LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) + else: + LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search) + return None + except Exception as e: + session.rollback() + LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) + raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)]) + finally: + session.close() + + def select_with_filter(self, model, filter_object): + session = self.Session() + try: + query = session.query(AnalyzerModel) + # Apply filters based on the filter_object + if filter_object.kpi_id: + query = query.filter(AnalyzerModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id])) # Need to be updated + result = query.all() + # query should be added to return all rows + if result: + LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result} + else: + LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}") + return result + except Exception as e: + LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}") + raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)]) + finally: + session.close() diff --git a/src/analytics/database/__init__.py b/src/analytics/database/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ee6f7071f145e06c3aeaefc09a43ccd88e619e3 --- /dev/null +++ b/src/analytics/database/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py index 071890105411f18960c39ecd426af3c8e96f4f37..ccbef3599d13d703d225aaf7e7b60a3419bcea47 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py +++ b/src/analytics/frontend/service/AnalyticsFrontendServiceServicerImpl.py @@ -25,6 +25,8 @@ from common.proto.context_pb2 import Empty from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer +from analytics.database.Analyzer_DB import AnalyzerDB +from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel LOGGER = logging.getLogger(__name__) @@ -34,6 +36,7 @@ ACTIVE_ANALYZERS = [] # In case of sevice restarts, the list can be populated class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): def __init__(self): LOGGER.info('Init AnalyticsFrontendService') + self.db_obj = AnalyzerDB() self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()}) self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'analytics-frontend', @@ -46,7 +49,13 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): ) -> AnalyzerId: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) response = AnalyzerId() + + self.db_obj.add_row_to_db( + AnalyzerModel.ConvertAnalyzerToRow(request) + ) self.PublishStartRequestOnKafka(request) + + response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid return response def PublishStartRequestOnKafka(self, analyzer_obj): @@ -76,9 +85,16 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer): request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("At Service gRPC message: {:}".format(request)) + try: + analyzer_id_to_delete = request.analyzer_id.uuid + self.db_obj.delete_db_row_by_id( + AnalyzerModel, "analyzer_id", analyzer_id_to_delete + ) + except Exception as e: + LOGGER.warning('Unable to delete analyzer. Error: {:}'.format(e)) self.PublishStopRequestOnKafka(request) return Empty() - + def PublishStopRequestOnKafka(self, analyzer_id): """ Method to generate stop analyzer request on Kafka. diff --git a/src/analytics/frontend/tests/messages.py b/src/analytics/frontend/tests/messages.py index 04653857de39e42c996a2fa63783b3f8db97eacf..4c826e5c336d05e57e183c538ca50bbcdeef164e 100644 --- a/src/analytics/frontend/tests/messages.py +++ b/src/analytics/frontend/tests/messages.py @@ -19,13 +19,15 @@ from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, Analyze def create_analyzer_id(): _create_analyzer_id = AnalyzerId() - _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + # _create_analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer_id.analyzer_id.uuid = "9baa306d-d91c-48c2-a92f-76a21bab35b6" return _create_analyzer_id def create_analyzer(): - _create_analyzer = Analyzer() - - _create_analyzer.algorithm_name = "some_algo_name" + _create_analyzer = Analyzer() + _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) + _create_analyzer.algorithm_name = "some_algo_name" + _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _kpi_id = KpiId() # input IDs to analyze @@ -39,8 +41,6 @@ def create_analyzer(): _kpi_id.kpi_id.uuid = str(uuid.uuid4()) _create_analyzer.output_kpi_ids.append(_kpi_id) - _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING - return _create_analyzer def create_analyzer_filter(): diff --git a/src/analytics/requirements.in b/src/analytics/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..98cf9671034575f6c854eedfd39bf172631544e6 --- /dev/null +++ b/src/analytics/requirements.in @@ -0,0 +1,21 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +java==11.0.* +pyspark==3.5.2 +confluent-kafka==2.3.* +psycopg2-binary==2.9.* +SQLAlchemy==1.4.* +sqlalchemy-cockroachdb==1.4.* +SQLAlchemy-Utils==0.38.* \ No newline at end of file diff --git a/src/analytics/tests/test_analytics_db.py b/src/analytics/tests/test_analytics_db.py new file mode 100644 index 0000000000000000000000000000000000000000..58e7d0167044bb461e66b053dcb3999641ea8419 --- /dev/null +++ b/src/analytics/tests/test_analytics_db.py @@ -0,0 +1,28 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +from analytics.database.Analyzer_DB import AnalyzerDB + +LOGGER = logging.getLogger(__name__) + +def test_verify_databases_and_tables(): + LOGGER.info('>>> test_verify_databases_and_tables : START <<< ') + AnalyzerDBobj = AnalyzerDB() + # AnalyzerDBobj.drop_database() + # AnalyzerDBobj.verify_tables() + AnalyzerDBobj.create_database() + AnalyzerDBobj.create_tables() + AnalyzerDBobj.verify_tables()