Newer
Older
from typing import Any, List, Tuple
from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database
from common.orm.Factory import get_database_backend
from common.orm.HighLevel import get_object, update_or_create_object
from common.orm.backend.BackendEnum import BackendEnum
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from common.type_checkers.Checkers import chk_integer
from context.client.ContextClient import ContextClient
from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings

Lluis Gifre Renom
committed
from device.proto.device_pb2_grpc import DeviceServiceServicer
from device.service.drivers.openconfig.OpenConfigDriver import OpenConfigDriver
from .MonitoringLoops import MonitoringLoops
from .database.ConfigModel import (
ConfigModel, ConfigRuleModel, ORM_ConfigActionEnum, get_config_rules, grpc_config_rules_to_raw, update_config)
from .database.DatabaseTools import (
delete_device_from_context, get_device_driver_filter_fields, sync_device_from_context, sync_device_to_context,
update_device_in_local_database)
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel
from .database.KpiModel import KpiModel
from .database.KpiSampleType import grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
check_delete_errors, check_set_errors, check_subscribe_errors, check_unsubscribe_errors)

Lluis Gifre Renom
committed
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'Device'
METHOD_NAMES = ['AddDevice', 'ConfigureDevice', 'DeleteDevice', 'GetInitialConfig', 'MonitorDeviceKpi']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)

Lluis Gifre Renom
committed
class DeviceServiceServicerImpl(DeviceServiceServicer):
def __init__(
self, context_client : ContextClient, driver_instance_cache : DriverInstanceCache,
monitoring_loops : MonitoringLoops):

Lluis Gifre Renom
committed
LOGGER.debug('Creating Servicer...')
self.context_client = context_client
self.database = Database(get_database_backend(backend=BackendEnum.INMEMORY))
self.driver_instance_cache = driver_instance_cache
self.monitoring_loops = monitoring_loops

