Loading my_deploy.sh +1 −1 Original line number Diff line number Diff line Loading @@ -93,7 +93,7 @@ export CRDB_DATABASE="tfs" export CRDB_DEPLOY_MODE="single" # Disable flag for dropping database, if it exists. export CRDB_DROP_DATABASE_IF_EXISTS="YES" export CRDB_DROP_DATABASE_IF_EXISTS="NO" # Disable flag for re-deploying CockroachDB from scratch. export CRDB_REDEPLOY="" Loading src/kpi_manager/service/KpiValueComposer.py +68 −13 Original line number Diff line number Diff line Loading @@ -24,8 +24,8 @@ from kpi_manager.service.database.Kpi_DB import Kpi_DB from kpi_manager.service.database.KpiModel import Kpi as KpiModel LOGGER = logging.getLogger(__name__) KAFKA_SERVER_IP = '10.152.183.175:9092' # KAFKA_SERVER_IP = '127.0.0.1:9092' # KAFKA_SERVER_IP = '10.152.183.175:30092' KAFKA_SERVER_IP = '127.0.0.1:9092' # ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} Loading @@ -33,7 +33,11 @@ PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, 'group.id' : 'kpi_composer', 'auto.offset.reset' : 'latest'} KPIs_TO_SEARCH = ["node_timex_status", "node_timex_sync_status", "node_udp_queues"] KPIs_TO_SEARCH = ["node_network_receive_packets_total", "node_network_receive_bytes_total", "node_network_transmit_bytes_total", "process_open_fds"] DB_TABLE_NAME = KpiModel class KpiValueComposer: def __init__(self) -> None: Loading Loading @@ -78,19 +82,70 @@ class KpiValueComposer: def extract_kpi_values(event): pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) lines = event.split('\n') matching_rows = [] # matching_rows = [] sub_names = kpi_value = "" for line in lines: try: if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE"): matching_rows.append(tuple(line.split(" "))) print("Extracted Rows that match the KPIs {:}".format(matching_rows)) # LOGGER.info("Extracted Rows that match the KPIs {:}".format(matching_rows)) return matching_rows (kpi_name, kpi_value) = line.split(" ") if kpi_name.endswith('}'): (kpi_name, sub_names) = kpi_name.replace('}','').split('{') print("Extracted row that match the KPI {:}".format((kpi_name, sub_names, kpi_value))) kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db() if kpi_descriptor is not None: kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) producerObj = KafkaProducer(PRODUCER_CONFIG) producerObj.produce(KAFKA_TOPICS['labeled'], key="labeled", value= str(kpi_to_produce), callback=KpiValueComposer.delivery_callback) producerObj.flush() except Exception as e: print("Unable to extract kpi name and value from raw data: ERROR Info: {:}".format(e)) @staticmethod def request_kpi_descriptor_from_db(): def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): col_name = "kpi_description" kpi_name = KPIs_TO_SEARCH[0] kpiDBobj = Kpi_DB() row = kpiDBobj.search_db_row_by_id(KpiModel, col_name, kpi_name) row = kpiDBobj.search_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) if row is not None: LOGGER.info("Extracted Row: {:}".format(row)) return row else: return None @staticmethod def merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value): # Creating a dictionary from the kpi_descriptor's attributes kpi_dict = { 'kpi_id' : kpi_descriptor.kpi_id, 'kpi_description': kpi_descriptor.kpi_description, 'kpi_sample_type': kpi_descriptor.kpi_sample_type, 'device_id' : kpi_descriptor.device_id, 'endpoint_id' : kpi_descriptor.endpoint_id, 'service_id' : kpi_descriptor.service_id, 'slice_id' : kpi_descriptor.slice_id, 'connection_id' : kpi_descriptor.connection_id, 'link_id' : kpi_descriptor.link_id, 'kpi_value' : kpi_value } return kpi_dict @staticmethod def delete_kpi_by_id(): col_name = "link_id" kpi_name = None kpiDBobj = Kpi_DB() row = kpiDBobj.delete_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) if row is not None: LOGGER.info("Deleted Row: {:}".format(row)) @staticmethod def delivery_callback( err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}') No newline at end of file src/kpi_manager/service/database/KpiModel.py +1 −3 Original line number Diff line number Diff line Loading @@ -31,7 +31,7 @@ class Kpi(Base): __tablename__ = 'kpi' kpi_id = Column(UUID(as_uuid=False), primary_key=True) kpi_description = Column(Text) kpi_description = Column(Text, unique=True) kpi_sample_type = Column(Integer) device_id = Column(String) endpoint_id = Column(String) Loading @@ -39,8 +39,6 @@ class Kpi(Base): slice_id = Column(String) connection_id = Column(String) link_id = Column(String) # monitor_flag = Column(String) # helps in logging the information def __repr__(self): Loading src/kpi_manager/service/database/Kpi_DB.py +2 −1 Original line number Diff line number Diff line Loading @@ -80,7 +80,7 @@ class Kpi_DB: try: entity = session.query(model).filter_by(**{col_name: id_to_search}).first() if entity: LOGGER.info(f"{model.__name__} ID found: {str(entity)}") # LOGGER.info(f"{model.__name__} ID found: {str(entity)}") return entity else: LOGGER.warning(f"{model.__name__} ID not found: {str(id_to_search)}") Loading @@ -102,6 +102,7 @@ class Kpi_DB: LOGGER.info("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) else: LOGGER.warning("%s with %s %s not found", model.__name__, col_name, id_to_search) return None except Exception as e: session.rollback() LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) Loading src/kpi_manager/tests/test_kpi_composer.py +8 −4 Original line number Diff line number Diff line Loading @@ -18,10 +18,14 @@ from kpi_manager.service.KpiValueComposer import KpiValueComposer LOGGER = logging.getLogger(__name__) # def test_compose_kpi(): # LOGGER.info(' >>> test_compose_kpi START <<< ') # KpiValueComposer.compose_kpi() def test_compose_kpi(): LOGGER.info(' >>> test_compose_kpi START <<< ') KpiValueComposer.compose_kpi() def test_request_kpi_descriptor_from_db(): LOGGER.info(' >>> test_request_kpi_descriptor_from_db START <<< ') KpiValueComposer.request_kpi_descriptor_from_db() # def test_delete_kpi_by_id(): # LOGGER.info(' >>> test_request_kpi_descriptor_from_db START <<< ') # KpiValueComposer.delete_kpi_by_id() No newline at end of file Loading
my_deploy.sh +1 −1 Original line number Diff line number Diff line Loading @@ -93,7 +93,7 @@ export CRDB_DATABASE="tfs" export CRDB_DEPLOY_MODE="single" # Disable flag for dropping database, if it exists. export CRDB_DROP_DATABASE_IF_EXISTS="YES" export CRDB_DROP_DATABASE_IF_EXISTS="NO" # Disable flag for re-deploying CockroachDB from scratch. export CRDB_REDEPLOY="" Loading
src/kpi_manager/service/KpiValueComposer.py +68 −13 Original line number Diff line number Diff line Loading @@ -24,8 +24,8 @@ from kpi_manager.service.database.Kpi_DB import Kpi_DB from kpi_manager.service.database.KpiModel import Kpi as KpiModel LOGGER = logging.getLogger(__name__) KAFKA_SERVER_IP = '10.152.183.175:9092' # KAFKA_SERVER_IP = '127.0.0.1:9092' # KAFKA_SERVER_IP = '10.152.183.175:30092' KAFKA_SERVER_IP = '127.0.0.1:9092' # ADMIN_KAFKA_CLIENT = AdminClient({'bootstrap.servers': KAFKA_SERVER_IP}) KAFKA_TOPICS = {'request' : 'topic_request', 'response': 'topic_response', 'raw' : 'topic_raw' , 'labeled' : 'topic_labled'} Loading @@ -33,7 +33,11 @@ PRODUCER_CONFIG = {'bootstrap.servers': KAFKA_SERVER_IP,} CONSUMER_CONFIG = {'bootstrap.servers' : KAFKA_SERVER_IP, 'group.id' : 'kpi_composer', 'auto.offset.reset' : 'latest'} KPIs_TO_SEARCH = ["node_timex_status", "node_timex_sync_status", "node_udp_queues"] KPIs_TO_SEARCH = ["node_network_receive_packets_total", "node_network_receive_bytes_total", "node_network_transmit_bytes_total", "process_open_fds"] DB_TABLE_NAME = KpiModel class KpiValueComposer: def __init__(self) -> None: Loading Loading @@ -78,19 +82,70 @@ class KpiValueComposer: def extract_kpi_values(event): pattern = re.compile("|".join(map(re.escape, KPIs_TO_SEARCH))) lines = event.split('\n') matching_rows = [] # matching_rows = [] sub_names = kpi_value = "" for line in lines: try: if pattern.search(line) and not line.startswith("# HELP") and not line.startswith("# TYPE"): matching_rows.append(tuple(line.split(" "))) print("Extracted Rows that match the KPIs {:}".format(matching_rows)) # LOGGER.info("Extracted Rows that match the KPIs {:}".format(matching_rows)) return matching_rows (kpi_name, kpi_value) = line.split(" ") if kpi_name.endswith('}'): (kpi_name, sub_names) = kpi_name.replace('}','').split('{') print("Extracted row that match the KPI {:}".format((kpi_name, sub_names, kpi_value))) kpi_descriptor = KpiValueComposer.request_kpi_descriptor_from_db() if kpi_descriptor is not None: kpi_to_produce = KpiValueComposer.merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value) producerObj = KafkaProducer(PRODUCER_CONFIG) producerObj.produce(KAFKA_TOPICS['labeled'], key="labeled", value= str(kpi_to_produce), callback=KpiValueComposer.delivery_callback) producerObj.flush() except Exception as e: print("Unable to extract kpi name and value from raw data: ERROR Info: {:}".format(e)) @staticmethod def request_kpi_descriptor_from_db(): def request_kpi_descriptor_from_db(kpi_name: str = KPIs_TO_SEARCH[0]): col_name = "kpi_description" kpi_name = KPIs_TO_SEARCH[0] kpiDBobj = Kpi_DB() row = kpiDBobj.search_db_row_by_id(KpiModel, col_name, kpi_name) row = kpiDBobj.search_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) if row is not None: LOGGER.info("Extracted Row: {:}".format(row)) return row else: return None @staticmethod def merge_kpi_descriptor_and_value(kpi_descriptor, kpi_value): # Creating a dictionary from the kpi_descriptor's attributes kpi_dict = { 'kpi_id' : kpi_descriptor.kpi_id, 'kpi_description': kpi_descriptor.kpi_description, 'kpi_sample_type': kpi_descriptor.kpi_sample_type, 'device_id' : kpi_descriptor.device_id, 'endpoint_id' : kpi_descriptor.endpoint_id, 'service_id' : kpi_descriptor.service_id, 'slice_id' : kpi_descriptor.slice_id, 'connection_id' : kpi_descriptor.connection_id, 'link_id' : kpi_descriptor.link_id, 'kpi_value' : kpi_value } return kpi_dict @staticmethod def delete_kpi_by_id(): col_name = "link_id" kpi_name = None kpiDBobj = Kpi_DB() row = kpiDBobj.delete_db_row_by_id(DB_TABLE_NAME, col_name, kpi_name) if row is not None: LOGGER.info("Deleted Row: {:}".format(row)) @staticmethod def delivery_callback( err, msg): """ Callback function to handle message delivery status. Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ if err: print(f'Message delivery failed: {err}') else: print(f'Message delivered to topic {msg.topic()}') No newline at end of file
src/kpi_manager/service/database/KpiModel.py +1 −3 Original line number Diff line number Diff line Loading @@ -31,7 +31,7 @@ class Kpi(Base): __tablename__ = 'kpi' kpi_id = Column(UUID(as_uuid=False), primary_key=True) kpi_description = Column(Text) kpi_description = Column(Text, unique=True) kpi_sample_type = Column(Integer) device_id = Column(String) endpoint_id = Column(String) Loading @@ -39,8 +39,6 @@ class Kpi(Base): slice_id = Column(String) connection_id = Column(String) link_id = Column(String) # monitor_flag = Column(String) # helps in logging the information def __repr__(self): Loading
src/kpi_manager/service/database/Kpi_DB.py +2 −1 Original line number Diff line number Diff line Loading @@ -80,7 +80,7 @@ class Kpi_DB: try: entity = session.query(model).filter_by(**{col_name: id_to_search}).first() if entity: LOGGER.info(f"{model.__name__} ID found: {str(entity)}") # LOGGER.info(f"{model.__name__} ID found: {str(entity)}") return entity else: LOGGER.warning(f"{model.__name__} ID not found: {str(id_to_search)}") Loading @@ -102,6 +102,7 @@ class Kpi_DB: LOGGER.info("Deleted %s with %s: %s", model.__name__, col_name, id_to_search) else: LOGGER.warning("%s with %s %s not found", model.__name__, col_name, id_to_search) return None except Exception as e: session.rollback() LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e) Loading
src/kpi_manager/tests/test_kpi_composer.py +8 −4 Original line number Diff line number Diff line Loading @@ -18,10 +18,14 @@ from kpi_manager.service.KpiValueComposer import KpiValueComposer LOGGER = logging.getLogger(__name__) # def test_compose_kpi(): # LOGGER.info(' >>> test_compose_kpi START <<< ') # KpiValueComposer.compose_kpi() def test_compose_kpi(): LOGGER.info(' >>> test_compose_kpi START <<< ') KpiValueComposer.compose_kpi() def test_request_kpi_descriptor_from_db(): LOGGER.info(' >>> test_request_kpi_descriptor_from_db START <<< ') KpiValueComposer.request_kpi_descriptor_from_db() # def test_delete_kpi_by_id(): # LOGGER.info(' >>> test_request_kpi_descriptor_from_db START <<< ') # KpiValueComposer.delete_kpi_by_id() No newline at end of file