Commit d937aecd authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

First working version of Device monitoring features

parent b4267383
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -38,7 +38,6 @@ rm -f $COVERAGEFILE

coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
    device/tests/test_unitary.py
    #device/tests/test_unitary_driverapi.py \

#coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \
#    l3_centralizedattackdetector/tests/test_unitary.py
+98 −45
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@ from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from context.client.ContextClient import ContextClient
from context.proto.kpi_sample_types_pb2 import KpiSampleType
from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceServicer
@@ -20,7 +21,7 @@ from .database.DatabaseTools import (
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel, EndPointMonitorModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import grpc_to_enum__kpi_sample_type
from .database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
@@ -108,7 +109,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
        driver.Connect()

        endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
        LOGGER.info('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
        #LOGGER.info('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
        for resource_key, resource_value in endpoints:
            endpoint_uuid = resource_value.get('uuid')
            endpoint_type = resource_value.get('type')
@@ -224,7 +225,10 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
            KpiModel(self.database, db_kpi_pk).delete()

        for db_endpoint_pk,_ in db_device.references(EndPointModel):
            EndPointModel(self.database, db_endpoint_pk).delete()
            db_endpoint = EndPointModel(self.database, db_endpoint_pk)
            for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
                EndPointMonitorModel(self.database, db_endpoint_monitor_pk).delete()
            db_endpoint.delete()

        for db_driver_pk,_ in db_device.references(DriverModel):
            DriverModel(self.database, db_driver_pk).delete()
@@ -256,17 +260,15 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
        kpi_uuid = request.kpi_id.kpi_id.uuid

        subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
        if subscribe:
            device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid

            db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
            if db_device is None:
                msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

        driver : _Driver = self.driver_instance_cache.get(device_uuid)
        if driver is None:
            msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
            raise OperationFailedException('ConfigureDevice', extra_details=msg)

            endpoint_id = request.kpi_descriptor.endpoint_id
            endpoint_uuid = endpoint_id.endpoint_uuid.uuid
            str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
@@ -282,36 +284,39 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
                    str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

        sample_type = request.kpi_descriptor.kpi_sample_type
        str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
        db_endpoint_monitor : EndPointMonitorModel = get_object(
            self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
        if db_endpoint_monitor is None:
            msg = 'Device({:s})/EndPoint({:s})/EndPointMonitor({:s}) not found.'.format(
                str(device_uuid), str(endpoint_uuid), str(sample_type))
            driver : _Driver = self.driver_instance_cache.get(device_uuid)
            if driver is None:
                msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

        subscribe = True
        if subscribe:
            sample_type = request.kpi_descriptor.kpi_sample_type

            attributes = {
                'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
                'kpi_description'  : request.kpi_descriptor.kpi_description,
                'kpi_sample_type'  : grpc_to_enum__kpi_sample_type(request.kpi_descriptor.kpi_sample_type),
                'kpi_sample_type'  : grpc_to_enum__kpi_sample_type(sample_type),
                'device_fk'        : db_device,
                'endpoint_fk'      : db_endpoint,
                'sampling_duration': request.sampling_duration_s,
                'sampling_interval': request.sampling_interval_s,
            }
            LOGGER.info('kpi.attributes = {:s}'.format(str(attributes)))
            result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
            db_kpi, updated = result

            str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
            db_endpoint_monitor : EndPointMonitorModel = get_object(
                self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
            if db_endpoint_monitor is None:
                msg = 'SampleType({:s}/{:s}) not supported for EndPoint({:s}).'.format(
                    str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
                    str(endpoint_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
            attributes = {
                'endpoint_monitor_fk': db_endpoint_monitor,
                'kpi_fk'             : db_kpi,
            }
            LOGGER.info('epm_kpi.attributes = {:s}'.format(str(attributes)))
            result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
                self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
            db_endpoint_monitor_kpi, updated = result
@@ -319,7 +324,6 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
            resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
            resources_to_subscribe.append(
                (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
            LOGGER.info('[MonitorDeviceKpi] resources_to_subscribe = {:s}'.format(str(resources_to_subscribe)))
            results_subscribestate = driver.SubscribeState(resources_to_subscribe)
            errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
@@ -327,12 +331,61 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
            self.monitoring_loops.add(device_uuid, driver)

        else:
            db_kpi : KpiModel = get_object(
                self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
            if db_kpi is None:
                msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            db_device : DeviceModel = get_object(
                self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
            if db_device is None:
                msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
            device_uuid = db_device.device_uuid

            db_endpoint : EndPointModel = get_object(
                self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
            if db_endpoint is None:
                msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
            endpoint_uuid = db_endpoint.endpoint_uuid
            str_endpoint_key = db_endpoint.pk

            kpi_sample_type : ORM_KpiSampleType = db_kpi.kpi_sample_type
            sample_type = kpi_sample_type.value
            str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
            db_endpoint_monitor : EndPointMonitorModel = get_object(
                self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
            if db_endpoint_monitor is None:
                msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
            db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
            if db_endpoint_monitor_kpi is None:
                msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
            LOGGER.info('[MonitorDeviceKpi] resources_to_unsubscribe = {:s}'.format(str(resources_to_unsubscribe)))
            resources_to_unsubscribe.append(
                (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))

            driver : _Driver = self.driver_instance_cache.get(device_uuid)
            if driver is None:
                msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
            errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

            self.monitoring_loops.remove(device_uuid)
            db_endpoint_monitor_kpi.delete()
            db_kpi.delete()

            # There is one monitoring loop per device; keep them active since they are re-used by different monitoring
            # requests.
            #self.monitoring_loops.remove(device_uuid)

        return Empty()
+11 −26
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ class MonitoringLoop:
        self._running = threading.Event()
        self._terminate = threading.Event()
        self._samples_stream = self._driver.GetState(blocking=True)
        self._collector_thread = threading.Thread(target=self._collect, daemon=False)
        self._collector_thread = threading.Thread(target=self._collect, daemon=True)

    def _collect(self) -> None:
        for sample in self._samples_stream:
@@ -49,7 +49,7 @@ class MonitoringLoops:
        self._terminate = threading.Event()
        self._lock = threading.Lock()
        self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
        self._exporter_thread = threading.Thread(target=self._export, daemon=False)
        self._exporter_thread = threading.Thread(target=self._export, daemon=True)

    def add(self, device_uuid : str, driver : _Driver) -> None:
        with self._lock:
@@ -85,14 +85,12 @@ class MonitoringLoops:
        while not self._terminate.is_set():
            try:
                sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
                LOGGER.info('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
                #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
            except queue.Empty:
                continue

            device_uuid, timestamp, endpoint_monitor_resource_key, value = sample
            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
            LOGGER.info('[MonitoringLoops:_export] retrieving EndPointMonitorKpiModel {:s}'.format(
                str(str_endpoint_monitor_kpi_key)))

            #db_entries = self._database.dump()
            #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
@@ -102,25 +100,17 @@ class MonitoringLoops:

            db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
            LOGGER.info('[MonitoringLoops:_export] db_endpoint_monitor_kpi = {:s}'.format(
                str(db_endpoint_monitor_kpi)))
            if db_endpoint_monitor_kpi is None:
                LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key))
                continue

            str_kpi_key = db_endpoint_monitor_kpi.kpi_fk
            LOGGER.info('[MonitoringLoops:_export] retrieving KpiModel {:s}'.format(
                str(str_kpi_key)))
            db_kpi : KpiModel = get_object(
                self._database, KpiModel, str_kpi_key, raise_if_not_found=False)
            LOGGER.info('[MonitoringLoops:_export] db_kpi = {:s}'.format(
                str(db_kpi)))
            if db_kpi is None:
                LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key))
                continue

            LOGGER.info('[MonitoringLoops:_export] formatting kpi... = {:s}/{:s}'.format(
                str(type(value)), str(value)))
            if isinstance(value, int):
                kpi_value_field_name = 'intVal'
                kpi_value_field_cast = int
@@ -134,16 +124,11 @@ class MonitoringLoops:
                kpi_value_field_name = 'stringVal'
                kpi_value_field_cast = str

            LOGGER.info('[MonitoringLoops:_export] kpi_value_field_name = {:s}'.format(str(kpi_value_field_name)))
            LOGGER.info('[MonitoringLoops:_export] kpi_value_field_cast = {:s}'.format(str(kpi_value_field_cast)))

            kpi_data = {
                'kpi_id'   : {'kpi_id': db_kpi.kpi_uuid},
            try:
                self._monitoring_client.IncludeKpi(Kpi(**{
                    'kpi_id'   : {'kpi_id': {'uuid': db_kpi.kpi_uuid}},
                    'timestamp': str(timestamp),
                    'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)}
            }
            LOGGER.info('[MonitoringLoops:_export] sending sample: {:s}'.format(str(kpi_data)))
            LOGGER.info('[MonitoringLoops:_export] self._monitoring_client: {:s}'.format(str(self._monitoring_client)))
            LOGGER.info('[MonitoringLoops:_export] self._monitoring_client.IncludeKpi: {:s}'.format(str(self._monitoring_client.IncludeKpi)))
            self._monitoring_client.IncludeKpi(Kpi(**kpi_data))
            LOGGER.info('[MonitoringLoops:_export] sample sent: {:s}'.format(str(kpi_data)))
                }))
            except: # pylint: disable=bare-except
                LOGGER.exception('Unable to format/send Kpi')
+5 −5
Original line number Diff line number Diff line
@@ -4,11 +4,11 @@ from device.proto.kpi_sample_types_pb2 import KpiSampleType
from .Tools import grpc_to_enum

class ORM_KpiSampleType(Enum):
    UNKNOWN             = KpiSampleType.UNKNOWN
    PACKETS_TRANSMITTED = KpiSampleType.PACKETS_TRANSMITTED
    PACKETS_RECEIVED    = KpiSampleType.PACKETS_RECEIVED
    BYTES_TRANSMITTED   = KpiSampleType.BYTES_TRANSMITTED
    BYTES_RECEIVED      = KpiSampleType.BYTES_RECEIVED
    UNKNOWN             = KpiSampleType.KPISAMPLETYPE_UNKNOWN
    PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
    PACKETS_RECEIVED    = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
    BYTES_TRANSMITTED   = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED
    BYTES_RECEIVED      = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED

grpc_to_enum__kpi_sample_type = functools.partial(
    grpc_to_enum, KpiSampleType, ORM_KpiSampleType)
+2 −4
Original line number Diff line number Diff line
import logging
from queue import Queue
from monitoring.proto.context_pb2 import Empty
from monitoring.proto.monitoring_pb2 import IncludeKpiRequest
from monitoring.proto.monitoring_pb2 import Kpi
from monitoring.proto.monitoring_pb2_grpc import MonitoringServiceServicer

LOGGER = logging.getLogger(__name__)
@@ -10,8 +10,6 @@ class MockMonitoringServiceServicerImpl(MonitoringServiceServicer):
    def __init__(self, queue_samples : Queue):
        self.queue_samples = queue_samples

    def IncludeKpi(self, request : IncludeKpiRequest, context) -> Empty:
        LOGGER.info('[IncludeKpi] begin')
    def IncludeKpi(self, request : Kpi, context) -> Empty:
        self.queue_samples.put(request)
        LOGGER.info('[IncludeKpi] end')
        return Empty()
Loading