Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging, re
from typing import Any, Dict, List, Tuple
from common.orm.Database import Database
from common.orm.HighLevel import get_object, update_or_create_object
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from common.tools.grpc.Tools import grpc_message_to_json
from context.client.ContextClient import ContextClient
from context.proto.kpi_sample_types_pb2 import KpiSampleType
from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings

Lluis Gifre Renom
committed
from device.proto.device_pb2_grpc import DeviceServiceServicer
from .database.ConfigModel import (
ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
from .database.DatabaseTools import (
delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel, EndPointMonitorModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type
from .database.RelationModels import EndPointMonitorKpiModel
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)
from .MonitoringLoops import MonitoringLoops

Lluis Gifre Renom
committed
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

Lluis Gifre Renom
committed
class DeviceServiceServicerImpl(DeviceServiceServicer):
self, database : Database, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops
) -> None:

Lluis Gifre Renom
committed
LOGGER.debug('Creating Servicer...')
self.database = database
self.driver_instance_cache = driver_instance_cache
self.monitoring_loops = monitoring_loops

Lluis Gifre Renom
committed
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS, LOGGER)
def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
connection_config_rules = {}
unexpected_config_rules = []
for config_rule in request.device_config.config_rules:
if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \
(config_rule.resource_key.startswith('_connect/')):
connection_config_rules[config_rule.resource_key.replace('_connect/', '')] = config_rule.resource_value
else:
unexpected_config_rules.append(config_rule)
if len(unexpected_config_rules) > 0:
unexpected_config_rules = grpc_message_to_json(request.device_config)
unexpected_config_rules = unexpected_config_rules['config_rules']
unexpected_config_rules = list(filter(
lambda cr: cr['resource_key'].replace('_connect/', '') not in connection_config_rules,
unexpected_config_rules))
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'device.device_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
'with "_connect/" tag. Others should be configured after adding the device.')
if len(request.device_endpoints) > 0:

Lluis Gifre Renom
committed
unexpected_endpoints = []
for device_endpoint in request.device_endpoints:
unexpected_endpoints.append(grpc_message_to_json(device_endpoint))
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'device.device_endpoints', str_unexpected_endpoints,

