diff --git a/src/telemetry/backend/collector_api/_Collector.py b/src/telemetry/backend/collector_api/_Collector.py index ec4ba943c90de8a8d683d1e7a9dd9d48865b5edf..d6e711d65c763cb3cefd36f1798f5c97656f0a92 100644 --- a/src/telemetry/backend/collector_api/_Collector.py +++ b/src/telemetry/backend/collector_api/_Collector.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading +import queue from typing import Any, Iterator, List, Optional, Tuple, Union # Special resource names to request to the collector to retrieve the specified @@ -135,31 +135,25 @@ class _Collector: """ raise NotImplementedError() - def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> \ + def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> \ + bool: + """ Subscribe to state information of the entire device or selected resources. + Subscriptions are incremental, and the collector should keep track of requested resources. + List of tuples, each containing: + - resource_id (str): Identifier pointing to the resource to be subscribed. + - resource_dict (dict): Dictionary containing resource name, KPI to be subscribed, and type. + - sampling_duration (float): Duration (in seconds) for how long monitoring should last. + - sampling_interval (float): Desired monitoring interval (in seconds) for the specified resource. + List of results for the requested resource key subscriptions. + The return values are in the same order as the requested resource keys. + - True if a resource is successfully subscribed. + - Exception if an error occurs during the subscription process. List[Union[bool, Exception]]: - """ Subscribe to state information of entire device or - selected resources. Subscriptions are incremental. - Collector should keep track of requested resources. - Parameters: - subscriptions : List[Tuple[str, float, float]] - List of tuples, each containing a resource_key pointing the - resource to be subscribed, a sampling_duration, and a - sampling_interval (both in seconds with float - representation) defining, respectively, for how long - monitoring should last, and the desired monitoring interval - for the resource specified. - Returns: - results : List[Union[bool, Exception]] - List of results for resource key subscriptions requested. - Return values must be in the same order as the resource keys - requested. If a resource is properly subscribed, - True must be retrieved; otherwise, the Exception that is - raised during the processing must be retrieved. - """ + """ raise NotImplementedError() - def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) \ - -> List[Union[bool, Exception]]: + def UnsubscribeState(self, resource_key: str) \ + -> bool: """ Unsubscribe from state information of entire device or selected resources. Subscriptions are incremental. Collector should keep track of requested resources. @@ -182,7 +176,7 @@ class _Collector: raise NotImplementedError() def GetState( - self, blocking=False, terminate : Optional[threading.Event] = None + self, duration : int, blocking=False, terminate: Optional[queue.Queue] = None ) -> Iterator[Tuple[float, str, Any]]: """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be called once and will diff --git a/src/telemetry/backend/collectors/emulated/EmulatedCollector.py b/src/telemetry/backend/collectors/emulated/EmulatedCollector.py index 90be013368c5aa80dcb52c2394e8b74f9d74b6f4..48102a943f54eead9b0119b2839faaa123e1cb51 100644 --- a/src/telemetry/backend/collectors/emulated/EmulatedCollector.py +++ b/src/telemetry/backend/collectors/emulated/EmulatedCollector.py @@ -15,10 +15,7 @@ import pytz import queue import logging -import uuid -import json from anytree import Node, Resolver -from apscheduler.events import EVENT_JOB_ADDED, EVENT_JOB_REMOVED from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor @@ -36,10 +33,6 @@ class EmulatedCollector(_Collector): """ def __init__(self, address: str, port: int, **settings): super().__init__('emulated_collector', address, port, **settings) - self._initial_config = Node('root') # Tree structure for initial config - self._running_config = Node('root') # Tree structure for running config - self._subscriptions = Node('subscriptions') # Tree for state subscriptions - self._resolver = Resolver() # For path resolution in tree structures self._out_samples = queue.Queue() # Queue to hold synthetic state samples self._synthetic_data = SyntheticMetricsGenerator(metric_queue=self._out_samples) # Placeholder for synthetic data generator self._scheduler = BackgroundScheduler(daemon=True) @@ -48,8 +41,8 @@ class EmulatedCollector(_Collector): executors = {'default': ThreadPoolExecutor(max_workers=1)}, timezone = pytz.utc ) - self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) - self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) + # self._scheduler.add_listener(self._listener_job_added_to_subscription_tree, EVENT_JOB_ADDED) + # self._scheduler.add_listener(self._listener_job_removed_from_subscription_tree, EVENT_JOB_REMOVED) self._helper_methods = EmulatedCollectorHelper() self.logger = logging.getLogger(__name__) @@ -77,73 +70,56 @@ class EmulatedCollector(_Collector): if not self.connected: raise RuntimeError("Collector is not connected. Please connect before performing operations.") - def SubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + def SubscribeState(self, subscriptions: List[Tuple[str, dict, float, float]]) -> bool: self._require_connection() - results = [] - for resource_key, duration, interval in subscriptions: - resource_key = self._helper_methods.validate_resource_key(resource_key) # Validate the endpoint name - self.logger.info(f"1. Subscribing to {resource_key} with duration {duration}s and interval {interval}s") + try: + job_id, endpoint, duration, interval = subscriptions + except: + self.logger.exception(f"Invalid subscription format: {subscriptions}") + return False + if endpoint: + self.logger.info(f"Subscribing to {endpoint} with duration {duration}s and interval {interval}s") try: - self._resolver.get(self._running_config, resource_key) # Verify if the resource key exists in the running configuration - self.logger.info(f"Resource key {resource_key} exists in the configuration.") - resource_value = json.loads(self._resolver.get(self._running_config, resource_key).value) - if resource_value is not None: - sample_type_ids = resource_value['sample_types'] - self.logger.info(f"Sample type IDs for {resource_key}: {sample_type_ids}") - if len(sample_type_ids) == 0: - self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") - results.append(False) - continue - else: - self.logger.warning(f"No sample types found for {resource_key}. Skipping subscription.") - results.append(False) - continue + sample_type_ids = endpoint['sample_types'] # type: ignore + resource_name = endpoint['name'] # type: ignore # Add the job to the scheduler - job_id = f"{resource_key}-{uuid.uuid4()}" self._scheduler.add_job( self._generate_sample, 'interval', seconds=interval, - args=[resource_key, sample_type_ids], - id=job_id, + args=[resource_name, sample_type_ids], + id=f"{job_id}", replace_existing=True, end_date=datetime.now(pytz.utc) + timedelta(seconds=duration) ) - self.logger.info(f"Job added to scheduler for resource key {resource_key} with duration {duration}s and interval {interval}s") - results.append(True) - except Exception as e: - self.logger.error(f"Failed to verify resource key or add job: {e}") - results.append(e) - return results + self.logger.info(f"Job added to scheduler for resource key {resource_name} with duration {duration}s and interval {interval}s") + return True + except: + self.logger.exception(f"Failed to verify resource key or add job:") + return False + else: + self.logger.warning(f"No sample types found for {endpoint}. Skipping subscription.") + return False - def UnsubscribeState(self, subscriptions: List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: + def UnsubscribeState(self, resource_key: str) -> bool: self._require_connection() - results = [] - for resource_key, _, _ in subscriptions: - resource_key = self._helper_methods.validate_resource_key(resource_key) - try: - # Check if job exists - job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] - if not job_ids: - self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") - results.append(False) - continue - # Remove jobs - for job_id in job_ids: - self._scheduler.remove_job(job_id) - - self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") - results.append(True) - except Exception as e: - self.logger.exception(f"Failed to unsubscribe from {resource_key}") - results.append(e) - return results + try: + # Check if job exists + job_ids = [job.id for job in self._scheduler.get_jobs() if resource_key in job.id] + if not job_ids: + self.logger.warning(f"No active jobs found for {resource_key}. It might have already terminated.") + return False + for job_id in job_ids: + self._scheduler.remove_job(job_id) + self.logger.info(f"Unsubscribed from {resource_key} with job IDs: {job_ids}") + return True + except: + self.logger.exception(f"Failed to unsubscribe from {resource_key}") + return False - def GetState(self, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: + def GetState(self, duration : int, blocking: bool = False, terminate: Optional[queue.Queue] = None) -> Iterator[Tuple[float, str, Any]]: self._require_connection() start_time = datetime.now(pytz.utc) - duration = 10 # Duration of the subscription in seconds (as an example) - while True: try: if terminate and not terminate.empty(): @@ -168,283 +144,3 @@ class EmulatedCollector(_Collector): self.logger.debug(f"Executing _generate_sample for resource: {resource_key}") sample = self._synthetic_data.generate_synthetic_data_point(resource_key, sample_type_ids) self._out_samples.put(sample) - -# ------------- Event Listeners (START)----------------- - - def _listener_job_removed_from_subscription_tree(self, event): - if event.job_id: - # Extract the resource key from the job ID - resource_key = event.job_id.split('-')[0] - resource_key = self._helper_methods.validate_resource_key(resource_key) - - # Remove the subscription from the tree - try: - subscription_path = resource_key.split('/') - parent = self._subscriptions - for part in subscription_path: - parent = next((child for child in parent.children if child.name == part), None) - if not parent: - raise ValueError(f"Subscription path '{resource_key}' not found in tree.") - if parent: - parent.parent.children = tuple(child for child in parent.parent.children if child != parent) - self.logger.warning(f"Automatically removed subscription from subscription_tree for {resource_key} after job termination by listener. Maybe due to timeout.") - except Exception as e: - self.logger.warning(f"Failed to remove subscription for {resource_key}: {e}") - - def _listener_job_added_to_subscription_tree(self, event): - try: - job_id = event.job_id - if job_id: - resource_key = job_id.split('-')[0] # Extract resource key from job ID - resource_key = self._helper_methods.validate_resource_key(resource_key) - subscription_path = resource_key.split('/') - parent = self._subscriptions - for part in subscription_path: - node = next((child for child in parent.children if child.name == part), None) - if not node: - node = Node(part, parent=parent) - parent = node - parent.value = { - "job_id": job_id - } - self.logger.info(f"Automatically added subscription for {resource_key} to the subscription_tree by listener.") - except Exception as e: - self.logger.exception("Failed to add subscription to the tree") - -# ------------- Event Listeners (END)----------------- - -#------------------------------------------------------------------------------------- -# ------- The below methods are kept for debugging purposes (test-case) only --------- -#------------------------------------------------------------------------------------- - -# This method can be commented but this will arise an error in the test-case (@pytest.fixture --> connected_configured_collector()). - def SetConfig(self, resources: dict) -> List[Union[bool, Exception]]: # For debugging purposes. - self._require_connection() - results = [] - - # if not isinstance(resources, dict): - # self.logger.error("Invalid configuration format: resources must be a dictionary.") - # raise ValueError("Invalid configuration format. Must be a dictionary.") - if 'config_rules' not in resources or not isinstance(resources['config_rules'], list): - self.logger.error("Invalid configuration format: 'config_rules' key missing or not a list.") - raise ValueError("Invalid configuration format. Must contain a 'config_rules' key with a list of rules.") - - for rule in resources['config_rules']: - try: - if 'action' not in rule or 'custom' not in rule: - raise ValueError(f"Invalid rule format: {rule}") - - action = rule['action'] - custom = rule['custom'] - resource_key = custom.get('resource_key') - resource_value = custom.get('resource_value') - - if not resource_key: - raise ValueError(f"Resource key is missing in rule: {rule}") - - if resource_value is None: - raise ValueError(f"Resource value is None for key: {resource_key}") - if not resource_key: - raise ValueError(f"Resource key is missing in rule: {rule}") - - if action == 1: # Set action - resource_path = self._helper_methods._parse_resource_key(resource_key) - # self.logger.info(f"1. Setting configuration for resource key {resource_key} and resource_path: {resource_path}") - parent = self._running_config - - for part in resource_path[:-1]: - if '[' in part and ']' in part: - base, index = part.split('[', 1) - index = index.rstrip(']') - parent = self._helper_methods._find_or_create_node(index, self._helper_methods._find_or_create_node(base, parent)) - # self.logger.info(f"2a. Creating node: {base}, {index}, {parent}") - elif resource_path[-1] != 'settings': - # self.logger.info(f"2b. Creating node: {part}") - parent = self._helper_methods._find_or_create_node(part, parent) - - final_part = resource_path[-1] - if final_part in ['address', 'port']: - self._helper_methods._create_or_update_node(final_part, parent, resource_value) - self.logger.info(f"Configured: {resource_key} = {resource_value}") - - if resource_key.startswith("_connect/settings"): - parent = self._helper_methods._find_or_create_node("_connect", self._running_config) - settings_node = self._helper_methods._find_or_create_node("settings", parent) - settings_node.value = None # Ensure settings node has None value - endpoints_node = self._helper_methods._find_or_create_node("endpoints", settings_node) - - for endpoint in resource_value.get("endpoints", []): - uuid = endpoint.get("uuid") - uuid = uuid.replace('/', '_') if uuid else None - if uuid: - # self.logger.info(f"3. Creating endpoint: {uuid}, {endpoint}, {endpoints_node}") - self._helper_methods._create_or_update_node(uuid, endpoints_node, endpoint) - self.logger.info(f"Configured endpoint: {uuid} : {endpoint}") - - elif resource_key.startswith("/interface"): - interface_parent = self._helper_methods._find_or_create_node("interface", self._running_config) - name = resource_value.get("name") - name = name.replace('/', '_') if name else None - if name: - self._helper_methods._create_or_update_node(name, interface_parent, resource_value) - self.logger.info(f"Configured interface: {name} : {resource_value}") - # self.logger.info(f"4. Configured interface: {name}") - - results.append(True) - else: - raise ValueError(f"Unsupported action '{action}' in rule: {rule}") - - if resource_value is None: - raise ValueError(f"Resource value is None for key: {resource_key}") - - except Exception as e: - self.logger.exception(f"Failed to apply rule: {rule}") - results.append(e) - - return results - -#----------------------------------- -# ------- EXTRA Methods ------------ -#----------------------------------- - - # def log_active_jobs(self): # For debugging purposes. - # """ - # Logs the IDs of all active jobs. - # This method retrieves the list of active jobs from the scheduler and logs their IDs using the logger. - # """ - # self._require_connection() - # jobs = self._scheduler.get_jobs() - # self.logger.info(f"Active jobs: {[job.id for job in jobs]}") - - # def print_config_tree(self): # For debugging purposes. - # """ - # Reads the configuration using GetConfig and prints it as a hierarchical tree structure. - # """ - # self._require_connection() - - # def print_tree(node, indent=""): - # """ - # Recursively prints the configuration tree. - - # Args: - # node (Node): The current node to print. - # indent (str): The current indentation level. - # """ - # if node.name != "root": # Skip the root node's name - # value = getattr(node, "value", None) - # print(f"{indent}- {node.name}: {json.loads(value) if value else ''}") - - # for child in node.children: - # print_tree(child, indent + " ") - - # print("Configuration Tree:") - # print_tree(self._running_config) - - - # def GetInitialConfig(self) -> List[Tuple[str, Any]]: # comment - # self._require_connection() - # results = [] - # for node in self._initial_config.descendants: - # value = getattr(node, "value", None) - # results.append((node.name, json.loads(value) if value else None)) - # self.logger.info("Retrieved initial configurations") - # return results - - # def GetConfig(self, resource_keys: List[str] = []) -> List[Tuple[str, Union[Any, dict, Exception]]]: # comment - # """ - # Retrieves the configuration for the specified resource keys. - # If no keys are provided, returns the full configuration tree. - - # Args: - # resource_keys (List[str]): A list of keys specifying the configuration to retrieve. - - # Returns: - # List[Tuple[str, Union[Any, dict, Exception]]]: A list of tuples with the resource key and its value, - # subtree, or an exception. - # """ - # self._require_connection() - # results = [] - - # try: - # if not resource_keys: - # # If no specific keys are provided, return the full configuration tree - - # full_tree = self._helper_methods._generate_subtree(self._running_config) - # # full_tree = self._generate_subtree(self._running_config) - # return [("full_configuration", full_tree)] - - # for key in resource_keys: - # try: - # # Parse the resource key - # resource_path = self._helper_methods.(key) - # self.logger.info(f"1. Retrieving configuration for resource path : {resource_path}") - - # # Navigate to the node corresponding to the key - # parent = self._running_config - # for part in resource_path: - # parent = self._find_or_raise_node(part, parent) - - # # Check if the node has a value - # value = getattr(parent, "value", None) - # if value: - # # If a value exists, return it - # results.append((key, json.loads(value))) - # else: - # # If no value, return the subtree of this node - # subtree = self._helper_methods._generate_subtree(parent) - # # subtree = self._generate_subtree(parent) - # results.append((key, subtree)) - - # except Exception as e: - # self.logger.exception(f"Failed to retrieve configuration for key: {key}") - # results.append((key, e)) - - # except Exception as e: - # self.logger.exception("Failed to retrieve configurations") - # results.append(("Error", e)) - - # return results - - # def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: # comment - # self._require_connection() - # results = [] - - # for key in resources: - # try: - # # Parse resource key into parts, handling brackets correctly - # resource_path = self._helper_methods.(key) - - # parent = self._running_config - # for part in resource_path: - # parent = self._find_or_raise_node(part, parent) - - # # Delete the final node - # node_to_delete = parent - # parent = node_to_delete.parent - # parent.children = tuple(child for child in parent.children if child != node_to_delete) - # self.logger.info(f"Deleted configuration for key: {key}") - - # # Handle endpoints structure - # if "interface" in key and "settings" in key: - # interface_name = key.split('[')[-1].split(']')[0] - # endpoints_parent = self._find_or_raise_node("_connect", self._running_config) - # endpoints_node = self._find_or_raise_node("endpoints", endpoints_parent) - # endpoint_to_delete = next((child for child in endpoints_node.children if child.name == interface_name), None) - # if endpoint_to_delete: - # endpoints_node.children = tuple(child for child in endpoints_node.children if child != endpoint_to_delete) - # self.logger.info(f"Removed endpoint entry for interface '{interface_name}'") - - # # Check if parent has no more children and is not the root - # while parent and parent.name != "root" and not parent.children: - # node_to_delete = parent - # parent = node_to_delete.parent - # parent.children = tuple(child for child in parent.children if child != node_to_delete) - # self.logger.info(f"Deleted empty parent node: {node_to_delete.name}") - - # results.append(True) - # except Exception as e: - # self.logger.exception(f"Failed to delete configuration for key: {key}") - # results.append(e) - - # return results - diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 0c515768e9f668aff964cf6dc3f25d5ea84baa4a..c392efd1dbbaa7a31e3bcffd57b93fbd008b198e 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -44,6 +44,7 @@ class TelemetryBackendService(GenericGrpcService): self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(), 'group.id' : 'backend', 'auto.offset.reset' : 'latest'}) + self.collector = EmulatedCollector(address="127.0.0.1", port=8000) self.active_jobs = {} def install_servicers(self): @@ -65,7 +66,7 @@ class TelemetryBackendService(GenericGrpcService): if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue elif receive_msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART: - LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist. May be topic does not have any messages.") + LOGGER.warning(f"Subscribed topic {receive_msg.topic()} does not exist or topic does not have any messages.") continue else: LOGGER.error("Consumer error: {}".format(receive_msg.error())) @@ -77,11 +78,11 @@ class TelemetryBackendService(GenericGrpcService): collector_id = receive_msg.key().decode('utf-8') LOGGER.debug('Recevied Collector: {:} - {:}'.format(collector_id, collector)) - duration = collector.get('duration', -1) + duration = collector.get('duration', 0) if duration == -1 and collector['interval'] == -1: self.TerminateCollector(collector_id) else: - LOGGER.info("Collector ID: {:} - Scheduling...".format(collector_id)) + LOGGER.info("Received Collector ID: {:} - Scheduling...".format(collector_id)) if collector_id not in self.active_jobs: stop_event = threading.Event() self.active_jobs[collector_id] = stop_event @@ -95,13 +96,15 @@ class TelemetryBackendService(GenericGrpcService): )).start() # Stop the Collector after the given duration if duration > 0: - def stop_after_duration(): - time.sleep(duration) - LOGGER.warning(f"Execution duration ({duration}) completed of Collector: {collector_id}") - self.TerminateCollector(collector_id) + def stop_after_duration(completion_time, stop_event): + time.sleep(completion_time) + if not stop_event.is_set(): + LOGGER.warning(f"Execution duration ({completion_time}) completed of Collector: {collector_id}") + self.TerminateCollector(collector_id) duration_thread = threading.Thread( - target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}" + target=stop_after_duration, daemon=True, name=f"stop_after_duration_{collector_id}", + args=(duration, stop_event) ) duration_thread.start() else: @@ -113,7 +116,7 @@ class TelemetryBackendService(GenericGrpcService): """ Method to handle collector request. """ - end_points : list = self.get_endpoints_from_kpi_id(kpi_id) + end_points : dict = self.get_endpoints_from_kpi_id(kpi_id) if not end_points: LOGGER.warning("KPI ID: {:} - Endpoints not found. Skipping...".format(kpi_id)) @@ -125,21 +128,24 @@ class TelemetryBackendService(GenericGrpcService): if device_type == "EMU-Device": LOGGER.info("KPI ID: {:} - Device Type: {:} - Endpoints: {:}".format(kpi_id, device_type, end_points)) subscription = [collector_id, end_points, duration, interval] - self.EmulatedCollectorHandler(subscription, kpi_id, stop_event) + self.EmulatedCollectorHandler(subscription, duration, collector_id, kpi_id, stop_event) else: LOGGER.warning("KPI ID: {:} - Device Type: {:} - Not Supported".format(kpi_id, device_type)) - - def EmulatedCollectorHandler(self, subscription, kpi_id, stop_event): + def EmulatedCollectorHandler(self, subscription, duration, collector_id, kpi_id, stop_event): # EmulatedCollector - collector = EmulatedCollector(address="127.0.0.1", port=8000) - collector.Connect() - while not stop_event.is_set(): - # samples = collector.SubscribeState(subscription) - # LOGGER.debug("KPI: {:} - Value: {:}".format(kpi_id, samples)) - # self.GenerateKpiValue(job_id, kpi_id, samples) - LOGGER.info("Generating KPI Values ...") - time.sleep(1) + + self.collector.Connect() + if not self.collector.SubscribeState(subscription): + LOGGER.warning("KPI ID: {:} - Subscription failed. Skipping...".format(kpi_id)) + else: + while not stop_event.is_set(): + samples = list(self.collector.GetState(duration=duration, blocking=True)) + LOGGER.info("KPI: {:} - Value: {:}".format(kpi_id, samples)) + self.GenerateKpiValue(collector_id, kpi_id, samples) + time.sleep(1) + self.collector.Disconnect() + # self.TerminateCollector(collector_id) # No need to terminate, automatically terminated after duration. def GenerateKpiValue(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): """ @@ -171,12 +177,17 @@ class TelemetryBackendService(GenericGrpcService): if stop_event: stop_event.set() LOGGER.info(f"Job {job_id} terminated.") + if self.collector.UnsubscribeState(job_id): + LOGGER.info(f"Unsubscribed from collector: {job_id}") + else: + LOGGER.warning(f"Failed to unsubscribe from collector: {job_id}") else: LOGGER.warning(f"Job {job_id} not found in active jobs.") except: LOGGER.exception("Error terminating job: {:}".format(job_id)) - def get_endpoints_from_kpi_id(self, kpi_id: str) -> list: +# --- Mock Methods --- + def get_endpoints_from_kpi_id(self, kpi_id: str) -> dict: """ Method to get endpoints based on kpi_id. """ @@ -185,7 +196,7 @@ class TelemetryBackendService(GenericGrpcService): '123e4567-e89b-12d3-a456-426614174001': {"uuid": "123e4567-e89b-12d3-a456-42661417ed07", "name": "eth1", "type": "ethernet", "sample_types": []}, '123e4567-e89b-12d3-a456-426614174002': {"uuid": "123e4567-e89b-12d3-a456-42661417ed08", "name": "13/1/2", "type": "copper", "sample_types": [101, 102, 201, 202]}, } - return [kpi_endpoints.get(kpi_id, {})] if kpi_id in kpi_endpoints else [] + return kpi_endpoints.get(kpi_id, {}) if kpi_id in kpi_endpoints else {} def get_device_type_from_kpi_id(self, kpi_id: str) -> str: """ @@ -198,35 +209,6 @@ class TelemetryBackendService(GenericGrpcService): } return kpi_device_types.get(kpi_id, {}).get('device_type', "Unknown") - - # def TerminateCollectorBackend(self, collector_id): - # LOGGER.debug("Terminating collector backend...") - # if collector_id in self.running_threads: - # thread = self.running_threads[collector_id] - # thread.stop() - # del self.running_threads[collector_id] - # LOGGER.debug("Collector backend terminated. Collector ID: {:}".format(collector_id)) - # self.GenerateCollectorTerminationSignal(collector_id, "-1", -1) # Termination confirmation to frontend. - # else: - # LOGGER.warning('Backend collector {:} not found'.format(collector_id)) - - # def GenerateCollectorTerminationSignal(self, collector_id: str, kpi_id: str, measured_kpi_value: Any): - # """ - # Method to write kpi Termination signat on TELEMETRY_RESPONSE Kafka topic - # """ - # producer = self.kafka_producer - # kpi_value : Dict = { - # "kpi_id" : kpi_id, - # "kpi_value" : measured_kpi_value, - # } - # producer.produce( - # KafkaTopic.TELEMETRY_RESPONSE.value, - # key = collector_id, - # value = json.dumps(kpi_value), - # callback = self.delivery_callback - # ) - # producer.flush() - def delivery_callback(self, err, msg): if err: LOGGER.error('Message delivery failed: {:s}'.format(str(err))) diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index 067925a285f6e6d69b89b518e6a96c1ed495b7e0..6c6107152f950cbe565f109b1757843a5c6165e8 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -15,6 +15,7 @@ import os import pytest import logging +import time from common.Constants import ServiceNameEnum from common.proto.telemetry_frontend_pb2 import CollectorId, CollectorList @@ -42,6 +43,16 @@ os.environ[get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT LOGGER = logging.getLogger(__name__) +@pytest.fixture(autouse=True) +def log_all_methods(request): + ''' + This fixture logs messages before and after each test function runs, indicating the start and end of the test. + The autouse=True parameter ensures that this logging happens automatically for all tests in the module. + ''' + LOGGER.info(f" >>>>> Starting test: {request.node.name} ") + yield + LOGGER.info(f" <<<<< Finished test: {request.node.name} ") + @pytest.fixture(scope='session') def telemetryFrontend_service(): LOGGER.info('Initializing TelemetryFrontendService...') @@ -82,33 +93,29 @@ def telemetryFrontend_client( # ------- Re-structuring Test --------- # --- "test_validate_kafka_topics" should be run before the functionality tests --- def test_validate_kafka_topics(): - LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") + # LOGGER.debug(" >>> test_validate_kafka_topics: START <<< ") response = KafkaTopic.create_all_topics() assert isinstance(response, bool) # ----- core funtionality test ----- def test_StartCollector(telemetryFrontend_client): - LOGGER.info(' >>> test_StartCollector START: <<< ') + # LOGGER.info(' >>> test_StartCollector START: <<< ') response = telemetryFrontend_client.StartCollector(create_collector_request()) LOGGER.debug(str(response)) assert isinstance(response, CollectorId) + def test_StopCollector(telemetryFrontend_client): - LOGGER.info(' >>> test_StopCollector START: <<< ') + # LOGGER.info(' >>> test_StopCollector START: <<< ') + LOGGER.info("Waiting before termination...") + time.sleep(30) response = telemetryFrontend_client.StopCollector(create_collector_id()) LOGGER.debug(str(response)) assert isinstance(response, Empty) -def test_SelectCollectors(telemetryFrontend_client): - LOGGER.info(' >>> test_SelectCollectors START: <<< ') - response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) - LOGGER.debug(str(response)) - assert isinstance(response, CollectorList) - -# # ----- Non-gRPC method tests ----- -# def test_RunResponseListener(): -# LOGGER.info(' >>> test_RunResponseListener START: <<< ') -# TelemetryFrontendServiceObj = TelemetryFrontendServiceServicerImpl() -# response = TelemetryFrontendServiceObj.RunResponseListener() # becasue Method "run_kafka_listener" is not define in frontend.proto +# def test_SelectCollectors(telemetryFrontend_client): +# LOGGER.info(' >>> test_SelectCollectors START: <<< ') +# response = telemetryFrontend_client.SelectCollectors(create_collector_filter()) # LOGGER.debug(str(response)) -# assert isinstance(response, bool) +# assert isinstance(response, CollectorList) +