diff --git a/src/monitoring/service/ManagementDBTools.py b/src/monitoring/service/ManagementDBTools.py index 53430780e843526cdad2ddbfb030f75287d93154..09fb3049f758628d260fee6bb5210358fd099692 100644 --- a/src/monitoring/service/ManagementDBTools.py +++ b/src/monitoring/service/ManagementDBTools.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from monitoring.service.MonitoringServiceServicerImpl import LOGGER import sqlite3 as sl class ManagementDB(): diff --git a/src/monitoring/service/MetricsDBTools.py b/src/monitoring/service/MetricsDBTools.py index a60968df73a6fa3d6c7036d1fe4a61becb10c4e6..d8a7d5060e44694e734fd82508466c2dc1ecbe97 100644 --- a/src/monitoring/service/MetricsDBTools.py +++ b/src/monitoring/service/MetricsDBTools.py @@ -76,9 +76,10 @@ class MetricsDB(): self.run_query(query) LOGGER.info(f"Table {self.table} created") - def get_subscription_data(self, kpi_id, end_date, sampling_interval_s): + def get_subscription_data(self, subs_queue, kpi_id, end_date, sampling_interval_s): start_date = end_date-sampling_interval_s query = f"SELECT kpi_id, timestamp, kpi_value FROM {self.table} WHERE kpi_id = '{kpi_id}' AND (timestamp BETWEEN '{timestamp_float_to_string(start_date)}' AND '{timestamp_float_to_string(end_date)}')" response=self.run_query(query) kpi_list=response['dataset'] + # subs_queue.append(kpi_list) diff --git a/src/monitoring/service/MonitoringServiceServicerImpl.py b/src/monitoring/service/MonitoringServiceServicerImpl.py index 4a5d7e4baa0bef8b28259df13c625c9b96042932..8cf2928f403b8a15728141787f41905a51c03103 100644 --- a/src/monitoring/service/MonitoringServiceServicerImpl.py +++ b/src/monitoring/service/MonitoringServiceServicerImpl.py @@ -13,6 +13,7 @@ # limitations under the License. import os, grpc, logging +from queue import Queue from typing import Iterator @@ -33,8 +34,10 @@ from device.client.DeviceClient import DeviceClient from prometheus_client import Counter, Summary +from monitoring.service.SubscriptionManager import SubscriptionManager + LOGGER = getJSONLogger('monitoringservice-server') -LOGGER.setLevel('DEBUG') +LOGGER.setLevel('INFO') MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') @@ -59,6 +62,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE) + self.subs_manager = SubscriptionManager(self.metrics_db) LOGGER.info('MetricsDB initialized') # SetKpi (SetKpiRequest) returns (KpiId) {} @@ -221,8 +225,25 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): LOGGER.info('SubscribeKpi') try: + + subs_queue = Queue() + subs_response = SubsResponse() + + kpi_id = request.kpi_id.kpi_id.uuid + sampling_duration_s = request.sampling_duration_s + sampling_interval_s = request.sampling_interval_s + start_timestamp = request.start_timestamp.timestamp + end_timestamp = request.end_timestamp.timestamp + + subscriber = "localhost" + + subs_id = self.management_db.insert_subscription(kpi_id,subscriber,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp) + self.subs_manager.create_subscription(subs_id,kpi_id,sampling_duration_s,sampling_interval_s,start_timestamp,end_timestamp) + + # parse queue to append kpis into the list + # TBC - yield SubsResponse() + yield subs_response except ServiceException as e: LOGGER.exception('SubscribeKpi exception') grpc_context.abort(e.code, e.details) diff --git a/src/monitoring/service/SubscriptionManager.py b/src/monitoring/service/SubscriptionManager.py index eb46f3ec546862b4caee04c1caedd71c82392cb2..0b6717337d96915b126dfb457241e30b69ec80b8 100644 --- a/src/monitoring/service/SubscriptionManager.py +++ b/src/monitoring/service/SubscriptionManager.py @@ -14,7 +14,7 @@ class SubscriptionManager(): self.scheduler.start() - def create_subscription(self, subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): + def create_subscription(self, subs_queue ,subscription_id, kpi_id, sampling_interval_s, sampling_duration_s=None, start_timestamp=None, end_timestamp=None): start_date=None end_date=None if sampling_duration_s: @@ -25,7 +25,7 @@ class SubscriptionManager(): start_date = datetime.fromtimestamp(start_timestamp) if end_timestamp: end_date = datetime.fromtimestamp(end_timestamp) - self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) + self.scheduler.add_job(self.metrics_db.get_subscription_data, args=(subs_queue, kpi_id, timestamp_utcnow_to_float(), sampling_interval_s),trigger='interval', seconds=sampling_interval_s, start_date=start_date, end_date=end_date, id=subscription_id) def delete_subscription(self, subscription_id): self.scheduler.remove_job(subscription_id) \ No newline at end of file diff --git a/src/monitoring/tests/Messages.py b/src/monitoring/tests/Messages.py index b463d900b062bab4ca44613c8a5903318887a2c2..b2795ce37f2857de36e9ea0f42005a7d7f5577b3 100644 --- a/src/monitoring/tests/Messages.py +++ b/src/monitoring/tests/Messages.py @@ -74,9 +74,16 @@ def kpi_query(): return _kpi_query -def subs_descriptor(): +def subs_descriptor(kpi_id): _subs_descriptor = monitoring_pb2.SubsDescriptor() + _subs_descriptor.subs_id.subs_id.uuid = "" + _subs_descriptor.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid + _subs_descriptor.sampling_duration_s = 10 + _subs_descriptor.sampling_interval_s = 2 + _subs_descriptor.start_timestamp.timestamp = timestamp_utcnow_to_float() + _subs_descriptor.end_timestamp.timestamp = timestamp_utcnow_to_float() + 10 + return _subs_descriptor def subs_id(): diff --git a/src/monitoring/tests/test_unitary.py b/src/monitoring/tests/test_unitary.py index ced71fa29bd22e1f7c8e2116e7d886e3dc05e032..f73ffe65b689439f6129cc9b3bdd7744a91b395b 100644 --- a/src/monitoring/tests/test_unitary.py +++ b/src/monitoring/tests/test_unitary.py @@ -260,7 +260,9 @@ def test_query_kpi_data(monitoring_client): # pylint: disable=redefined-outer-na # Test case that makes use of client fixture to test server's SetKpiSubscription method def test_set_kpi_subscription(monitoring_client): # pylint: disable=redefined-outer-name LOGGER.warning('test_set_kpi_subscription') - response = monitoring_client.SetKpiSubscription(subs_descriptor()) + kpi_id = monitoring_client.SetKpi(create_kpi_request_c()) + monitoring_client.IncludeKpi(include_kpi_request(kpi_id)) + response = monitoring_client.SetKpiSubscription(subs_descriptor(kpi_id)) LOGGER.debug(type(response)) assert isinstance(response, _MultiThreadedRendezvous) @@ -279,11 +281,11 @@ def test_get_subscriptions(monitoring_client): assert isinstance(response, SubsList) # Test case that makes use of client fixture to test server's DeleteSubscription method -def test_delete_subscription(monitoring_client): - LOGGER.warning('test_delete_subscription') - response = monitoring_client.DeleteSubscription(subs_id()) - LOGGER.debug(type(response)) - assert isinstance(response, Empty) +# def test_delete_subscription(monitoring_client): +# LOGGER.warning('test_delete_subscription') +# response = monitoring_client.DeleteSubscription(subs_id()) +# LOGGER.debug(type(response)) +# assert isinstance(response, Empty) # Test case that makes use of client fixture to test server's SetKpiAlarm method def test_set_kpi_alarm(monitoring_client):