Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 540 additions and 280 deletions
......@@ -36,7 +36,7 @@ class AnalyticsBackendService(GenericGrpcService):
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSBACKEND)
super().__init__(port, cls_name=cls_name)
self.running_threads = {} # To keep track of all running analyzers
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : '10.152.183.186:9092',
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
......
......@@ -43,6 +43,7 @@ def GetAggregationMappings(thresholds):
agg_dict[threshold_key] = ('kpi_value', aggregation)
return agg_dict
def ApplyThresholds(aggregated_df, thresholds):
"""
Apply thresholds (TH-Fall and TH-Raise) based on the thresholds dictionary
......@@ -53,12 +54,14 @@ def ApplyThresholds(aggregated_df, thresholds):
"""
for threshold_key, threshold_values in thresholds.items():
if threshold_key not in aggregated_df.columns:
LOGGER.warning(f"Threshold key '{threshold_key}' does not correspond to any aggregation result. Skipping threshold application.")
continue
if isinstance(threshold_values, (list, tuple)) and len(threshold_values) == 2:
fail_th, raise_th = threshold_values
aggregated_df[f"{threshold_key}_THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
aggregated_df[f"{threshold_key}_THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
aggregated_df["THRESHOLD_FALL"] = aggregated_df[threshold_key] < fail_th
aggregated_df["THRESHOLD_RAISE"] = aggregated_df[threshold_key] > raise_th
aggregated_df["value"] = aggregated_df[threshold_key]
else:
LOGGER.warning(f"Threshold values for '{threshold_key}' are not a list or tuple of length 2. Skipping threshold application.")
return aggregated_df
......@@ -96,7 +99,7 @@ def process_batch(batch, agg_mappings, thresholds, key):
df = pd.DataFrame(batch)
LOGGER.info(f"df {df} ")
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce',unit='s')
df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
df.dropna(subset=['time_stamp'], inplace=True)
LOGGER.info(f"df {df} ")
required_columns = {'time_stamp', 'kpi_id', 'kpi_value'}
......@@ -110,19 +113,44 @@ def process_batch(batch, agg_mappings, thresholds, key):
# Perform aggregations using named aggregation
try:
agg_dict = {key: value for key, value in agg_mappings.items()}
df_agg = df.groupby(['window_start']).agg(**agg_dict).reset_index()
df_agg_ = df.groupby(['window_start']).agg(**agg_dict).reset_index()
#example: agg_dict = {'min_latency_E2E': ('kpi_value', 'min')
#given that threshold has 1 value
second_value_tuple = next(iter(agg_dict.values()))[1]
#in case we have multiple thresholds!
#second_values_tuples = [value[1] for value in agg_dict.values()]
if second_value_tuple=="min":
df_agg = df_agg_.min(numeric_only=True).to_frame().T
elif second_value_tuple == "max":
df_agg = df_agg_.max(numeric_only=True).to_frame().T
elif second_value_tuple == "std":
df_agg = df_agg_.sted(numeric_only=True).to_frame().T
else:
df_agg = df_agg_.mean(numeric_only=True).to_frame().T
# Assign the first value of window_start from the original aggregated data
df_agg['window_start'] = df_agg_['window_start'].iloc[0]
# Reorder columns to place 'window_start' first if needed
cols = ['window_start'] + [col for col in df_agg.columns if col != 'window_start']
df_agg = df_agg[cols]
except Exception as e:
LOGGER.error(f"Aggregation error: {e}")
return []
# Apply thresholds
df_thresholded = ApplyThresholds(df_agg, thresholds)
df_thresholded['kpi_id'] = key
df_thresholded['window_start'] = df_thresholded['window_start'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Convert aggregated DataFrame to list of dicts
result = df_thresholded.to_dict(orient='records')
LOGGER.info(f"Processed batch with {len(result)} records after aggregation and thresholding.")
return result
def produce_result(result, producer, destination_topic):
......@@ -197,7 +225,7 @@ def DaskStreamer(key, kpi_list, thresholds, stop_event,
continue
try:
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce',unit='s')
message_timestamp = pd.to_datetime(message_value[time_stamp_col], errors='coerce')
LOGGER.warning(f"message_timestamp: {message_timestamp}. Skipping message.")
if pd.isna(message_timestamp):
......
......@@ -13,138 +13,44 @@
# limitations under the License.
import logging
import sqlalchemy_utils
from common.method_wrappers.Decorator import MetricsPool
from common.tools.database.GenericDatabase import Database
from common.method_wrappers.ServiceExceptions import OperationFailedException
from sqlalchemy import inspect, or_
from sqlalchemy.orm import sessionmaker
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiManager', 'Database')
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
from analytics.database.AnalyzerEngine import AnalyzerEngine
from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException)
class AnalyzerDB(Database):
def __init__(self, model) -> None:
LOGGER.info('Init KpiManagerService')
super().__init__(model)
LOGGER = logging.getLogger(__name__)
DB_NAME = "tfs_analyzer" # TODO: export name from enviornment variable
class AnalyzerDB:
def __init__(self):
self.db_engine = AnalyzerEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
try:
AnalyzerModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the database: {:}".format(self.db_name))
except Exception as e:
LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e)))
raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
def verify_tables(self):
try:
inspect_object = inspect(self.db_engine)
if(inspect_object.has_table('analyzer', None)):
LOGGER.info("Table exists in DB: {:}".format(self.db_name))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
# ----------------- CURD OPERATIONS ---------------------
def add_row_to_db(self, row):
session = self.Session()
try:
session.add(row)
session.commit()
LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.")
return True
except Exception as e:
session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def search_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
entity = session.query(model).filter_by(**{col_name: id_to_search}).first()
if entity:
# LOGGER.debug(f"{model.__name__} ID found: {str(entity)}")
return entity
else:
LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}")
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
session.close()
def delete_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
record = session.query(model).filter_by(**{col_name: id_to_search}).first()
if record:
session.delete(record)
session.commit()
LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
else:
LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search)
return None
except Exception as e:
session.rollback()
LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def select_with_filter(self, model, filter_object):
"""
Generic method to create filters dynamically based on filter_object attributes.
params: model: SQLAlchemy model class to query.
filter_object: Object that contains filtering criteria as attributes.
return: SQLAlchemy session, query and Model
"""
session = self.Session()
try:
query = session.query(AnalyzerModel)
query = session.query(model)
# Apply filters based on the filter_object
if filter_object.analyzer_id:
query = query.filter(AnalyzerModel.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id]))
query = query.filter(model.analyzer_id.in_([a.analyzer_id.uuid for a in filter_object.analyzer_id]))
if filter_object.algorithm_names:
query = query.filter(AnalyzerModel.algorithm_name.in_(filter_object.algorithm_names))
query = query.filter(model.algorithm_name.in_(filter_object.algorithm_names))
if filter_object.input_kpi_ids:
input_kpi_uuids = [k.kpi_id.uuid for k in filter_object.input_kpi_ids]
query = query.filter(AnalyzerModel.input_kpi_ids.op('&&')(input_kpi_uuids))
query = query.filter(model.input_kpi_ids.op('&&')(input_kpi_uuids))
if filter_object.output_kpi_ids:
output_kpi_uuids = [k.kpi_id.uuid for k in filter_object.output_kpi_ids]
query = query.filter(AnalyzerModel.output_kpi_ids.op('&&')(output_kpi_uuids))
result = query.all()
# query should be added to return all rows
if result:
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
else:
LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}")
return result
query = query.filter(model.output_kpi_ids.op('&&')(output_kpi_uuids))
except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
finally:
session.close()
LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}")
raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)])
return super().select_with_filter(query, session, model)
......@@ -12,10 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc, json
import logging, grpc, json, queue
from typing import Dict
from confluent_kafka import Consumer as KafkaConsumer
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import KafkaError
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.proto.context_pb2 import Empty
......@@ -24,7 +27,8 @@ from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, Analy
from common.proto.analytics_frontend_pb2_grpc import AnalyticsFrontendServiceServicer
from analytics.database.Analyzer_DB import AnalyzerDB
from analytics.database.AnalyzerModel import Analyzer as AnalyzerModel
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
......@@ -32,8 +36,14 @@ METRICS_POOL = MetricsPool('AnalyticsFrontend', 'NBIgRPC')
class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
def __init__(self):
LOGGER.info('Init AnalyticsFrontendService')
self.db_obj = AnalyzerDB()
self.listener_topic = KafkaTopic.ANALYTICS_RESPONSE.value
self.db_obj = AnalyzerDB(AnalyzerModel)
self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'analytics-frontend',
'auto.offset.reset' : 'latest'})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StartAnalyzer(self,
......@@ -46,6 +56,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
AnalyzerModel.ConvertAnalyzerToRow(request)
)
self.PublishStartRequestOnKafka(request)
response.analyzer_id.uuid = request.analyzer_id.analyzer_id.uuid
return response
......@@ -73,6 +84,62 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info("Analyzer Start Request Generated: Analyzer Id: {:}, Value: {:}".format(analyzer_uuid, analyzer_to_generate))
self.kafka_producer.flush()
def StartResponseListener(self, filter_key=None):
"""
Start the Kafka response listener with APScheduler and return key-value pairs periodically.
"""
LOGGER.info("Starting StartResponseListener")
# Schedule the ResponseListener at fixed intervals
self.scheduler.add_job(
self.response_listener,
trigger=IntervalTrigger(seconds=5),
args=[filter_key],
id=f"response_listener_{self.listener_topic}",
replace_existing=True
)
self.scheduler.start()
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
except KeyboardInterrupt:
LOGGER.warning("Listener stopped manually.")
finally:
self.StopListener()
def response_listener(self, filter_key=None):
"""
Poll Kafka messages and put key-value pairs into the queue.
"""
LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
consumer = self.kafka_consumer
consumer.subscribe([self.listener_topic])
msg = consumer.poll(2.0)
if msg is None:
return
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
return
try:
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.info(f"Skipping message with unmatched key: {key}")
# value = json.loads(msg.value().decode('utf-8')) # Added for debugging
# self.result_queue.put((filter_key, value)) # Added for debugging
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StopAnalyzer(self,
request : AnalyzerId, grpc_context: grpc.ServicerContext # type: ignore
......@@ -107,6 +174,15 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
)
LOGGER.info("Analyzer Stop Request Generated: Analyzer Id: {:}".format(analyzer_uuid))
self.kafka_producer.flush()
self.StopListener()
def StopListener(self):
"""
Gracefully stop the Kafka listener and the scheduler.
"""
LOGGER.info("Stopping Kafka listener...")
self.scheduler.shutdown()
LOGGER.info("Kafka listener stopped.")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectAnalyzers(self,
......@@ -126,6 +202,7 @@ class AnalyticsFrontendServiceServicerImpl(AnalyticsFrontendServiceServicer):
LOGGER.info('Unable to process filter response {:}'.format(e))
except Exception as e:
LOGGER.error('Unable to apply filter on table {:}. ERROR: {:}'.format(AnalyzerModel.__name__, e))
def delivery_callback(self, err, msg):
if err:
......
......@@ -16,9 +16,11 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from .AnalyticsFrontendService import AnalyticsFrontendService
from analytics.database.AnalyzerModel import Analyzer as Model
from common.tools.database.GenericDatabase import Database
terminate = threading.Event()
LOGGER = None
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
......@@ -36,6 +38,11 @@ def main():
LOGGER.info('Starting...')
# To create DB
kpiDBobj = Database(Model)
kpiDBobj.create_database()
kpiDBobj.create_tables()
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
......
......@@ -25,7 +25,7 @@ from common.Settings import ( get_service_port_grpc, get_env_var_name,
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC )
from common.tools.kafka.Variables import KafkaTopic
from common.proto.kpi_value_api_pb2 import KpiValue
from common.proto.analytics_frontend_pb2 import AnalyzerId, AnalyzerList
from analytics.frontend.client.AnalyticsFrontendClient import AnalyticsFrontendClient
from analytics.frontend.service.AnalyticsFrontendService import AnalyticsFrontendService
from analytics.frontend.tests.messages import ( create_analyzer_id, create_analyzer,
......@@ -33,7 +33,7 @@ from analytics.frontend.tests.messages import ( create_analyze
from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import AnalyticsFrontendServiceServicerImpl
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from common.proto.analytics_frontend_pb2 import Analyzer, AnalyzerId, AnalyzerFilter, AnalyzerList
###########################
# Tests Setup
......@@ -84,23 +84,45 @@ def analyticsFrontend_client(analyticsFrontend_service : AnalyticsFrontendServic
###########################
# --- "test_validate_kafka_topics" should be executed before the functionality tests ---
# def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool)
# ----- core funtionality test -----
# def test_StartAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_StartAnalytic START: <<< ')
# response = analyticsFrontend_client.StartAnalyzer(create_analyzer())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerId)
# To test start and stop listener together
def test_StartStopAnalyzers(analyticsFrontend_client):
LOGGER.info(' >>> test_StartAnalyzers START: <<< ')
LOGGER.info(' >>> test_StartStopAnalyzers START: <<< ')
LOGGER.info('--> StartAnalyzer')
added_analyzer_id = analyticsFrontend_client.StartAnalyzer(create_analyzer())
LOGGER.debug(str(added_analyzer_id))
assert isinstance(added_analyzer_id, AnalyzerId)
def test_StopAnalytic(analyticsFrontend_client):
LOGGER.info(' >>> test_StopAnalytic START: <<< ')
response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
LOGGER.info(' --> Calling StartResponseListener... ')
class_obj = AnalyticsFrontendServiceServicerImpl()
response = class_obj.StartResponseListener(added_analyzer_id.analyzer_id._uuid)
LOGGER.debug(response)
LOGGER.info("waiting for timer to comlete ...")
time.sleep(3)
LOGGER.info('--> StopAnalyzer')
response = analyticsFrontend_client.StopAnalyzer(added_analyzer_id)
LOGGER.debug(str(response))
assert isinstance(response, Empty)
def test_SelectAnalytics(analyticsFrontend_client):
LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
LOGGER.debug(str(response))
assert isinstance(response, AnalyzerList)
# def test_SelectAnalytics(analyticsFrontend_client):
# LOGGER.info(' >>> test_SelectAnalytics START: <<< ')
# response = analyticsFrontend_client.SelectAnalyzers(create_analyzer_filter())
# LOGGER.debug(str(response))
# assert isinstance(response, AnalyzerList)
# def test_StopAnalytic(analyticsFrontend_client):
# LOGGER.info(' >>> test_StopAnalytic START: <<< ')
# response = analyticsFrontend_client.StopAnalyzer(create_analyzer_id())
# LOGGER.debug(str(response))
# assert isinstance(response, Empty)
# def test_ResponseListener():
# LOGGER.info(' >>> test_ResponseListener START <<< ')
......@@ -109,4 +131,4 @@ def test_SelectAnalytics(analyticsFrontend_client):
# class_obj = AnalyticsFrontendServiceServicerImpl()
# for response in class_obj.StartResponseListener(analyzer_id.analyzer_id.uuid):
# LOGGER.debug(response)
# assert isinstance(response, tuple)
# assert isinstance(response, tuple)
\ No newline at end of file
......@@ -11,10 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apscheduler==3.10.4
confluent-kafka==2.3.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
......@@ -58,47 +58,57 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
policy_client = PolicyClient()
telemetry_frontend_client = TelemetryFrontendClient()
analytics_frontend_client = AnalyticsFrontendClient()
analytic_frontend_service = AnalyticsFrontendServiceServicerImpl()
LOGGER.info('Trying to get the service ')
LOGGER.info('request.serviceId.service_uuid.uuid({:s})'.format(str(request.serviceId.service_uuid.uuid)))
LOGGER.info('request.serviceId.service_uuid({:s})'.format(str(request.serviceId.service_uuid)))
LOGGER.info('request.serviceId({:s})'.format(str(request.serviceId)))
LOGGER.info('Request({:s})'.format(str(request)))
try:
# TODO: Remove static variables(get them from ZSMCreateRequest)
# TODO: Refactor policy component (remove unnecessary variables)
####### GET Context #######################
LOGGER.info('Get the service from Context: ')
service: Service = context_client.GetService(request.serviceId)
LOGGER.info('service({:s})'.format(str(service)))
LOGGER.info('Service ({:s}) :'.format(str(service)))
###########################################
####### SET Kpi Descriptor LAT ################
LOGGER.info('Set Kpi Descriptor LAT: ')
if(len(service.service_constraints) == 0):
raise InvalidArgumentException("service_constraints" , "empty", []);
# if(len(service.service_constraints) == 0):
# raise InvalidArgumentException("argument_name" , "argument_value", []);
if(len(service.service_constraints) > 1):
raise InvalidArgumentException("service_constraints" , ">1", []);
# KPI Descriptor
if(service.service_constraints[0].sla_latency is None ):
raise InvalidArgumentException("sla_latency", "empty", []);
## Static Implementation Applied only in case of SLA Latency Constraint ##
# KPI Descriptor
kpi_descriptor_lat = KpiDescriptor()
kpi_descriptor_lat.kpi_sample_type = 701 #'KPISAMPLETYPE_SERVICE_LATENCY_MS' #static service.service_constraints[].sla_latency.e2e_latency_ms
kpi_descriptor_lat.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
kpi_descriptor_lat.kpi_id.kpi_id.uuid = str(uuid4())
kpi_id_lat: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_lat)
LOGGER.info('kpi_id_lat({:s})'.format(str(kpi_id_lat)))
LOGGER.info('The kpi_id_lat({:s})'.format(str(kpi_id_lat)))
###########################################
####### SET Kpi Descriptor TX ################
LOGGER.info('Set Kpi Descriptor TX: ')
kpi_descriptor_tx = KpiDescriptor()
kpi_descriptor_tx.kpi_sample_type = 101 # static KPISAMPLETYPE_PACKETS_TRANSMITTED
kpi_descriptor_tx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
kpi_descriptor_tx.kpi_id.kpi_id.uuid = str(uuid4())
kpi_id_tx: KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor_tx)
LOGGER.info('kpi_id_tx({:s})'.format(str(kpi_id_tx)))
LOGGER.info('The kpi_id_tx({:s})'.format(str(kpi_id_tx)))
###########################################
####### SET Kpi Descriptor RX ################
LOGGER.info('Set Kpi Descriptor RX: ')
kpi_descriptor_rx = KpiDescriptor()
kpi_descriptor_rx.kpi_sample_type = 102 # static KPISAMPLETYPE_PACKETS_RECEIVED
kpi_descriptor_rx.service_id.service_uuid.uuid = request.serviceId.service_uuid.uuid
......@@ -114,7 +124,7 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
collect_tx = Collector()
collect_tx.collector_id.collector_id.uuid = str(uuid4())
collect_tx.kpi_id.kpi_id.uuid = kpi_id_tx.kpi_id.uuid
collect_tx.duration_s = 2000 # static
collect_tx.duration_s = 20000 # static
collect_tx.interval_s = 1 # static
LOGGER.info('Start Collector TX'.format(str(collect_tx)))
......@@ -126,7 +136,7 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
collect_rx = Collector()
collect_rx.collector_id.collector_id.uuid = str(uuid4())
collect_rx.kpi_id.kpi_id.uuid = kpi_id_rx.kpi_id.uuid
collect_rx.duration_s = 2000 # static
collect_rx.duration_s = 20000 # static
collect_rx.interval_s = 1 # static
LOGGER.info('Start Collector RX'.format(str(collect_rx)))
......@@ -134,6 +144,24 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
LOGGER.info('collect_id_tx({:s})'.format(str(collect_id_rx)))
###############################################
####### START Analyzer LAT ################
analyzer = Analyzer()
analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
analyzer.algorithm_name = 'Test_Aggregate_and_Threshold' # static
analyzer.operation_mode = 2
analyzer.input_kpi_ids.append(kpi_id_rx)
analyzer.input_kpi_ids.append(kpi_id_tx)
analyzer.output_kpi_ids.append(kpi_id_lat)
_threshold_dict = {'min_latency_E2E': (0, service.service_constraints[0].sla_latency.e2e_latency_ms)}
analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
analyzer.parameters['window_size'] = "60s"
analyzer.parameters['window_slider'] = "30s"
analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
###########################################################
####### SET Policy LAT ################
policy_lat = PolicyRuleService()
policy_lat.serviceId.service_uuid.uuid = request.serviceId.service_uuid.uuid
......@@ -160,45 +188,13 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
policyRuleCondition = PolicyRuleCondition()
policyRuleCondition.kpiId.kpi_id.uuid = kpi_id_lat.kpi_id.uuid
policyRuleCondition.numericalOperator = 5
policyRuleCondition.kpiValue.floatVal = 300 #constraint.sla_latency.e2e_latency_ms
policyRuleCondition.kpiValue.floatVal = 300
policy_lat.policyRuleBasic.conditionList.append(policyRuleCondition)
policy_rule_state: PolicyRuleState = policy_client.PolicyAddService(policy_lat)
LOGGER.info('policy_rule_state({:s})'.format(str(policy_rule_state)))
####### START Analyzer LAT ################
analyzer = Analyzer()
analyzer.analyzer_id.analyzer_id.uuid = str(uuid4())
analyzer.algorithm_name = 'Test_Aggergate_and_Threshold' # static
analyzer.operation_mode = 2
analyzer.input_kpi_ids.append(kpi_id_rx)
analyzer.input_kpi_ids.append(kpi_id_tx)
analyzer.output_kpi_ids.append(kpi_id_lat)
_threshold_dict = {'min_latency_E2E': (2, 105)}
analyzer.parameters['thresholds'] = json.dumps(_threshold_dict)
analyzer.parameters['window_size'] = "60s"
analyzer.parameters['window_slider'] = "30s"
analyzer_id_lat: AnalyzerId = analytics_frontend_client.StartAnalyzer(analyzer)
LOGGER.info('analyzer_id_lat({:s})'.format(str(analyzer_id_lat)))
kpi_value_api_client = KpiValueApiClient()
stream: KpiAlarms = kpi_value_api_client.GetKpiAlarms(kpi_id_lat.kpi_id.uuid)
for response in stream:
if response is None:
LOGGER.debug('NO message')
else:
LOGGER.debug(str(response))
###########################################
# for response in analytic_frontend_service.StartResponseListener( analyzer_id_lat.analyzer_id.uuid):
# LOGGER.info("response.value {:s}",response)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member
LOGGER.exception('Unable to get Service({:s})'.format(str(request)))
......@@ -208,8 +204,6 @@ class AutomationServiceServicerImpl(AutomationServiceServicer):
telemetry_frontend_client.close()
return None
LOGGER.info('Here is the service')
context_client.close()
kpi_manager_client.close()
policy_client.close()
......
......@@ -62,6 +62,7 @@ class ServiceNameEnum(Enum):
E2EORCHESTRATOR = 'e2e-orchestrator'
OPTICALCONTROLLER = 'opticalcontroller'
BGPLS = 'bgpls-speaker'
QKD_APP = 'qkd_app'
KPIMANAGER = 'kpi-manager'
KPIVALUEAPI = 'kpi-value-api'
KPIVALUEWRITER = 'kpi-value-writer'
......@@ -98,6 +99,7 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.FORECASTER .value : 10040,
ServiceNameEnum.E2EORCHESTRATOR .value : 10050,
ServiceNameEnum.OPTICALCONTROLLER .value : 10060,
ServiceNameEnum.QKD_APP .value : 10070,
ServiceNameEnum.BGPLS .value : 20030,
ServiceNameEnum.KPIMANAGER .value : 30010,
ServiceNameEnum.KPIVALUEAPI .value : 30020,
......@@ -117,10 +119,12 @@ DEFAULT_SERVICE_HTTP_PORTS = {
ServiceNameEnum.CONTEXT .value : 8080,
ServiceNameEnum.NBI .value : 8080,
ServiceNameEnum.WEBUI .value : 8004,
ServiceNameEnum.QKD_APP .value : 8005,
}
# Default HTTP/REST-API service base URLs
DEFAULT_SERVICE_HTTP_BASEURLS = {
ServiceNameEnum.NBI .value : None,
ServiceNameEnum.WEBUI .value : None,
ServiceNameEnum.QKD_APP .value : None,
}
......@@ -79,12 +79,12 @@ def get_service_host(service_name : ServiceNameEnum):
def get_service_port_grpc(service_name : ServiceNameEnum):
envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_PORT_GRPC)
default_value = DEFAULT_SERVICE_GRPC_PORTS.get(service_name.value)
return get_setting(envvar_name, default=default_value)
return int(get_setting(envvar_name, default=default_value))
def get_service_port_http(service_name : ServiceNameEnum):
envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_PORT_HTTP)
default_value = DEFAULT_SERVICE_HTTP_PORTS.get(service_name.value)
return get_setting(envvar_name, default=default_value)
return int(get_setting(envvar_name, default=default_value))
def get_service_baseurl_http(service_name : ServiceNameEnum):
envvar_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_BASEURL_HTTP)
......@@ -95,16 +95,34 @@ def get_log_level():
return get_setting(ENVVAR_LOG_LEVEL, default=DEFAULT_LOG_LEVEL)
def get_metrics_port():
return get_setting(ENVVAR_METRICS_PORT, default=DEFAULT_METRICS_PORT)
return int(get_setting(ENVVAR_METRICS_PORT, default=DEFAULT_METRICS_PORT))
def get_grpc_bind_address():
return get_setting(ENVVAR_GRPC_BIND_ADDRESS, default=DEFAULT_GRPC_BIND_ADDRESS)
def get_grpc_max_workers():
return get_setting(ENVVAR_GRPC_MAX_WORKERS, default=DEFAULT_GRPC_MAX_WORKERS)
return int(get_setting(ENVVAR_GRPC_MAX_WORKERS, default=DEFAULT_GRPC_MAX_WORKERS))
def get_grpc_grace_period():
return get_setting(ENVVAR_GRPC_GRACE_PERIOD, default=DEFAULT_GRPC_GRACE_PERIOD)
return int(get_setting(ENVVAR_GRPC_GRACE_PERIOD, default=DEFAULT_GRPC_GRACE_PERIOD))
def get_http_bind_address():
return get_setting(ENVVAR_HTTP_BIND_ADDRESS, default=DEFAULT_HTTP_BIND_ADDRESS)
##### ----- Detect deployed microservices ----- #####
def is_microservice_deployed(service_name : ServiceNameEnum) -> bool:
host_env_var_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_HOST )
port_env_var_name = get_env_var_name(service_name, ENVVAR_SUFIX_SERVICE_PORT_GRPC)
return (host_env_var_name in os.environ) and (port_env_var_name in os.environ)
def is_deployed_bgpls () -> bool: return is_microservice_deployed(ServiceNameEnum.BGPLS )
def is_deployed_e2e_orch () -> bool: return is_microservice_deployed(ServiceNameEnum.E2EORCHESTRATOR )
def is_deployed_forecaster() -> bool: return is_microservice_deployed(ServiceNameEnum.FORECASTER )
def is_deployed_load_gen () -> bool: return is_microservice_deployed(ServiceNameEnum.LOAD_GENERATOR )
def is_deployed_optical () -> bool: return is_microservice_deployed(ServiceNameEnum.OPTICALCONTROLLER)
def is_deployed_policy () -> bool: return is_microservice_deployed(ServiceNameEnum.POLICY )
def is_deployed_qkd_app () -> bool: return is_microservice_deployed(ServiceNameEnum.QKD_APP )
def is_deployed_slice () -> bool: return is_microservice_deployed(ServiceNameEnum.SLICE )
def is_deployed_te () -> bool: return is_microservice_deployed(ServiceNameEnum.TE )
......@@ -12,52 +12,54 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sqlalchemy_utils
from .GenericEngine import Engine
from sqlalchemy import inspect
from sqlalchemy.orm import sessionmaker
from kpi_manager.database.KpiEngine import KpiEngine
from kpi_manager.database.KpiModel import Kpi as KpiModel
from common.method_wrappers.ServiceExceptions import (
AlreadyExistsException, OperationFailedException , NotFoundException)
from common.Settings import get_setting
from common.method_wrappers.ServiceExceptions import (OperationFailedException, AlreadyExistsException)
LOGGER = logging.getLogger(__name__)
DB_NAME = "tfs_kpi_mgmt"
class KpiDB:
def __init__(self):
self.db_engine = KpiEngine.get_engine()
class Database:
def __init__(self, model):
self.db_engine = Engine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self) -> None:
raise Exception('Failed to initialize the database engine.')
self.db_model = model
self.db_table = model.__name__
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
if not sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.create_database(self.db_engine.url)
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
# TODO: use "get_tables(declatrative class obj)" method of "sqlalchemy_utils" to verify tables.
try:
KpiModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name))
self.db_model.metadata.create_all(self.db_engine)
LOGGER.debug("Tables created in the database: {:}".format(self.db_table))
except Exception as e:
LOGGER.debug("Tables cannot be created in the kpi database. {:s}".format(str(e)))
LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e)))
raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
def verify_tables(self):
try:
with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall() # type: ignore
LOGGER.debug("Tables verified: {:}".format(tables))
inspect_object = inspect(self.db_engine)
if(inspect_object.has_table(self.db_table , None)):
LOGGER.info("Table exists in DB: {:}".format(self.db_name))
except Exception as e:
LOGGER.debug("Unable to fetch Table names. {:s}".format(str(e)))
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
# ----------------- DB OPERATIONS ---------------------
def add_row_to_db(self, row):
session = self.Session()
......@@ -70,7 +72,8 @@ class KpiDB:
session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row, extra_details=["Unique key voilation: {:}".format(e)] )
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
......@@ -89,6 +92,7 @@ class KpiDB:
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
......@@ -112,43 +116,24 @@ class KpiDB:
finally:
session.close()
def select_with_filter(self, model, filter_object):
session = self.Session()
def select_with_filter(self, query_object, session, model):
"""
Generic method to apply filters dynamically based on filter.
params: model_name: SQLAlchemy model class name.
query_object : Object that contains query with applied filters.
session: session of the query.
return: List of filtered records.
"""
try:
query = session.query(KpiModel)
# Apply filters based on the filter_object
if filter_object.kpi_id:
query = query.filter(KpiModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))
if filter_object.kpi_sample_type:
query = query.filter(KpiModel.kpi_sample_type.in_(filter_object.kpi_sample_type))
if filter_object.device_id:
query = query.filter(KpiModel.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id]))
if filter_object.endpoint_id:
query = query.filter(KpiModel.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id]))
if filter_object.service_id:
query = query.filter(KpiModel.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id]))
if filter_object.slice_id:
query = query.filter(KpiModel.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id]))
if filter_object.connection_id:
query = query.filter(KpiModel.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id]))
if filter_object.link_id:
query = query.filter(KpiModel.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id]))
result = query.all()
result = query_object.all()
# Log result and handle empty case
if result:
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
LOGGER.debug(f"Fetched filtered rows from {model.__name__} with filters: {query_object}")
else:
LOGGER.debug(f"No matching row found in {model.__name__} table with filters: {filter_object}")
LOGGER.warning(f"No matching rows found in {model.__name__} with filters: {query_object}")
return result
except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
LOGGER.error(f"Error fetching filtered rows from {model.__name__} with filters {query_object} ::: {e}")
raise OperationFailedException("Select by filter", extra_details=[f"Unable to apply the filter: {e}"])
finally:
session.close()
......@@ -18,14 +18,14 @@ from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class AnalyzerEngine:
class Engine:
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = "tfs-analyzer" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT')
CRDB_DATABASE = get_setting('CRDB_DATABASE')
CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -33,20 +33,25 @@
# # do test ...
# descriptor_loader.unload()
import concurrent.futures, json, logging, operator
import concurrent.futures, copy, json, logging, operator
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import (
Connection, Context, ContextId, Device, DeviceId, Empty, Link, LinkId, Service, ServiceId, Slice, SliceId,
Topology, TopologyId)
Connection, Context, ContextId, Device, DeviceId, Empty,
Link, LinkId, Service, ServiceId, Slice, SliceId,
Topology, TopologyId
)
from common.tools.object_factory.Context import json_context_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from .Tools import (
format_device_custom_config_rules, format_service_custom_config_rules, format_slice_custom_config_rules,
get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_controllers_and_network_devices, split_devices_by_rules)
format_device_custom_config_rules, format_service_custom_config_rules,
format_slice_custom_config_rules, get_descriptors_add_contexts,
get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_controllers_and_network_devices,
split_devices_by_rules
)
LOGGER = logging.getLogger(__name__)
LOGGERS = {
......@@ -78,6 +83,30 @@ TypeResults = List[Tuple[str, str, int, List[str]]] # entity_name, action, num_o
TypeNotification = Tuple[str, str] # message, level
TypeNotificationList = List[TypeNotification]
SLICE_TEMPLATE = {
"slice_id": {
"context_id": {"context_uuid": {"uuid": "admin"}},
"slice_uuid": {"uuid": None}
},
"name": {},
"slice_config": {"config_rules": [
{"action": 1, "custom": {"resource_key": "/settings", "resource_value": {
"address_families": ["IPV4"], "bgp_as": 65000,
"bgp_route_target": "65000:333", "mtu": 1512
}}}
]},
"slice_constraints": [
{"sla_capacity": {"capacity_gbps": 20.0}},
{"sla_availability": {"availability": 20.0, "num_disjoint_paths": 1, "all_active": True}},
{"sla_isolation": {"isolation_level": [0]}}
],
"slice_endpoint_ids": [
],
"slice_status": {"slice_status": 1}
}
class DescriptorLoader:
def __init__(
self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None,
......@@ -106,8 +135,53 @@ class DescriptorLoader:
self.__links = self.__descriptors.get('links' , [])
self.__services = self.__descriptors.get('services' , [])
self.__slices = self.__descriptors.get('slices' , [])
self.__ietf_slices = self.__descriptors.get('ietf-network-slice-service:network-slice-services', {})
self.__connections = self.__descriptors.get('connections', [])
if len(self.__ietf_slices) > 0:
for slice_service in self.__ietf_slices["slice-service"]:
tfs_slice = copy.deepcopy(SLICE_TEMPLATE)
tfs_slice["slice_id"]["slice_uuid"]["uuid"] = slice_service["id"]
tfs_slice["name"] = slice_service["description"]
for sdp in slice_service["sdps"]["sdp"]:
sdp_id = sdp["id"]
for attcircuit in sdp["attachment-circuits"]["attachment-circuit"]:
att_cir_tp_id = attcircuit["ac-tp-id"]
RESOURCE_KEY = "/device[{:s}]/endpoint[{:s}]/settings"
resource_key = RESOURCE_KEY.format(str(sdp_id), str(att_cir_tp_id))
for tag in attcircuit['ac-tags']['ac-tag']:
if tag.get('tag-type') == 'ietf-nss:vlan-id':
vlan_id = tag.get('value')
else:
vlan_id = 0
tfs_slice["slice_config"]["config_rules"].append({
"action": 1, "custom": {
"resource_key": resource_key, "resource_value": {
"router_id": sdp.get("node-id",[]),
"sub_interface_index": 0,
"vlan_id": vlan_id
}
}
})
tfs_slice["slice_endpoint_ids"].append({
"device_id": {"device_uuid": {"uuid": sdp_id}},
"endpoint_uuid": {"uuid": att_cir_tp_id},
"topology_id": {"context_id": {"context_uuid": {"uuid": "admin"}},
"topology_uuid": {"uuid": "admin"}}
})
#tfs_slice["slice_constraints"].append({
# "endpoint_location": {
# "endpoint_id": {
# "device_id": {"device_uuid": {"uuid": sdp["id"]}},
# "endpoint_uuid": {"uuid": attcircuit["ac-tp-id"]}
# },
# "location": {"region": "4"}
# }
#})
self.__slices.append(tfs_slice)
self.__contexts_add = None
self.__topologies_add = None
self.__devices_add = None
......@@ -232,7 +306,9 @@ class DescriptorLoader:
def _load_dummy_mode(self) -> None:
# Dummy Mode: used to pre-load databases (WebUI debugging purposes) with no smart or automated tasks.
controllers, network_devices = split_controllers_and_network_devices(self.__devices)
self.__ctx_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
......
......@@ -19,8 +19,7 @@ from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
# KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
KFK_SERVER_ADDRESS_TEMPLATE = '10.152.183.186'
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
class KafkaConfig(Enum):
......@@ -30,10 +29,7 @@ class KafkaConfig(Enum):
if kafka_server_address is None:
KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
KFK_PORT = get_setting('KFK_SERVER_PORT')
kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE+':'+KFK_PORT
#print("XXXXXXXXXXXXXXXXXXXXXXXXX")
print(kafka_server_address)
#kafka_server_address = "1"
kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
return kafka_server_address
@staticmethod
......@@ -82,8 +78,8 @@ class KafkaTopic(Enum):
# LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
# print("Topic {:} does not exist. Creating...".format(topic))
# LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
print("Topic {:} does not exist. Creating...".format(topic))
LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
KafkaConfig.get_admin_client().create_topics([new_topic])
else:
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from typing import Dict, List, Optional
from common.Constants import DEFAULT_CONTEXT_NAME
from common.tools.object_factory.Context import json_context_id
def json_app_id(app_uuid : str, context_id : Optional[Dict] = None) -> Dict:
result = {'app_uuid': {'uuid': app_uuid}}
if context_id is not None: result['context_id'] = copy.deepcopy(context_id)
return result
......@@ -42,6 +42,16 @@ def json_service(
'service_config' : {'config_rules': copy.deepcopy(config_rules)},
}
def json_service_qkd_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
config_rules : List[Dict] = [], context_uuid : str = DEFAULT_CONTEXT_NAME
):
return json_service(
service_uuid, ServiceTypeEnum.SERVICETYPE_QKD, context_id=json_context_id(context_uuid),
status=ServiceStatusEnum.SERVICESTATUS_PLANNED, endpoint_ids=endpoint_ids, constraints=constraints,
config_rules=config_rules)
def json_service_l2nm_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
config_rules : List[Dict] = [], context_uuid : str = DEFAULT_CONTEXT_NAME
......
......@@ -38,6 +38,30 @@ build device:
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
## Start Mock QKD Nodes before unit testing
#start_mock_nodes:
# stage: deploy
# script:
# - bash src/tests/tools/mock_qkd_nodes/start.sh &
# - sleep 10 # wait for nodes to spin up
# artifacts:
# paths:
# - mock_nodes.log
# rules:
# - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
# - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
## Prepare Scenario (Start NBI, mock services)
#prepare_scenario:
# stage: deploy
# script:
# - pytest src/tests/qkd/unit/PrepareScenario.py
# needs:
# - start_mock_nodes
# rules:
# - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
# - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
# Apply unit test to the component
unit_test device:
variables:
......@@ -46,6 +70,8 @@ unit_test device:
stage: unit_test
needs:
- build device
#- start_mock_nodes
#- prepare_scenario
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- >
......@@ -68,6 +94,7 @@ unit_test device:
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_emulated.py --junitxml=/opt/results/${IMAGE_NAME}_report_emulated.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary_ietf_actn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_actn.xml"
#- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/qkd/unit/test_*.py"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
......
{
"contexts": [
{"context_id": {"context_uuid": {"uuid": "admin"}}}
],
"topologies": [
{"topology_id": {"topology_uuid": {"uuid": "admin"}, "context_id": {"context_uuid": {"uuid": "admin"}}}}
],
"devices": [
{
"device_id": {"device_uuid": {"uuid": "QKD1"}}, "device_type": "qkd-node",
"device_operational_status": 0, "device_drivers": [12], "device_endpoints": [],
"device_config": {"config_rules": [
{"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "10.0.2.10"}},
{"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "11111"}},
{"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {
"scheme": "http"
}}}
]}
},
{
"device_id": {"device_uuid": {"uuid": "QKD2"}}, "device_type": "qkd-node",
"device_operational_status": 0, "device_drivers": [12], "device_endpoints": [],
"device_config": {"config_rules": [
{"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "10.0.2.10"}},
{"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "22222"}},
{"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {
"scheme": "http"
}}}
]}
},
{
"device_id": {"device_uuid": {"uuid": "QKD3"}}, "device_type": "qkd-node",
"device_operational_status": 0, "device_drivers": [12], "device_endpoints": [],
"device_config": {"config_rules": [
{"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "10.0.2.10"}},
{"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "33333"}},
{"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {
"scheme": "http"
}}}
]}
}
],
"links": [
{
"link_id": {"link_uuid": {"uuid": "QKD1/10.0.2.10:1001==QKD2/10.0.2.10:2001"}},
"link_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "QKD1"}}, "endpoint_uuid": {"uuid": "10.0.2.10:1001"}},
{"device_id": {"device_uuid": {"uuid": "QKD2"}}, "endpoint_uuid": {"uuid": "10.0.2.10:2001"}}
]
},
{
"link_id": {"link_uuid": {"uuid": "QKD2/10.0.2.10:2001==QKD1/10.0.2.10:1001"}},
"link_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "QKD2"}}, "endpoint_uuid": {"uuid": "10.0.2.10:2001"}},
{"device_id": {"device_uuid": {"uuid": "QKD1"}}, "endpoint_uuid": {"uuid": "10.0.2.10:1001"}}
]
},
{
"link_id": {"link_uuid": {"uuid": "QKD2/10.0.2.10:2002==QKD3/10.0.2.10:3001"}},
"link_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "QKD2"}}, "endpoint_uuid": {"uuid": "10.0.2.10:2002"}},
{"device_id": {"device_uuid": {"uuid": "QKD3"}}, "endpoint_uuid": {"uuid": "10.0.2.10:3001"}}
]
},
{
"link_id": {"link_uuid": {"uuid": "QKD3/10.0.2.10:3001==QKD2/10.0.2.10:2002"}},
"link_endpoint_ids": [
{"device_id": {"device_uuid": {"uuid": "QKD3"}}, "endpoint_uuid": {"uuid": "10.0.2.10:3001"}},
{"device_id": {"device_uuid": {"uuid": "QKD2"}}, "endpoint_uuid": {"uuid": "10.0.2.10:2002"}}
]
}
]
}
......@@ -14,9 +14,11 @@
import pytest
import json
import os
os.environ['DEVICE_EMULATED_ONLY'] = 'YES'
from device.service.drivers.qkd.QKDDriver2 import QKDDriver
MOCK_QKD_ADDRRESS = '127.0.0.1'
MOCK_QKD_ADDRRESS = '10.0.2.10'
MOCK_PORT = 11111
@pytest.fixture
......