Commit 492e852b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/device-monitoring' into 'develop'

First functional version of Device monitoring

See merge request teraflow-h2020/controller!48
parents a9e39340 6e789330
Loading
Loading
Loading
Loading

manifests/.gitignore

0 → 100644
+4 −0
Original line number Diff line number Diff line
# Internal manifest used for local testings.

# CTTC section:
cttc-ols.yaml
+0 −1
Original line number Diff line number Diff line
@@ -37,7 +37,6 @@ coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
    context/tests/test_unitary.py

coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
    device/tests/test_unitary_driverapi.py \
    device/tests/test_unitary.py

coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \
+7 −7
Original line number Diff line number Diff line
import grpc, logging
from common.tools.client.RetryDecorator import retry, delay_exponential
from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty
#from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceStub

LOGGER = logging.getLogger(__name__)
@@ -54,9 +54,9 @@ class DeviceClient:
        LOGGER.debug('GetInitialConfig result: {:s}'.format(str(response)))
        return response

    #@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
    #def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty:
    #    LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(str(request)))
    #    response = self.stub.MonitorDeviceKpi(request)
    #    LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(str(response)))
    #    return response
    @retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
    def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty:
        LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(str(request)))
        response = self.stub.MonitorDeviceKpi(request)
        LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(str(response)))
        return response
+17 −10
Original line number Diff line number Diff line
@@ -3,26 +3,29 @@ from concurrent import futures
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from context.client.ContextClient import ContextClient
from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server
from .DeviceServiceServicerImpl import DeviceServiceServicerImpl
#from .MonitoringLoops import MonitoringLoops
from monitoring.client.monitoring_client import MonitoringClient
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .DeviceServiceServicerImpl import DeviceServiceServicerImpl
from .MonitoringLoops import MonitoringLoops

BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)

class DeviceService:
    def __init__(
        self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache,
        #monitoring_loops : MonitoringLoops,
        address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
        grace_period=GRPC_GRACE_PERIOD):
        self, context_client : ContextClient, monitoring_client : MonitoringClient,
        driver_instance_cache : DriverInstanceCache,
        address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD):

        self.context_client = context_client
        self.monitoring_client = monitoring_client
        self.driver_instance_cache = driver_instance_cache
        #self.monitoring_loops = monitoring_loops
        self.address = address
        self.port = port
        self.endpoint = None
@@ -33,18 +36,21 @@ class DeviceService:
        self.pool = None
        self.server = None

        self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
        self.monitoring_loops = MonitoringLoops(monitoring_client, self.database)

    def start(self):
        self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
        LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
            str(self.endpoint), str(self.max_workers)))

        self.monitoring_loops.start()

        self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
        self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))

        self.device_servicer = DeviceServiceServicerImpl(
            self.context_client, self.driver_instance_cache,
            #self.monitoring_loops
        )
            self.context_client, self.database, self.driver_instance_cache, self.monitoring_loops)
        add_DeviceServiceServicer_to_server(self.device_servicer, self.server)

        self.health_servicer = HealthServicer(
@@ -63,4 +69,5 @@ class DeviceService:
        LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
        self.health_servicer.enter_graceful_shutdown()
        self.server.stop(self.grace_period)
        self.monitoring_loops.stop()
        LOGGER.debug('Service stopped')
+197 −96

File changed.

Preview size limit exceeded, changes collapsed.

Loading