Newer
Older
import grpc, json, logging
from typing import Any, List, Tuple
from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database
from common.orm.HighLevel import get_object, update_or_create_object
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from context.client.ContextClient import ContextClient
from context.proto.kpi_sample_types_pb2 import KpiSampleType
from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings

Lluis Gifre Renom
committed
from device.proto.device_pb2_grpc import DeviceServiceServicer
from device.service.database.RelationModels import EndPointMonitorKpiModel
from .MonitoringLoops import MonitoringLoops
from .database.ConfigModel import (
ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
from .database.DatabaseTools import (
delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel, EndPointMonitorModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)

Lluis Gifre Renom
committed
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

Lluis Gifre Renom
committed
class DeviceServiceServicerImpl(DeviceServiceServicer):
self, context_client : ContextClient, database : Database, driver_instance_cache : DriverInstanceCache,
monitoring_loops : MonitoringLoops):

Lluis Gifre Renom
committed
LOGGER.debug('Creating Servicer...')
self.context_client = context_client
self.database = database
self.driver_instance_cache = driver_instance_cache
self.monitoring_loops = monitoring_loops

Lluis Gifre Renom
committed
LOGGER.debug('Servicer Created')
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@safe_and_metered_rpc_method(METRICS, LOGGER)
def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
connection_config_rules = {}
unexpected_config_rules = []
for config_rule in request.device_config.config_rules:
if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \
(config_rule.resource_key.startswith('_connect/')):
connection_config_rules[config_rule.resource_key.replace('_connect/', '')] = config_rule.resource_value
else:
unexpected_config_rules.append(config_rule)
if len(unexpected_config_rules) > 0:
unexpected_config_rules = MessageToDict(
request.device_config, including_default_value_fields=True,
preserving_proto_field_name=True, use_integers_for_enums=True)
unexpected_config_rules = unexpected_config_rules['config_rules']
unexpected_config_rules = list(filter(
lambda cr: cr['resource_key'].replace('_connect/', '') not in connection_config_rules,
unexpected_config_rules))
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'device.device_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
'with "_connect/" tag. Others should be configured after adding the device.')
if len(request.device_endpoints) > 0:
unexpected_endpoints = MessageToDict(
request.device_endpoints, including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=True)
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'device.device_endpoints', str_unexpected_endpoints,
extra_details='RPC method AddDevice does not accept endpoints. Endpoints are discovered through '\
'interrogation of the physical device.')
# Remove device configuration
json_request = MessageToDict(
request, including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=True)
json_request['device_config'] = {}
request = Device(**json_request)
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device,_ = update_device_in_local_database(self.database, request)
driver_filter_fields = get_device_driver_filter_fields(db_device)
#LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules)))
address = connection_config_rules.pop('address', None)
port = connection_config_rules.pop('port', None)
settings = connection_config_rules.pop('settings', '{}')
try:
settings = json.loads(settings)
except ValueError as e:
raise InvalidArgumentException(
'device.device_config.config_rules[settings]', settings,
extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e
driver : _Driver = self.driver_instance_cache.get(
device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
driver.Connect()
endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
#LOGGER.info('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
for resource_key, resource_value in endpoints:
endpoint_uuid = resource_value.get('uuid')
endpoint_type = resource_value.get('type')
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
db_endpoint, _ = update_or_create_object(
self.database, EndPointModel, str_endpoint_key, {
'device_fk' : db_device,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint_type,
'resource_key' : resource_key,
sample_types = resource_value.get('sample_types', {})
for sample_type, monitor_resource_key in sample_types.items():
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
'endpoint_fk' : db_endpoint,
'resource_key' : monitor_resource_key,
'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
})
running_config_rules = driver.GetConfig()
running_config_rules = [
(ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
for config_rule in running_config_rules
]
#for running_config_rule in running_config_rules:
# LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
update_config(self.database, device_uuid, 'running', running_config_rules)
initial_config_rules = driver.GetInitialConfig()
update_config(self.database, device_uuid, 'initial', initial_config_rules)
#LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump(
# include_config_rules=True, include_drivers=True, include_endpoints=True))))
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
context_config_rules = get_config_rules(self.database, device_uuid, 'running')
context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
#LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
db_device,_ = update_device_in_local_database(self.database, request)
request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
#LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
resources_to_set : List[Tuple[str, Any]] = [] # key, value
resources_to_delete : List[Tuple[str, Any]] = [] # key, value
for config_rule in request_config_rules:
action, key, value = config_rule
if action == ORM_ConfigActionEnum.SET:
if (key not in context_config_rules) or (context_config_rules[key] != value):
resources_to_set.append((key, value))
elif action == ORM_ConfigActionEnum.DELETE:
if key in context_config_rules:
resources_to_delete.append((key, value))
#LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set)))
#LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete)))
# TODO: use of datastores (might be virtual ones) to enable rollbacks
errors = []
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
if len(errors) == 0:
results_setconfig = driver.SetConfig(resources_to_set)
errors.extend(check_set_errors(resources_to_set, results_setconfig))
if len(errors) == 0:
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
if len(errors) > 0:
raise OperationFailedException('ConfigureDevice', extra_details=errors)
running_config_rules = driver.GetConfig()
running_config_rules = [
(ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
for config_rule in running_config_rules
]
#for running_config_rule in running_config_rules:
# LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
update_config(self.database, device_uuid, 'running', running_config_rules)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
device_uuid = request.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None: return Empty()
self.driver_instance_cache.delete(device_uuid)
delete_device_from_context(db_device, self.context_client)
for db_kpi_pk,_ in db_device.references(KpiModel):
KpiModel(self.database, db_kpi_pk).delete()
for db_endpoint_pk,_ in db_device.references(EndPointModel):
db_endpoint = EndPointModel(self.database, db_endpoint_pk)
for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
EndPointMonitorModel(self.database, db_endpoint_monitor_pk).delete()
db_endpoint.delete()
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
for db_driver_pk,_ in db_device.references(DriverModel):
DriverModel(self.database, db_driver_pk).delete()
db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_device.delete()
db_initial_config.delete()
db_running_config.delete()
return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
device_uuid = request.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
config_rules = {} if db_device is None else db_device.dump_initial_config()
return DeviceConfig(config_rules=config_rules)
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
@safe_and_metered_rpc_method(METRICS, LOGGER)
def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
kpi_uuid = request.kpi_id.kpi_id.uuid
subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
if subscribe:
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None:
msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_id = request.kpi_descriptor.endpoint_id
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
if db_endpoint is None:
msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
sample_type = request.kpi_descriptor.kpi_sample_type
attributes = {
'kpi_uuid' : request.kpi_id.kpi_id.uuid,
'kpi_description' : request.kpi_descriptor.kpi_description,
'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type),
'device_fk' : db_device,
'endpoint_fk' : db_endpoint,
'sampling_duration': request.sampling_duration_s,
'sampling_interval': request.sampling_interval_s,
}
result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
db_kpi, updated = result
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
db_endpoint_monitor : EndPointMonitorModel = get_object(
self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
if db_endpoint_monitor is None:
msg = 'SampleType({:s}/{:s}) not supported for EndPoint({:s}).'.format(
str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
str(endpoint_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
attributes = {
'endpoint_monitor_fk': db_endpoint_monitor,
'kpi_fk' : db_kpi,
}
result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
db_endpoint_monitor_kpi, updated = result
resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_subscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
self.monitoring_loops.add(device_uuid, driver)
else:
db_kpi : KpiModel = get_object(
self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
if db_kpi is None:
msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
db_device : DeviceModel = get_object(
self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
if db_device is None:
msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
device_uuid = db_device.device_uuid
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
if db_endpoint is None:
msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_uuid = db_endpoint.endpoint_uuid
str_endpoint_key = db_endpoint.pk
kpi_sample_type : ORM_KpiSampleType = db_kpi.kpi_sample_type
sample_type = kpi_sample_type.value
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
db_endpoint_monitor : EndPointMonitorModel = get_object(
self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
if db_endpoint_monitor is None:
msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
if db_endpoint_monitor_kpi is None:
msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_unsubscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
db_endpoint_monitor_kpi.delete()
db_kpi.delete()
# There is one monitoring loop per device; keep them active since they are re-used by different monitoring
# requests.
#self.monitoring_loops.remove(device_uuid)
return Empty()