Skip to content
Snippets Groups Projects
Commit 990395f4 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Changes in Analytics DB.

- parameters col added in DB
- parameters field added in Analytics.proto
- AnalyzerModel class methods changes
- changes in messages file
parent df864e66
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!261(CTTC) New Analytics Component
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import logging import logging
import enum import enum
from sqlalchemy import Column, String, Float, Enum from sqlalchemy import Column, String, Float, Enum, BigInteger, JSON
from sqlalchemy.orm import registry from sqlalchemy.orm import registry
from common.proto import analytics_frontend_pb2 from common.proto import analytics_frontend_pb2
from common.proto import kpi_manager_pb2 from common.proto import kpi_manager_pb2
...@@ -36,23 +36,25 @@ class AnalyzerOperationMode (enum.Enum): ...@@ -36,23 +36,25 @@ class AnalyzerOperationMode (enum.Enum):
class Analyzer(Base): class Analyzer(Base):
__tablename__ = 'analyzer' __tablename__ = 'analyzer'
analyzer_id = Column(UUID(as_uuid=False) , primary_key=True) analyzer_id = Column( UUID(as_uuid=False) , primary_key=True)
algorithm_name = Column(String , nullable=False) algorithm_name = Column( String , nullable=False )
input_kpi_ids = Column(ARRAY(UUID(as_uuid=False)) , nullable=False) input_kpi_ids = Column( ARRAY(UUID(as_uuid=False)) , nullable=False )
output_kpi_ids = Column(ARRAY(UUID(as_uuid=False)) , nullable=False) output_kpi_ids = Column( ARRAY(UUID(as_uuid=False)) , nullable=False )
operation_mode = Column(Enum(AnalyzerOperationMode), nullable=False) operation_mode = Column( Enum(AnalyzerOperationMode), nullable=False )
batch_min_duration_s = Column(Float , nullable=False) parameters = Column( JSON , nullable=True )
batch_max_duration_s = Column(Float , nullable=False) batch_min_duration_s = Column( Float , nullable=False )
bacth_min_size = Column(Float , nullable=False) batch_max_duration_s = Column( Float , nullable=False )
bacth_max_size = Column(Float , nullable=False) batch_min_size = Column( BigInteger , nullable=False )
batch_max_size = Column( BigInteger , nullable=False )
# helps in logging the information # helps in logging the information
def __repr__(self): def __repr__(self):
return (f"<Analyzer(analyzer_id='{self.analyzer_id}', algorithm_name='{self.algorithm_name}', " return (f"<Analyzer(analyzer_id='{self.analyzer_id}' , algorithm_name='{self.algorithm_name}', "
f"input_kpi_ids={self.input_kpi_ids}, output_kpi_ids={self.output_kpi_ids}, " f"input_kpi_ids={self.input_kpi_ids} , output_kpi_ids={self.output_kpi_ids}, "
f"operation_mode='{self.operation_mode}', batch_min_duration_s={self.batch_min_duration_s}, " f"operation_mode='{self.operation_mode}' , parameters={self.parameters}, "
f"batch_max_duration_s={self.batch_max_duration_s}, bacth_min_size={self.bacth_min_size}, " f"batch_min_duration_s={self.batch_min_duration_s} , batch_max_duration_s={self.batch_max_duration_s}, "
f"bacth_max_size={self.bacth_max_size})>") f"batch_min_size={self.batch_min_size} , batch_max_size={self.batch_max_size})>")
@classmethod @classmethod
def ConvertAnalyzerToRow(cls, request): def ConvertAnalyzerToRow(cls, request):
...@@ -67,10 +69,11 @@ class Analyzer(Base): ...@@ -67,10 +69,11 @@ class Analyzer(Base):
input_kpi_ids = [k.kpi_id.uuid for k in request.input_kpi_ids], input_kpi_ids = [k.kpi_id.uuid for k in request.input_kpi_ids],
output_kpi_ids = [k.kpi_id.uuid for k in request.output_kpi_ids], output_kpi_ids = [k.kpi_id.uuid for k in request.output_kpi_ids],
operation_mode = AnalyzerOperationMode(request.operation_mode), # converts integer to coresponding Enum class member operation_mode = AnalyzerOperationMode(request.operation_mode), # converts integer to coresponding Enum class member
parameters = dict(request.parameters),
batch_min_duration_s = request.batch_min_duration_s, batch_min_duration_s = request.batch_min_duration_s,
batch_max_duration_s = request.batch_max_duration_s, batch_max_duration_s = request.batch_max_duration_s,
bacth_min_size = request.batch_min_size, batch_min_size = request.batch_min_size,
bacth_max_size = request.batch_max_size batch_max_size = request.batch_max_size
) )
@classmethod @classmethod
...@@ -85,17 +88,19 @@ class Analyzer(Base): ...@@ -85,17 +88,19 @@ class Analyzer(Base):
response.analyzer_id.analyzer_id.uuid = row.analyzer_id response.analyzer_id.analyzer_id.uuid = row.analyzer_id
response.algorithm_name = row.algorithm_name response.algorithm_name = row.algorithm_name
response.operation_mode = row.operation_mode response.operation_mode = row.operation_mode
response.parameters.update(row.parameters)
_kpi_id = kpi_manager_pb2.KpiId()
for input_kpi_id in row.input_kpi_ids: for input_kpi_id in row.input_kpi_ids:
_kpi_id = kpi_manager_pb2.KpiId()
_kpi_id.kpi_id.uuid = input_kpi_id _kpi_id.kpi_id.uuid = input_kpi_id
response.input_kpi_ids.append(_kpi_id) response.input_kpi_ids.append(_kpi_id)
for output_kpi_id in row.output_kpi_ids: for output_kpi_id in row.output_kpi_ids:
_kpi_id = kpi_manager_pb2.KpiId()
_kpi_id.kpi_id.uuid = output_kpi_id _kpi_id.kpi_id.uuid = output_kpi_id
response.output_kpi_ids.append(_kpi_id) response.output_kpi_ids.append(_kpi_id)
response.batch_min_duration_s = row.batch_min_duration_s response.batch_min_duration_s = row.batch_min_duration_s
response.batch_max_duration_s = row.batch_max_duration_s response.batch_max_duration_s = row.batch_max_duration_s
response.batch_min_size = row.bacth_min_size response.batch_min_size = row.batch_min_size
response.batch_max_size = row.bacth_max_size response.batch_max_size = row.batch_max_size
return response return response
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import uuid import uuid
import json
from common.proto.kpi_manager_pb2 import KpiId from common.proto.kpi_manager_pb2 import KpiId
from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, AnalyzerId, from common.proto.analytics_frontend_pb2 import ( AnalyzerOperationMode, AnalyzerId,
Analyzer, AnalyzerFilter ) Analyzer, AnalyzerFilter )
...@@ -26,7 +27,7 @@ def create_analyzer_id(): ...@@ -26,7 +27,7 @@ def create_analyzer_id():
def create_analyzer(): def create_analyzer():
_create_analyzer = Analyzer() _create_analyzer = Analyzer()
_create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4()) _create_analyzer.analyzer_id.analyzer_id.uuid = str(uuid.uuid4())
_create_analyzer.algorithm_name = "some_algo_name" _create_analyzer.algorithm_name = "Test_Aggergate_and_Threshold"
_create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING _create_analyzer.operation_mode = AnalyzerOperationMode.ANALYZEROPERATIONMODE_STREAMING
_kpi_id = KpiId() _kpi_id = KpiId()
...@@ -44,6 +45,14 @@ def create_analyzer(): ...@@ -44,6 +45,14 @@ def create_analyzer():
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
_kpi_id.kpi_id.uuid = str(uuid.uuid4()) _kpi_id.kpi_id.uuid = str(uuid.uuid4())
_create_analyzer.output_kpi_ids.append(_kpi_id) _create_analyzer.output_kpi_ids.append(_kpi_id)
# parameter
_threshold_dict = {
'avg_value' :(20, 30), 'min_value' :(00, 10), 'max_value' :(45, 50),
'first_value' :(00, 10), 'last_value' :(40, 50), 'stddev_value':(00, 10)}
_create_analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
_create_analyzer.parameters['window_size'] = "60 seconds" # Such as "10 seconds", "2 minutes", "3 hours", "4 days" or "5 weeks"
_create_analyzer.parameters['window_slider'] = "30 seconds" # should be less than window size
_create_analyzer.parameters['store_aggregate'] = str(False) # TRUE to store. No implemented yet
return _create_analyzer return _create_analyzer
......
...@@ -76,7 +76,7 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic ...@@ -76,7 +76,7 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
########################### ###########################
# ----- core funtionality test ----- # ----- core funtionality test -----
def test_StartAnalytic(analyticsFrontend_client): def test_StartAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalytic START: <<< ') LOGGER.info(' >>> test_StartAnalytic START: <<< ')
response = analyticsFrontend_client.StartAnalyzer(create_analyzer()) response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(response)) LOGGER.debug(str(response))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment