From 3a1c47f0ef308100ded95cb023932e0ad54ac6be Mon Sep 17 00:00:00 2001 From: Javier Moreno <francisco.moreno.external@atos.net> Date: Tue, 27 Sep 2022 09:04:55 +0200 Subject: [PATCH] WIP - Subscriptions implementation --- src/monitoring/service/ManagementDBTools.py | 2 +- src/monitoring/service/MetricsDBTools.py | 3 ++- .../service/MonitoringServiceServicerImpl.py | 25 +++++++++++++++++-- src/monitoring/service/SubscriptionManager.py | 4 +-- src/monitoring/tests/Messages.py | 9 ++++++- src/monitoring/tests/test_unitary.py | 14 ++++++----- 6 files changed, 44 insertions(+), 13 deletions(-) diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 53430780e..09fb3049f 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from monitoring.service.MonitoringServiceServicerImpl import LOGGER import sqlite3 as sl class ManagementDB(): diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index a60968df7..d8a7d5060 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -76,9 +76,10 @@ class MetricsDB(): self.run_query(query) LOGGER.info(f"Table {self.table} created") - def get_subscription_data(self, kpi_id, end_date, sampling_interval_s): + def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s): start_date = end_date-sampling_interval_s query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" response=self.run_query(query) kpi_list=response['dataset'] + # subs_queue.append(kpi_list) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 4a5d7e4ba..8cf2928f4 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -13,6 +13,7 @@ # limitations under the License. import os, grpc, logging +from queue import Queue from typing import Iterator @@ -33,8 +34,10 @@ from device.client.DeviceClient import DeviceClient from prometheus_client import Counter, Summary +from monitoring.service.SubscriptionManager import SubscriptionManager + LOGGER = getJSONLogger('monitoringservice-server') -LOGGER.setLevel('DEBUG') +LOGGER.setLevel('INFO') MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') @@ -59,6 +62,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE) + self.subs_manager = SubscriptionManager(self.metrics_db) LOGGER.info('MetricsDB initialized') # SetKpi (SetKpiRequest) returns (KpiId) {} @@ -221,8 +225,25 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('SubscribeKpi') try: + + subs_queue = Queue() + subs_response = SubsResponse() + + kpi_id = request.kpi_id.kpi_id.uuid + sampling_duration_s = request.sampling_duration_s + sampling_interval_s = request.sampling_interval_s + start_timestamp = request.start_timestamp.timestamp + end_timestamp = request.end_timestamp.timestamp + + subscriber = "localhost" + + subs_id = self.management_db.insert_subscription(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp) + self.subs_manager.create_subscription(subs_id,kpi_id,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp) + + # parse queue to append kpis into the list + # TBC - yield SubsResponse() + yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index eb46f3ec5..0b6717337 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -14,7 +14,7 @@ class SubscriptionManager(): self.scheduler.start() - def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): + def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None end_date=None if sampling_duration_s: @@ -25,7 +25,7 @@ class SubscriptionManager(): start_date = datetime.fromtimestamp(start_timestamp) if end_timestamp: end_date = datetime.fromtimestamp(end_timestamp) - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id) \ No newline at end of file diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index b463d900b..b2795ce37 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -74,9 +74,16 @@ def kpi_query(): return _kpi_query -def subs_descriptor(): +def subs_descriptor(kpi_id): _subs_descriptor = monitoring_pb2.SubsDescriptor() + _subs_descriptor.subs_id.subs_id.uuid = "" + _subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid + _subs_descriptor.sampling_duration_s = 10 + _subs_descriptor.sampling_interval_s = 2 + _subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float() + _subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10 + return _subs_descriptor def subs_id(): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ced71fa29..f73ffe65b 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -260,7 +260,9 @@ def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-na # Test case that makes use of client fixture to test server's SetKpiSubscription method def test_set_kpi_subscription(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') - response = monitoring_client.SetKpiSubscription(subs_descriptor()) + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) LOGGER.debug(type(response)) assert isinstance(response, _MultiThreadedRendezvous) @@ -279,11 +281,11 @@ def test_get_subscriptions(monitoring_client): assert isinstance(response, SubsList) # Test case that makes use of client fixture to test server's DeleteSubscription method -def test_delete_subscription(monitoring_client): - LOGGER.warning('test_delete_subscription') - response = monitoring_client.DeleteSubscription(subs_id()) - LOGGER.debug(type(response)) - assert isinstance(response, Empty) +# def test_delete_subscription(monitoring_client): +# LOGGER.warning('test_delete_subscription') +# response = monitoring_client.DeleteSubscription(subs_id()) +# LOGGER.debug(type(response)) +# assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client): -- GitLab