Skip to content
Snippets Groups Projects
Commit 4040e63f authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/71-cttc-separation-of-monitoring' of...

Merge branch 'feat/71-cttc-separation-of-monitoring' of ssh://gifrerenom_labs.etsi.org/tfs/controller into feat/71-cttc-separation-of-monitoring
parents 9a86eaed 5f4a7d31
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!207Resolve "(CTTC) Separation of Monitoring"
...@@ -36,9 +36,6 @@ class KafkaTopic(Enum): ...@@ -36,9 +36,6 @@ class KafkaTopic(Enum):
""" """
Method to create Kafka topics defined as class members Method to create Kafka topics defined as class members
""" """
# LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.values()))
# LOGGER.debug("Topics to be created: {:}".format(KafkaTopic.__members__.keys()))
# LOGGER.debug("Topics to be created: {:}".format([member.value for member in KafkaTopic]))
all_topics = [member.value for member in KafkaTopic] all_topics = [member.value for member in KafkaTopic]
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )): if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics created sucsessfully") LOGGER.debug("All topics created sucsessfully")
...@@ -54,16 +51,20 @@ class KafkaTopic(Enum): ...@@ -54,16 +51,20 @@ class KafkaTopic(Enum):
Args: Args:
list of topic: containing the topic name(s) to be created on Kafka list of topic: containing the topic name(s) to be created on Kafka
""" """
LOGGER.debug("Recevied topic List: {:}".format(new_topics)) LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics))
for topic in new_topics: for topic in new_topics:
try: try:
topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5) topic_metadata = KafkaConfig.ADMIN_CLIENT.value.list_topics(timeout=5)
# LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
if topic not in topic_metadata.topics: if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic # If the topic does not exist, create a new topic
print(f"Topic '{topic}' does not exist. Creating...") print("Topic {:} does not exist. Creating...".format(topic))
LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1) new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic]) KafkaConfig.ADMIN_CLIENT.value.create_topics([new_topic])
else:
print("Topic name already exists: {:}".format(topic))
LOGGER.debug("Topic name already exists: {:}".format(topic))
except Exception as e: except Exception as e:
LOGGER.debug("Failed to create topic: {:}".format(e)) LOGGER.debug("Failed to create topic: {:}".format(e))
return False return False
......
...@@ -29,35 +29,6 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name ...@@ -29,35 +29,6 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received') LOGGER.warning('Terminate signal received')
terminate.set() terminate.set()
# def start_kpi_manager(name_mapping : NameMapping):
# LOGGER.info('Start Kpi Manager...',)
# events_collector = EventsDeviceCollector(name_mapping)
# events_collector.start()
# # TODO: redesign this method to be more clear and clean
# # Iterate while terminate is not set
# while not terminate.is_set():
# list_new_kpi_ids = events_collector.listen_events()
# # Monitor Kpis
# if bool(list_new_kpi_ids):
# for kpi_id in list_new_kpi_ids:
# # Create Monitor Kpi Requests
# monitor_kpi_request = monitoring_pb2.MonitorKpiRequest()
# monitor_kpi_request.kpi_id.CopyFrom(kpi_id)
# monitor_kpi_request.monitoring_window_s = 86400
# monitor_kpi_request.sampling_rate_s = 10
# events_collector._monitoring_client.MonitorKpi(monitor_kpi_request)
# time.sleep(0.5) # let other tasks run; do not overload CPU
# else:
# # Terminate is set, looping terminates
# LOGGER.warning("Stopping execution...")
# events_collector.start()
def main(): def main():
global LOGGER # pylint: disable=global-statement global LOGGER # pylint: disable=global-statement
...@@ -77,21 +48,11 @@ def main(): ...@@ -77,21 +48,11 @@ def main():
LOGGER.debug('Starting...') LOGGER.debug('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
name_mapping = NameMapping() name_mapping = NameMapping()
# Starting monitoring service
# grpc_service = MonitoringService(name_mapping)
# grpc_service.start()
# start_monitoring(name_mapping)
grpc_service = KpiManagerService(name_mapping) grpc_service = KpiManagerService(name_mapping)
grpc_service.start() grpc_service.start()
# start_kpi_manager(name_mapping)
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass
......
confluent-kafka==2.3.0
requests==2.27.1
\ No newline at end of file
...@@ -105,19 +105,16 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer): ...@@ -105,19 +105,16 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
return KpiValueType(int64Val=int64_value) return KpiValueType(int64Val=int64_value)
except ValueError: except ValueError:
pass pass
# Check if the value is a float # Check if the value is a float
try: try:
float_value = float(value) float_value = float(value)
return KpiValueType(floatVal=float_value) return KpiValueType(floatVal=float_value)
except ValueError: except ValueError:
pass pass
# Check if the value is a boolean # Check if the value is a boolean
if value.lower() in ['true', 'false']: if value.lower() in ['true', 'false']:
bool_value = value.lower() == 'true' bool_value = value.lower() == 'true'
return KpiValueType(boolVal=bool_value) return KpiValueType(boolVal=bool_value)
# If none of the above, treat it as a string # If none of the above, treat it as a string
return KpiValueType(stringVal=value) return KpiValueType(stringVal=value)
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading, time
from prometheus_client import start_http_server
from common.Settings import get_log_level
from .NameMapping import NameMapping # import updated
from .KpiValueApiService import KpiValueApiService
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.debug('Starting...')
name_mapping = NameMapping()
grpc_service = KpiValueApiService(name_mapping)
grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.debug('Terminating...')
grpc_service.stop()
LOGGER.debug('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
...@@ -90,3 +90,4 @@ def test_store_kpi_values(kpi_value_api_client): ...@@ -90,3 +90,4 @@ def test_store_kpi_values(kpi_value_api_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ") LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list()) response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list())
assert isinstance(response, Empty) assert isinstance(response, Empty)
confluent-kafka==2.3.0
requests==2.27.1
\ No newline at end of file
...@@ -12,16 +12,11 @@ ...@@ -12,16 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging, signal, sys, threading, time import logging, signal, sys, threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum from .NameMapping import NameMapping # import updated
from common.Settings import ( from .KpiValueWriter import KpiValueWriter
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, from common.Settings import get_log_level, get_metrics_port
wait_for_environment_variables)
from common.proto import monitoring_pb2
from monitoring.service.EventTools import EventsDeviceCollector # import updated
from monitoring.service.NameMapping import NameMapping # import updated
from .KpiManagerService import KpiManagerService
terminate = threading.Event() terminate = threading.Event()
LOGGER = None LOGGER = None
...@@ -30,35 +25,6 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name ...@@ -30,35 +25,6 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received') LOGGER.warning('Terminate signal received')
terminate.set() terminate.set()
def start_kpi_manager(name_mapping : NameMapping):
LOGGER.info('Start Monitoring...',)
events_collector = EventsDeviceCollector(name_mapping)
events_collector.start()
# TODO: redesign this method to be more clear and clean
# Iterate while terminate is not set
while not terminate.is_set():
list_new_kpi_ids = events_collector.listen_events()
# Monitor Kpis
if bool(list_new_kpi_ids):
for kpi_id in list_new_kpi_ids:
# Create Monitor Kpi Requests
monitor_kpi_request = monitoring_pb2.MonitorKpiRequest()
monitor_kpi_request.kpi_id.CopyFrom(kpi_id)
monitor_kpi_request.monitoring_window_s = 86400
monitor_kpi_request.sampling_rate_s = 10
events_collector._monitoring_client.MonitorKpi(monitor_kpi_request)
time.sleep(0.5) # let other tasks run; do not overload CPU
else:
# Terminate is set, looping terminates
LOGGER.warning("Stopping execution...")
events_collector.start()
def main(): def main():
global LOGGER # pylint: disable=global-statement global LOGGER # pylint: disable=global-statement
...@@ -66,40 +32,25 @@ def main(): ...@@ -66,40 +32,25 @@ def main():
logging.basicConfig(level=log_level) logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...') LOGGER.debug('Starting...')
# Start metrics server start_http_server(get_metrics_port) # add Prometheus client port
metrics_port = get_metrics_port()
start_http_server(metrics_port)
name_mapping = NameMapping() name_mapping = NameMapping()
# Starting monitoring service
# grpc_service = MonitoringService(name_mapping)
# grpc_service.start()
# start_monitoring(name_mapping)
grpc_service = KpiManagerService(name_mapping) grpc_service = KpiValueWriter(name_mapping)
grpc_service.start() grpc_service.start()
start_kpi_manager(name_mapping)
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...') LOGGER.debug('Terminating...')
grpc_service.stop() grpc_service.stop()
LOGGER.info('Bye') LOGGER.debug('Bye')
return 0 return 0
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -22,31 +22,31 @@ from kpi_value_writer.tests.test_messages import create_kpi_id_request ...@@ -22,31 +22,31 @@ from kpi_value_writer.tests.test_messages import create_kpi_id_request
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
def test_GetKpiDescriptor(): # def test_GetKpiDescriptor():
LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") # LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ")
kpi_manager_client = KpiManagerClient() # kpi_manager_client = KpiManagerClient()
# adding KPI # # adding KPI
LOGGER.info(" --->>> calling SetKpiDescriptor ") # LOGGER.info(" --->>> calling SetKpiDescriptor ")
response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) # response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request())
# get KPI # # get KPI
LOGGER.info(" --->>> calling GetKpiDescriptor with response ID") # LOGGER.info(" --->>> calling GetKpiDescriptor with response ID")
response = kpi_manager_client.GetKpiDescriptor(response_id) # response = kpi_manager_client.GetKpiDescriptor(response_id)
LOGGER.info("Response gRPC message object: {:}".format(response)) # LOGGER.info("Response gRPC message object: {:}".format(response))
LOGGER.info(" --->>> calling GetKpiDescriptor with random ID") # LOGGER.info(" --->>> calling GetKpiDescriptor with random ID")
rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) # rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request())
LOGGER.info("Response gRPC message object: {:}".format(rand_response)) # LOGGER.info("Response gRPC message object: {:}".format(rand_response))
LOGGER.info("\n------------------ TEST FINISHED ---------------------\n") # LOGGER.info("\n------------------ TEST FINISHED ---------------------\n")
assert isinstance(response, KpiDescriptor) # assert isinstance(response, KpiDescriptor)
# -------- Initial Test ---------------- # -------- Initial Test ----------------
# def test_validate_kafka_topics(): def test_validate_kafka_topics():
# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ")
# response = KafkaTopic.create_all_topics() response = KafkaTopic.create_all_topics()
# assert isinstance(response, bool) assert isinstance(response, bool)
# def test_KafkaConsumer(): def test_KafkaConsumer():
# LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
# KpiValueWriter.RunKafkaConsumer() KpiValueWriter.RunKafkaConsumer()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment