Commit d21562c7 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Partial implementation of Device monitoring logic:

- Implemented Database models for KpiModel and KpiSampleType
- Added MonitorDeviceKpi to DeviceClient
- Added MonitorDeviceKpi to DeviceServiceServicerImpl (partial implementation)
- Created MonitoringLoops class (partial implementation)
parent 01a4b284
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
import grpc, logging
from common.tools.client.RetryDecorator import retry, delay_exponential
from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceStub

LOGGER = logging.getLogger(__name__)
@@ -21,7 +22,7 @@ class DeviceClient:
        self.stub = DeviceServiceStub(self.channel)

    def close(self):
        if(self.channel is not None): self.channel.close()
        if self.channel is not None: self.channel.close()
        self.channel = None
        self.stub = None

@@ -52,3 +53,10 @@ class DeviceClient:
        response = self.stub.GetInitialConfig(request)
        LOGGER.debug('GetInitialConfig result: {:s}'.format(str(response)))
        return response

    @retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
    def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty:
        LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(str(request)))
        response = self.stub.MonitorDeviceKpi(request)
        LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(str(response)))
        return response
+8 −4
Original line number Diff line number Diff line
@@ -6,8 +6,9 @@ from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from context.client.ContextClient import ContextClient
from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server
from device.service.DeviceServiceServicerImpl import DeviceServiceServicerImpl
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from .DeviceServiceServicerImpl import DeviceServiceServicerImpl
from .MonitoringLoops import MonitoringLoops
from .driver_api.DriverInstanceCache import DriverInstanceCache

BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)
@@ -15,10 +16,12 @@ LOGGER = logging.getLogger(__name__)
class DeviceService:
    def __init__(
        self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache,
        address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD):
        monitoring_loops : MonitoringLoops, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
        grace_period=GRPC_GRACE_PERIOD):

        self.context_client = context_client
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
        self.address = address
        self.port = port
        self.endpoint = None
@@ -37,7 +40,8 @@ class DeviceService:
        self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
        self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))

        self.device_servicer = DeviceServiceServicerImpl(self.context_client, self.driver_instance_cache)
        self.device_servicer = DeviceServiceServicerImpl(
            self.context_client, self.driver_instance_cache, self.monitoring_loops)
        add_DeviceServiceServicer_to_server(self.device_servicer, self.server)

        self.health_servicer = HealthServicer(
+64 −7
Original line number Diff line number Diff line
@@ -2,37 +2,46 @@ from typing import Any, List, Tuple
import grpc, logging
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.orm.HighLevel import get_object
from common.orm.HighLevel import get_object, update_or_create_object
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from context.client.ContextClient import ContextClient
from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceServicer
from device.service.driver_api.Tools import (
    check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)
from .MonitoringLoops import MonitoringLoops
from .database.ConfigModel import (
    ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel
from .database.DatabaseTools import (
    delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
    update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
    check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)

LOGGER = logging.getLogger(__name__)

SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig']
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

class DeviceServiceServicerImpl(DeviceServiceServicer):
    def __init__(self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache):
    def __init__(
        self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache,
        monitoring_loops : MonitoringLoops):

        LOGGER.debug('Creating Servicer...')
        self.context_client = context_client
        self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
        LOGGER.debug('Servicer Created')

    @safe_and_metered_rpc_method(METRICS, LOGGER)
@@ -171,3 +180,51 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):

        config_rules = {} if db_device is None else db_device.dump_initial_config()
        return DeviceConfig(config_rules=config_rules)

    @safe_and_metered_rpc_method(METRICS, LOGGER)
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
        kpi_uuid = request.kpi_id.kpi_id.uuid

        device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
        db_device = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)

        endpoint_id = request.kpi_descriptor.endpoint_id
        endpoint_uuid = endpoint_id.endpoint_uuid.uuid
        endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
        if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
        str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
        endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
        endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
        if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
            str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
            str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
        db_endpoint = get_object(self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)

        db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
        result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, {
            'kpi_uuid'         : request.kpi_id.kpi_id.uuid,
            'kpi_description'  : request.kpi_descriptor.kpi_description,
            'kpi_sample_type'  : grpc_to_enum__kpi_sample_type(request.kpi_descriptor.kpi_sample_type),
            'device_fk'        : db_device,
            'endpoint_fk'      : db_endpoint,
            'sampling_duration': request.sampling_duration_s,
            'sampling_interval': request.sampling_interval_s,
        })
        db_kpi, updated = result

        driver : _Driver = self.driver_instance_cache.get(device_uuid)
        if driver is None:
            msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
            raise OperationFailedException('ConfigureDevice', extra_details=msg)

        results = driver.SubscribeState([
            (db_kpi.sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval),
        ])
        assert len(results) == 4
        for result in results: assert isinstance(result, bool) and result


        self.monitoring_loops.add()

        raise NotImplementedError()
        return Empty()
+3 −0
Original line number Diff line number Diff line
class MonitoringLoops:
    def __init__(self) -> None:
        pass
+7 −7
Original line number Diff line number Diff line
@@ -3,10 +3,11 @@ from prometheus_client import start_http_server
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT
from device.service.DeviceService import DeviceService
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS
from .DeviceService import DeviceService
from .MonitoringLoops import MonitoringLoops
from .driver_api.DriverFactory import DriverFactory
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .drivers import DRIVERS

terminate = threading.Event()
LOGGER = None
@@ -43,15 +44,14 @@ def main():
            str(context_service_host), str(context_service_port)))
    context_client = ContextClient(context_service_host, context_service_port)

    # TODO: start monitoring loops to periodically report to Monitoring the collected data

    # Initialize Driver framework
    driver_factory = DriverFactory(DRIVERS)
    driver_instance_cache = DriverInstanceCache(driver_factory)
    monitoring_loops = MonitoringLoops()

    # Starting device service
    grpc_service = DeviceService(
        context_client, driver_instance_cache, port=grpc_service_port, max_workers=max_workers,
        context_client, driver_instance_cache, monitoring_loops, port=grpc_service_port, max_workers=max_workers,
        grace_period=grace_period)
    grpc_service.start()

Loading