Newer
Older
from typing import Any, List, Tuple

Lluis Gifre Renom
committed
import grpc, logging
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.orm.HighLevel import get_object, update_or_create_object
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from context.client.ContextClient import ContextClient
from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings

Lluis Gifre Renom
committed
from device.proto.device_pb2_grpc import DeviceServiceServicer
from .MonitoringLoops import MonitoringLoops
from .database.ConfigModel import (
ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
from .database.DatabaseTools import (
delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)

Lluis Gifre Renom
committed
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

Lluis Gifre Renom
committed
class DeviceServiceServicerImpl(DeviceServiceServicer):
def __init__(
self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache,
monitoring_loops : MonitoringLoops):

Lluis Gifre Renom
committed
LOGGER.debug('Creating Servicer...')
self.context_client = context_client
self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
self.driver_instance_cache = driver_instance_cache
self.monitoring_loops = monitoring_loops

Lluis Gifre Renom
committed
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS, LOGGER)
def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
if len(request.device_config.config_rules) > 0:
raise InvalidArgumentException(
'device.device_config.config_rules', str(request.device_config.config_rules),
extra_details='RPC method AddDevice does not allow definition of Config Rules. '\
'Add the Device first, and then configure it.')
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device,_ = update_device_in_local_database(self.database, request)
driver_filter_fields = get_device_driver_filter_fields(db_device)
driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields)
driver.Connect()
running_config_rules = driver.GetConfig()
running_config_rules = [(ORM_ConfigActionEnum.SET, rule[0], rule[1]) for rule in running_config_rules]
LOGGER.info('[AddDevice] running_config_rules = {:s}'.format(str(running_config_rules)))
context_config_rules = get_config_rules(self.database, device_uuid, 'running')
LOGGER.info('[AddDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
# TODO: Compute diff between current context config and device config. The one in device is of higher priority
# (might happen another instance is updating config and context was not already updated)
update_config(self.database, device_uuid, 'running', running_config_rules)
initial_config_rules = driver.GetInitialConfig()
update_config(self.database, device_uuid, 'initial', initial_config_rules)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
context_config_rules = get_config_rules(self.database, device_uuid, 'running')
context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
db_device,_ = update_device_in_local_database(self.database, request)
request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
resources_to_set : List[Tuple[str, Any]] = [] # key, value
resources_to_delete : List[str] = [] # key
resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
for config_rule in request_config_rules:
action, key, value = config_rule
if action == ORM_ConfigActionEnum.SET:
if (key not in context_config_rules) or (context_config_rules[key] != value):
resources_to_set.append((key, value))
elif action == ORM_ConfigActionEnum.DELETE:
resources_to_delete.append(key)
# TODO: Implement configuration of subscriptions
# TODO: use of datastores (might be virtual ones) to enable rollbacks
errors = []
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
if len(errors) == 0:
results_setconfig = driver.SetConfig(resources_to_set)
errors.extend(check_set_errors(resources_to_set, results_setconfig))
if len(errors) == 0:
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
if len(errors) == 0:
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate))
if len(errors) == 0:
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate))
if len(errors) > 0:
raise OperationFailedException('ConfigureDevice', extra_details=errors)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
device_uuid = request.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None: return Empty()
self.driver_instance_cache.delete(device_uuid)
delete_device_from_context(db_device, self.context_client)
for db_endpoint_pk,_ in db_device.references(EndPointModel):
EndPointModel(self.database, db_endpoint_pk).delete()
for db_driver_pk,_ in db_device.references(DriverModel):
db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_device.delete()
db_initial_config.delete()
db_running_config.delete()
return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
device_uuid = request.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
config_rules = {} if db_device is None else db_device.dump_initial_config()
return DeviceConfig(config_rules=config_rules)
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@safe_and_metered_rpc_method(METRICS, LOGGER)
def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
kpi_uuid = request.kpi_id.kpi_id.uuid
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
db_device = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
endpoint_id = request.kpi_descriptor.endpoint_id
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint = get_object(self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, {
'kpi_uuid' : request.kpi_id.kpi_id.uuid,
'kpi_description' : request.kpi_descriptor.kpi_description,
'kpi_sample_type' : grpc_to_enum__kpi_sample_type(request.kpi_descriptor.kpi_sample_type),
'device_fk' : db_device,
'endpoint_fk' : db_endpoint,
'sampling_duration': request.sampling_duration_s,
'sampling_interval': request.sampling_interval_s,
})
db_kpi, updated = result
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('ConfigureDevice', extra_details=msg)
results = driver.SubscribeState([
(db_kpi.sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval),
])
assert len(results) == 4
for result in results: assert isinstance(result, bool) and result
self.monitoring_loops.add(device_uuid, driver)
raise NotImplementedError()
return Empty()