Lluis Gifre Renom
committed
extra_details='RPC method AddDevice does not accept Endpoints. Endpoints are discovered through '\
'interrogation of the physical device.')
# Remove device configuration
json_request = grpc_message_to_json(request, use_integers_for_enums=True)
json_request['device_config'] = {}
request = Device(**json_request)
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device,_ = update_device_in_local_database(self.database, request)
driver_filter_fields = get_device_driver_filter_fields(db_device)
#LOGGER.info('[AddDevice] connection_config_rules = {:s}'.format(str(connection_config_rules)))
address = connection_config_rules.pop('address', None)
port = connection_config_rules.pop('port', None)
settings = connection_config_rules.pop('settings', '{}')
try:
settings = json.loads(settings)
except ValueError as e:
raise InvalidArgumentException(
'device.device_config.config_rules[settings]', settings,
extra_details='_connect/settings Config Rules provided cannot be decoded as JSON dictionary.') from e
driver : _Driver = self.driver_instance_cache.get(
device_uuid, filter_fields=driver_filter_fields, address=address, port=port, settings=settings)
driver.Connect()
endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
try:
for resource_key, resource_value in endpoints:
if isinstance(resource_value, Exception):
LOGGER.error('Error retrieving "{:s}": {:s}'.format(str(RESOURCE_ENDPOINTS), str(resource_value)))
continue
endpoint_uuid = resource_value.get('uuid')
endpoint_type = resource_value.get('type')
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
db_endpoint, _ = update_or_create_object(
self.database, EndPointModel, str_endpoint_key, {
'device_fk' : db_device,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint_type,
'resource_key' : resource_key,
sample_types : Dict[int, str] = resource_value.get('sample_types', {})
for sample_type, monitor_resource_key in sample_types.items():
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
update_or_create_object(self.database, EndPointMonitorModel, str_endpoint_monitor_key, {
'endpoint_fk' : db_endpoint,
'resource_key' : monitor_resource_key,
'kpi_sample_type': grpc_to_enum__kpi_sample_type(sample_type),
})
except: # pylint: disable=bare-except
LOGGER.exception('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
raw_running_config_rules = driver.GetConfig()
running_config_rules = []
for resource_key, resource_value in raw_running_config_rules:
if isinstance(resource_value, Exception):
msg = 'Error retrieving config rules: {:s} => {:s}'
LOGGER.error(msg.format(str(resource_key), str(resource_value)))
continue
config_rule = (ORM_ConfigActionEnum.SET, resource_key, json.dumps(resource_value, sort_keys=True))
running_config_rules.append(config_rule)
#for running_config_rule in running_config_rules:
# LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
update_config(self.database, device_uuid, 'running', running_config_rules)
initial_config_rules = driver.GetInitialConfig()
update_config(self.database, device_uuid, 'initial', initial_config_rules)
#LOGGER.info('[AddDevice] db_device = {:s}'.format(str(db_device.dump(
# include_config_rules=True, include_drivers=True, include_endpoints=True))))
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
context_config_rules = get_config_rules(self.database, device_uuid, 'running')
context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
#LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
db_device,_ = update_device_in_local_database(self.database, request)
request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
#LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
resources_to_set : List[Tuple[str, Any]] = [] # key, value
resources_to_delete : List[Tuple[str, Any]] = [] # key, value
for config_rule in request_config_rules:
action, key, value = config_rule
if action == ORM_ConfigActionEnum.SET:
if (key not in context_config_rules) or (context_config_rules[key] != value):
resources_to_set.append((key, value))
elif action == ORM_ConfigActionEnum.DELETE:
if key in context_config_rules:
resources_to_delete.append((key, value))
#LOGGER.info('[ConfigureDevice] resources_to_set = {:s}'.format(str(resources_to_set)))
#LOGGER.info('[ConfigureDevice] resources_to_delete = {:s}'.format(str(resources_to_delete)))
# TODO: use of datastores (might be virtual ones) to enable rollbacks
errors = []
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
if len(errors) == 0:
results_setconfig = driver.SetConfig(resources_to_set)
errors.extend(check_set_errors(resources_to_set, results_setconfig))
if len(errors) == 0:
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
if len(errors) > 0:
raise OperationFailedException('ConfigureDevice', extra_details=errors)
running_config_rules = driver.GetConfig()
running_config_rules = [
(ORM_ConfigActionEnum.SET, config_rule[0], json.dumps(config_rule[1], sort_keys=True))
for config_rule in running_config_rules
]
#for running_config_rule in running_config_rules:

Lluis Gifre Renom
committed
# LOGGER.info('[ConfigureDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
update_config(self.database, device_uuid, 'running', running_config_rules)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
device_uuid = request.device_uuid.uuid
self.monitoring_loops.remove(device_uuid)
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None: return Empty()
self.driver_instance_cache.delete(device_uuid)
delete_device_from_context(db_device, self.context_client)
for db_kpi_pk,_ in db_device.references(KpiModel):
db_kpi = get_object(self.database, KpiModel, db_kpi_pk)
for db_endpoint_monitor_kpi_pk,_ in db_kpi.references(EndPointMonitorKpiModel):
get_object(self.database, EndPointMonitorKpiModel, db_endpoint_monitor_kpi_pk).delete()
db_kpi.delete()
for db_endpoint_pk,_ in db_device.references(EndPointModel):
db_endpoint = EndPointModel(self.database, db_endpoint_pk)
for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
get_object(self.database, EndPointMonitorModel, db_endpoint_monitor_pk).delete()
for db_driver_pk,_ in db_device.references(DriverModel):
get_object(self.database, DriverModel, db_driver_pk).delete()
db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
get_object(self.database, ConfigRuleModel, db_config_rule_pk).delete()
db_device.delete()
db_initial_config.delete()
db_running_config.delete()
return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
device_uuid = request.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
config_rules = {} if db_device is None else db_device.dump_initial_config()
return DeviceConfig(config_rules=config_rules)
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
@safe_and_metered_rpc_method(METRICS, LOGGER)
def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
kpi_uuid = request.kpi_id.kpi_id.uuid
subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
if subscribe:
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None:
msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_id = request.kpi_descriptor.endpoint_id
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
if db_endpoint is None:
msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
sample_type = request.kpi_descriptor.kpi_sample_type
attributes = {
'kpi_uuid' : request.kpi_id.kpi_id.uuid,
'kpi_description' : request.kpi_descriptor.kpi_description,
'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type),
'device_fk' : db_device,
'endpoint_fk' : db_endpoint,
'sampling_duration': request.sampling_duration_s,
'sampling_interval': request.sampling_interval_s,
}
result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
db_kpi, updated = result
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
db_endpoint_monitor : EndPointMonitorModel = get_object(
self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
if db_endpoint_monitor is None:
msg = 'SampleType({:s}/{:s}) not supported for Device({:s})/EndPoint({:s}).'.format(
str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
str(device_uuid), str(endpoint_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
attributes = {
'endpoint_monitor_fk': db_endpoint_monitor,
'kpi_fk' : db_kpi,
}
result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
db_endpoint_monitor_kpi, updated = result
resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_subscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
self.monitoring_loops.add(device_uuid, driver)
else:
db_kpi : KpiModel = get_object(
self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
if db_kpi is None:
msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
db_device : DeviceModel = get_object(
self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
if db_device is None:
msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
device_uuid = db_device.device_uuid
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
if db_endpoint is None:
msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_uuid = db_endpoint.endpoint_uuid
str_endpoint_key = db_endpoint.pk
kpi_sample_type : ORM_KpiSampleTypeEnum = db_kpi.kpi_sample_type
sample_type = kpi_sample_type.value
str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
db_endpoint_monitor : EndPointMonitorModel = get_object(
self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
if db_endpoint_monitor is None:
msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
endpoint_monitor_resource_key = re.sub('[^A-Za-z0-9]', '.', db_endpoint_monitor.resource_key)
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
if db_endpoint_monitor_kpi is None:
msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_unsubscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
db_endpoint_monitor_kpi.delete()
db_kpi.delete()
# There is one monitoring loop per device; keep them active since they are re-used by different monitoring
# requests.
#self.monitoring_loops.remove(device_uuid)
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# Subscriptions are not stored as classical driver config.
# TODO: consider adding it somehow in the configuration.
# Warning: GetConfig might be very slow in OpenConfig devices
#running_config_rules = [
# (config_rule[0], json.dumps(config_rule[1], sort_keys=True))
# for config_rule in driver.GetConfig()
#]
#context_config_rules = {
# config_rule[1]: config_rule[2]
# for config_rule in get_config_rules(self.database, device_uuid, 'running')
#}
## each in context, not in running => delete in context
## each in running, not in context => add to context
## each in context and in running, context.value != running.value => update in context
#running_config_rules_actions : List[Tuple[ORM_ConfigActionEnum, str, str]] = []
#for config_rule_key,config_rule_value in running_config_rules:
# running_config_rules_actions.append((ORM_ConfigActionEnum.SET, config_rule_key, config_rule_value))
# context_config_rules.pop(config_rule_key, None)
#for context_rule_key,context_rule_value in context_config_rules.items():
# running_config_rules_actions.append((ORM_ConfigActionEnum.DELETE, context_rule_key, context_rule_value))
##msg = '[MonitorDeviceKpi] running_config_rules_action[{:d}]: {:s}'
##for i,running_config_rules_action in enumerate(running_config_rules_actions):
## LOGGER.info(msg.format(i, str(running_config_rules_action)))
#update_config(self.database, device_uuid, 'running', running_config_rules_actions)
sync_device_to_context(db_device, self.context_client)