From 56f63933fa18dfaea2186cb5565ae0bd768770b5 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sat, 12 Oct 2024 15:05:02 +0000 Subject: [PATCH 1/5] Additions to Backend Telemetry: - Added Emulated Metric Collector. - Implemented a few other changes. --- .../run_tests_locally-telemetry-backend.sh | 7 +- .../backend/service/EmulatedCollector.py | 75 +++++++++++ .../service/TelemetryBackendService.py | 125 ++++++++---------- src/telemetry/backend/tests/messages.py | 16 +++ src/telemetry/backend/tests/test_backend.py | 22 ++- .../TelemetryFrontendServiceServicerImpl.py | 4 +- 6 files changed, 171 insertions(+), 78 deletions(-) create mode 100644 src/telemetry/backend/service/EmulatedCollector.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index f71128240..89f22611f 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -18,15 +18,12 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src # RCFILE=$PROJECTDIR/coverage/.coveragerc -# coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \ -# kpi_manager/tests/test_unitary.py -# python3 kpi_manager/tests/test_unitary.py export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=INFO --log-cli-level=debug --verbose \ - telemetry/backend/tests/test_TelemetryBackend.py +python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \ + telemetry/backend/tests/test_backend.py diff --git a/src/telemetry/backend/service/EmulatedCollector.py b/src/telemetry/backend/service/EmulatedCollector.py new file mode 100644 index 000000000..716292ae0 --- /dev/null +++ b/src/telemetry/backend/service/EmulatedCollector.py @@ -0,0 +1,75 @@ +import numpy as np +import random +import threading +import time +import logging +import queue + +LOGGER = logging.getLogger(__name__) + +class NetworkMetricsEmulator(threading.Thread): + def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"): + LOGGER.info("Initiaitng Emulator") + super().__init__() + self.interval = interval + self.duration = duration + self.metric_queue = metric_queue if metric_queue is not None else queue.Queue() + self.network_state = network_state + self.running = True + self.base_utilization = None + self.states = None + self.state_probabilities = None + self.set_inital_parameter_values() + + def set_inital_parameter_values(self): + self.states = ["good", "moderate", "poor"] + self.state_probabilities = { + "good" : [0.9, 0.1, 0.0], + "moderate": [0.2, 0.7, 0.1], + "poor" : [0.0, 0.3, 0.7] + } + if self.network_state == "good": + self.base_utilization = random.uniform(700, 900) + elif self.network_state == "moderate": + self.base_utilization = random.uniform(300, 700) + else: + self.base_utilization = random.uniform(100, 300) + + def generate_synthetic_data_point(self): + if self.network_state == "good": + variance = random.uniform(-5, 5) + elif self.network_state == "moderate": + variance = random.uniform(-50, 50) + elif self.network_state == "poor": + variance = random.uniform(-100, 100) + else: + raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.") + self.base_utilization += variance + + period = 60 * 60 * random.uniform(10, 100) + amplitude = random.uniform(50, 100) + sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.base_utilization + random_noise = random.uniform(-10, 10) + utilization = sin_wave + random_noise + + state_prob = self.state_probabilities[self.network_state] + self.network_state = random.choices(self.states, state_prob)[0] + + return utilization + + def run(self): + while self.running and (self.duration == -1 or self.duration > 0): + utilization = self.generate_synthetic_data_point() + self.metric_queue.put(round(utilization,3)) + time.sleep(self.interval) + if self.duration > 0: + self.duration -= self.interval + if self.duration == -1: + self.duration = 0 + LOGGER.debug("Emulator collector is stopped.") + self.stop() + + def stop(self): + self.running = False + if not self.is_alive(): + LOGGER.debug("Emulator Collector is Termintated.") diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 81ef24481..2d7333715 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -12,22 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import queue import json import time -import random import logging import threading -from typing import Any, Dict -from datetime import datetime, timezone -# from common.proto.context_pb2 import Empty -from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import Consumer as KafkaConsumer -from confluent_kafka import KafkaError +from typing import Any, Dict +from datetime import datetime, timezone +from confluent_kafka import Producer as KafkaProducer +from confluent_kafka import Consumer as KafkaConsumer +from confluent_kafka import KafkaError +from numpy import info from common.Constants import ServiceNameEnum -from common.Settings import get_service_port_grpc -from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from common.method_wrappers.Decorator import MetricsPool +from common.Settings import get_service_port_grpc +from common.method_wrappers.Decorator import MetricsPool +from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService +from telemetry.backend.service.EmulatedCollector import NetworkMetricsEmulator LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') @@ -45,7 +46,9 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) - self.running_threads = {} + self.running_threads = {} + self.emulatorCollector = None + self.metric_queue = queue.Queue() def install_servicers(self): threading.Thread(target=self.RequestListener).start() @@ -66,93 +69,84 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue else: - # print("Consumer error: {}".format(receive_msg.error())) + LOGGER.error("Consumer error: {}".format(receive_msg.error())) break try: collector = json.loads(receive_msg.value().decode('utf-8')) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - # print('Recevied Collector: {:} - {:}'.format(collector_id, collector)) if collector['duration'] == -1 and collector['interval'] == -1: self.TerminateCollectorBackend(collector_id) else: - self.RunInitiateCollectorBackend(collector_id, collector) + threading.Thread(target=self.InitiateCollectorBackend, + args=(collector_id, collector)).start() except Exception as e: LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) - # print ("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) - def TerminateCollectorBackend(self, collector_id): - if collector_id in self.running_threads: - thread, stop_event = self.running_threads[collector_id] - stop_event.set() - thread.join() - # print ("Terminating backend (by StopCollector): Collector Id: ", collector_id) - del self.running_threads[collector_id] - self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - else: - # print ('Backend collector {:} not found'.format(collector_id)) - LOGGER.warning('Backend collector {:} not found'.format(collector_id)) - - def RunInitiateCollectorBackend(self, collector_id: str, collector: str): - stop_event = threading.Event() - thread = threading.Thread(target=self.InitiateCollectorBackend, - args=(collector_id, collector, stop_event)) - self.running_threads[collector_id] = (thread, stop_event) - thread.start() - - def InitiateCollectorBackend(self, collector_id, collector, stop_event): + def InitiateCollectorBackend(self, collector_id, collector): """ Method receives collector request and initiates collecter backend. """ # print("Initiating backend for collector: ", collector_id) LOGGER.info("Initiating backend for collector: {:s}".format(str(collector_id))) start_time = time.time() - while not stop_event.is_set(): - if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend - print("Execuation duration completed: Terminating backend: Collector Id: ", collector_id, " - ", time.time() - start_time) - self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - break - self.ExtractKpiValue(collector_id, collector['kpi_id']) - time.sleep(collector['interval']) + self.emulatorCollector = NetworkMetricsEmulator( + duration = collector['duration'], + interval = collector['interval'], + metric_queue = self.metric_queue + ) + self.emulatorCollector.start() + self.running_threads[collector_id] = self.emulatorCollector - def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + while self.emulatorCollector.is_alive(): + if not self.metric_queue.empty(): + metric_value = self.metric_queue.get() + LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value)) + self.GenerateCollectorResponse(collector_id, collector['kpi_id'] , metric_value) + time.sleep(1) + self.TerminateCollectorBackend(collector_id) + + def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ - Method to write kpi Termination signat on RESPONSE Kafka topic + Method to write kpi value on RESPONSE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { + "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value, + "kpi_value" : measured_kpi_value } producer.produce( - KafkaTopic.RESPONSE.value, # TODO: to the topic ... + KafkaTopic.VALUE.value, # TODO: to the topic ... key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback ) producer.flush() - def ExtractKpiValue(self, collector_id: str, kpi_id: str): - """ - Method to extract kpi value. - """ - measured_kpi_value = random.randint(1,100) # TODO: To be extracted from a device - # print ("Measured Kpi value: {:}".format(measured_kpi_value)) - self.GenerateCollectorResponse(collector_id, kpi_id , measured_kpi_value) + def TerminateCollectorBackend(self, collector_id): + LOGGER.debug("Terminating collector backend...") + if collector_id in self.running_threads: + thread = self.running_threads[collector_id] + thread.stop() + del self.running_threads[collector_id] + LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) + self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. + else: + LOGGER.warning('Backend collector {:} not found'.format(collector_id)) - def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ - Method to write kpi value on RESPONSE Kafka topic + Method to write kpi Termination signat on RESPONSE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { - "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value + "kpi_value" : measured_kpi_value, } producer.produce( - KafkaTopic.VALUE.value, # TODO: to the topic ... + KafkaTopic.RESPONSE.value, # TODO: to the topic ... key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback @@ -160,14 +154,9 @@ class TelemetryBackendService(GenericGrpcService): producer.flush() def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ - if err: - LOGGER.error('Message delivery failed: {:}'.format(err)) + if err: + LOGGER.error('Message delivery failed: {:s}'.format(str(err))) # print(f'Message delivery failed: {err}') - #else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # # print(f'Message delivered to topic {msg.topic()}') + # else: + # LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) + # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/messages.py b/src/telemetry/backend/tests/messages.py index 5cf553eaa..e6c9b9e16 100644 --- a/src/telemetry/backend/tests/messages.py +++ b/src/telemetry/backend/tests/messages.py @@ -12,4 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid +import random +from common.proto import telemetry_frontend_pb2 +# from common.proto.kpi_sample_types_pb2 import KpiSampleType +# from common.proto.kpi_manager_pb2 import KpiId + +def create_collector_request(): + _create_collector_request = telemetry_frontend_pb2.Collector() + _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) + # _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" + _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_collector_request.duration_s = float(random.randint(8, 16)) + # _create_collector_request.duration_s = -1 + _create_collector_request.interval_s = float(random.randint(2, 4)) + return _create_collector_request diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 8bbde9769..3ddbedf93 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -13,10 +13,11 @@ # limitations under the License. import logging -import threading +from typing import Dict +from .messages import create_collector_request from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService - +import time LOGGER = logging.getLogger(__name__) @@ -34,4 +35,19 @@ def test_validate_kafka_topics(): # def test_RunRequestListener(): # LOGGER.info('test_RunRequestListener') # TelemetryBackendServiceObj = TelemetryBackendService() -# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() \ No newline at end of file +# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() + +def test_RunInitiateCollectorBackend(): + LOGGER.debug(">>> RunInitiateCollectorBackend <<<") + collector_obj = create_collector_request() + collector_id = collector_obj.collector_id.collector_id.uuid + collector_dict : Dict = { + "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, + "duration": collector_obj.duration_s, + "interval": collector_obj.interval_s + } + TeleObj = TelemetryBackendService() + TeleObj.InitiateCollectorBackend(collector_id, collector_dict) + time.sleep(20) + + LOGGER.debug("--- Execution Finished Sucessfully---") diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 5c569e2dd..f2540fd79 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -196,7 +196,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): if kpi_id == "-1" and kpi_value == -1: # print ("Backend termination confirmation for collector id: ", collector_id) - LOGGER.info("Backend termination confirmation for collector id: ", collector_id) + LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) else: - LOGGER.info("Backend termination confirmation for collector id: ", collector_id) + LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) -- GitLab From 3a9369744e250d5a7714513857c69de6ce907493 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sat, 12 Oct 2024 15:18:59 +0000 Subject: [PATCH 2/5] Updated Kafka topic names for Telemetry service. --- src/common/tools/kafka/Variables.py | 8 ++++---- .../backend/service/TelemetryBackendService.py | 12 ++++++------ .../service/TelemetryFrontendServiceServicerImpl.py | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/common/tools/kafka/Variables.py b/src/common/tools/kafka/Variables.py index 8ff6447f7..9a1aec273 100644 --- a/src/common/tools/kafka/Variables.py +++ b/src/common/tools/kafka/Variables.py @@ -41,14 +41,14 @@ class KafkaConfig(Enum): class KafkaTopic(Enum): # TODO: Later to be populated from ENV variable. - REQUEST = 'topic_request' - RESPONSE = 'topic_response' + TELEMETRY_REQUEST = 'topic_telemetry_request' + TELEMETRY_RESPONSE = 'topic_telemetry_response' RAW = 'topic_raw' LABELED = 'topic_labeled' VALUE = 'topic_value' ALARMS = 'topic_alarms' - ANALYTICS_REQUEST = 'topic_request_analytics' - ANALYTICS_RESPONSE = 'topic_response_analytics' + ANALYTICS_REQUEST = 'topic_analytics_request' + ANALYTICS_RESPONSE = 'topic_analytics_response' @staticmethod def create_all_topics() -> bool: diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 2d7333715..8cef95292 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -36,7 +36,7 @@ METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): """ Class listens for request on Kafka topic, fetches requested metrics from device. - Produces metrics on both RESPONSE and VALUE kafka topics. + Produces metrics on both TELEMETRY_RESPONSE and VALUE kafka topics. """ def __init__(self, cls_name : str = __name__) -> None: LOGGER.info('Init TelemetryBackendService') @@ -60,7 +60,7 @@ class TelemetryBackendService(GenericGrpcService): LOGGER.info('Telemetry backend request listener is running ...') # print ('Telemetry backend request listener is running ...') consumer = self.kafka_consumer - consumer.subscribe([KafkaTopic.REQUEST.value]) + consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value]) while True: receive_msg = consumer.poll(2.0) if receive_msg is None: @@ -82,7 +82,7 @@ class TelemetryBackendService(GenericGrpcService): threading.Thread(target=self.InitiateCollectorBackend, args=(collector_id, collector)).start() except Exception as e: - LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.REQUEST.value, e)) + LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) def InitiateCollectorBackend(self, collector_id, collector): """ @@ -109,7 +109,7 @@ class TelemetryBackendService(GenericGrpcService): def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ - Method to write kpi value on RESPONSE Kafka topic + Method to write kpi value on TELEMETRY_RESPONSE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { @@ -138,7 +138,7 @@ class TelemetryBackendService(GenericGrpcService): def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ - Method to write kpi Termination signat on RESPONSE Kafka topic + Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { @@ -146,7 +146,7 @@ class TelemetryBackendService(GenericGrpcService): "kpi_value" : measured_kpi_value, } producer.produce( - KafkaTopic.RESPONSE.value, # TODO: to the topic ... + KafkaTopic.TELEMETRY_RESPONSE.value, # TODO: to the topic ... key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index f2540fd79..b7dcdbb6b 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -74,7 +74,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): "interval": collector_obj.interval_s } self.kafka_producer.produce( - KafkaTopic.REQUEST.value, + KafkaTopic.TELEMETRY_REQUEST.value, key = collector_uuid, value = json.dumps(collector_to_generate), callback = self.delivery_callback @@ -110,7 +110,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): "interval": -1 } self.kafka_producer.produce( - KafkaTopic.REQUEST.value, + KafkaTopic.TELEMETRY_REQUEST.value, key = collector_uuid, value = json.dumps(collector_to_stop), callback = self.delivery_callback @@ -168,7 +168,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): """ listener for response on Kafka topic. """ - self.kafka_consumer.subscribe([KafkaTopic.RESPONSE.value]) + self.kafka_consumer.subscribe([KafkaTopic.TELEMETRY_RESPONSE.value]) while True: receive_msg = self.kafka_consumer.poll(2.0) if receive_msg is None: -- GitLab From 29e64a62290b3100b27e86aa03ab7f7da799479f Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sat, 12 Oct 2024 15:26:51 +0000 Subject: [PATCH 3/5] Minor changes in Telemetry backend --- .../backend/service/TelemetryBackendService.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 8cef95292..ab4991690 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -88,7 +88,6 @@ class TelemetryBackendService(GenericGrpcService): """ Method receives collector request and initiates collecter backend. """ - # print("Initiating backend for collector: ", collector_id) LOGGER.info("Initiating backend for collector: {:s}".format(str(collector_id))) start_time = time.time() self.emulatorCollector = NetworkMetricsEmulator( @@ -103,13 +102,13 @@ class TelemetryBackendService(GenericGrpcService): if not self.metric_queue.empty(): metric_value = self.metric_queue.get() LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value)) - self.GenerateCollectorResponse(collector_id, collector['kpi_id'] , metric_value) + self.GenerateKpiValue(collector_id, collector['kpi_id'] , metric_value) time.sleep(1) self.TerminateCollectorBackend(collector_id) - def GenerateCollectorResponse(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ - Method to write kpi value on TELEMETRY_RESPONSE Kafka topic + Method to write kpi value on VALUE Kafka topic """ producer = self.kafka_producer kpi_value : Dict = { @@ -118,7 +117,7 @@ class TelemetryBackendService(GenericGrpcService): "kpi_value" : measured_kpi_value } producer.produce( - KafkaTopic.VALUE.value, # TODO: to the topic ... + KafkaTopic.VALUE.value, key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback @@ -146,7 +145,7 @@ class TelemetryBackendService(GenericGrpcService): "kpi_value" : measured_kpi_value, } producer.produce( - KafkaTopic.TELEMETRY_RESPONSE.value, # TODO: to the topic ... + KafkaTopic.TELEMETRY_RESPONSE.value, key = collector_id, value = json.dumps(kpi_value), callback = self.delivery_callback -- GitLab From 4467726216ac7f455a487e960040ecfd4c52f03e Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 13 Oct 2024 15:26:53 +0000 Subject: [PATCH 4/5] Emulated Collector is added in Telemetry backend to produce multiple metrics at single timestamp. --- .../backend/service/EmulatedCollector.py | 3 + .../service/EmulatedCollectorMultiple.py | 83 +++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 src/telemetry/backend/service/EmulatedCollectorMultiple.py diff --git a/src/telemetry/backend/service/EmulatedCollector.py b/src/telemetry/backend/service/EmulatedCollector.py index 716292ae0..5f546d043 100644 --- a/src/telemetry/backend/service/EmulatedCollector.py +++ b/src/telemetry/backend/service/EmulatedCollector.py @@ -8,6 +8,9 @@ import queue LOGGER = logging.getLogger(__name__) class NetworkMetricsEmulator(threading.Thread): + """ + This collector class will generate a single emulated metric value. + """ def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"): LOGGER.info("Initiaitng Emulator") super().__init__() diff --git a/src/telemetry/backend/service/EmulatedCollectorMultiple.py b/src/telemetry/backend/service/EmulatedCollectorMultiple.py new file mode 100644 index 000000000..54886e5db --- /dev/null +++ b/src/telemetry/backend/service/EmulatedCollectorMultiple.py @@ -0,0 +1,83 @@ +import numpy as np +import random +import threading +import time +import logging +import queue + +LOGGER = logging.getLogger(__name__) + +class NetworkMetricsEmulator(threading.Thread): + """ + This collector class will generate the emulated metrics for PKT_IN, PKT_OUT, BYTES_IN, BYTES_OUT and PKT_DROP as a list. + """ + def __init__(self, interval=1, duration=10, metric_queue=None, network_state="moderate"): + LOGGER.info("Initiaitng Emulator") + super().__init__() + self.interval = interval + self.duration = duration + self.metric_queue = metric_queue if metric_queue is not None else queue.Queue() + self.network_state = network_state + self.running = True + self.set_inital_parameter_values() + + def set_inital_parameter_values(self): + self.bytes_per_pkt = random.uniform(65, 150) + self.states = ["good", "moderate", "poor"] + self.state_probabilities = { + "good" : [0.8, 0.2, 0.0], + "moderate": [0.2, 0.6, 0.2], + "poor" : [0.0, 0.4, 0.6] + } + if self.network_state == "good": + self.packet_in = random.uniform(700, 900) + elif self.network_state == "moderate": + self.packet_in = random.uniform(300, 700) + else: + self.packet_in = random.uniform(100, 300) + + def generate_synthetic_data_point(self): + if self.network_state == "good": + packet_loss = random.uniform(0.01, 0.1) + random_noise = random.uniform(1,10) + elif self.network_state == "moderate": + packet_loss = random.uniform(0.1, 1) + random_noise = random.uniform(10, 40) + elif self.network_state == "poor": + packet_loss = random.uniform(1, 3) + random_noise = random.uniform(40, 100) + else: + raise ValueError("Invalid network state. Must be 'good', 'moderate', or 'poor'.") + # self.packet_in += random_noise + + period = 60 * 60 * random.uniform(10, 100) + amplitude = random.uniform(50, 100) + sin_wave = amplitude * np.sin(2 * np.pi * 100 / period) + self.packet_in + packet_in = sin_wave + ((sin_wave/100) * random_noise) + packet_out = packet_in - ((packet_in / 100) * packet_loss) + bytes_in = packet_in * self.bytes_per_pkt + bytes_out = packet_out * self.bytes_per_pkt + + state_prob = self.state_probabilities[self.network_state] + self.network_state = random.choices(self.states, state_prob)[0] + print (self.network_state) + + return [float(packet_in), float(packet_out), float(bytes_in), float(bytes_out), float(packet_loss)] + # return packet_in + + def run(self): + while self.running and (self.duration == -1 or self.duration > 0): + packet_in = self.generate_synthetic_data_point() + self.metric_queue.put(packet_in) + time.sleep(self.interval) + if self.duration > 0: + self.duration -= self.interval + if self.duration == -1: + self.duration = 0 + LOGGER.debug("Emulator collector is stopped.") + self.stop() + + def stop(self): + self.running = False + if not self.is_alive(): + print("Thread is terminated.") -- GitLab From ca614228b6c065f995a0b0b3addeb8acb8c7e1b0 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Wed, 16 Oct 2024 07:47:44 +0000 Subject: [PATCH 5/5] Pre-merge code cleanup --- src/telemetry/backend/service/EmulatedCollector.py | 14 ++++++++++++++ .../backend/service/EmulatedCollectorMultiple.py | 14 ++++++++++++++ src/telemetry/backend/tests/test_backend.py | 4 ++-- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/telemetry/backend/service/EmulatedCollector.py b/src/telemetry/backend/service/EmulatedCollector.py index 5f546d043..cffff516f 100644 --- a/src/telemetry/backend/service/EmulatedCollector.py +++ b/src/telemetry/backend/service/EmulatedCollector.py @@ -1,3 +1,17 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import numpy as np import random import threading diff --git a/src/telemetry/backend/service/EmulatedCollectorMultiple.py b/src/telemetry/backend/service/EmulatedCollectorMultiple.py index 54886e5db..5be634dea 100644 --- a/src/telemetry/backend/service/EmulatedCollectorMultiple.py +++ b/src/telemetry/backend/service/EmulatedCollectorMultiple.py @@ -1,3 +1,17 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import numpy as np import random import threading diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 3ddbedf93..a8d47da45 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -13,11 +13,11 @@ # limitations under the License. import logging +import time from typing import Dict -from .messages import create_collector_request from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService -import time +from .messages import create_collector_request LOGGER = logging.getLogger(__name__) -- GitLab