Lluis Gifre Renom
committed
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS, LOGGER)
def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
connection_config_rules = {}
unexpected_config_rules = []
for config_rule in request.device_config.config_rules:
if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \
(config_rule.resource_key.startswith('_connect/')):
connection_config_rules[config_rule.resource_key.replace('_connect/', '')] = config_rule.resource_value
else:
unexpected_config_rules.append(config_rule)
if len(unexpected_config_rules) > 0:
unexpected_config_rules = MessageToDict(
request.device_config, including_default_value_fields=True,
preserving_proto_field_name=True, use_integers_for_enums=True)
unexpected_config_rules = unexpected_config_rules['config_rules']
unexpected_config_rules = list(filter(
lambda cr: cr['resource_key'].replace('_connect/', '') not in connection_config_rules,
unexpected_config_rules))
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
'device.device_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\
'with "_connect/" tag. Others should be configured after adding the device.')
if len(request.device_endpoints) > 0:
unexpected_endpoints = MessageToDict(
request.device_endpoints, including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=True)
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'device.device_endpoints', str_unexpected_endpoints,
extra_details='RPC method AddDevice does not accept endpoints. Endpoints are discovered through '\
'interrogation of the physical device.')
# Remove device configuration
json_request = MessageToDict(
request, including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=True)
json_request['device_config'] = {}
request = Device(**json_request)
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device,_ = update_device_in_local_database(self.database, request)
driver_filter_fields = get_device_driver_filter_fields(db_device)
address = connection_config_rules.pop('address', None)
port = connection_config_rules.pop('port', None)
driver : _Driver = self.driver_instance_cache.get(
device_uuid, filter_fields=driver_filter_fields, address=address, port=port,
settings=connection_config_rules)
driver.Connect()
endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
for _, resource_value in endpoints:
endpoint_uuid = resource_value.get('name')
endpoint_type = resource_value.get('type')
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
update_or_create_object(
self.database, EndPointModel, str_endpoint_key, {
'device_fk' : db_device,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint_type,
})
running_config_rules = driver.GetConfig([RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES])
running_config_rules = [(ORM_ConfigActionEnum.SET, cr[0], json.dumps(cr[1])) for cr in running_config_rules]
#for running_config_rule in running_config_rules:
# LOGGER.info('[AddDevice] running_config_rule: {:s}'.format(str(running_config_rule)))
#context_config_rules = get_config_rules(self.database, device_uuid, 'running')
#LOGGER.info('[AddDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
# TODO: Compute diff between current context config and device config. The one in device is of higher priority
# (might happen another instance is updating config and context was not already updated)
update_config(self.database, device_uuid, 'running', running_config_rules)
initial_config_rules = driver.GetInitialConfig()
update_config(self.database, device_uuid, 'initial', initial_config_rules)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
device_id = request.device_id
device_uuid = device_id.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
context_config_rules = get_config_rules(self.database, device_uuid, 'running')
context_config_rules = {config_rule[1]: config_rule[2] for config_rule in context_config_rules}
LOGGER.info('[ConfigureDevice] context_config_rules = {:s}'.format(str(context_config_rules)))
db_device,_ = update_device_in_local_database(self.database, request)
request_config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
LOGGER.info('[ConfigureDevice] request_config_rules = {:s}'.format(str(request_config_rules)))
resources_to_set : List[Tuple[str, Any]] = [] # key, value
resources_to_delete : List[str] = [] # key
resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
for config_rule in request_config_rules:
action, key, value = config_rule
if action == ORM_ConfigActionEnum.SET:
if (key not in context_config_rules) or (context_config_rules[key] != value):
resources_to_set.append((key, value))
elif action == ORM_ConfigActionEnum.DELETE:
resources_to_delete.append(key)
# TODO: Implement configuration of subscriptions
# TODO: use of datastores (might be virtual ones) to enable rollbacks
errors = []
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
errors.append('Device({:s}) has not been added to this Device instance'.format(str(device_uuid)))
if len(errors) == 0:
results_setconfig = driver.SetConfig(resources_to_set)
errors.extend(check_set_errors(resources_to_set, results_setconfig))
if len(errors) == 0:
results_deleteconfig = driver.DeleteConfig(resources_to_delete)
errors.extend(check_delete_errors(resources_to_delete, results_deleteconfig))
if len(errors) == 0:
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors.extend(check_subscribe_errors(resources_to_delete, results_subscribestate))
if len(errors) == 0:
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors.extend(check_unsubscribe_errors(resources_to_delete, results_unsubscribestate))
if len(errors) > 0:
raise OperationFailedException('ConfigureDevice', extra_details=errors)
sync_device_to_context(db_device, self.context_client)
return DeviceId(**db_device.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
device_uuid = request.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
if db_device is None: return Empty()
self.driver_instance_cache.delete(device_uuid)
delete_device_from_context(db_device, self.context_client)
for db_endpoint_pk,_ in db_device.references(EndPointModel):
EndPointModel(self.database, db_endpoint_pk).delete()
for db_driver_pk,_ in db_device.references(DriverModel):
db_initial_config = ConfigModel(self.database, db_device.device_initial_config_fk)
for db_config_rule_pk,_ in db_initial_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_running_config = ConfigModel(self.database, db_device.device_running_config_fk)
for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_device.delete()
db_initial_config.delete()
db_running_config.delete()
return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
device_uuid = request.device_uuid.uuid
sync_device_from_context(device_uuid, self.context_client, self.database)
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
config_rules = {} if db_device is None else db_device.dump_initial_config()
return DeviceConfig(config_rules=config_rules)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
kpi_uuid = request.kpi_id.kpi_id.uuid
device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
endpoint_id = request.kpi_descriptor.endpoint_id
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint : EndPointModel = get_object(
self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
#db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, {
'kpi_uuid' : request.kpi_id.kpi_id.uuid,
'kpi_description' : request.kpi_descriptor.kpi_description,
'kpi_sample_type' : grpc_to_enum__kpi_sample_type(request.kpi_descriptor.kpi_sample_type),
'device_fk' : db_device,
'endpoint_fk' : db_endpoint,
'sampling_duration': request.sampling_duration_s,
'sampling_interval': request.sampling_interval_s,
})
db_kpi, updated = result
driver : _Driver = self.driver_instance_cache.get(device_uuid)
if driver is None:
msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
raise OperationFailedException('ConfigureDevice', extra_details=msg)
sampling_resource = driver.GetResource(db_endpoint.endpoint_uuid)
results = driver.SubscribeState([
(sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval),
])
assert len(results) == 4
for result in results: assert isinstance(result, bool) and result
self.monitoring_loops.add(device_uuid, driver)
return Empty()