Commit 3c6595fd authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Intermediate backup device monitoring.

parent d3dcedca
Loading
Loading
Loading
Loading
+16 −6
Original line number Diff line number Diff line
@@ -3,25 +3,29 @@ from concurrent import futures
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from context.client.ContextClient import ContextClient
from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server
from monitoring.client.monitoring_client import MonitoringClient
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .DeviceServiceServicerImpl import DeviceServiceServicerImpl
from .MonitoringLoops import MonitoringLoops
from .driver_api.DriverInstanceCache import DriverInstanceCache

BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)

class DeviceService:
    def __init__(
        self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache,
        monitoring_loops : MonitoringLoops, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
        grace_period=GRPC_GRACE_PERIOD):
        self, context_client : ContextClient, monitoring_client : MonitoringClient,
        driver_instance_cache : DriverInstanceCache,
        address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD):

        self.context_client = context_client
        self.monitoring_client = monitoring_client
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
        self.address = address
        self.port = port
        self.endpoint = None
@@ -32,16 +36,21 @@ class DeviceService:
        self.pool = None
        self.server = None

        self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
        self.monitoring_loops = MonitoringLoops(monitoring_client, self.database)

    def start(self):
        self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
        LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
            str(self.endpoint), str(self.max_workers)))

        self.monitoring_loops.start()

        self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
        self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))

        self.device_servicer = DeviceServiceServicerImpl(
            self.context_client, self.driver_instance_cache, self.monitoring_loops)
            self.context_client, self.database, self.driver_instance_cache, self.monitoring_loops)
        add_DeviceServiceServicer_to_server(self.device_servicer, self.server)

        self.health_servicer = HealthServicer(
@@ -60,4 +69,5 @@ class DeviceService:
        LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
        self.health_servicer.enter_graceful_shutdown()
        self.server.stop(self.grace_period)
        self.monitoring_loops.stop()
        LOGGER.debug('Service stopped')
+70 −53
Original line number Diff line number Diff line
@@ -2,9 +2,7 @@ import grpc, json, logging
from typing import Any, List, Tuple
from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.orm.HighLevel import get_object, update_or_create_object
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
@@ -12,6 +10,7 @@ from context.client.ContextClient import ContextClient
from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceServicer
from device.service.database.RelationModels import EndPointMonitorKpiModel
from .MonitoringLoops import MonitoringLoops
from .database.ConfigModel import (
    ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
@@ -21,8 +20,8 @@ from .database.DatabaseTools import (
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel, EndPointMonitorModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .database.KpiSampleType import grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
    check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)
@@ -35,12 +34,12 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class DeviceServiceServicerImpl(DeviceServiceServicer):
    def __init__(
        self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache,
        self, context_client : ContextClient, database : Database, driver_instance_cache : DriverInstanceCache,
        monitoring_loops : MonitoringLoops):

        LOGGER.debug('Creating Servicer...')
        self.context_client = context_client
        self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
        self.database = database
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
        LOGGER.debug('Servicer Created')
@@ -259,11 +258,17 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):

        device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
        db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
        if db_device is None:
            msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
            raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

        driver : _Driver = self.driver_instance_cache.get(device_uuid)
        if driver is None:
            msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
            raise OperationFailedException('ConfigureDevice', extra_details=msg)

        endpoint_id = request.kpi_descriptor.endpoint_id
        endpoint_uuid = endpoint_id.endpoint_uuid.uuid
        endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
        if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
        str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
        endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
        endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
@@ -272,8 +277,22 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
            str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
        db_endpoint : EndPointModel = get_object(
            self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
        sampling_resource = db_endpoint.resource_key
        if db_endpoint is None:
            msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
                str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
            raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

        sample_type = request.kpi_descriptor.kpi_sample_type
        str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
        db_endpoint_monitor : EndPointMonitorModel = get_object(
            self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
        if db_endpoint_monitor is None:
            msg = 'Device({:s})/EndPoint({:s})/EndPointMonitor({:s}) not found.'.format(
                str(device_uuid), str(endpoint_uuid), str(sample_type))
            raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

        subscribe = True
        if subscribe:
            attributes = {
                'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
                'kpi_description'  : request.kpi_descriptor.kpi_description,
@@ -283,39 +302,37 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
                'sampling_duration': request.sampling_duration_s,
                'sampling_interval': request.sampling_interval_s,
            }
        LOGGER.info('attributes = {:s}'.format(str(attributes)))

        #db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
            LOGGER.info('kpi.attributes = {:s}'.format(str(attributes)))
            result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
            db_kpi, updated = result

        driver : _Driver = self.driver_instance_cache.get(device_uuid)
        if driver is None:
            msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
            raise OperationFailedException('ConfigureDevice', extra_details=msg)

        #resources_to_subscribe   : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
        #resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
        #LOGGER.info('[ConfigureDevice] resources_to_subscribe = {:s}'.format(str(resources_to_subscribe)))
        #LOGGER.info('[ConfigureDevice] resources_to_unsubscribe = {:s}'.format(str(resources_to_unsubscribe)))
        # TODO: Implement configuration of subscriptions

        #if len(errors) == 0:
        #    results_subscribestate = driver.SubscribeState(resources_to_subscribe)
        #    errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate))
            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
            attributes = {
                'endpoint_monitor_fk': db_endpoint_monitor,
                'kpi_fk'             : db_kpi,
            }
            LOGGER.info('epm_kpi.attributes = {:s}'.format(str(attributes)))
            result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
                self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
            db_endpoint_monitor_kpi, updated = result

            resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
            resources_to_subscribe.append(
                (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
            LOGGER.info('[MonitorDeviceKpi] resources_to_subscribe = {:s}'.format(str(resources_to_subscribe)))
            results_subscribestate = driver.SubscribeState(resources_to_subscribe)
            errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

        #if len(errors) == 0:
        #    results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
        #    errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate))
            self.monitoring_loops.add(device_uuid, driver)

        subscriptions = [
            (sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval),
        ]
        results = driver.SubscribeState(subscriptions)
        LOGGER.info('results = {:s}'.format(str(results)))
        assert len(results) == len(subscriptions)
        for result in results: assert isinstance(result, bool) and result
        else:
            resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
            LOGGER.info('[MonitorDeviceKpi] resources_to_unsubscribe = {:s}'.format(str(resources_to_unsubscribe)))
            results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
            errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

        self.monitoring_loops.add(device_uuid, driver)
            self.monitoring_loops.remove(device_uuid)

        return Empty()
+67 −10
Original line number Diff line number Diff line
@@ -2,9 +2,11 @@ import logging, queue, threading
from typing import Dict
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from device.service.database.KpiModel import KpiModel
from common.orm.backend.Tools import key_to_str
from device.service.database.RelationModels import EndPointMonitorKpiModel
from monitoring.client.monitoring_client import MonitoringClient
from monitoring.proto.monitoring_pb2 import Kpi
from .database.KpiModel import KpiModel
from .driver_api._Driver import _Driver

LOGGER = logging.getLogger(__name__)
@@ -39,9 +41,9 @@ class MonitoringLoop:
        self._collector_thread.join()

class MonitoringLoops:
    def __init__(self, monitoring_client : MonitoringClient) -> None:
    def __init__(self, monitoring_client : MonitoringClient, database : Database) -> None:
        self._monitoring_client = monitoring_client
        self._database = None
        self._database = database
        self._samples_queue = queue.Queue()
        self._running = threading.Event()
        self._terminate = threading.Event()
@@ -49,9 +51,6 @@ class MonitoringLoops:
        self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
        self._exporter_thread = threading.Thread(target=self._export, daemon=False)

    def set_database(self, database : Database) -> None:
        self._database = database

    def add(self, device_uuid : str, driver : _Driver) -> None:
        with self._lock:
            monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid)
@@ -79,6 +78,10 @@ class MonitoringLoops:
        self._exporter_thread.join()

    def _export(self) -> None:
        if self._database is None:
            LOGGER.error('[MonitoringLoops:_export] Database not set. Terminating Exporter.')
            return

        while not self._terminate.is_set():
            try:
                sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
@@ -86,7 +89,61 @@ class MonitoringLoops:
            except queue.Empty:
                continue

            get_object(self._database, KpiModel)
            self._database
            kpi_data = {}
            device_uuid, timestamp, endpoint_monitor_resource_key, value = sample
            str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
            LOGGER.info('[MonitoringLoops:_export] retrieving EndPointMonitorKpiModel {:s}'.format(
                str(str_endpoint_monitor_kpi_key)))

            #db_entries = self._database.dump()
            #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
            #for db_entry in db_entries:
            #    LOGGER.info('  [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
            #LOGGER.info('-----------------------------------------------------------')

            db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
                self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
            LOGGER.info('[MonitoringLoops:_export] db_endpoint_monitor_kpi = {:s}'.format(
                str(db_endpoint_monitor_kpi)))
            if db_endpoint_monitor_kpi is None:
                LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key))
                continue

            str_kpi_key = db_endpoint_monitor_kpi.kpi_fk
            LOGGER.info('[MonitoringLoops:_export] retrieving KpiModel {:s}'.format(
                str(str_kpi_key)))
            db_kpi : KpiModel = get_object(
                self._database, KpiModel, str_kpi_key, raise_if_not_found=False)
            LOGGER.info('[MonitoringLoops:_export] db_kpi = {:s}'.format(
                str(db_kpi)))
            if db_kpi is None:
                LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key))
                continue

            LOGGER.info('[MonitoringLoops:_export] formatting kpi... = {:s}/{:s}'.format(
                str(type(value)), str(value)))
            if isinstance(value, int):
                kpi_value_field_name = 'intVal'
                kpi_value_field_cast = int
            elif isinstance(value, float):
                kpi_value_field_name = 'floatVal'
                kpi_value_field_cast = float
            elif isinstance(value, bool):
                kpi_value_field_name = 'boolVal'
                kpi_value_field_cast = bool
            else:
                kpi_value_field_name = 'stringVal'
                kpi_value_field_cast = str

            LOGGER.info('[MonitoringLoops:_export] kpi_value_field_name = {:s}'.format(str(kpi_value_field_name)))
            LOGGER.info('[MonitoringLoops:_export] kpi_value_field_cast = {:s}'.format(str(kpi_value_field_cast)))

            kpi_data = {
                'kpi_id'   : {'kpi_id': db_kpi.kpi_uuid},
                'timestamp': str(timestamp),
                'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)}
            }
            LOGGER.info('[MonitoringLoops:_export] sending sample: {:s}'.format(str(kpi_data)))
            LOGGER.info('[MonitoringLoops:_export] self._monitoring_client: {:s}'.format(str(self._monitoring_client)))
            LOGGER.info('[MonitoringLoops:_export] self._monitoring_client.IncludeKpi: {:s}'.format(str(self._monitoring_client.IncludeKpi)))
            self._monitoring_client.IncludeKpi(Kpi(**kpi_data))
            LOGGER.info('[MonitoringLoops:_export] sample sent: {:s}'.format(str(kpi_data)))
