Skip to content
Snippets Groups Projects
Commit 4cff2b80 authored by Carlos Natalino Da Silva's avatar Carlos Natalino Da Silva
Browse files

Merge branch 'feat/monitoring' into fix/ofc22-tests

parents 348bed26 92142ab1
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!4Compute component:
...@@ -9,13 +9,14 @@ Jinja2==3.0.3 ...@@ -9,13 +9,14 @@ Jinja2==3.0.3
ncclient==0.6.13 ncclient==0.6.13
p4runtime==1.3.0 p4runtime==1.3.0
paramiko==2.9.2 paramiko==2.9.2
influx-line-protocol==0.1.4 # influx-line-protocol==0.1.4
python-dateutil==2.8.2 python-dateutil==2.8.2
python-json-logger==2.0.2 python-json-logger==2.0.2
pytz==2021.3 pytz==2021.3
redis==4.1.2 redis==4.1.2
requests==2.27.1 requests==2.27.1
xmltodict==0.12.0 xmltodict==0.12.0
questdb==1.0.1
# pip's dependency resolver does not take into account installed packages. # pip's dependency resolver does not take into account installed packages.
# p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one # p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one
......
...@@ -19,16 +19,13 @@ import grpc ...@@ -19,16 +19,13 @@ import grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
#from common.proto import kpi_sample_types_pb2
from common.proto.context_pb2 import Empty, EventTypeEnum from common.proto.context_pb2 import Empty, EventTypeEnum
from common.logger import getJSONLogger
from monitoring.client.MonitoringClient import MonitoringClient from monitoring.client.MonitoringClient import MonitoringClient
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
from common.proto import monitoring_pb2 from common.proto import monitoring_pb2
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
class EventsDeviceCollector: class EventsDeviceCollector:
def __init__(self) -> None: # pylint: disable=redefined-outer-name def __init__(self) -> None: # pylint: disable=redefined-outer-name
self._events_queue = Queue() self._events_queue = Queue()
...@@ -74,7 +71,7 @@ class EventsDeviceCollector: ...@@ -74,7 +71,7 @@ class EventsDeviceCollector:
kpi_id_list = [] kpi_id_list = []
while not self._events_queue.empty(): while not self._events_queue.empty():
LOGGER.info('getting Kpi by KpiID') # LOGGER.info('getting Kpi by KpiID')
event = self.get_event(block=True) event = self.get_event(block=True)
if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE: if event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE:
device = self._context_client.GetDevice(event.device_id) device = self._context_client.GetDevice(event.device_id)
......
...@@ -12,18 +12,16 @@ ...@@ -12,18 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from influx_line_protocol import Metric from questdb.ingress import Sender, IngressError
import socket
import requests import requests
import json import json
import sys
import logging import logging
import datetime
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MetricsDB(): class MetricsDB():
def __init__(self, host, ilp_port, rest_port, table): def __init__(self, host, ilp_port, rest_port, table):
self.socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.host=host self.host=host
self.ilp_port=int(ilp_port) self.ilp_port=int(ilp_port)
self.rest_port=rest_port self.rest_port=rest_port
...@@ -31,19 +29,30 @@ class MetricsDB(): ...@@ -31,19 +29,30 @@ class MetricsDB():
self.create_table() self.create_table()
def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value): def write_KPI(self,time,kpi_id,kpi_sample_type,device_id,endpoint_id,service_id,kpi_value):
self.socket.connect((self.host,self.ilp_port)) counter=0
metric = Metric(self.table) number_of_retries=10
metric.with_timestamp(time) while (counter<number_of_retries):
metric.add_tag('kpi_id', kpi_id) try:
metric.add_tag('kpi_sample_type', kpi_sample_type) with Sender(self.host, self.ilp_port) as sender:
metric.add_tag('device_id', device_id) sender.row(
metric.add_tag('endpoint_id', endpoint_id) self.table,
metric.add_tag('service_id', service_id) symbols={
metric.add_value('kpi_value', kpi_value) 'kpi_id': kpi_id,
str_metric = str(metric) 'kpi_sample_type': kpi_sample_type,
str_metric += "\n" 'device_id': device_id,
self.socket.sendall((str_metric).encode()) 'endpoint_id': endpoint_id,
self.socket.close() 'service_id': service_id},
columns={
'kpi_value': kpi_value},
at=datetime.datetime.fromtimestamp(time))
sender.flush()
counter=number_of_retries
LOGGER.info(f"KPI written")
except IngressError as ierr:
# LOGGER.info(ierr)
# LOGGER.info(f"Ingress Retry number {counter}")
counter=counter+1
def run_query(self, sql_query): def run_query(self, sql_query):
query_params = {'query': sql_query, 'fmt' : 'json'} query_params = {'query': sql_query, 'fmt' : 'json'}
......
...@@ -18,6 +18,7 @@ from typing import Iterator ...@@ -18,6 +18,7 @@ from typing import Iterator
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import get_setting, get_service_port_grpc, get_service_host from common.Settings import get_setting, get_service_port_grpc, get_service_host
from common.logger import getJSONLogger
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.proto.device_pb2 import MonitoringSettings from common.proto.device_pb2 import MonitoringSettings
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
...@@ -26,23 +27,23 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDL ...@@ -26,23 +27,23 @@ from common.proto.monitoring_pb2 import AlarmResponse, AlarmDescriptor, AlarmIDL
KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \ KpiDescriptor, KpiList, KpiQuery, SubsDescriptor, SubscriptionID, AlarmID, KpiDescriptorList, \
MonitorKpiRequest, Kpi, AlarmSubscription MonitorKpiRequest, Kpi, AlarmSubscription
from common.rpc_method_wrapper.ServiceExceptions import ServiceException from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.tools.timestamp.Converters import timestamp_float_to_string
from monitoring.service import SqliteTools, MetricsDBTools from monitoring.service import SqliteTools, MetricsDBTools
from device.client.DeviceClient import DeviceClient from device.client.DeviceClient import DeviceClient
from prometheus_client import Counter, Summary from prometheus_client import Counter, Summary
LOGGER = logging.getLogger(__name__) LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary( MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary(
'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request') 'monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter') MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME") METRICSDB_HOSTNAME = os.environ.get("METRICSDB_HOSTNAME")
METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT") METRICSDB_ILP_PORT = os.environ.get("METRICSDB_ILP_PORT")
METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT") METRICSDB_REST_PORT = os.environ.get("METRICSDB_REST_PORT")
METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE") METRICSDB_TABLE = os.environ.get("METRICSDB_TABLE")
DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) ) DEVICESERVICE_SERVICE_HOST = get_setting('DEVICESERVICE_SERVICE_HOST', default=get_service_host(ServiceNameEnum.DEVICE) )
...@@ -57,7 +58,6 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): ...@@ -57,7 +58,6 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
self.sql_db = SqliteTools.SQLite('monitoring.db') self.sql_db = SqliteTools.SQLite('monitoring.db')
self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client self.deviceClient = DeviceClient(host=DEVICESERVICE_SERVICE_HOST, port=DEVICESERVICE_SERVICE_PORT_GRPC) # instantiate the client
# Set metrics_db client
self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE) self.metrics_db = MetricsDBTools.MetricsDB(METRICSDB_HOSTNAME,METRICSDB_ILP_PORT,METRICSDB_REST_PORT,METRICSDB_TABLE)
LOGGER.info('MetricsDB initialized') LOGGER.info('MetricsDB initialized')
...@@ -81,7 +81,6 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): ...@@ -81,7 +81,6 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id) kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data) kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc() # CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id return kpi_id
except ServiceException as e: except ServiceException as e:
...@@ -162,7 +161,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer): ...@@ -162,7 +161,7 @@ class MonitoringServiceServicerImpl(MonitoringServiceServicer):
deviceId = kpiDescriptor.device_id.device_uuid.uuid deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = timestamp_float_to_string(request.timestamp.timestamp) time_stamp = request.timestamp.timestamp
kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value')) kpi_value = getattr(request.kpi_value, request.kpi_value.WhichOneof('value'))
# Build the structure to be included as point in the MetricsDB # Build the structure to be included as point in the MetricsDB
......
...@@ -11,17 +11,13 @@ ...@@ -11,17 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import datetime
from common.proto import monitoring_pb2 from common.proto import monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.timestamp.Converters import timestamp_string_to_float from common.tools.timestamp.Converters import timestamp_string_to_float, timestamp_utcnow_to_float
def kpi():
_kpi = monitoring_pb2.Kpi()
_kpi.kpi_id.kpi_id.uuid = 'KPIID0000' # pylint: disable=maybe-no-member
return _kpi
def kpi_id(): def kpi_id():
_kpi_id = monitoring_pb2.KpiId() _kpi_id = monitoring_pb2.KpiId()
_kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member _kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member
...@@ -43,9 +39,9 @@ def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s): ...@@ -43,9 +39,9 @@ def monitor_kpi_request(kpi_uuid, monitoring_window_s, sampling_rate_s):
_monitor_kpi_request.sampling_rate_s = sampling_rate_s _monitor_kpi_request.sampling_rate_s = sampling_rate_s
return _monitor_kpi_request return _monitor_kpi_request
def include_kpi_request(): def include_kpi_request(kpi_id):
_include_kpi_request = monitoring_pb2.Kpi() _include_kpi_request = monitoring_pb2.Kpi()
_include_kpi_request.kpi_id.kpi_id.uuid = str(1) # pylint: disable=maybe-no-member _include_kpi_request.kpi_id.kpi_id.uuid = kpi_id.kpi_id.uuid
_include_kpi_request.timestamp.timestamp = timestamp_string_to_float("2021-10-12T13:14:42Z") _include_kpi_request.timestamp.timestamp = timestamp_utcnow_to_float()
_include_kpi_request.kpi_value.int32Val = 500 # pylint: disable=maybe-no-member _include_kpi_request.kpi_value.int32Val = 500 # pylint: disable=maybe-no-member
return _include_kpi_request return _include_kpi_request
...@@ -12,16 +12,18 @@ ...@@ -12,16 +12,18 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy, logging, os, pytest import copy, os, pytest
from time import sleep from time import sleep
from typing import Tuple from typing import Tuple
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import ( from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from common.logger import getJSONLogger
from common.orm.Database import Database from common.orm.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from common.proto import monitoring_pb2
from common.proto.monitoring_pb2 import KpiId, KpiDescriptor from common.proto.monitoring_pb2 import KpiId, KpiDescriptor
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
...@@ -35,17 +37,17 @@ from device.service.driver_api.DriverInstanceCache import DriverInstanceCache ...@@ -35,17 +37,17 @@ from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS from device.service.drivers import DRIVERS
from monitoring.client.MonitoringClient import MonitoringClient from monitoring.client.MonitoringClient import MonitoringClient
from common.proto import context_pb2, monitoring_pb2
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.service import SqliteTools, MetricsDBTools from monitoring.service import SqliteTools, MetricsDBTools
from monitoring.service.MonitoringService import MonitoringService from monitoring.service.MonitoringService import MonitoringService
from monitoring.service.EventTools import EventsDeviceCollector from monitoring.service.EventTools import EventsDeviceCollector
from monitoring.tests.Messages import create_kpi_request, include_kpi_request, kpi, kpi_id, monitor_kpi_request from monitoring.tests.Messages import create_kpi_request, include_kpi_request, monitor_kpi_request
from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID from monitoring.tests.Objects import DEVICE_DEV1, DEVICE_DEV1_CONNECT_RULES, DEVICE_DEV1_UUID
from monitoring.service.MonitoringServiceServicerImpl import LOGGER
LOGGER = logging.getLogger(__name__) # LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel(logging.DEBUG) # LOGGER.setLevel('DEBUG')
########################### ###########################
# Tests Setup # Tests Setup
...@@ -177,7 +179,7 @@ def test_monitor_kpi( ...@@ -177,7 +179,7 @@ def test_monitor_kpi(
monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker] # pylint: disable=redefined-outer-name
): ):
LOGGER.warning('test_monitor_kpi begin') LOGGER.info('test_monitor_kpi begin')
context_database = context_db_mb[0] context_database = context_db_mb[0]
...@@ -210,14 +212,14 @@ def test_monitor_kpi( ...@@ -210,14 +212,14 @@ def test_monitor_kpi(
def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name def test_include_kpi(monitoring_client): # pylint: disable=redefined-outer-name
# make call to server # make call to server
LOGGER.warning('test_include_kpi requesting') LOGGER.warning('test_include_kpi requesting')
response = monitoring_client.IncludeKpi(include_kpi_request()) kpi_id = monitoring_client.SetKpi(create_kpi_request())
LOGGER.debug(str(response)) response = monitoring_client.IncludeKpi(include_kpi_request(kpi_id))
assert isinstance(response, Empty) assert isinstance(response, Empty)
# Test case that makes use of client fixture to test server's GetStreamKpi method # Test case that makes use of client fixture to test server's GetStreamKpi method
def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getstream_kpi begin') LOGGER.warning('test_getstream_kpi begin')
response = monitoring_client.GetStreamKpi(kpi()) response = monitoring_client.GetStreamKpi(monitoring_pb2.Kpi())
LOGGER.debug(str(response)) LOGGER.debug(str(response))
#assert isinstance(response, Kpi) #assert isinstance(response, Kpi)
...@@ -232,8 +234,9 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na ...@@ -232,8 +234,9 @@ def test_get_stream_kpi(monitoring_client): # pylint: disable=redefined-outer-na
def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name def test_get_kpidescritor_kpi(monitoring_client): # pylint: disable=redefined-outer-name
LOGGER.warning('test_getkpidescritor_kpi begin') LOGGER.warning('test_getkpidescritor_kpi begin')
response = monitoring_client.SetKpi(create_kpi_request()) response = monitoring_client.SetKpi(create_kpi_request())
# LOGGER.debug(str(response))
response = monitoring_client.GetKpiDescriptor(response) response = monitoring_client.GetKpiDescriptor(response)
LOGGER.debug(str(response)) # LOGGER.debug(str(response))
assert isinstance(response, KpiDescriptor) assert isinstance(response, KpiDescriptor)
def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name def test_sqlitedb_tools_insert_kpi(sql_db): # pylint: disable=redefined-outer-name
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment