From 1be101f8a11bc7cabc327b17952d692ea669f75f Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 26 Jan 2025 07:58:02 +0000 Subject: [PATCH 01/12] Updated Telemetry Backend. - Refactor Telemetry backend service methods - Enhance test logging for better clarity. --- .../service/TelemetryBackendService.py | 183 ++++++++++++------ src/telemetry/backend/tests/test_backend.py | 67 ++++--- .../TelemetryFrontendServiceServicerImpl.py | 8 +- 3 files changed, 172 insertions(+), 86 deletions(-) diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index a1f17df3c..0c515768e 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import queue import json import time import logging @@ -27,6 +26,7 @@ from common.Settings import get_service_port_grpc from common.method_wrappers.Decorator import MetricsPool from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService +from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') @@ -44,9 +44,7 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) - self.running_threads = {} - self.emulatorCollector = None - self.metric_queue = queue.Queue() + self.active_jobs = {} def install_servicers(self): threading.Thread(target=self.RequestListener).start() @@ -60,49 +58,88 @@ class TelemetryBackendService(GenericGrpcService): consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.TELEMETRY_REQUEST.value]) while True: - receive_msg = consumer.poll(2.0) + receive_msg = consumer.poll(1.0) if receive_msg is None: continue elif receive_msg.error(): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue + elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: + LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist. May be topic does not have any messages.") + continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) break try: - collector = json.loads(receive_msg.value().decode('utf-8')) + collector = json.loads( + receive_msg.value().decode('utf-8') + ) collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - if collector['duration'] == -1 and collector['interval'] == -1: - self.TerminateCollectorBackend(collector_id) + duration = collector.get('duration', -1) + if duration == -1 and collector['interval'] == -1: + self.TerminateCollector(collector_id) else: - threading.Thread(target=self.InitiateCollectorBackend, - args=(collector_id, collector)).start() + LOGGER.info("Collector ID: {:} - Scheduling...".format(collector_id)) + if collector_id not in self.active_jobs: + stop_event = threading.Event() + self.active_jobs[collector_id] = stop_event + threading.Thread(target = self.CollectorHandler, + args=( + collector_id, + collector['kpi_id'], + duration, + collector['interval'], + stop_event + )).start() + # Stop the Collector after the given duration + if duration > 0: + def stop_after_duration(): + time.sleep(duration) + LOGGER.warning(f"Execution duration ({duration}) completed of Collector: {collector_id}") + self.TerminateCollector(collector_id) + + duration_thread = threading.Thread( + target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}" + ) + duration_thread.start() + else: + LOGGER.warning("Collector ID: {:} - Already scheduled or running".format(collector_id)) except Exception as e: LOGGER.warning("Unable to consumer message from topic: {:}. ERROR: {:}".format(KafkaTopic.TELEMETRY_REQUEST.value, e)) - def InitiateCollectorBackend(self, collector_id, collector): + def CollectorHandler(self, collector_id, kpi_id, duration, interval, stop_event): """ - Method receives collector request and initiates collecter backend. + Method to handle collector request. """ - LOGGER.info("Initiating backend for collector: (Not Implemented... In progress ) {:s}".format(str(collector_id))) - # start_time = time.time() - # self.emulatorCollector = NetworkMetricsEmulator( - # duration = collector['duration'], - # interval = collector['interval'], - # metric_queue = self.metric_queue - # ) - # self.emulatorCollector.start() - # self.running_threads[collector_id] = self.emulatorCollector - - # while self.emulatorCollector.is_alive(): - # if not self.metric_queue.empty(): - # metric_value = self.metric_queue.get() - # LOGGER.debug("Metric: {:} - Value : {:}".format(collector['kpi_id'], metric_value)) - # self.GenerateKpiValue(collector_id, collector['kpi_id'] , metric_value) - # time.sleep(1) - # self.TerminateCollectorBackend(collector_id) + end_points : list = self.get_endpoints_from_kpi_id(kpi_id) + if not end_points: + LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) + + device_type : str = self.get_device_type_from_kpi_id(kpi_id) + + if device_type == "Unknown": + LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id)) + + if device_type == "EMU-Device": + LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) + subscription = [collector_id, end_points, duration, interval] + self.EmulatedCollectorHandler(subscription, kpi_id, stop_event) + else: + LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type)) + + + def EmulatedCollectorHandler(self, subscription, kpi_id, stop_event): + # EmulatedCollector + collector = EmulatedCollector(address="127.0.0.1", port=8000) + collector.Connect() + while not stop_event.is_set(): + # samples = collector.SubscribeState(subscription) + # LOGGER.debug("KPI: {:} - Value: {:}".format(kpi_id, samples)) + # self.GenerateKpiValue(job_id, kpi_id, samples) + LOGGER.info("Generating KPI Values ...") + time.sleep(1) def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ @@ -122,38 +159,74 @@ class TelemetryBackendService(GenericGrpcService): ) producer.flush() - def TerminateCollectorBackend(self, collector_id): + def TerminateCollector(self, job_id): LOGGER.debug("Terminating collector backend...") - if collector_id in self.running_threads: - thread = self.running_threads[collector_id] - thread.stop() - del self.running_threads[collector_id] - LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) - self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - else: - LOGGER.warning('Backend collector {:} not found'.format(collector_id)) + try: + if job_id not in self.active_jobs: # not job_ids: + # self.logger.warning(f"Active jobs: {self.active_jobs}") + self.logger.warning(f"No active jobs found for {job_id}. It might have already terminated.") + else: + LOGGER.info(f"Terminating job: {job_id}") + stop_event = self.active_jobs.pop(job_id, None) + if stop_event: + stop_event.set() + LOGGER.info(f"Job {job_id} terminated.") + else: + LOGGER.warning(f"Job {job_id} not found in active jobs.") + except: + LOGGER.exception("Error terminating job: {:}".format(job_id)) - def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + def get_endpoints_from_kpi_id(self, kpi_id: str) -> list: """ - Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic + Method to get endpoints based on kpi_id. """ - producer = self.kafka_producer - kpi_value : Dict = { - "kpi_id" : kpi_id, - "kpi_value" : measured_kpi_value, + kpi_endpoints = { + '6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0", "type": "ethernet", "sample_types": [101, 102]}, + '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []}, + '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}, } - producer.produce( - KafkaTopic.TELEMETRY_RESPONSE.value, - key = collector_id, - value = json.dumps(kpi_value), - callback = self.delivery_callback - ) - producer.flush() + return [kpi_endpoints.get(kpi_id, {})] if kpi_id in kpi_endpoints else [] + + def get_device_type_from_kpi_id(self, kpi_id: str) -> str: + """ + Method to get device type based on kpi_id. + """ + kpi_device_types = { + "123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"}, + "123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"}, + "6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"}, + } + return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown") + + + # def TerminateCollectorBackend(self, collector_id): + # LOGGER.debug("Terminating collector backend...") + # if collector_id in self.running_threads: + # thread = self.running_threads[collector_id] + # thread.stop() + # del self.running_threads[collector_id] + # LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) + # self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. + # else: + # LOGGER.warning('Backend collector {:} not found'.format(collector_id)) + + # def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): + # """ + # Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic + # """ + # producer = self.kafka_producer + # kpi_value : Dict = { + # "kpi_id" : kpi_id, + # "kpi_value" : measured_kpi_value, + # } + # producer.produce( + # KafkaTopic.TELEMETRY_RESPONSE.value, + # key = collector_id, + # value = json.dumps(kpi_value), + # callback = self.delivery_callback + # ) + # producer.flush() def delivery_callback(self, err, msg): if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err))) - # print(f'Message delivery failed: {err}') - # else: - # LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) - # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index e75b33ca5..28b92fb29 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -12,12 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest import logging import time -from typing import Dict -from common.tools.kafka.Variables import KafkaTopic from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService from .messages import create_collector_request +from .Fixtures import context_client, device_client +from .add_devices import load_topology LOGGER = logging.getLogger(__name__) @@ -26,28 +27,42 @@ LOGGER = logging.getLogger(__name__) # Tests Implementation of Telemetry Backend ########################### +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + +@pytest.fixture +def telemetryBackend_service(): + LOGGER.info('Initializing TelemetryBackendService...') + + _service = TelemetryBackendService() + _service.start() + + LOGGER.info('Yielding TelemetryBackendService...') + yield _service + + LOGGER.info('Terminating TelemetryBackendService...') + _service.stop() + LOGGER.info('Terminated TelemetryBackendService...') + + +def test_InitiateCollectorBackend(telemetryBackend_service): + LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...") + time.sleep(300) + LOGGER.info(" Backend Timer Finished Successfully. ") + # --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - -# def test_RunRequestListener(): -# LOGGER.info('test_RunRequestListener') -# TelemetryBackendServiceObj = TelemetryBackendService() -# threading.Thread(target=TelemetryBackendServiceObj.RequestListener).start() - -def test_RunInitiateCollectorBackend(): - LOGGER.debug(">>> RunInitiateCollectorBackend <<<") - collector_obj = create_collector_request() - collector_id = collector_obj.collector_id.collector_id.uuid - collector_dict : Dict = { - "kpi_id" : collector_obj.kpi_id.kpi_id.uuid, - "duration": collector_obj.duration_s, - "interval": collector_obj.interval_s - } - TeleObj = TelemetryBackendService() - TeleObj.InitiateCollectorBackend(collector_id, collector_dict) - time.sleep(20) - - LOGGER.debug("--- Execution Finished Sucessfully---") +# def test_validate_kafka_topics(): +# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") +# response = KafkaTopic.create_all_topics() +# assert isinstance(response, bool) + +# # Call load_topology from the add_devices.py file +# def test_load_topology(context_client, device_client): +# load_topology(context_client, device_client) diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index f74e97ffd..1ef8ed46b 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -13,7 +13,6 @@ # limitations under the License. import json -import threading from typing import Any, Dict import grpc import logging @@ -29,7 +28,6 @@ from telemetry.database.Telemetry_DB import TelemetryDB from confluent_kafka import Consumer as KafkaConsumer from confluent_kafka import Producer as KafkaProducer -from confluent_kafka import KafkaError LOGGER = logging.getLogger(__name__) @@ -49,7 +47,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StartCollector(self, - request : Collector, grpc_context: grpc.ServicerContext # type: ignore + request : Collector, context: grpc.ServicerContext # type: ignore ) -> CollectorId: # type: ignore LOGGER.info ("gRPC message: {:}".format(request)) response = CollectorId() @@ -86,7 +84,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def StopCollector(self, - request : CollectorId, grpc_context: grpc.ServicerContext # type: ignore + request : CollectorId, context: grpc.ServicerContext # type: ignore ) -> Empty: # type: ignore LOGGER.info ("gRPC message: {:}".format(request)) try: @@ -125,7 +123,7 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectCollectors(self, - request : CollectorFilter, contextgrpc_context: grpc.ServicerContext # type: ignore + request : CollectorFilter, context: grpc.ServicerContext # type: ignore ) -> CollectorList: # type: ignore LOGGER.info("gRPC message: {:}".format(request)) response = CollectorList() -- GitLab From af6688ba9ad5972ed2f5334b2f1925f5c2a0265b Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 26 Jan 2025 18:00:10 +0000 Subject: [PATCH 02/12] Updated Telemetry Backend - Enhance telemetry tests with logging - Update collector API methods - Updated Telemetry backend collector management --- .../backend/collector_api/_Collector.py | 42 +- .../collectors/emulated/EmulatedCollector.py | 378 ++---------------- .../service/TelemetryBackendService.py | 84 ++-- src/telemetry/frontend/tests/test_frontend.py | 37 +- 4 files changed, 110 insertions(+), 431 deletions(-) diff --git a/src/telemetry/backend/collector_api/_Collector.py b/src/telemetry/backend/collector_api/_Collector.py index ec4ba943c..d6e711d65 100644 --- a/src/telemetry/backend/collector_api/_Collector.py +++ b/src/telemetry/backend/collector_api/_Collector.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading +import queue from typing import Any, Iterator, List, Optional, Tuple, Union # Special resource names to request to the collector to retrieve the specified @@ -135,31 +135,25 @@ class _Collector: """ raise NotImplementedError() - def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> \ + def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> \ + bool: + """ Subscribe to state information of the entire device or selected resources. + Subscriptions are incremental, and the collector should keep track of requested resources. + List of tuples, each containing: + - resource_id (str): Identifier pointing to the resource to be subscribed. + - resource_dict (dict): Dictionary containing resource name, KPI to be subscribed, and type. + - sampling_duration (float): Duration (in seconds) for how long monitoring should last. + - sampling_interval (float): Desired monitoring interval (in seconds) for the specified resource. + List of results for the requested resource key subscriptions. + The return values are in the same order as the requested resource keys. + - True if a resource is successfully subscribed. + - Exception if an error occurs during the subscription process. List[Union[bool, Exception]]: - """ Subscribe to state information of entire device or - selected resources. Subscriptions are incremental. - Collector should keep track of requested resources. - Parameters: - subscriptions : List[Tuple[str, float, float]] - List of tuples, each containing a resource_key pointing the - resource to be subscribed, a sampling_duration, and a - sampling_interval (both in seconds with float - representation) defining, respectively, for how long - monitoring should last, and the desired monitoring interval - for the resource specified. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key subscriptions requested. - Return values must be in the same order as the resource keys - requested. If a resource is properly subscribed, - True must be retrieved; otherwise, the Exception that is - raised during the processing must be retrieved. - """ + """ raise NotImplementedError() - def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) \ - -> List[Union[bool, Exception]]: + def UnsubscribeState(self, resource_key: str) \ + -> bool: """ Unsubscribe from state information of entire device or selected resources. Subscriptions are incremental. Collector should keep track of requested resources. @@ -182,7 +176,7 @@ class _Collector: raise NotImplementedError() def GetState( - self, blocking=False, terminate : Optional[threading.Event] = None + self, duration : int, blocking=False, terminate: Optional[queue.Queue] = None ) -> Iterator[Tuple[float, str, Any]]: """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be called once and will diff --git a/src/telemetry/backend/collectors/emulated/EmulatedCollector.py b/src/telemetry/backend/collectors/emulated/EmulatedCollector.py index 90be01336..48102a943 100644 --- a/src/telemetry/backend/collectors/emulated/EmulatedCollector.py +++ b/src/telemetry/backend/collectors/emulated/EmulatedCollector.py @@ -15,10 +15,7 @@ import pytz import queue import logging -import uuid -import json from anytree import Node, Resolver -from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor @@ -36,10 +33,6 @@ class EmulatedCollector(_Collector): """ def __init__(self, address: str, port: int, **settings): super().__init__('emulated_collector', address, port, **settings) - self._initial_config = Node('root') # Tree structure for initial config - self._running_config = Node('root') # Tree structure for running config - self._subscriptions = Node('subscriptions') # Tree for state subscriptions - self._resolver = Resolver() # For path resolution in tree structures self._out_samples = queue.Queue() # Queue to hold synthetic state samples self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator self._scheduler = BackgroundScheduler(daemon=True) @@ -48,8 +41,8 @@ class EmulatedCollector(_Collector): executors = {'default': ThreadPoolExecutor(max_workers=1)}, timezone = pytz.utc ) - self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) - self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) + # self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) + # self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) self._helper_methods = EmulatedCollectorHelper() self.logger = logging.getLogger(__name__) @@ -77,73 +70,56 @@ class EmulatedCollector(_Collector): if not self.connected: raise RuntimeError("Collector is not connected. Please connect before performing operations.") - def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> bool: self._require_connection() - results = [] - for resource_key, duration, interval in subscriptions: - resource_key = self._helper_methods.validate_resource_key(resource_key) # Validate the endpoint name - self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s") + try: + job_id, endpoint, duration, interval = subscriptions + except: + self.logger.exception(f"Invalid subscription format: {subscriptions}") + return False + if endpoint: + self.logger.info(f"Subscribing to {endpoint} with duration {duration}s and interval {interval}s") try: - self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration - self.logger.info(f"Resource key {resource_key} exists in the configuration.") - resource_value = json.loads(self._resolver.get(self._running_config, resource_key).value) - if resource_value is not None: - sample_type_ids = resource_value['sample_types'] - self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}") - if len(sample_type_ids) == 0: - self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") - results.append(False) - continue - else: - self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") - results.append(False) - continue + sample_type_ids = endpoint['sample_types'] # type: ignore + resource_name = endpoint['name'] # type: ignore # Add the job to the scheduler - job_id = f"{resource_key}-{uuid.uuid4()}" self._scheduler.add_job( self._generate_sample, 'interval', seconds=interval, - args=[resource_key, sample_type_ids], - id=job_id, + args=[resource_name, sample_type_ids], + id=f"{job_id}", replace_existing=True, end_date=datetime.now(pytz.utc) + timedelta(seconds=duration) ) - self.logger.info(f"Job added to scheduler for resource key {resource_key} with duration {duration}s and interval {interval}s") - results.append(True) - except Exception as e: - self.logger.error(f"Failed to verify resource key or add job: {e}") - results.append(e) - return results + self.logger.info(f"Job added to scheduler for resource key {resource_name} with duration {duration}s and interval {interval}s") + return True + except: + self.logger.exception(f"Failed to verify resource key or add job:") + return False + else: + self.logger.warning(f"No sample types found for {endpoint}. Skipping subscription.") + return False - def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + def UnsubscribeState(self, resource_key: str) -> bool: self._require_connection() - results = [] - for resource_key, _, _ in subscriptions: - resource_key = self._helper_methods.validate_resource_key(resource_key) - try: - # Check if job exists - job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] - if not job_ids: - self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") - results.append(False) - continue - # Remove jobs - for job_id in job_ids: - self._scheduler.remove_job(job_id) - - self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") - results.append(True) - except Exception as e: - self.logger.exception(f"Failed to unsubscribe from {resource_key}") - results.append(e) - return results + try: + # Check if job exists + job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] + if not job_ids: + self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") + return False + for job_id in job_ids: + self._scheduler.remove_job(job_id) + self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") + return True + except: + self.logger.exception(f"Failed to unsubscribe from {resource_key}") + return False - def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: + def GetState(self, duration : int, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: self._require_connection() start_time = datetime.now(pytz.utc) - duration = 10 # Duration of the subscription in seconds (as an example) - while True: try: if terminate and not terminate.empty(): @@ -168,283 +144,3 @@ class EmulatedCollector(_Collector): self.logger.debug(f"Executing _generate_sample for resource: {resource_key}") sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids) self._out_samples.put(sample) - -# ------------- Event Listeners (START)----------------- - - def _listener_job_removed_from_subscription_tree(self, event): - if event.job_id: - # Extract the resource key from the job ID - resource_key = event.job_id.split('-')[0] - resource_key = self._helper_methods.validate_resource_key(resource_key) - - # Remove the subscription from the tree - try: - subscription_path = resource_key.split('/') - parent = self._subscriptions - for part in subscription_path: - parent = next((child for child in parent.children if child.name == part), None) - if not parent: - raise ValueError(f"Subscription path '{resource_key}' not found in tree.") - if parent: - parent.parent.children = tuple(child for child in parent.parent.children if child != parent) - self.logger.warning(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener. Maybe due to timeout.") - except Exception as e: - self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}") - - def _listener_job_added_to_subscription_tree(self, event): - try: - job_id = event.job_id - if job_id: - resource_key = job_id.split('-')[0] # Extract resource key from job ID - resource_key = self._helper_methods.validate_resource_key(resource_key) - subscription_path = resource_key.split('/') - parent = self._subscriptions - for part in subscription_path: - node = next((child for child in parent.children if child.name == part), None) - if not node: - node = Node(part, parent=parent) - parent = node - parent.value = { - "job_id": job_id - } - self.logger.info(f"Automatically added subscription for {resource_key} to the subscription_tree by listener.") - except Exception as e: - self.logger.exception("Failed to add subscription to the tree") - -# ------------- Event Listeners (END)----------------- - -#------------------------------------------------------------------------------------- -# ------- The below methods are kept for debugging purposes (test-case) only --------- -#------------------------------------------------------------------------------------- - -# This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_collector()). - def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes. - self._require_connection() - results = [] - - # if not isinstance(resources, dict): - # self.logger.error("Invalid configuration format: resources must be a dictionary.") - # raise ValueError("Invalid configuration format. Must be a dictionary.") - if 'config_rules' not in resources or not isinstance(resources['config_rules'], list): - self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.") - raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.") - - for rule in resources['config_rules']: - try: - if 'action' not in rule or 'custom' not in rule: - raise ValueError(f"Invalid rule format: {rule}") - - action = rule['action'] - custom = rule['custom'] - resource_key = custom.get('resource_key') - resource_value = custom.get('resource_value') - - if not resource_key: - raise ValueError(f"Resource key is missing in rule: {rule}") - - if resource_value is None: - raise ValueError(f"Resource value is None for key: {resource_key}") - if not resource_key: - raise ValueError(f"Resource key is missing in rule: {rule}") - - if action == 1: # Set action - resource_path = self._helper_methods._parse_resource_key(resource_key) - # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}") - parent = self._running_config - - for part in resource_path[:-1]: - if '[' in part and ']' in part: - base, index = part.split('[', 1) - index = index.rstrip(']') - parent = self._helper_methods._find_or_create_node(index, self._helper_methods._find_or_create_node(base, parent)) - # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}") - elif resource_path[-1] != 'settings': - # self.logger.info(f"2b. Creating node: {part}") - parent = self._helper_methods._find_or_create_node(part, parent) - - final_part = resource_path[-1] - if final_part in ['address', 'port']: - self._helper_methods._create_or_update_node(final_part, parent, resource_value) - self.logger.info(f"Configured: {resource_key} = {resource_value}") - - if resource_key.startswith("_connect/settings"): - parent = self._helper_methods._find_or_create_node("_connect", self._running_config) - settings_node = self._helper_methods._find_or_create_node("settings", parent) - settings_node.value = None # Ensure settings node has None value - endpoints_node = self._helper_methods._find_or_create_node("endpoints", settings_node) - - for endpoint in resource_value.get("endpoints", []): - uuid = endpoint.get("uuid") - uuid = uuid.replace('/', '_') if uuid else None - if uuid: - # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}") - self._helper_methods._create_or_update_node(uuid, endpoints_node, endpoint) - self.logger.info(f"Configured endpoint: {uuid} : {endpoint}") - - elif resource_key.startswith("/interface"): - interface_parent = self._helper_methods._find_or_create_node("interface", self._running_config) - name = resource_value.get("name") - name = name.replace('/', '_') if name else None - if name: - self._helper_methods._create_or_update_node(name, interface_parent, resource_value) - self.logger.info(f"Configured interface: {name} : {resource_value}") - # self.logger.info(f"4. Configured interface: {name}") - - results.append(True) - else: - raise ValueError(f"Unsupported action '{action}' in rule: {rule}") - - if resource_value is None: - raise ValueError(f"Resource value is None for key: {resource_key}") - - except Exception as e: - self.logger.exception(f"Failed to apply rule: {rule}") - results.append(e) - - return results - -#----------------------------------- -# ------- EXTRA Methods ------------ -#----------------------------------- - - # def log_active_jobs(self): # For debugging purposes. - # """ - # Logs the IDs of all active jobs. - # This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger. - # """ - # self._require_connection() - # jobs = self._scheduler.get_jobs() - # self.logger.info(f"Active jobs: {[job.id for job in jobs]}") - - # def print_config_tree(self): # For debugging purposes. - # """ - # Reads the configuration using GetConfig and prints it as a hierarchical tree structure. - # """ - # self._require_connection() - - # def print_tree(node, indent=""): - # """ - # Recursively prints the configuration tree. - - # Args: - # node (Node): The current node to print. - # indent (str): The current indentation level. - # """ - # if node.name != "root": # Skip the root node's name - # value = getattr(node, "value", None) - # print(f"{indent}- {node.name}: {json.loads(value) if value else ''}") - - # for child in node.children: - # print_tree(child, indent + " ") - - # print("Configuration Tree:") - # print_tree(self._running_config) - - - # def GetInitialConfig(self) -> List[Tuple[str, Any]]: # comment - # self._require_connection() - # results = [] - # for node in self._initial_config.descendants: - # value = getattr(node, "value", None) - # results.append((node.name, json.loads(value) if value else None)) - # self.logger.info("Retrieved initial configurations") - # return results - - # def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: # comment - # """ - # Retrieves the configuration for the specified resource keys. - # If no keys are provided, returns the full configuration tree. - - # Args: - # resource_keys (List[str]): A list of keys specifying the configuration to retrieve. - - # Returns: - # List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, - # subtree, or an exception. - # """ - # self._require_connection() - # results = [] - - # try: - # if not resource_keys: - # # If no specific keys are provided, return the full configuration tree - - # full_tree = self._helper_methods._generate_subtree(self._running_config) - # # full_tree = self._generate_subtree(self._running_config) - # return [("full_configuration", full_tree)] - - # for key in resource_keys: - # try: - # # Parse the resource key - # resource_path = self._helper_methods.(key) - # self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}") - - # # Navigate to the node corresponding to the key - # parent = self._running_config - # for part in resource_path: - # parent = self._find_or_raise_node(part, parent) - - # # Check if the node has a value - # value = getattr(parent, "value", None) - # if value: - # # If a value exists, return it - # results.append((key, json.loads(value))) - # else: - # # If no value, return the subtree of this node - # subtree = self._helper_methods._generate_subtree(parent) - # # subtree = self._generate_subtree(parent) - # results.append((key, subtree)) - - # except Exception as e: - # self.logger.exception(f"Failed to retrieve configuration for key: {key}") - # results.append((key, e)) - - # except Exception as e: - # self.logger.exception("Failed to retrieve configurations") - # results.append(("Error", e)) - - # return results - - # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # comment - # self._require_connection() - # results = [] - - # for key in resources: - # try: - # # Parse resource key into parts, handling brackets correctly - # resource_path = self._helper_methods.(key) - - # parent = self._running_config - # for part in resource_path: - # parent = self._find_or_raise_node(part, parent) - - # # Delete the final node - # node_to_delete = parent - # parent = node_to_delete.parent - # parent.children = tuple(child for child in parent.children if child != node_to_delete) - # self.logger.info(f"Deleted configuration for key: {key}") - - # # Handle endpoints structure - # if "interface" in key and "settings" in key: - # interface_name = key.split('[')[-1].split(']')[0] - # endpoints_parent = self._find_or_raise_node("_connect", self._running_config) - # endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent) - # endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None) - # if endpoint_to_delete: - # endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete) - # self.logger.info(f"Removed endpoint entry for interface '{interface_name}'") - - # # Check if parent has no more children and is not the root - # while parent and parent.name != "root" and not parent.children: - # node_to_delete = parent - # parent = node_to_delete.parent - # parent.children = tuple(child for child in parent.children if child != node_to_delete) - # self.logger.info(f"Deleted empty parent node: {node_to_delete.name}") - - # results.append(True) - # except Exception as e: - # self.logger.exception(f"Failed to delete configuration for key: {key}") - # results.append(e) - - # return results - diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 0c515768e..c392efd1d 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -44,6 +44,7 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) + self.collector = EmulatedCollector(address="127.0.0.1", port=8000) self.active_jobs = {} def install_servicers(self): @@ -65,7 +66,7 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: - LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist. May be topic does not have any messages.") + LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) @@ -77,11 +78,11 @@ class TelemetryBackendService(GenericGrpcService): collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - duration = collector.get('duration', -1) + duration = collector.get('duration', 0) if duration == -1 and collector['interval'] == -1: self.TerminateCollector(collector_id) else: - LOGGER.info("Collector ID: {:} - Scheduling...".format(collector_id)) + LOGGER.info("Received Collector ID: {:} - Scheduling...".format(collector_id)) if collector_id not in self.active_jobs: stop_event = threading.Event() self.active_jobs[collector_id] = stop_event @@ -95,13 +96,15 @@ class TelemetryBackendService(GenericGrpcService): )).start() # Stop the Collector after the given duration if duration > 0: - def stop_after_duration(): - time.sleep(duration) - LOGGER.warning(f"Execution duration ({duration}) completed of Collector: {collector_id}") - self.TerminateCollector(collector_id) + def stop_after_duration(completion_time, stop_event): + time.sleep(completion_time) + if not stop_event.is_set(): + LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}") + self.TerminateCollector(collector_id) duration_thread = threading.Thread( - target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}" + target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}", + args=(duration, stop_event) ) duration_thread.start() else: @@ -113,7 +116,7 @@ class TelemetryBackendService(GenericGrpcService): """ Method to handle collector request. """ - end_points : list = self.get_endpoints_from_kpi_id(kpi_id) + end_points : dict = self.get_endpoints_from_kpi_id(kpi_id) if not end_points: LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) @@ -125,21 +128,24 @@ class TelemetryBackendService(GenericGrpcService): if device_type == "EMU-Device": LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) subscription = [collector_id, end_points, duration, interval] - self.EmulatedCollectorHandler(subscription, kpi_id, stop_event) + self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event) else: LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type)) - - def EmulatedCollectorHandler(self, subscription, kpi_id, stop_event): + def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event): # EmulatedCollector - collector = EmulatedCollector(address="127.0.0.1", port=8000) - collector.Connect() - while not stop_event.is_set(): - # samples = collector.SubscribeState(subscription) - # LOGGER.debug("KPI: {:} - Value: {:}".format(kpi_id, samples)) - # self.GenerateKpiValue(job_id, kpi_id, samples) - LOGGER.info("Generating KPI Values ...") - time.sleep(1) + + self.collector.Connect() + if not self.collector.SubscribeState(subscription): + LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) + else: + while not stop_event.is_set(): + samples = list(self.collector.GetState(duration=duration, blocking=True)) + LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples)) + self.GenerateKpiValue(collector_id, kpi_id, samples) + time.sleep(1) + self.collector.Disconnect() + # self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration. def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ @@ -171,12 +177,17 @@ class TelemetryBackendService(GenericGrpcService): if stop_event: stop_event.set() LOGGER.info(f"Job {job_id} terminated.") + if self.collector.UnsubscribeState(job_id): + LOGGER.info(f"Unsubscribed from collector: {job_id}") + else: + LOGGER.warning(f"Failed to unsubscribe from collector: {job_id}") else: LOGGER.warning(f"Job {job_id} not found in active jobs.") except: LOGGER.exception("Error terminating job: {:}".format(job_id)) - def get_endpoints_from_kpi_id(self, kpi_id: str) -> list: +# --- Mock Methods --- + def get_endpoints_from_kpi_id(self, kpi_id: str) -> dict: """ Method to get endpoints based on kpi_id. """ @@ -185,7 +196,7 @@ class TelemetryBackendService(GenericGrpcService): '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []}, '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}, } - return [kpi_endpoints.get(kpi_id, {})] if kpi_id in kpi_endpoints else [] + return kpi_endpoints.get(kpi_id, {}) if kpi_id in kpi_endpoints else {} def get_device_type_from_kpi_id(self, kpi_id: str) -> str: """ @@ -198,35 +209,6 @@ class TelemetryBackendService(GenericGrpcService): } return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown") - - # def TerminateCollectorBackend(self, collector_id): - # LOGGER.debug("Terminating collector backend...") - # if collector_id in self.running_threads: - # thread = self.running_threads[collector_id] - # thread.stop() - # del self.running_threads[collector_id] - # LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) - # self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - # else: - # LOGGER.warning('Backend collector {:} not found'.format(collector_id)) - - # def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): - # """ - # Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic - # """ - # producer = self.kafka_producer - # kpi_value : Dict = { - # "kpi_id" : kpi_id, - # "kpi_value" : measured_kpi_value, - # } - # producer.produce( - # KafkaTopic.TELEMETRY_RESPONSE.value, - # key = collector_id, - # value = json.dumps(kpi_value), - # callback = self.delivery_callback - # ) - # producer.flush() - def delivery_callback(self, err, msg): if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err))) diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 067925a28..6c6107152 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -15,6 +15,7 @@ import os import pytest import logging +import time from common.Constants import ServiceNameEnum from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList @@ -42,6 +43,16 @@ os.environ[get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT LOGGER = logging.getLogger(__name__) +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + @pytest.fixture(scope='session') def telemetryFrontend_service(): LOGGER.info('Initializing TelemetryFrontendService...') @@ -82,33 +93,29 @@ def telemetryFrontend_client( # ------- Re-structuring Test --------- # --- "test_validate_kafka_topics" should be run before the functionality tests --- def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") response = KafkaTopic.create_all_topics() assert isinstance(response, bool) # ----- core funtionality test ----- def test_StartCollector(telemetryFrontend_client): - LOGGER.info(' >>> test_StartCollector START: <<< ') + # LOGGER.info(' >>> test_StartCollector START: <<< ') response = telemetryFrontend_client.StartCollector(create_collector_request()) LOGGER.debug(str(response)) assert isinstance(response, CollectorId) + def test_StopCollector(telemetryFrontend_client): - LOGGER.info(' >>> test_StopCollector START: <<< ') + # LOGGER.info(' >>> test_StopCollector START: <<< ') + LOGGER.info("Waiting before termination...") + time.sleep(30) response = telemetryFrontend_client.StopCollector(create_collector_id()) LOGGER.debug(str(response)) assert isinstance(response, Empty) -def test_SelectCollectors(telemetryFrontend_client): - LOGGER.info(' >>> test_SelectCollectors START: <<< ') - response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) - LOGGER.debug(str(response)) - assert isinstance(response, CollectorList) - -# # ----- Non-gRPC method tests ----- -# def test_RunResponseListener(): -# LOGGER.info(' >>> test_RunResponseListener START: <<< ') -# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() -# response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto +# def test_SelectCollectors(telemetryFrontend_client): +# LOGGER.info(' >>> test_SelectCollectors START: <<< ') +# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) # LOGGER.debug(str(response)) -# assert isinstance(response, bool) +# assert isinstance(response, CollectorList) + -- GitLab From 56089f19701eaad593aa573c88ec33e691993890 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sat, 15 Feb 2025 14:34:46 +0000 Subject: [PATCH 03/12] Changes in Kpi_value_api and added new monitoring namespace - Added Prometheus monitoring deployment and update test configurations - Updated KPI VALUE API (writer and promWriter) --- deploy/all.sh | 3 ++ deploy/monitoring.sh | 53 +++++++++++++++++++ manifests/kpi_value_writerservice.yaml | 2 + manifests/prometheus/prometheus.yaml | 52 ++++++++++++++++++ scripts/run_tests_locally-kpi-value-writer.sh | 1 + .../run_tests_locally-telemetry-backend.sh | 2 +- .../run_tests_locally-telemetry-frontend.sh | 5 +- src/common/tools/client/RetryDecorator.py | 2 +- src/kpi_manager/tests/test_messages.py | 2 +- .../service/KpiValueWriter.py | 35 ++++-------- .../service/MetricWriterToPrometheus.py | 42 +++++++++------ src/kpi_value_writer/service/__main__.py | 3 -- .../tests/test_kpi_value_writer.py | 39 ++++++++++++-- src/kpi_value_writer/tests/test_messages.py | 3 +- 14 files changed, 190 insertions(+), 54 deletions(-) create mode 100644 deploy/monitoring.sh create mode 100644 manifests/prometheus/prometheus.yaml diff --git a/deploy/all.sh b/deploy/all.sh index 97f4db37d..f3075949e 100755 --- a/deploy/all.sh +++ b/deploy/all.sh @@ -215,6 +215,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} # Deploy Apache Kafka ./deploy/kafka.sh +#Deploy Monitoring (Prometheus, Mimir, Grafana) +./deploy/monitoring.sh + # Expose Dashboard ./deploy/expose_dashboard.sh diff --git a/deploy/monitoring.sh b/deploy/monitoring.sh new file mode 100644 index 000000000..18992501a --- /dev/null +++ b/deploy/monitoring.sh @@ -0,0 +1,53 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +RELEASE_NAME="mon-prometheus" +NAMESPACE="monitoring" +CHART_REPO_NAME="prometheus-community" +CHART_REPO_URL="https://prometheus-community.github.io/helm-charts" +CHART_NAME="prometheus" # Chart name within the repo +VALUES_FILE="manifests/prometheus/prometheus.yaml" + +echo ">>> Deploying Prometheus with the following configuration:" +echo "Adding/updating Helm repo: $CHART_REPO_NAME -> $CHART_REPO_URL" +helm repo add "$CHART_REPO_NAME" "$CHART_REPO_URL" || true +helm repo update + +echo "Creating namespace '$NAMESPACE' if it doesn't exist..." +kubectl get namespace "$NAMESPACE" >/dev/null 2>&1 || kubectl create namespace "$NAMESPACE" + +#------------------------------------------------------------------------------ +# 3. Install or upgrade the Prometheus chart +# - If 'VALUES_FILE' is set, it will use it for custom configuration. +# - Otherwise, it will deploy with the chart defaults. +#------------------------------------------------------------------------------ +if [ -n "$VALUES_FILE" ] && [ -f "$VALUES_FILE" ]; then + echo "Installing/Upgrading Prometheus with custom values from $VALUES_FILE..." + helm upgrade --install "$RELEASE_NAME" "$CHART_REPO_NAME/$CHART_NAME" \ + --namespace "$NAMESPACE" \ + --values "$VALUES_FILE" +else + echo "Installing/Upgrading Prometheus with default chart values..." + helm upgrade --install "$RELEASE_NAME" "$CHART_REPO_NAME/$CHART_NAME" \ + --namespace "$NAMESPACE" +fi + +echo "Waiting for Prometheus pods to be ready..." +kubectl rollout status deployment/"$RELEASE_NAME"-server -n "$NAMESPACE" || true + +# echo "Listing deployed resources in namespace '$NAMESPACE':" +# kubectl get all -n "$NAMESPACE" + +echo "<<< Prometheus deployment completed successfully!" diff --git a/manifests/kpi_value_writerservice.yaml b/manifests/kpi_value_writerservice.yaml index f98be4629..27c61c933 100644 --- a/manifests/kpi_value_writerservice.yaml +++ b/manifests/kpi_value_writerservice.yaml @@ -39,6 +39,8 @@ spec: env: - name: LOG_LEVEL value: "INFO" + - name: PUSHGATEWAY_URL + value: "http://mon-prometheus-prometheus-pushgateway.monitoring.svc.cluster.local:9091" envFrom: - secretRef: name: kfk-kpi-data diff --git a/manifests/prometheus/prometheus.yaml b/manifests/prometheus/prometheus.yaml new file mode 100644 index 000000000..fabc97c4a --- /dev/null +++ b/manifests/prometheus/prometheus.yaml @@ -0,0 +1,52 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Configuration for Prometheus components and server settings +# Global Prometheus configuration +alertmanager: + enabled: false # Default is true +kube-state-metrics: + enabled: false # Default is true +prometheus-node-exporter: + enabled: false # Default is true +prometheus-pushgateway: + enabled: true # Default is true + +# Prometheus server-specific configuration +server: + retention: "30d" + logLevel: "debug" + resources: + requests: + cpu: "250m" + memory: "256Mi" + limits: + cpu: "1" + memory: "1Gi" + + # Expose the Prometheus server via a Kubernetes service + service: + type: NodePort + nodePort: 30090 + + extraScrapeConfigs: + - job_name: 'pushgateway' + static_configs: + - targets: + - 'prometheus-pushgateway.monitoring.svc.cluster.local:9091' # Push Gateway endpoint + + # Global Prometheus settings: + global: + scrape_interval: 10s + evaluation_interval: 10s diff --git a/scripts/run_tests_locally-kpi-value-writer.sh b/scripts/run_tests_locally-kpi-value-writer.sh index cbeed3b78..e3d9c7c6a 100755 --- a/scripts/run_tests_locally-kpi-value-writer.sh +++ b/scripts/run_tests_locally-kpi-value-writer.sh @@ -19,6 +19,7 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src export KFK_SERVER_ADDRESS='127.0.0.1:9092' + RCFILE=$PROJECTDIR/coverage/.coveragerc python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ kpi_value_writer/tests/test_kpi_value_writer.py diff --git a/scripts/run_tests_locally-telemetry-backend.sh b/scripts/run_tests_locally-telemetry-backend.sh index f648a6252..1b4915d74 100755 --- a/scripts/run_tests_locally-telemetry-backend.sh +++ b/scripts/run_tests_locally-telemetry-backend.sh @@ -25,5 +25,5 @@ export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetr RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=debug --log-cli-level=debug --verbose \ +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/backend/tests/test_backend.py diff --git a/scripts/run_tests_locally-telemetry-frontend.sh b/scripts/run_tests_locally-telemetry-frontend.sh index 38822330e..e70818377 100755 --- a/scripts/run_tests_locally-telemetry-frontend.sh +++ b/scripts/run_tests_locally-telemetry-frontend.sh @@ -18,10 +18,11 @@ PROJECTDIR=`pwd` cd $PROJECTDIR/src -# python3 kpi_manager/tests/test_unitary.py export KFK_SERVER_ADDRESS='127.0.0.1:9092' CRDB_SQL_ADDRESS=$(kubectl get service cockroachdb-public --namespace crdb -o jsonpath='{.spec.clusterIP}') + export CRDB_URI="cockroachdb://tfs:tfs123@${CRDB_SQL_ADDRESS}:26257/tfs_telemetry?sslmode=require" RCFILE=$PROJECTDIR/coverage/.coveragerc -python3 -m pytest --log-level=DEBUG --log-cli-level=DEBUG --verbose \ + +python3 -m pytest --log-level=INFO --log-cli-level=INFO --verbose \ telemetry/frontend/tests/test_frontend.py diff --git a/src/common/tools/client/RetryDecorator.py b/src/common/tools/client/RetryDecorator.py index 4750ff73a..efc8b5234 100644 --- a/src/common/tools/client/RetryDecorator.py +++ b/src/common/tools/client/RetryDecorator.py @@ -56,7 +56,7 @@ def delay_linear(initial=0, increment=0, maximum=None): return delay return compute -def delay_exponential(initial=1, increment=1, maximum=None): +def delay_exponential(initial=1.0, increment=1.0, maximum=None): def compute(num_try): delay = initial * pow(increment, (num_try - 1)) if maximum is not None: diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index 5f55c2cfc..ebe13b661 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -77,4 +77,4 @@ def create_kpi_filter_request(): _create_kpi_filter_request.connection_id.append(connection_id_obj) _create_kpi_filter_request.link_id.append(link_id_obj) - return _create_kpi_filter_request \ No newline at end of file + return _create_kpi_filter_request diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 0bc95355e..25b8ca2e8 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -15,25 +15,21 @@ import json import logging import threading + +from confluent_kafka import KafkaError +from confluent_kafka import Consumer as KafkaConsumer + from common.tools.kafka.Variables import KafkaConfig, KafkaTopic -from common.proto.kpi_value_api_pb2 import KpiValue from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId from common.Settings import get_service_port_grpc from common.Constants import ServiceNameEnum from common.tools.service.GenericGrpcService import GenericGrpcService - -from confluent_kafka import KafkaError -from confluent_kafka import Consumer as KafkaConsumer - from kpi_manager.client.KpiManagerClient import KpiManagerClient -# -- test import -- -# from kpi_value_writer.tests.test_messages import create_kpi_descriptor_request from .MetricWriterToPrometheus import MetricWriterToPrometheus -LOGGER = logging.getLogger(__name__) -ACTIVE_CONSUMERS = [] +LOGGER = logging.getLogger(__name__) class KpiValueWriter(GenericGrpcService): def __init__(self, cls_name : str = __name__) -> None: @@ -43,9 +39,8 @@ class KpiValueWriter(GenericGrpcService): 'group.id' : 'KpiValueWriter', 'auto.offset.reset' : 'latest'}) - def RunKafkaConsumer(self): + def install_servicers(self): thread = threading.Thread(target=self.KafkaKpiConsumer, args=()) - ACTIVE_CONSUMERS.append(thread) thread.start() def KafkaKpiConsumer(self): @@ -55,7 +50,6 @@ class KpiValueWriter(GenericGrpcService): consumer = self.kafka_consumer consumer.subscribe([KafkaTopic.VALUE.value]) LOGGER.debug("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) - print("Kafka Consumer start listenng on topic: {:}".format(KafkaTopic.VALUE.value)) while True: raw_kpi = consumer.poll(1.0) if raw_kpi is None: @@ -69,30 +63,21 @@ class KpiValueWriter(GenericGrpcService): try: kpi_value = json.loads(raw_kpi.value().decode('utf-8')) LOGGER.info("Received KPI : {:}".format(kpi_value)) - print("Received KPI : {:}".format(kpi_value)) self.get_kpi_descriptor(kpi_value, kpi_manager_client, metric_writer) - except Exception as e: - print("Error detail: {:}".format(e)) + except: + LOGGER.exception("Error detail: ") continue def get_kpi_descriptor(self, kpi_value: str, kpi_manager_client, metric_writer): - print("--- START -----") - kpi_id = KpiId() - kpi_id.kpi_id.uuid = kpi_value['kpi_uuid'] - print("KpiId generated: {:}".format(kpi_id)) - # print("Kpi manger client created: {:}".format(kpi_manager_client)) + kpi_id.kpi_id.uuid = kpi_value['kpi_id'] # type: ignore try: kpi_descriptor_object = KpiDescriptor() kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id) - # TODO: why kpi_descriptor_object recevies a KpiDescriptor type object not Empty type object??? if kpi_descriptor_object.kpi_id.kpi_id.uuid == kpi_id.kpi_id.uuid: LOGGER.info("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) - print("Extracted KpiDescriptor: {:}".format(kpi_descriptor_object)) metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) else: - LOGGER.info("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) - print("No KPI Descriptor found in DB for Kpi ID: {:}".format(kpi_id)) + LOGGER.info("No KPI Descriptor found in Database for Kpi ID: {:}".format(kpi_id)) except Exception as e: LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) - print ("Unable to get KpiDescriptor. Error: {:}".format(e)) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index bfbb6e3ba..3238516c9 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -14,15 +14,20 @@ # read Kafka stream from Kafka topic +import os import logging -from prometheus_client import Gauge -from common.proto.kpi_sample_types_pb2 import KpiSampleType -from common.proto.kpi_value_api_pb2 import KpiValue -from common.proto.kpi_manager_pb2 import KpiDescriptor +from prometheus_client import Gauge +from prometheus_client.exposition import push_to_gateway +from prometheus_client.registry import CollectorRegistry + +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_value_api_pb2 import KpiValue +from common.proto.kpi_manager_pb2 import KpiDescriptor -LOGGER = logging.getLogger(__name__) -PROM_METRICS = {} +LOGGER = logging.getLogger(__name__) +PROM_METRICS = {} +GATEWAY_URL = os.getenv('PUSHGATEWAY_URL', 'prometheus-pushgateway.monitoring.svc.cluster.local:9091') class MetricWriterToPrometheus: ''' @@ -30,7 +35,9 @@ class MetricWriterToPrometheus: cooked KPI value = KpiDescriptor (gRPC message) + KpiValue (gRPC message) ''' def __init__(self): - pass + self.job_name = 'kpivaluewriter' + self.registry = CollectorRegistry() + self.gateway_url = GATEWAY_URL def merge_kpi_descriptor_and_kpi_value(self, kpi_descriptor, kpi_value): # Creating a dictionary from the kpi_descriptor's attributes @@ -44,26 +51,27 @@ class MetricWriterToPrometheus: 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, - 'time_stamp' : kpi_value.timestamp.timestamp, - 'kpi_value' : kpi_value.kpi_value_type.floatVal + 'time_stamp' : kpi_value["time_stamp"], + 'kpi_value' : kpi_value["kpi_value"] } LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue): # merge both gRPC messages into single varible. - cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) + cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value) tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'} - metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags - metric_name = cooked_kpi['kpi_sample_type'] + metric_tags = [tag for tag in cooked_kpi.keys() if tag not in tags_to_exclude] # These values will be used as metric tags + metric_name = cooked_kpi['kpi_sample_type'] try: if metric_name not in PROM_METRICS: # Only register the metric, when it doesn't exists PROM_METRICS[metric_name] = Gauge ( metric_name, cooked_kpi['kpi_description'], - metric_tags + metric_tags, + registry=self.registry ) - LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) + LOGGER.debug("Metric is created with labels: {:}".format(metric_tags)) PROM_METRICS[metric_name].labels( kpi_id = cooked_kpi['kpi_id'], device_id = cooked_kpi['device_id'], @@ -74,7 +82,11 @@ class MetricWriterToPrometheus: link_id = cooked_kpi['link_id'], time_stamp = cooked_kpi['time_stamp'], ).set(float(cooked_kpi['kpi_value'])) - LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name])) + LOGGER.debug("Metric is being pushed to the Gateway ... : {:}".format(PROM_METRICS[metric_name])) + + # Push to the Prometheus Gateway, Prometheus is preconfigured to scrap the metrics from the gateway + push_to_gateway(self.gateway_url, job=self.job_name, registry=self.registry) + LOGGER.debug("Metric pushed to Prometheus Gateway.") except ValueError as e: if 'Duplicated timeseries' in str(e): diff --git a/src/kpi_value_writer/service/__main__.py b/src/kpi_value_writer/service/__main__.py index 28ba2ac90..56fc6100d 100644 --- a/src/kpi_value_writer/service/__main__.py +++ b/src/kpi_value_writer/service/__main__.py @@ -13,7 +13,6 @@ # limitations under the License. import logging, signal, sys, threading -from prometheus_client import start_http_server from kpi_value_writer.service.KpiValueWriter import KpiValueWriter from common.Settings import get_log_level @@ -39,8 +38,6 @@ def main(): grpc_service = KpiValueWriter() grpc_service.start() - start_http_server(10808) - LOGGER.debug("Prometheus client is started on port 10808") # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass diff --git a/src/kpi_value_writer/tests/test_kpi_value_writer.py b/src/kpi_value_writer/tests/test_kpi_value_writer.py index 0d3f9e683..29e81d28a 100755 --- a/src/kpi_value_writer/tests/test_kpi_value_writer.py +++ b/src/kpi_value_writer/tests/test_kpi_value_writer.py @@ -12,14 +12,35 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest +import time import logging from kpi_value_writer.service.KpiValueWriter import KpiValueWriter +from kpi_manager.client.KpiManagerClient import KpiManagerClient from common.tools.kafka.Variables import KafkaTopic +from test_messages import create_kpi_descriptor_request +LOGGER = logging.getLogger(__name__) +# -------- Fixtures ---------------- + +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + +# @pytest.fixture(scope='module') +# def kpi_manager_client(): +# LOGGER.debug("Yielding KpiManagerClient ...") +# yield KpiManagerClient(host="10.152.183.203") +# LOGGER.debug("KpiManagerClient is terminated.") -LOGGER = logging.getLogger(__name__) # -------- Initial Test ---------------- def test_validate_kafka_topics(): @@ -27,7 +48,15 @@ def test_validate_kafka_topics(): response = KafkaTopic.create_all_topics() assert isinstance(response, bool) -def test_KafkaConsumer(): - LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ") - # kpi_value_writer = KpiValueWriter() - # kpi_value_writer.RunKafkaConsumer() +# -------------- +# NOT FOR GITHUB PIPELINE (Local testing only) +# -------------- +# def test_KafkaConsumer(kpi_manager_client): + +# # kpidescriptor = create_kpi_descriptor_request() +# # kpi_manager_client.SetKpiDescriptor(kpidescriptor) + +# kpi_value_writer = KpiValueWriter() +# kpi_value_writer.KafkaKpiConsumer() +# LOGGER.debug(" waiting for timer to finish ") +# time.sleep(300) diff --git a/src/kpi_value_writer/tests/test_messages.py b/src/kpi_value_writer/tests/test_messages.py index ffc6b398c..4cd901b2c 100755 --- a/src/kpi_value_writer/tests/test_messages.py +++ b/src/kpi_value_writer/tests/test_messages.py @@ -25,7 +25,8 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(description: str = "Test Description"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() - _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_request.kpi_id.kpi_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" _create_kpi_request.kpi_description = description _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = 'DEV4' -- GitLab From cdf65bba37a2c374e05119cccd62ab059bcfc004 Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Sun, 2 Mar 2025 07:30:37 +0000 Subject: [PATCH 04/12] Changes in Analytics, DB, KPI Enums, KPI Manager and Telemetery - Create Kafka topic added in main method of services - KPI sample types added in ENUM class - Methods are commented in _Collector class. - In Telemetry backend, logic is added get device ID, enpoint ID and devicec type from Context and KPI DB. --- src/analytics/backend/service/__main__.py | 5 + src/analytics/frontend/service/__main__.py | 5 + src/common/tools/database/GenericEngine.py | 4 +- .../database/models/enums/KpiSampleType.py | 17 +- src/kpi_manager/tests/test_messages.py | 10 +- src/telemetry/backend/Dockerfile | 4 + .../backend/collector_api/_Collector.py | 126 +++++++-------- .../service/TelemetryBackendService.py | 80 ++++++---- src/telemetry/backend/service/__main__.py | 3 + src/telemetry/backend/tests/Fixtures.py | 58 +++++++ src/telemetry/backend/tests/add_devices.py | 78 +++++++++ src/telemetry/backend/tests/messages.py | 23 ++- src/telemetry/backend/tests/test_backend.py | 137 ++++++++++++++-- src/telemetry/backend/tests/topology.json | 148 ++++++++++++++++++ .../TelemetryFrontendServiceServicerImpl.py | 54 +------ src/telemetry/frontend/service/__main__.py | 4 + src/telemetry/frontend/tests/Messages.py | 9 +- src/telemetry/frontend/tests/test_frontend.py | 22 +-- 18 files changed, 591 insertions(+), 196 deletions(-) create mode 100644 src/telemetry/backend/tests/Fixtures.py create mode 100644 src/telemetry/backend/tests/add_devices.py create mode 100644 src/telemetry/backend/tests/topology.json diff --git a/src/analytics/backend/service/__main__.py b/src/analytics/backend/service/__main__.py index 533761bab..55bcb53e4 100644 --- a/src/analytics/backend/service/__main__.py +++ b/src/analytics/backend/service/__main__.py @@ -16,8 +16,11 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .AnalyticsBackendService import AnalyticsBackendService +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name @@ -36,6 +39,8 @@ def main(): LOGGER.info('Starting...') + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/analytics/frontend/service/__main__.py b/src/analytics/frontend/service/__main__.py index edf94c4fd..a79b2bbc6 100644 --- a/src/analytics/frontend/service/__main__.py +++ b/src/analytics/frontend/service/__main__.py @@ -18,8 +18,11 @@ from common.Settings import get_log_level, get_metrics_port from .AnalyticsFrontendService import AnalyticsFrontendService from analytics.database.AnalyzerModel import Analyzer as Model from common.tools.database.GenericDatabase import Database +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') LOGGER = None def signal_handler(signal, frame): # pylint: disable=redefined-outer-name @@ -43,6 +46,8 @@ def main(): kpiDBobj.create_database() kpiDBobj.create_tables() + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/common/tools/database/GenericEngine.py b/src/common/tools/database/GenericEngine.py index 1d38a1f44..89b6c2b6d 100644 --- a/src/common/tools/database/GenericEngine.py +++ b/src/common/tools/database/GenericEngine.py @@ -33,8 +33,8 @@ class Engine: CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE) try: engine = sqlalchemy.create_engine(crdb_uri, echo=False) - LOGGER.info(' AnalyzerDB initalized with DB URL: {:}'.format(crdb_uri)) + LOGGER.info(' Database initalized with DB URL: {:}'.format(crdb_uri)) + return engine except: # pylint: disable=bare-except # pragma: no cover LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri))) return None # type: ignore - return engine diff --git a/src/context/service/database/models/enums/KpiSampleType.py b/src/context/service/database/models/enums/KpiSampleType.py index 77b568dcf..66afdb710 100644 --- a/src/context/service/database/models/enums/KpiSampleType.py +++ b/src/context/service/database/models/enums/KpiSampleType.py @@ -22,13 +22,16 @@ from ._GrpcToEnum import grpc_to_enum # BYTES_RECEIVED. If item name does not match, automatic mapping of # proto enums to database enums will fail. class ORM_KpiSampleTypeEnum(enum.Enum): - UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN - PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED - PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED - BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED - LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS - LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS + UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN # 0 + PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED # 101 + PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED # 102 + PACKETS_DROPPED = KpiSampleType.KPISAMPLETYPE_PACKETS_DROPPED # 103 + BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED # 201 + BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED # 202 + BYTES_DROPPED = KpiSampleType.KPISAMPLETYPE_BYTES_DROPPED # 203 + LINK_TOTAL_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_TOTAL_CAPACITY_GBPS # 301 + LINK_USED_CAPACITY_GBPS = KpiSampleType.KPISAMPLETYPE_LINK_USED_CAPACITY_GBPS # 302 + grpc_to_enum__kpi_sample_type = functools.partial( grpc_to_enum, KpiSampleType, ORM_KpiSampleTypeEnum) diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index ebe13b661..811661a4e 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -26,15 +26,15 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() - _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" - # _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666" + _create_kpi_request.kpi_id.kpi_id.uuid = "f974b6cc-095f-4767-b8c1-3457b383fb99" _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED - _create_kpi_request.device_id.device_uuid.uuid = 'DEV2' + _create_kpi_request.device_id.device_uuid.uuid = str(uuid.uuid4()) _create_kpi_request.service_id.service_uuid.uuid = 'SERV2' - _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' - _create_kpi_request.endpoint_id.endpoint_uuid.uuid = 'END1' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC1' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = str(uuid.uuid4()) _create_kpi_request.connection_id.connection_uuid.uuid = 'CON1' _create_kpi_request.link_id.link_uuid.uuid = 'LNK1' return _create_kpi_request diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile index 4bc5605d5..7448f1ebc 100644 --- a/src/telemetry/backend/Dockerfile +++ b/src/telemetry/backend/Dockerfile @@ -62,6 +62,10 @@ RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow +COPY src/context/__init__.py context/__init__.py +COPY src/context/client/. context/client/ +COPY src/kpi_manager/client/. kpi_manager/client/ +COPY src/kpi_manager/__init__.py kpi_manager/__init__.py COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/backend/. telemetry/backend/ diff --git a/src/telemetry/backend/collector_api/_Collector.py b/src/telemetry/backend/collector_api/_Collector.py index d6e711d65..a4bd7f17f 100644 --- a/src/telemetry/backend/collector_api/_Collector.py +++ b/src/telemetry/backend/collector_api/_Collector.py @@ -71,69 +71,69 @@ class _Collector: """ raise NotImplementedError() - def GetInitialConfig(self) -> List[Tuple[str, Any]]: - """ Retrieve initial configuration of entire device. - Returns: - values : List[Tuple[str, Any]] - List of tuples (resource key, resource value) for - resource keys. - """ - raise NotImplementedError() - - def GetConfig(self, resource_keys: List[str] = []) -> \ - List[Tuple[str, Union[Any, None, Exception]]]: - """ Retrieve running configuration of entire device or - selected resource keys. - Parameters: - resource_keys : List[str] - List of keys pointing to the resources to be retrieved. - Returns: - values : List[Tuple[str, Union[Any, None, Exception]]] - List of tuples (resource key, resource value) for - resource keys requested. If a resource is found, - the appropriate value type must be retrieved. - If a resource is not found, None must be retrieved as - value for that resource. In case of Exception, - the Exception must be retrieved as value. - """ - raise NotImplementedError() - - def SetConfig(self, resources: List[Tuple[str, Any]]) -> \ - List[Union[bool, Exception]]: - """ Create/Update configuration for a list of resources. - Parameters: - resources : List[Tuple[str, Any]] - List of tuples, each containing a resource_key pointing the - resource to be modified, and a resource_value containing - the new value to be set. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key changes requested. - Return values must be in the same order as the - resource keys requested. If a resource is properly set, - True must be retrieved; otherwise, the Exception that is - raised during the processing must be retrieved. - """ - raise NotImplementedError() - - def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \ - List[Union[bool, Exception]]: - """ Delete configuration for a list of resources. - Parameters: - resources : List[Tuple[str, Any]] - List of tuples, each containing a resource_key pointing the - resource to be modified, and a resource_value containing - possible additionally required values to locate - the value to be removed. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key deletions requested. - Return values must be in the same order as the resource keys - requested. If a resource is properly deleted, True must be - retrieved; otherwise, the Exception that is raised during - the processing must be retrieved. - """ - raise NotImplementedError() + # def GetInitialConfig(self) -> List[Tuple[str, Any]]: + # """ Retrieve initial configuration of entire device. + # Returns: + # values : List[Tuple[str, Any]] + # List of tuples (resource key, resource value) for + # resource keys. + # """ + # raise NotImplementedError() + + # def GetConfig(self, resource_keys: List[str] = []) -> \ + # List[Tuple[str, Union[Any, None, Exception]]]: + # """ Retrieve running configuration of entire device or + # selected resource keys. + # Parameters: + # resource_keys : List[str] + # List of keys pointing to the resources to be retrieved. + # Returns: + # values : List[Tuple[str, Union[Any, None, Exception]]] + # List of tuples (resource key, resource value) for + # resource keys requested. If a resource is found, + # the appropriate value type must be retrieved. + # If a resource is not found, None must be retrieved as + # value for that resource. In case of Exception, + # the Exception must be retrieved as value. + # """ + # raise NotImplementedError() + + # def SetConfig(self, resources: List[Tuple[str, Any]]) -> \ + # List[Union[bool, Exception]]: + # """ Create/Update configuration for a list of resources. + # Parameters: + # resources : List[Tuple[str, Any]] + # List of tuples, each containing a resource_key pointing the + # resource to be modified, and a resource_value containing + # the new value to be set. + # Returns: + # results : List[Union[bool, Exception]] + # List of results for resource key changes requested. + # Return values must be in the same order as the + # resource keys requested. If a resource is properly set, + # True must be retrieved; otherwise, the Exception that is + # raised during the processing must be retrieved. + # """ + # raise NotImplementedError() + + # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> \ + # List[Union[bool, Exception]]: + # """ Delete configuration for a list of resources. + # Parameters: + # resources : List[Tuple[str, Any]] + # List of tuples, each containing a resource_key pointing the + # resource to be modified, and a resource_value containing + # possible additionally required values to locate + # the value to be removed. + # Returns: + # results : List[Union[bool, Exception]] + # List of results for resource key deletions requested. + # Return values must be in the same order as the resource keys + # requested. If a resource is properly deleted, True must be + # retrieved; otherwise, the Exception that is raised during + # the processing must be retrieved. + # """ + # raise NotImplementedError() def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> \ bool: diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index c392efd1d..40cd1443a 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -16,7 +16,7 @@ import json import time import logging import threading -from typing import Any, Dict +from typing import Any, Dict, Tuple from datetime import datetime, timezone from confluent_kafka import Producer as KafkaProducer from confluent_kafka import Consumer as KafkaConsumer @@ -26,10 +26,15 @@ from common.Settings import get_service_port_grpc from common.method_wrappers.Decorator import MetricsPool from common.tools.kafka.Variables import KafkaConfig, KafkaTopic from common.tools.service.GenericGrpcService import GenericGrpcService +from common.tools.context_queries.Device import get_device +from common.proto.kpi_manager_pb2 import KpiId + +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from context.client.ContextClient import ContextClient from telemetry.backend.collectors.emulated.EmulatedCollector import EmulatedCollector -LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') +LOGGER = logging.getLogger(__name__) +METRICS_POOL = MetricsPool('TelemetryBackend', 'backendService') class TelemetryBackendService(GenericGrpcService): """ @@ -44,7 +49,9 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) - self.collector = EmulatedCollector(address="127.0.0.1", port=8000) + self.collector = EmulatedCollector(address="127.0.0.1", port=8000) + self.context_client = ContextClient() + self.kpi_manager_client = KpiManagerClient() self.active_jobs = {} def install_servicers(self): @@ -116,16 +123,13 @@ class TelemetryBackendService(GenericGrpcService): """ Method to handle collector request. """ - end_points : dict = self.get_endpoints_from_kpi_id(kpi_id) - if not end_points: + device_type, end_points = self.get_endpoint_detail(kpi_id) + # end_points : dict = self.get_endpoints_from_kpi_id(kpi_id) + if end_points is None: LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) - - device_type : str = self.get_device_type_from_kpi_id(kpi_id) - - if device_type == "Unknown": - LOGGER.warning("KPI ID: {:} - Device Type not found. Skipping...".format(kpi_id)) - - if device_type == "EMU-Device": + return + # device_type : str = self.get_device_type_from_kpi_id(kpi_id) + if device_type and "emu" in device_type: LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) subscription = [collector_id, end_points, duration, interval] self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event) @@ -186,28 +190,40 @@ class TelemetryBackendService(GenericGrpcService): except: LOGGER.exception("Error terminating job: {:}".format(job_id)) -# --- Mock Methods --- - def get_endpoints_from_kpi_id(self, kpi_id: str) -> dict: + def get_endpoint_detail(self, kpi_id: str): """ - Method to get endpoints based on kpi_id. + Method to get device_type and endpoint detail based on device_uuid. """ - kpi_endpoints = { - '6e22f180-ba28-4641-b190-2287bf448888': {"uuid": "123e4567-e89b-12d3-a456-42661417ed06", "name": "eth0", "type": "ethernet", "sample_types": [101, 102]}, - '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []}, - '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}, - } - return kpi_endpoints.get(kpi_id, {}) if kpi_id in kpi_endpoints else {} + kpi_id_obj = KpiId() + kpi_id_obj.kpi_id.uuid = kpi_id + kpi_descriptor = self.kpi_manager_client.GetKpiDescriptor(kpi_id_obj) + if not kpi_descriptor: + LOGGER.warning(f"KPI ID: {kpi_id} - Descriptor not found. Skipping...") + return (None, None) - def get_device_type_from_kpi_id(self, kpi_id: str) -> str: - """ - Method to get device type based on kpi_id. - """ - kpi_device_types = { - "123e4567-e89b-12d3-a456-42661type003" : {'device_type': "PKT-Device"}, - "123e4567-e89b-12d3-a456-42661type004" : {'device_type': "OPT-Device"}, - "6e22f180-ba28-4641-b190-2287bf448888" : {'device_type': "EMU-Device"}, - } - return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown") + device_id = kpi_descriptor.device_id.device_uuid.uuid + endpoint_id = kpi_descriptor.endpoint_id.endpoint_uuid.uuid + device = get_device( context_client = self.context_client, + device_uuid = device_id, + include_config_rules = False, + include_components = False, + ) + if device: + for endpoint in device.device_endpoints: + if endpoint.endpoint_id.endpoint_uuid.uuid == endpoint_id: + endpoint_dict = {} + kpi_sample_types = [] + endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid + endpoint_dict["name"] = endpoint.name + endpoint_dict["type"] = endpoint.endpoint_type + for sample_type in endpoint.kpi_sample_types: + kpi_sample_types.append(sample_type) + endpoint_dict["sample_types"] = kpi_sample_types + + return (device.device_type, endpoint_dict) + + LOGGER.warning(f"Device ID: {device_id} - Endpoint ID: {endpoint_id} - Not Found") + return (None, None) def delivery_callback(self, err, msg): if err: diff --git a/src/telemetry/backend/service/__main__.py b/src/telemetry/backend/service/__main__.py index 61ff39721..6e77d5d6c 100644 --- a/src/telemetry/backend/service/__main__.py +++ b/src/telemetry/backend/service/__main__.py @@ -16,6 +16,7 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_log_level, get_metrics_port from .TelemetryBackendService import TelemetryBackendService +from common.tools.kafka.Variables import KafkaTopic terminate = threading.Event() LOGGER = None @@ -34,6 +35,8 @@ def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) + KafkaTopic.create_all_topics() + LOGGER.info('Starting...') # Start metrics server diff --git a/src/telemetry/backend/tests/Fixtures.py b/src/telemetry/backend/tests/Fixtures.py new file mode 100644 index 000000000..59f1b761c --- /dev/null +++ b/src/telemetry/backend/tests/Fixtures.py @@ -0,0 +1,58 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import logging + +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient +from service.client.ServiceClient import ServiceClient +from kpi_manager.client.KpiManagerClient import KpiManagerClient + + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +@pytest.fixture(scope='session') +def context_client(): + _client = ContextClient(host="10.152.183.234") + _client.connect() + LOGGER.info('Yielding Connected ContextClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def device_client(): + _client = DeviceClient(host="10.152.183.95") + _client.connect() + LOGGER.info('Yielding Connected DeviceClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def service_client(): + _client = ServiceClient(host="10.152.183.47") + _client.connect() + LOGGER.info('Yielding Connected DeviceClient...') + yield _client + _client.close() + +@pytest.fixture(scope='session') +def kpi_manager_client(): + _client = KpiManagerClient(host="10.152.183.118") + LOGGER.info('Yielding Connected KpiManagerClient...') + yield _client + _client.close() + LOGGER.info('Closed KpiManagerClient...') diff --git a/src/telemetry/backend/tests/add_devices.py b/src/telemetry/backend/tests/add_devices.py new file mode 100644 index 000000000..9fe02a953 --- /dev/null +++ b/src/telemetry/backend/tests/add_devices.py @@ -0,0 +1,78 @@ +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, os, time +from common.Constants import DEFAULT_CONTEXT_NAME +from common.proto.context_pb2 import ContextId, DeviceOperationalStatusEnum, Empty +from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario +from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string +from common.tools.object_factory.Context import json_context_id +from context.client.ContextClient import ContextClient +from device.client.DeviceClient import DeviceClient +from .Fixtures import context_client, device_client # pylint: disable=unused-import + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +DESCRIPTOR_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'topology.json') +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) + +def load_topology( + context_client : ContextClient, # pylint: disable=redefined-outer-name + device_client : DeviceClient, # pylint: disable=redefined-outer-name +) -> None: + LOGGER.info('Loading Topology...') + validate_empty_scenario(context_client) + descriptor_loader = DescriptorLoader( + descriptors_file=DESCRIPTOR_FILE, context_client=context_client, device_client=device_client) + LOGGER.info('Descriptor Loader Created') + results = descriptor_loader.process() + # LOGGER.info('Descriptor Load Results: {:s}'.format(str(results))) + check_descriptor_load_results(results, descriptor_loader) + # descriptor_loader.validate() + + # Verify the scenario has no services/slices + response = context_client.GetContext(ADMIN_CONTEXT_ID) + assert len(response.service_ids) == 0 + assert len(response.slice_ids) == 0 + +def test_scenario_devices_enabled( + context_client : ContextClient, # pylint: disable=redefined-outer-name +) -> None: + """ + This test validates that the devices are enabled. + """ + DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED + + disabled_devices = list() + response = None + num_devices = -1 + num_devices_enabled, num_retry = 0, 0 + while (num_devices != num_devices_enabled) and (num_retry < 10): + time.sleep(1.0) + response = context_client.ListDevices(Empty()) + num_devices = len(response.devices) + num_devices_enabled = 0 + disabled_devices = list() + for device in response.devices: + if device.device_operational_status == DEVICE_OP_STATUS_ENABLED: + num_devices_enabled += 1 + else: + disabled_devices.append(grpc_message_to_json(device)) + LOGGER.info('Num Devices enabled: {:d}/{:d}'.format(num_devices_enabled, num_devices)) + num_retry += 1 + if num_devices_enabled != num_devices: + LOGGER.info('Disabled Devices: {:s}'.format(str(disabled_devices))) + LOGGER.info('Devices: {:s}'.format(grpc_message_to_json_string(response))) + assert num_devices_enabled == num_devices diff --git a/src/telemetry/backend/tests/messages.py b/src/telemetry/backend/tests/messages.py index f6a2bb247..0d31cd15f 100644 --- a/src/telemetry/backend/tests/messages.py +++ b/src/telemetry/backend/tests/messages.py @@ -15,8 +15,8 @@ import uuid import random from common.proto import telemetry_frontend_pb2 -# from common.proto.kpi_sample_types_pb2 import KpiSampleType -# from common.proto.kpi_manager_pb2 import KpiId +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId def create_collector_request(): _create_collector_request = telemetry_frontend_pb2.Collector() @@ -24,8 +24,25 @@ def create_collector_request(): # _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" - _create_collector_request.duration_s = float(random.randint(8, 16)) + _create_collector_request.duration_s = float(random.randint(30, 50)) # _create_collector_request.duration_s = -1 _create_collector_request.interval_s = float(random.randint(2, 4)) return _create_collector_request +def _create_kpi_descriptor(device_id : str = ""): + _create_kpi_request = KpiDescriptor() + _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_request.kpi_description = "Test Description" + _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED + _create_kpi_request.device_id.device_uuid.uuid = device_id + _create_kpi_request.service_id.service_uuid.uuid = 'SERV3' + _create_kpi_request.slice_id.slice_uuid.uuid = 'SLC3' + _create_kpi_request.endpoint_id.endpoint_uuid.uuid = '36571df2-bac1-5909-a27d-5f42491d2ff0' + _create_kpi_request.connection_id.connection_uuid.uuid = 'CON2' + _create_kpi_request.link_id.link_uuid.uuid = 'LNK2' + return _create_kpi_request + +def _create_kpi_id(kpi_id : str = "fc046641-0c9a-4750-b4d9-9f98401714e2"): + _create_kpi_id_request = KpiId() + _create_kpi_id_request.kpi_id.uuid = kpi_id + return _create_kpi_id_request diff --git a/src/telemetry/backend/tests/test_backend.py b/src/telemetry/backend/tests/test_backend.py index 28b92fb29..1329aa969 100644 --- a/src/telemetry/backend/tests/test_backend.py +++ b/src/telemetry/backend/tests/test_backend.py @@ -16,11 +16,19 @@ import pytest import logging import time from telemetry.backend.service.TelemetryBackendService import TelemetryBackendService -from .messages import create_collector_request -from .Fixtures import context_client, device_client +from .messages import create_collector_request, _create_kpi_descriptor, _create_kpi_id +from .Fixtures import context_client, device_client, service_client, kpi_manager_client from .add_devices import load_topology +from common.tools.context_queries.Topology import get_topology +from common.Constants import DEFAULT_CONTEXT_NAME +from common.tools.context_queries.Device import get_device, add_device_to_topology +# from common.tools.context_queries.EndPoint import get_endpoint_names +from .EndPoint import get_endpoint_names # modofied version of get_endpoint_names +from common.proto.context_pb2 import EndPointId, DeviceId, TopologyId, ContextId , Empty +from common.proto.kpi_manager_pb2 import KpiId LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) ########################### @@ -37,6 +45,120 @@ def log_all_methods(request): yield LOGGER.info(f" <<<<< Finished test: {request.node.name} ") +# # ----- Add Topology ----- +# def test_add_to_topology(context_client, device_client, service_client): +# load_topology(context_client, device_client) + +# # ----- Add Device to Topology ------ +# def test_add_device_to_topology(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "43813baf-195e-5da6-af20-b3d0922e71a7" +# topology_uuid = "c76135e3-24a8-5e92-9bed-c3c9139359c8" +# device_uuid = "69a3a3f0-5237-5f9e-bc96-d450d0c6c03a" +# response = add_device_to_topology( context_client = context_client, +# context_id = context_id, +# topology_uuid = topology_uuid, +# device_uuid = device_uuid +# ) +# LOGGER.info(f"Device added to topology: {response}") +# assert response is True + +# # ----- Get Topology ----- +# def test_get_topology(context_client, device_client): +# response = get_topology(context_client = context_client, topology_uuid = "test1", context_uuid = "test1") +# LOGGER.info(f"Topology: {response}") +# assert response is not None + +# def test_set_kpi_descriptor_and_get_device_id(kpi_manager_client): +# kpi_descriptor = _create_kpi_descriptor("1290fb71-bf15-5528-8b69-2d2fabe1fa18") +# kpi_id = kpi_manager_client.SetKpiDescriptor(kpi_descriptor) +# LOGGER.info(f"KPI Descriptor set: {kpi_id}") +# assert kpi_id is not None + +# response = kpi_manager_client.GetKpiDescriptor(kpi_id) +# # response = kpi_manager_client.GetKpiDescriptor(_create_kpi_id()) + +# assert response is not None +# LOGGER.info(f"KPI Descriptor: {response}") +# LOGGER.info(f"Device Id: {response.device_id.device_uuid.uuid}") +# LOGGER.info(f"Endpoint Id: {response.endpoint_id.endpoint_uuid.uuid}") + +# # ----- Get endpoint detail using device ID ----- +# def test_get_device_details(context_client): +# response = get_device(context_client = context_client, device_uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18", include_config_rules = False, include_components = False) +# if response: +# LOGGER.info(f"Device type: {response.device_type}") +# for endpoint in response.device_endpoints: +# if endpoint.endpoint_id.endpoint_uuid.uuid == '36571df2-bac1-5909-a27d-5f42491d2ff0': +# endpoint_dict = {} +# kpi_sample_types = [] +# # LOGGER.info(f"Endpoint: {endpoint}") +# # LOGGER.info(f"Enpoint_uuid: {endpoint.endpoint_id.endpoint_uuid.uuid}") +# endpoint_dict["uuid"] = endpoint.endpoint_id.endpoint_uuid.uuid +# # LOGGER.info(f"Enpoint_name: {endpoint.name}") +# endpoint_dict["name"] = endpoint.name +# # LOGGER.info(f"Enpoint_type: {endpoint.endpoint_type}") +# endpoint_dict["type"] = endpoint.endpoint_type +# for sample_type in endpoint.kpi_sample_types: +# # LOGGER.info(f"Enpoint_sample_types: {sample_type}") +# kpi_sample_types.append(sample_type) +# endpoint_dict["sample_types"] = kpi_sample_types +# LOGGER.info(f"Extracted endpoint dict: {endpoint_dict}") +# else: +# LOGGER.info(f"Endpoint not matched") +# LOGGER.info(f"Device Type: {type(response)}") +# assert response is not None + +# # ----- List Conetxts ----- +# def test_list_contextIds(context_client): +# empty = Empty() +# response = context_client.ListContexts(empty) +# LOGGER.info(f"Contexts: {response}") +# assert response + +# # ----- List Devices ----- +# def test_list_devices(context_client): +# empty = Empty() +# response = context_client.ListDeviceIds(empty) +# LOGGER.info(f"Devices: {response}") +# assert response + +# ----- Get Endpoints ----- TODO: get_endpoint_names method doesn't return KPI samples types +# def test_get_endpoints(context_client): +# device_id = DeviceId() +# device_id.device_uuid.uuid = "1290fb71-bf15-5528-8b69-2d2fabe1fa18" +# endpoint_id = EndPointId() +# endpoint_id.endpoint_uuid.uuid = "43b817fa-246f-5e0a-a2e3-2aad0b3e16ca" +# endpoint_id.device_id.CopyFrom(device_id) +# response = get_endpoint_names(context_client = context_client, endpoint_ids = [endpoint_id]) +# LOGGER.info(f"Endpoints: {response}") +# assert response is not None + +# # ----- List Topologies ----- +# def test_list_topologies(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# respone = context_client.ListTopologies(context_id) +# LOGGER.info(f"Topologies: {respone}") + +# # ----- Remove Topology ----- +# def test_remove_topology(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# topology_id = TopologyId() +# topology_id.topology_uuid.uuid = "9ef0118c-4bca-5e81-808b-dc8f60e2cda4" +# topology_id.context_id.CopyFrom(context_id) + +# response = context_client.RemoveTopology(topology_id) +# LOGGER.info(f"Topology removed: {response}") + +# # ----- Remove context ----- +# def test_remove_context(context_client): +# context_id = ContextId() +# context_id.context_uuid.uuid = "e7d46baa-d38d-5b72-a082-f344274b63ef" +# response = context_client.RemoveContext(context_id) +# LOGGER.info(f"Context removed: {response}") + @pytest.fixture def telemetryBackend_service(): LOGGER.info('Initializing TelemetryBackendService...') @@ -54,15 +176,6 @@ def telemetryBackend_service(): def test_InitiateCollectorBackend(telemetryBackend_service): LOGGER.info(" Backend Initiated Successfully. Waiting for timer to finish ...") - time.sleep(300) + time.sleep(30) LOGGER.info(" Backend Timer Finished Successfully. ") -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -# def test_validate_kafka_topics(): -# LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") -# response = KafkaTopic.create_all_topics() -# assert isinstance(response, bool) - -# # Call load_topology from the add_devices.py file -# def test_load_topology(context_client, device_client): -# load_topology(context_client, device_client) diff --git a/src/telemetry/backend/tests/topology.json b/src/telemetry/backend/tests/topology.json new file mode 100644 index 000000000..6416130b9 --- /dev/null +++ b/src/telemetry/backend/tests/topology.json @@ -0,0 +1,148 @@ +{ + "contexts": [ + {"context_id": {"context_uuid": {"uuid": "admin"}}} + ], + "topologies": [ + {"topology_id": {"context_id": {"context_uuid": {"uuid": "admin"}}, "topology_uuid": {"uuid": "admin"}}} + ], + "devices": [ + { + "device_id": {"device_uuid": {"uuid": "DE1"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [101, 102], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [103, 102], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [201, 202], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [202, 203], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [201, 203], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [101, 103], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE2"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [101, 103], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [103, 101], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [202, 201], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [203, 201], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [203, 202], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [102 ], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE3"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + }, + { + "device_id": {"device_uuid": {"uuid": "DE4"}}, "device_type": "emu-packet-router", "device_drivers": [0], + "device_endpoints": [], "device_operational_status": 0, "device_config": {"config_rules": [ + {"action": 1, "custom": {"resource_key": "_connect/address", "resource_value": "127.0.0.1"}}, + {"action": 1, "custom": {"resource_key": "_connect/port", "resource_value": "0"}}, + {"action": 1, "custom": {"resource_key": "_connect/settings", "resource_value": {"endpoints": [ + {"sample_types": [], "type": "copper/internal", "uuid": "1/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "1/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/1"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/2"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/3"}, + {"sample_types": [], "type": "copper/internal", "uuid": "2/4"} + ]}}} + ]} + } + ], + "links": [ + + { + "link_id": {"link_uuid": {"uuid": "DE1/2/2==DE2/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE1/2/3==DE3/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE1/2/4==DE4/2/1"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/4"}}, + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/1"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE2/2/1==DE1/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE2/2/3==DE3/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE2/2/4==DE4/2/2"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}}, + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE3/2/1==DE1/2/3"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/3"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE3/2/2==DE2/2/3"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/3"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/2==DE2/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + + { + "link_id": {"link_uuid": {"uuid": "DE4/2/1==DE1/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/1"}}, + {"device_id": {"device_uuid": {"uuid": "DE1"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/2==DE2/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/2"}}, + {"device_id": {"device_uuid": {"uuid": "DE2"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + }, + { + "link_id": {"link_uuid": {"uuid": "DE4/2/3==DE3/2/4"}}, "link_endpoint_ids": [ + {"device_id": {"device_uuid": {"uuid": "DE4"}}, "endpoint_uuid": {"uuid": "2/3"}}, + {"device_id": {"device_uuid": {"uuid": "DE3"}}, "endpoint_uuid": {"uuid": "2/4"}} + ] + } + ] +} diff --git a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py index 1ef8ed46b..955036495 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py +++ b/src/telemetry/frontend/service/TelemetryFrontendServiceServicerImpl.py @@ -143,58 +143,6 @@ class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def delivery_callback(self, err, msg): - """ - Callback function to handle message delivery status. - Args: - err (KafkaError): Kafka error object. - msg (Message): Kafka message object. - """ if err: LOGGER.debug('Message delivery failed: {:}'.format(err)) - # print('Message delivery failed: {:}'.format(err)) - # else: - # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) - # print('Message delivered to topic {:}'.format(msg.topic())) - - # ---------- Independent Method --------------- - # Listener method is independent of any method (same lifetime as service) - # continously listens for responses - def install_servicers(self): - threading.Thread(target=self.ResponseListener).start() - - def ResponseListener(self): - """ - listener for response on Kafka topic. - """ - self.kafka_consumer.subscribe([KafkaTopic.TELEMETRY_RESPONSE.value]) - while True: - receive_msg = self.kafka_consumer.poll(2.0) - if receive_msg is None: - continue - elif receive_msg.error(): - if receive_msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - # print("Consumer error: {:}".format(receive_msg.error())) - LOGGER.error("Consumer error: {:}".format(receive_msg.error())) - break - try: - collector_id = receive_msg.key().decode('utf-8') - if collector_id in ACTIVE_COLLECTORS: - kpi_value = json.loads(receive_msg.value().decode('utf-8')) - self.process_response(collector_id, kpi_value['kpi_id'], kpi_value['kpi_value']) - else: - # print(f"collector id does not match.\nRespone ID: '{collector_id}' --- Active IDs: '{ACTIVE_COLLECTORS}' ") - LOGGER.info("collector id does not match.\nRespone ID: {:} --- Active IDs: {:}".format(collector_id, ACTIVE_COLLECTORS)) - except Exception as e: - # print(f"Error extarcting msg key or value: {str(e)}") - LOGGER.info("Error extarcting msg key or value: {:}".format(e)) - continue - - def process_response(self, collector_id: str, kpi_id: str, kpi_value: Any): - if kpi_id == "-1" and kpi_value == -1: - # print ("Backend termination confirmation for collector id: ", collector_id) - LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) - else: - LOGGER.info("Backend termination confirmation for collector id: {:}".format(collector_id)) - # print ("KPI Value: Collector Id:", collector_id, ", Kpi Id:", kpi_id, ", Value:", kpi_value) + diff --git a/src/telemetry/frontend/service/__main__.py b/src/telemetry/frontend/service/__main__.py index e1b9dba4e..874b34b8c 100644 --- a/src/telemetry/frontend/service/__main__.py +++ b/src/telemetry/frontend/service/__main__.py @@ -18,6 +18,8 @@ from common.Settings import get_log_level, get_metrics_port from .TelemetryFrontendService import TelemetryFrontendService from telemetry.database.TelemetryModel import Collector as Model from common.tools.database.GenericDatabase import Database +from common.tools.kafka.Variables import KafkaTopic + terminate = threading.Event() LOGGER = None @@ -43,6 +45,8 @@ def main(): kpiDBobj.create_database() kpiDBobj.create_tables() + KafkaTopic.create_all_topics() + # Start metrics server metrics_port = get_metrics_port() start_http_server(metrics_port) diff --git a/src/telemetry/frontend/tests/Messages.py b/src/telemetry/frontend/tests/Messages.py index 177bcc0b7..d766f68fa 100644 --- a/src/telemetry/frontend/tests/Messages.py +++ b/src/telemetry/frontend/tests/Messages.py @@ -30,16 +30,17 @@ def create_collector_request(): # _create_collector_request.collector_id.collector_id.uuid = str(uuid.uuid4()) _create_collector_request.collector_id.collector_id.uuid = "efef4d95-1cf1-43c4-9742-95c283dddddd" # _create_collector_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) - _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + # _create_collector_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" + _create_collector_request.kpi_id.kpi_id.uuid = "8c5ca114-cdc7-4081-b128-b667fd159832" # _create_collector_request.duration_s = float(random.randint(8, 16)) - _create_collector_request.duration_s = -1 - _create_collector_request.interval_s = float(random.randint(3, 5)) + _create_collector_request.duration_s = float(random.randint(40, 60)) + _create_collector_request.interval_s = float(random.randint(5, 7)) return _create_collector_request def create_collector_filter(): _create_collector_filter = telemetry_frontend_pb2.CollectorFilter() kpi_id_obj = KpiId() # kpi_id_obj.kpi_id.uuid = str(uuid.uuid4()) - kpi_id_obj.kpi_id.uuid = "a7237fa3-caf4-479d-84b6-4d9f9738fb7f" + kpi_id_obj.kpi_id.uuid = "8c5ca114-cdc7-4081-b128-b667fd159832" _create_collector_filter.kpi_id.append(kpi_id_obj) return _create_collector_filter diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 6c6107152..767a1f73f 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -90,13 +90,6 @@ def telemetryFrontend_client( # Tests Implementation of Telemetry Frontend ########################### -# ------- Re-structuring Test --------- -# --- "test_validate_kafka_topics" should be run before the functionality tests --- -def test_validate_kafka_topics(): - # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") - response = KafkaTopic.create_all_topics() - assert isinstance(response, bool) - # ----- core funtionality test ----- def test_StartCollector(telemetryFrontend_client): # LOGGER.info(' >>> test_StartCollector START: <<< ') @@ -104,18 +97,17 @@ def test_StartCollector(telemetryFrontend_client): LOGGER.debug(str(response)) assert isinstance(response, CollectorId) +def test_SelectCollectors(telemetryFrontend_client): + LOGGER.info(' >>> test_SelectCollectors START: <<< ') + response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) + LOGGER.debug(str(response)) + assert isinstance(response, CollectorList) def test_StopCollector(telemetryFrontend_client): # LOGGER.info(' >>> test_StopCollector START: <<< ') - LOGGER.info("Waiting before termination...") - time.sleep(30) + # LOGGER.info("Waiting before termination...") + # time.sleep(30) response = telemetryFrontend_client.StopCollector(create_collector_id()) LOGGER.debug(str(response)) assert isinstance(response, Empty) -# def test_SelectCollectors(telemetryFrontend_client): -# LOGGER.info(' >>> test_SelectCollectors START: <<< ') -# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) -# LOGGER.debug(str(response)) -# assert isinstance(response, CollectorList) - -- GitLab From 87685c9c58a468f4da960e80148158d8c390528a Mon Sep 17 00:00:00 2001 From: Waleed Akbar Date: Fri, 7 Mar 2025 05:47:47 +0000 Subject: [PATCH 05/12] Updated Telemetry Backend and add monitoring deployment script - Minor changes in Telmetry backend service - new script added to deploy Prom. Gateway, Prometheus and Mimir - Configuration file for Prometheus to scrape Gateway. --- .gitignore | 2 - deploy/monitoring.sh | 53 ---- deploy/new_monitoring.sh | 132 ++++++++++ manifests/monitoring/grafana_values.yaml | 235 ++++++++++++++++++ .../prometheus_values.yaml} | 0 .../emulated/SyntheticMetricsGenerator.py | 2 +- .../service/TelemetryBackendService.py | 7 +- 7 files changed, 372 insertions(+), 59 deletions(-) delete mode 100644 deploy/monitoring.sh create mode 100755 deploy/new_monitoring.sh create mode 100644 manifests/monitoring/grafana_values.yaml rename manifests/{prometheus/prometheus.yaml => monitoring/prometheus_values.yaml} (100%) diff --git a/.gitignore b/.gitignore index db47387c8..235d7768a 100644 --- a/.gitignore +++ b/.gitignore @@ -179,5 +179,3 @@ libyang/ # Other logs **/logs/*.log.* -# PySpark checkpoints -src/analytics/.spark/* diff --git a/deploy/monitoring.sh b/deploy/monitoring.sh deleted file mode 100644 index 18992501a..000000000 --- a/deploy/monitoring.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -RELEASE_NAME="mon-prometheus" -NAMESPACE="monitoring" -CHART_REPO_NAME="prometheus-community" -CHART_REPO_URL="https://prometheus-community.github.io/helm-charts" -CHART_NAME="prometheus" # Chart name within the repo -VALUES_FILE="manifests/prometheus/prometheus.yaml" - -echo ">>> Deploying Prometheus with the following configuration:" -echo "Adding/updating Helm repo: $CHART_REPO_NAME -> $CHART_REPO_URL" -helm repo add "$CHART_REPO_NAME" "$CHART_REPO_URL" || true -helm repo update - -echo "Creating namespace '$NAMESPACE' if it doesn't exist..." -kubectl get namespace "$NAMESPACE" >/dev/null 2>&1 || kubectl create namespace "$NAMESPACE" - -#------------------------------------------------------------------------------ -# 3. Install or upgrade the Prometheus chart -# - If 'VALUES_FILE' is set, it will use it for custom configuration. -# - Otherwise, it will deploy with the chart defaults. -#------------------------------------------------------------------------------ -if [ -n "$VALUES_FILE" ] && [ -f "$VALUES_FILE" ]; then - echo "Installing/Upgrading Prometheus with custom values from $VALUES_FILE..." - helm upgrade --install "$RELEASE_NAME" "$CHART_REPO_NAME/$CHART_NAME" \ - --namespace "$NAMESPACE" \ - --values "$VALUES_FILE" -else - echo "Installing/Upgrading Prometheus with default chart values..." - helm upgrade --install "$RELEASE_NAME" "$CHART_REPO_NAME/$CHART_NAME" \ - --namespace "$NAMESPACE" -fi - -echo "Waiting for Prometheus pods to be ready..." -kubectl rollout status deployment/"$RELEASE_NAME"-server -n "$NAMESPACE" || true - -# echo "Listing deployed resources in namespace '$NAMESPACE':" -# kubectl get all -n "$NAMESPACE" - -echo "<<< Prometheus deployment completed successfully!" diff --git a/deploy/new_monitoring.sh b/deploy/new_monitoring.sh new file mode 100755 index 000000000..ac1f46723 --- /dev/null +++ b/deploy/new_monitoring.sh @@ -0,0 +1,132 @@ +#!/bin/bash +# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +# ----------------------------------------------------------- +# Global namespace for all deployments +# ----------------------------------------------------------- +NAMESPACE="monitoring" +VALUES_FILE_PATH="manifests/monitoring" + +# ----------------------------------------------------------- +# Prometheus Configuration +# ----------------------------------------------------------- +RELEASE_NAME_PROM="mon-prometheus" +CHART_REPO_NAME_PROM="prometheus-community" +CHART_REPO_URL_PROM="https://prometheus-community.github.io/helm-charts" +CHART_NAME_PROM="prometheus" +VALUES_FILE_PROM="$VALUES_FILE_PATH/prometheus_values.yaml" + +# ----------------------------------------------------------- +# Mimir Configuration +# ----------------------------------------------------------- +RELEASE_NAME_MIMIR="mon-mimir" +CHART_REPO_NAME_MIMIR="grafana" +CHART_REPO_URL_MIMIR="https://grafana.github.io/helm-charts" +CHART_NAME_MIMIR="mimir-distributed" +VALUES_FILE_MIMIR="$VALUES_FILE_PATH/mimir_values.yaml" + +# ----------------------------------------------------------- +# Grafana Configuration +# ----------------------------------------------------------- +# RELEASE_NAME_GRAFANA="mon-grafana" +# CHART_REPO_NAME_GRAFANA="grafana" +# CHART_REPO_URL_GRAFANA="https://grafana.github.io/helm-charts" +# CHART_NAME_GRAFANA="grafana" +# VALUES_FILE_GRAFANA="$VALUES_FILE_PATH/grafana_values.yaml" + + +# ----------------------------------------------------------- +# Function to deploy or upgrade a Helm chart +# ----------------------------------------------------------- +deploy_chart() { + local release_name="$1" + local chart_repo_name="$2" + local chart_repo_url="$3" + local chart_name="$4" + local values_file="$5" + local namespace="$6" + + echo ">>> Deploying [${release_name}] from repo [${chart_repo_name}]..." + + # Add or update the Helm repo + echo "Adding/updating Helm repo: $chart_repo_name -> $chart_repo_url" + helm repo add "$chart_repo_name" "$chart_repo_url" || true + helm repo update + + # Create namespace if needed + echo "Creating namespace '$namespace' if it doesn't exist..." + kubectl get namespace "$namespace" >/dev/null 2>&1 || kubectl create namespace "$namespace" + + # Install or upgrade the chart + if [ -n "$values_file" ] && [ -f "$values_file" ]; then + echo "Installing/Upgrading $release_name using custom values from $values_file..." + helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \ + --namespace "$namespace" \ + --values "$values_file" + else + echo "Installing/Upgrading $release_name with default chart values..." + helm upgrade --install "$release_name" "$chart_repo_name/$chart_name" \ + --namespace "$namespace" + fi + + echo "<<< Deployment initiated for [$release_name]." + echo +} + + +# ----------------------------------------------------------- +# Actual Deployments +# ----------------------------------------------------------- + +# 1) Deploy Prometheus +deploy_chart "$RELEASE_NAME_PROM" \ + "$CHART_REPO_NAME_PROM" \ + "$CHART_REPO_URL_PROM" \ + "$CHART_NAME_PROM" \ + "$VALUES_FILE_PROM" \ + "$NAMESPACE" + +# Optionally wait for Prometheus server pod to become ready +kubectl rollout status deployment/"$RELEASE_NAME_PROM-server" -n "$NAMESPACE" || true + + +# 2) Deploy Mimir +deploy_chart "$RELEASE_NAME_MIMIR" \ + "$CHART_REPO_NAME_MIMIR" \ + "$CHART_REPO_URL_MIMIR" \ + "$CHART_NAME_MIMIR" \ + "$VALUES_FILE_MIMIR" \ + "$NAMESPACE" + +# Depending on how Mimir runs (StatefulSets, Deployments), you can wait for +# the correct resource to be ready. For example: +# kubectl rollout status statefulset/"$RELEASE_NAME_MIMIR-distributor" -n "$NAMESPACE" || true + + +# 3) Deploy Grafana +# deploy_chart "$RELEASE_NAME_GRAFANA" \ +# "$CHART_REPO_NAME_GRAFANA" \ +# "$CHART_REPO_URL_GRAFANA" \ +# "$CHART_NAME_GRAFANA" \ +# "$VALUES_FILE_GRAFANA" \ +# "$NAMESPACE" + +# kubectl rollout status deployment/"$RELEASE_NAME_GRAFANA" -n "$NAMESPACE" || true + +# ----------------------------------------------------------- +echo "All deployments completed!" + diff --git a/manifests/monitoring/grafana_values.yaml b/manifests/monitoring/grafana_values.yaml new file mode 100644 index 000000000..a2dbd7971 --- /dev/null +++ b/manifests/monitoring/grafana_values.yaml @@ -0,0 +1,235 @@ +rbac: + create: true + ## Use an existing ClusterRole/Role (depending on rbac.namespaced false/true) + # useExistingRole: name-of-some-role + # useExistingClusterRole: name-of-some-clusterRole + pspEnabled: false + pspUseAppArmor: false + namespaced: false + +serviceAccount: + create: true + name: + nameTest: + ## ServiceAccount labels. + automountServiceAccountToken: false + +replicas: 1 + +## Create a headless service for the deployment +headlessService: false + +## Should the service account be auto mounted on the pod +automountServiceAccountToken: true + +## Create HorizontalPodAutoscaler object for deployment type +# +autoscaling: + enabled: false + minReplicas: 1 + maxReplicas: 3 + targetCPU: "60" + targetMemory: "" + behavior: {} + +deploymentStrategy: + type: RollingUpdate + +readinessProbe: + httpGet: + path: /api/health + port: 3000 + +livenessProbe: + httpGet: + path: /api/health + port: 3000 + initialDelaySeconds: 60 + timeoutSeconds: 30 + failureThreshold: 10 + +image: + registry: docker.io + repository: grafana/grafana + # Overrides the Grafana image tag whose default is the chart appVersion + tag: "" + sha: "" + pullPolicy: IfNotPresent + + ## Optionally specify an array of imagePullSecrets. + ## Secrets must be manually created in the namespace. + ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ + ## Can be templated. + ## + pullSecrets: [] + # - myRegistrKeySecretName + +testFramework: + enabled: true + ## The type of Helm hook used to run this test. Defaults to test. + ## ref: https://helm.sh/docs/topics/charts_hooks/#the-available-hooks + ## + # hookType: test + image: + # -- The Docker registry + registry: docker.io + repository: bats/bats + tag: "v1.4.1" + imagePullPolicy: IfNotPresent + +# dns configuration for pod +dnsPolicy: ~ +dnsConfig: {} + # nameservers: + # - 8.8.8.8 + # options: + # - name: ndots + # value: "2" + # - name: edns0 + +securityContext: + runAsNonRoot: true + runAsUser: 472 + runAsGroup: 472 + fsGroup: 472 + +containerSecurityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + seccompProfile: + type: RuntimeDefault + +# Enable creating the grafana configmap +createConfigmap: true + +downloadDashboardsImage: + registry: docker.io + repository: curlimages/curl + tag: 8.9.1 + sha: "" + pullPolicy: IfNotPresent + +downloadDashboards: + env: {} + envFromSecret: "" + resources: {} + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + seccompProfile: + type: RuntimeDefault + envValueFrom: {} + # ENV_NAME: + # configMapKeyRef: + # name: configmap-name + # key: value_key + +## Pod Annotations +# podAnnotations: {} + +## ConfigMap Annotations +# configMapAnnotations: {} + # argocd.argoproj.io/sync-options: Replace=true + +## Pod Labels +# podLabels: {} + +podPortName: grafana +gossipPortName: gossip +## Deployment annotations +# annotations: {} + +service: + enabled: true + type: NodePort + port: 80 + targetPort: 3000 + nodePort: 30080 + portName: service + +## Enable persistence using Persistent Volume Claims +## ref: https://kubernetes.io/docs/user-guide/persistent-volumes/ +## +persistence: + type: pvc + enabled: true + # storageClassName: default + accessModes: + - ReadWriteOnce + size: 10Gi + # annotations: {} + finalizers: + - kubernetes.io/pvc-protection + + disableWarning: false + + ## If 'lookupVolumeName' is set to true, Helm will attempt to retrieve + ## the current value of 'spec.volumeName' and incorporate it into the template. + lookupVolumeName: true + +# Administrator credentials when not using an existing secret (see below) +adminUser: admin +# adminPassword: strongpassword + +# Use an existing secret for the admin user. +admin: + ## Name of the secret. Can be templated. + existingSecret: "" + userKey: admin-user + passwordKey: admin-password + +## Configure grafana datasources +## ref: http://docs.grafana.org/administration/provisioning/#datasources +## +datasources: + datasources.yaml: + apiVersion: 1 + datasources: + - name: Prometheus + type: prometheus + url: http://mon-prometheus-server.monitoring.svc.cluster.local + access: proxy + isDefault: true + - name: Mimir + type: prometheus + url: http://mimir-nginx.mon-mimir.svc:80/prometheus + access: proxy + isDefault: false + +## Grafana's primary configuration +## NOTE: values in map will be converted to ini format +## ref: http://docs.grafana.org/installation/configuration/ +## +grafana.ini: + paths: + data: /var/lib/grafana/ + logs: /var/log/grafana + plugins: /var/lib/grafana/plugins + provisioning: /etc/grafana/provisioning + analytics: + check_for_updates: true + log: + mode: console + grafana_net: + url: https://grafana.net + server: + domain: "{{ if (and .Values.ingress.enabled .Values.ingress.hosts) }}{{ tpl (.Values.ingress.hosts | first) . }}{{ else }}''{{ end }}" + +## Number of old ReplicaSets to retain +## +revisionHistoryLimit: 5 + +# assertNoLeakedSecrets is a helper function defined in _helpers.tpl that checks if secret +# values are not exposed in the rendered grafana.ini configmap. It is enabled by default. +# +# To pass values into grafana.ini without exposing them in a configmap, use variable expansion: +# https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#variable-expansion +# +# Alternatively, if you wish to allow secret values to be exposed in the rendered grafana.ini configmap, +# you can disable this check by setting assertNoLeakedSecrets to false. +assertNoLeakedSecrets: true + diff --git a/manifests/prometheus/prometheus.yaml b/manifests/monitoring/prometheus_values.yaml similarity index 100% rename from manifests/prometheus/prometheus.yaml rename to manifests/monitoring/prometheus_values.yaml diff --git a/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py b/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py index a01e2c0e6..77d998432 100644 --- a/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py +++ b/src/telemetry/backend/collectors/emulated/SyntheticMetricsGenerator.py @@ -98,7 +98,7 @@ class SyntheticMetricsGenerator(): return (time.time(), resource_key, requested_metrics) - def metric_id_mapper(self, sample_type_ids, metric_dict): + def metric_id_mapper(self, sample_type_ids, metric_dict): # TODO: Add a dynamic mappper from kpi_sample_type ID to name... """ Maps the sample type IDs to the corresponding metric names. diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 40cd1443a..3aeee8238 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -49,7 +49,7 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) - self.collector = EmulatedCollector(address="127.0.0.1", port=8000) + self.collector = None self.context_client = ContextClient() self.kpi_manager_client = KpiManagerClient() self.active_jobs = {} @@ -124,11 +124,11 @@ class TelemetryBackendService(GenericGrpcService): Method to handle collector request. """ device_type, end_points = self.get_endpoint_detail(kpi_id) - # end_points : dict = self.get_endpoints_from_kpi_id(kpi_id) + if end_points is None: LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) return - # device_type : str = self.get_device_type_from_kpi_id(kpi_id) + if device_type and "emu" in device_type: LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) subscription = [collector_id, end_points, duration, interval] @@ -139,6 +139,7 @@ class TelemetryBackendService(GenericGrpcService): def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event): # EmulatedCollector + self.collector = EmulatedCollector(address="127.0.0.1", port=8000) self.collector.Connect() if not self.collector.SubscribeState(subscription): LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) -- GitLab From c688687d990fad4bfb8364af58ad05669d00b66c Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 10 Mar 2025 13:25:40 +0000 Subject: [PATCH 06/12] Minor bug fixes --- src/kpi_value_writer/service/KpiValueWriter.py | 10 ++++------ src/telemetry/backend/Dockerfile | 2 ++ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/kpi_value_writer/service/KpiValueWriter.py b/src/kpi_value_writer/service/KpiValueWriter.py index 25b8ca2e8..74291bba3 100644 --- a/src/kpi_value_writer/service/KpiValueWriter.py +++ b/src/kpi_value_writer/service/KpiValueWriter.py @@ -55,11 +55,9 @@ class KpiValueWriter(GenericGrpcService): if raw_kpi is None: continue elif raw_kpi.error(): - if raw_kpi.error().code() == KafkaError._PARTITION_EOF: - continue - else: + if raw_kpi.error().code() != KafkaError._PARTITION_EOF: print("Consumer error: {}".format(raw_kpi.error())) - continue + continue try: kpi_value = json.loads(raw_kpi.value().decode('utf-8')) LOGGER.info("Received KPI : {:}".format(kpi_value)) @@ -79,5 +77,5 @@ class KpiValueWriter(GenericGrpcService): metric_writer.create_and_expose_cooked_kpi(kpi_descriptor_object, kpi_value) else: LOGGER.info("No KPI Descriptor found in Database for Kpi ID: {:}".format(kpi_id)) - except Exception as e: - LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e)) + except: + LOGGER.exception("Unable to get KpiDescriptor") diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile index 7448f1ebc..81f8bba4b 100644 --- a/src/telemetry/backend/Dockerfile +++ b/src/telemetry/backend/Dockerfile @@ -64,6 +64,8 @@ RUN python3 -m pip install -r requirements.txt WORKDIR /var/teraflow COPY src/context/__init__.py context/__init__.py COPY src/context/client/. context/client/ +COPY src/device/__init__.py device/__init__.py +COPY src/device/client/. device/client/ COPY src/kpi_manager/client/. kpi_manager/client/ COPY src/kpi_manager/__init__.py kpi_manager/__init__.py COPY src/telemetry/__init__.py telemetry/__init__.py -- GitLab From 2d4b9d8c1ef7978aeb69f287987b2a93f67665bd Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 10 Mar 2025 13:27:15 +0000 Subject: [PATCH 07/12] Renamed new_monitoring.sh to monitoring.sh --- deploy/{new_monitoring.sh => monitoring.sh} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename deploy/{new_monitoring.sh => monitoring.sh} (100%) diff --git a/deploy/new_monitoring.sh b/deploy/monitoring.sh similarity index 100% rename from deploy/new_monitoring.sh rename to deploy/monitoring.sh -- GitLab From cef03c41983bee4d572d018c949ade6bb14886c7 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 10 Mar 2025 13:29:35 +0000 Subject: [PATCH 08/12] Disable for now Mimir --- deploy/monitoring.sh | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/deploy/monitoring.sh b/deploy/monitoring.sh index ac1f46723..6fa633a37 100755 --- a/deploy/monitoring.sh +++ b/deploy/monitoring.sh @@ -33,11 +33,11 @@ VALUES_FILE_PROM="$VALUES_FILE_PATH/prometheus_values.yaml" # ----------------------------------------------------------- # Mimir Configuration # ----------------------------------------------------------- -RELEASE_NAME_MIMIR="mon-mimir" -CHART_REPO_NAME_MIMIR="grafana" -CHART_REPO_URL_MIMIR="https://grafana.github.io/helm-charts" -CHART_NAME_MIMIR="mimir-distributed" -VALUES_FILE_MIMIR="$VALUES_FILE_PATH/mimir_values.yaml" +# RELEASE_NAME_MIMIR="mon-mimir" +# CHART_REPO_NAME_MIMIR="grafana" +# CHART_REPO_URL_MIMIR="https://grafana.github.io/helm-charts" +# CHART_NAME_MIMIR="mimir-distributed" +# VALUES_FILE_MIMIR="$VALUES_FILE_PATH/mimir_values.yaml" # ----------------------------------------------------------- # Grafana Configuration @@ -105,12 +105,12 @@ kubectl rollout status deployment/"$RELEASE_NAME_PROM-server" -n "$NAMESPACE" || # 2) Deploy Mimir -deploy_chart "$RELEASE_NAME_MIMIR" \ - "$CHART_REPO_NAME_MIMIR" \ - "$CHART_REPO_URL_MIMIR" \ - "$CHART_NAME_MIMIR" \ - "$VALUES_FILE_MIMIR" \ - "$NAMESPACE" +# deploy_chart "$RELEASE_NAME_MIMIR" \ +# "$CHART_REPO_NAME_MIMIR" \ +# "$CHART_REPO_URL_MIMIR" \ +# "$CHART_NAME_MIMIR" \ +# "$VALUES_FILE_MIMIR" \ +# "$NAMESPACE" # Depending on how Mimir runs (StatefulSets, Deployments), you can wait for # the correct resource to be ready. For example: -- GitLab From 5c64c58bc6d89d767ed5c894e6ab0585e6beb1f9 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 10 Mar 2025 13:40:46 +0000 Subject: [PATCH 09/12] KPI Manager: - Corrected exception handling --- .../service/KpiManagerServiceServicerImpl.py | 85 ++++++++----------- 1 file changed, 34 insertions(+), 51 deletions(-) diff --git a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py index 1dd214506..38e6a1fe1 100644 --- a/src/kpi_manager/service/KpiManagerServiceServicerImpl.py +++ b/src/kpi_manager/service/KpiManagerServiceServicerImpl.py @@ -18,7 +18,7 @@ from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_m from common.proto.context_pb2 import Empty from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList -# from kpi_manager.database.Kpi_DB import KpiDB +from common.method_wrappers.ServiceExceptions import NotFoundException from kpi_manager.database.KpiDB import KpiDB from kpi_manager.database.KpiModel import Kpi as KpiModel @@ -31,65 +31,48 @@ class KpiManagerServiceServicerImpl(KpiManagerServiceServicer): self.kpi_db_obj = KpiDB(KpiModel) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def SetKpiDescriptor(self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore - ) -> KpiId: # type: ignore + def SetKpiDescriptor( + self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore + ) -> KpiId: # type: ignore response = KpiId() LOGGER.info("Received gRPC message object: {:}".format(request)) - try: - kpi_to_insert = KpiModel.convert_KpiDescriptor_to_row(request) - if(self.kpi_db_obj.add_row_to_db(kpi_to_insert)): - response.kpi_id.uuid = request.kpi_id.kpi_id.uuid - # LOGGER.info("Added Row: {:}".format(response)) - return response - except Exception as e: - LOGGER.info("Unable to create KpiModel class object. {:}".format(e)) - + kpi_to_insert = KpiModel.convert_KpiDescriptor_to_row(request) + if self.kpi_db_obj.add_row_to_db(kpi_to_insert): + response.kpi_id.uuid = request.kpi_id.kpi_id.uuid + # LOGGER.info("Added Row: {:}".format(response)) + return response + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore - ) -> KpiDescriptor: # type: ignore + def GetKpiDescriptor( + self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore + ) -> KpiDescriptor: # type: ignore response = KpiDescriptor() - print("--> Received gRPC message object: {:}".format(request)) LOGGER.info("Received gRPC message object: {:}".format(request)) - try: - kpi_id_to_search = request.kpi_id.uuid - row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) - if row is None: - print ('No matching row found for kpi id: {:}'.format(kpi_id_to_search)) - LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search)) - return Empty() - else: - response = KpiModel.convert_row_to_KpiDescriptor(row) - return response - except Exception as e: - print ('Unable to search kpi id. {:}'.format(e)) - LOGGER.info('Unable to search kpi id. {:}'.format(e)) - raise e + kpi_id_to_search = request.kpi_id.uuid + row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) + if row is None: + LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search)) + raise NotFoundException('KpiDescriptor', kpi_id_to_search) + response = KpiModel.convert_row_to_KpiDescriptor(row) + return response @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def DeleteKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore - ) -> Empty: # type: ignore + def DeleteKpiDescriptor( + self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore + ) -> Empty: # type: ignore LOGGER.info("Received gRPC message object: {:}".format(request)) - try: - kpi_id_to_search = request.kpi_id.uuid - self.kpi_db_obj.delete_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) - except Exception as e: - LOGGER.info('Unable to search kpi id. {:}'.format(e)) - finally: - return Empty() + kpi_id_to_search = request.kpi_id.uuid + self.kpi_db_obj.delete_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search) + return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def SelectKpiDescriptor(self, filter: KpiDescriptorFilter, grpc_context: grpc.ServicerContext # type: ignore - ) -> KpiDescriptorList: # type: ignore + def SelectKpiDescriptor( + self, filter: KpiDescriptorFilter, grpc_context: grpc.ServicerContext # type: ignore + ) -> KpiDescriptorList: # type: ignore LOGGER.info("Received gRPC message object: {:}".format(filter)) response = KpiDescriptorList() - try: - rows = self.kpi_db_obj.select_with_filter(KpiModel, filter) - except Exception as e: - LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e)) - try: - for row in rows: - kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row) - response.kpi_descriptor_list.append(kpiDescriptor_obj) - return response - except Exception as e: - LOGGER.info('Unable to process filter response {:}'.format(e)) + rows = self.kpi_db_obj.select_with_filter(KpiModel, filter) + for row in rows: + kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row) + response.kpi_descriptor_list.append(kpiDescriptor_obj) + return response -- GitLab From f0a69d8f14e1012065ea0d842b71edb9269ecc0a Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 10 Mar 2025 13:42:39 +0000 Subject: [PATCH 10/12] Telemetry Backend: - Added missing Dockerfile dependencies --- src/telemetry/backend/Dockerfile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile index 81f8bba4b..a0fa2f803 100644 --- a/src/telemetry/backend/Dockerfile +++ b/src/telemetry/backend/Dockerfile @@ -68,6 +68,8 @@ COPY src/device/__init__.py device/__init__.py COPY src/device/client/. device/client/ COPY src/kpi_manager/client/. kpi_manager/client/ COPY src/kpi_manager/__init__.py kpi_manager/__init__.py +COPY src/service/__init__.py service/__init__.py +COPY src/service/client/. service/client/ COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/backend/. telemetry/backend/ -- GitLab From 469032c3e71d7765201155e918fc6ee0d3d65d86 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 10 Mar 2025 13:58:15 +0000 Subject: [PATCH 11/12] CI/CD pipeline code fixes --- src/kpi_manager/tests/test_messages.py | 4 ++-- src/kpi_value_writer/service/MetricWriterToPrometheus.py | 6 ++++-- src/telemetry/backend/Dockerfile | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/kpi_manager/tests/test_messages.py b/src/kpi_manager/tests/test_messages.py index 811661a4e..094c56df8 100644 --- a/src/kpi_manager/tests/test_messages.py +++ b/src/kpi_manager/tests/test_messages.py @@ -26,9 +26,9 @@ def create_kpi_id_request(): def create_kpi_descriptor_request(descriptor_name: str = "Test_name"): _create_kpi_request = kpi_manager_pb2.KpiDescriptor() - # _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + _create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4()) # _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888" - _create_kpi_request.kpi_id.kpi_id.uuid = "f974b6cc-095f-4767-b8c1-3457b383fb99" + # _create_kpi_request.kpi_id.kpi_id.uuid = "f974b6cc-095f-4767-b8c1-3457b383fb99" _create_kpi_request.kpi_description = descriptor_name _create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED _create_kpi_request.device_id.device_uuid.uuid = str(uuid.uuid4()) diff --git a/src/kpi_value_writer/service/MetricWriterToPrometheus.py b/src/kpi_value_writer/service/MetricWriterToPrometheus.py index 3238516c9..595d025b3 100644 --- a/src/kpi_value_writer/service/MetricWriterToPrometheus.py +++ b/src/kpi_value_writer/service/MetricWriterToPrometheus.py @@ -51,8 +51,10 @@ class MetricWriterToPrometheus: 'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid, 'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid, 'link_id' : kpi_descriptor.link_id.link_uuid.uuid, - 'time_stamp' : kpi_value["time_stamp"], - 'kpi_value' : kpi_value["kpi_value"] + 'time_stamp' : kpi_value.timestamp.timestamp, + #'time_stamp' : kpi_value["time_stamp"], + 'kpi_value' : kpi_value.kpi_value_type.floatVal + #'kpi_value' : kpi_value["kpi_value"] } LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi)) return cooked_kpi diff --git a/src/telemetry/backend/Dockerfile b/src/telemetry/backend/Dockerfile index a0fa2f803..07459986d 100644 --- a/src/telemetry/backend/Dockerfile +++ b/src/telemetry/backend/Dockerfile @@ -70,6 +70,8 @@ COPY src/kpi_manager/client/. kpi_manager/client/ COPY src/kpi_manager/__init__.py kpi_manager/__init__.py COPY src/service/__init__.py service/__init__.py COPY src/service/client/. service/client/ +COPY src/slice/__init__.py slice/__init__.py +COPY src/slice/client/. slice/client/ COPY src/telemetry/__init__.py telemetry/__init__.py COPY src/telemetry/backend/. telemetry/backend/ -- GitLab From 17228dd56138c9bdd77c58cd772229315e12b7e0 Mon Sep 17 00:00:00 2001 From: gifrerenom Date: Mon, 10 Mar 2025 14:16:03 +0000 Subject: [PATCH 12/12] CI/CD pipeline code fixes --- src/kpi_manager/tests/test_kpi_manager.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/kpi_manager/tests/test_kpi_manager.py b/src/kpi_manager/tests/test_kpi_manager.py index fedc3f94c..17a1c8d77 100755 --- a/src/kpi_manager/tests/test_kpi_manager.py +++ b/src/kpi_manager/tests/test_kpi_manager.py @@ -13,6 +13,7 @@ # limitations under the License. +import grpc import os, pytest import logging from typing import Union @@ -109,13 +110,19 @@ def test_DeleteKpiDescriptor(kpi_manager_client): LOGGER.info(" >>> test_DeleteKpiDescriptor: START <<< ") # adding KPI response_id = kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request()) + # deleting KPI del_response = kpi_manager_client.DeleteKpiDescriptor(response_id) - # select KPI - kpi_manager_client.GetKpiDescriptor(response_id) LOGGER.info("Response of delete method gRPC message object: {:}".format(del_response)) assert isinstance(del_response, Empty) + # select KPI and check it does not exist + with pytest.raises(grpc.RpcError) as e: + kpi_manager_client.GetKpiDescriptor(response_id) + assert e.value.code() == grpc.StatusCode.NOT_FOUND + MSG = 'KpiDescriptor({:s}) not found' + assert e.value.details() == MSG.format(response_id.kpi_id.uuid) + def test_GetKpiDescriptor(kpi_manager_client): LOGGER.info(" >>> test_GetKpiDescriptor: START <<< ") # adding KPI @@ -123,11 +130,6 @@ def test_GetKpiDescriptor(kpi_manager_client): # get KPI response = kpi_manager_client.GetKpiDescriptor(response_id) LOGGER.info("Response gRPC message object: {:}".format(response)) - - LOGGER.info(" >>> calling GetKpiDescriptor with random ID") - rand_response = kpi_manager_client.GetKpiDescriptor(create_kpi_id_request()) - LOGGER.info("Response gRPC message object: {:}".format(rand_response)) - assert isinstance(response, KpiDescriptor) def test_SelectKpiDescriptor(kpi_manager_client): -- GitLab