+1 −4
Original line number Diff line number Diff line
@@ -58,20 +58,17 @@ def main():
    # Initialize Driver framework
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)
    monitoring_loops = MonitoringLoops(monitoring_client)

    # Starting device service
    grpc_service = DeviceService(
        context_client, driver_instance_cache, monitoring_loops, port=grpc_service_port, max_workers=max_workers,
        context_client, monitoring_client, driver_instance_cache, port=grpc_service_port, max_workers=max_workers,
        grace_period=grace_period)
    grpc_service.start()
    monitoring_loops.start()

    # Wait for Ctrl+C or termination signal
    while not terminate.wait(timeout=0.1): pass

    LOGGER.info('Terminating...')
    monitoring_loops.stop()
    grpc_service.stop()
    driver_instance_cache.terminate()

+13 −0
Original line number Diff line number Diff line
import logging
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.model.Model import Model
from .EndPointModel import EndPointMonitorModel
from .KpiModel import KpiModel

LOGGER = logging.getLogger(__name__)

class EndPointMonitorKpiModel(Model): # pylint: disable=abstract-method
    pk = PrimaryKeyField()
    endpoint_monitor_fk = ForeignKeyField(EndPointMonitorModel)
    kpi_fk = ForeignKeyField(KpiModel)
Loading