Loading src/tests/mwc26-f5ga/AI_analytics_engine/clients/influxdb_fetcher.py +16 −26 Original line number Diff line number Diff line Loading @@ -25,7 +25,7 @@ from datetime import datetime, timezone from typing import Any, Dict from common.tools.client.RetryDecorator import delay_exponential, retry from influxdb_client_3 import InfluxDBClient3 from influxdb_client_3 import InfluxDBClient3, Point from ..ai_model.sla_policy import SLAPolicyConfig LOGGER = logging.getLogger(__name__) Loading Loading @@ -65,14 +65,10 @@ class InfluxDBFetcher: self.influxdb_port = influxdb_port self.influxdb_token = influxdb_token self.influxdb_database = influxdb_database # Construct full URL with port for InfluxDB v3 self.influxdb_url = f"http://{influxdb_host}:{influxdb_port}" LOGGER.info( f"InfluxDBFetcher initialized for database '{influxdb_database}' " f"at {self.influxdb_url}" ) LOGGER.info( f"InfluxDBFetcher initialized for database '{influxdb_database}' " f"at {self.influxdb_url}") self._client = InfluxDBClient3( host = self.influxdb_url, token = self.influxdb_token, Loading Loading @@ -274,17 +270,11 @@ class InfluxDBFetcher: ) # TODO: Implement actual InfluxDB write # Example implementation: # from influxdb_client_3 import InfluxDBClient3, Point # client = InfluxDBClient3( # host=self.influxdb_host, # token=self.influxdb_token, # database=self.influxdb_database # ) # point = Point("telemetry_notifications") \ # .tag("status", status) \ # .field("timestamp", timestamp) # client.write(point) point = Point("telemetry_notifications") \ .tag("status", status) \ .field("timestamp", timestamp) self._client.write(point) LOGGER.info("Telemetry notification stored successfully in InfluxDB") return True Loading
src/tests/mwc26-f5ga/AI_analytics_engine/clients/influxdb_fetcher.py +16 −26 Original line number Diff line number Diff line Loading @@ -25,7 +25,7 @@ from datetime import datetime, timezone from typing import Any, Dict from common.tools.client.RetryDecorator import delay_exponential, retry from influxdb_client_3 import InfluxDBClient3 from influxdb_client_3 import InfluxDBClient3, Point from ..ai_model.sla_policy import SLAPolicyConfig LOGGER = logging.getLogger(__name__) Loading Loading @@ -65,14 +65,10 @@ class InfluxDBFetcher: self.influxdb_port = influxdb_port self.influxdb_token = influxdb_token self.influxdb_database = influxdb_database # Construct full URL with port for InfluxDB v3 self.influxdb_url = f"http://{influxdb_host}:{influxdb_port}" LOGGER.info( f"InfluxDBFetcher initialized for database '{influxdb_database}' " f"at {self.influxdb_url}" ) LOGGER.info( f"InfluxDBFetcher initialized for database '{influxdb_database}' " f"at {self.influxdb_url}") self._client = InfluxDBClient3( host = self.influxdb_url, token = self.influxdb_token, Loading Loading @@ -274,17 +270,11 @@ class InfluxDBFetcher: ) # TODO: Implement actual InfluxDB write # Example implementation: # from influxdb_client_3 import InfluxDBClient3, Point # client = InfluxDBClient3( # host=self.influxdb_host, # token=self.influxdb_token, # database=self.influxdb_database # ) # point = Point("telemetry_notifications") \ # .tag("status", status) \ # .field("timestamp", timestamp) # client.write(point) point = Point("telemetry_notifications") \ .tag("status", status) \ .field("timestamp", timestamp) self._client.write(point) LOGGER.info("Telemetry notification stored successfully in InfluxDB") return True