from typing import List, Tuple import grpc, logging from prometheus_client import Counter, Histogram from common.Checkers import chk_options, chk_string from common.database.api.Database import Database from common.database.api.context.OperationalStatus import OperationalStatus, operationalstatus_enum_values, \ to_operationalstatus_enum from common.exceptions.ServiceException import ServiceException from device.proto.context_pb2 import DeviceId, Device, Empty from device.proto.device_pb2_grpc import DeviceServiceServicer LOGGER = logging.getLogger(__name__) DEFAULT_CONTEXT_ID = 'admin' DEFAULT_TOPOLOGY_ID = 'admin' ADDDEVICE_COUNTER_STARTED = Counter ('device_adddevice_counter_started', 'Device:AddDevice counter of requests started' ) ADDDEVICE_COUNTER_COMPLETED = Counter ('device_adddevice_counter_completed', 'Device:AddDevice counter of requests completed') ADDDEVICE_COUNTER_FAILED = Counter ('device_adddevice_counter_failed', 'Device:AddDevice counter of requests failed' ) ADDDEVICE_HISTOGRAM_DURATION = Histogram('device_adddevice_histogram_duration', 'Device:AddDevice histogram of request duration') CONFIGUREDEVICE_COUNTER_STARTED = Counter ('device_configuredevice_counter_started', 'Device:ConfigureDevice counter of requests started' ) CONFIGUREDEVICE_COUNTER_COMPLETED = Counter ('device_configuredevice_counter_completed', 'Device:ConfigureDevice counter of requests completed') CONFIGUREDEVICE_COUNTER_FAILED = Counter ('device_configuredevice_counter_failed', 'Device:ConfigureDevice counter of requests failed' ) CONFIGUREDEVICE_HISTOGRAM_DURATION = Histogram('device_configuredevice_histogram_duration', 'Device:ConfigureDevice histogram of request duration') DELETEDEVICE_COUNTER_STARTED = Counter ('device_deletedevice_counter_started', 'Device:DeleteDevice counter of requests started' ) DELETEDEVICE_COUNTER_COMPLETED = Counter ('device_deletedevice_counter_completed', 'Device:DeleteDevice counter of requests completed') DELETEDEVICE_COUNTER_FAILED = Counter ('device_deletedevice_counter_failed', 'Device:DeleteDevice counter of requests failed' ) DELETEDEVICE_HISTOGRAM_DURATION = Histogram('device_deletedevice_histogram_duration', 'Device:DeleteDevice histogram of request duration') class DeviceServiceServicerImpl(DeviceServiceServicer): def __init__(self, database : Database): LOGGER.debug('Creating Servicer...') self.database = database LOGGER.debug('Servicer Created') @ADDDEVICE_HISTOGRAM_DURATION.time() def AddDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId: ADDDEVICE_COUNTER_STARTED.inc() try: LOGGER.debug('AddDevice request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- try: device_uuid = chk_string('device_uuid', request.device_id.device_id.uuid, allow_empty=False) device_type = chk_string('device_type', request.device_type, allow_empty=False) device_config = chk_string('device_config', request.device_config.device_config, allow_empty=True) device_opstat = chk_options('devOperationalStatus', request.devOperationalStatus, operationalstatus_enum_values()) except Exception as e: LOGGER.exception('Invalid arguments:') raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) device_opstat = to_operationalstatus_enum(device_opstat) if device_opstat == OperationalStatus.KEEP_STATE: msg = ' '.join([ 'Device has to be created with either ENABLED/DISABLED Operational State.', 'Use KEEP_STATE only in configure Device methods.', ]) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) db_context = self.database.context(DEFAULT_CONTEXT_ID).create() db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() if db_topology.devices.contains(device_uuid): msg = 'device_uuid({}) already exists.' msg = msg.format(device_uuid) raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg) added_endpoint_uuids = set() endpoint_pairs : List[Tuple[str, str]] = [] for i,endpoint in enumerate(request.endpointList): contextId = endpoint.port_id.topoId.contextId.contextUuid.uuid if (len(contextId) > 0) and (contextId != DEFAULT_CONTEXT_ID): msg = ' '.join([ 'Unsupported context_id({}) in endpoint #{}.', 'Only default context_id({}) is currently supported.', 'Optionally, leave field empty to use default context_id.', ]) msg = msg.format(contextId, i, DEFAULT_CONTEXT_ID) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) elif len(contextId) == 0: contextId = DEFAULT_CONTEXT_ID topoId = endpoint.port_id.topoId.topoId.uuid if (len(topoId) > 0) and (topoId != DEFAULT_TOPOLOGY_ID): msg = ' '.join([ 'Unsupported topology_id({}) in endpoint #{}.', 'Only default topology_id({}) is currently supported.', 'Optionally, leave field empty to use default topology_id.', ]) msg = msg.format(topoId, i, DEFAULT_TOPOLOGY_ID) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) elif len(topoId) == 0: topoId = DEFAULT_TOPOLOGY_ID dev_id = endpoint.port_id.dev_id.device_id.uuid if (len(dev_id) > 0) and (dev_id != device_uuid): msg = ' '.join([ 'Wrong device_id({}) in endpoint #{}.', 'Parent specified in message is device_id({}).', 'Optionally, leave field empty to use parent device_id.', ]) msg = msg.format(dev_id, i, device_uuid) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) elif len(dev_id) == 0: dev_id = device_uuid try: port_id = chk_string('port_uuid', endpoint.port_id.port_id.uuid, allow_empty=False) port_type = chk_string('port_type', endpoint.port_type, allow_empty=False) except Exception as e: LOGGER.exception('Invalid arguments:') raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) if port_id in added_endpoint_uuids: msg = 'Duplicated port_id({}) in device_id({}).' msg = msg.format(port_id, device_uuid) raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg) added_endpoint_uuids.add(port_id) endpoint_pairs.append((port_id, port_type)) # ----- Implement changes in database ---------------------------------------------------------------------- db_device = db_topology.device(device_uuid).create(device_type, device_config, device_opstat) for port_id,port_type in endpoint_pairs: db_device.endpoint(port_id).create(port_type) # ----- Compose reply -------------------------------------------------------------------------------------- reply = DeviceId(device_id=dict(uuid=device_uuid)) LOGGER.debug('AddDevice reply: {}'.format(str(reply))) ADDDEVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('AddDevice exception') # pragma: no cover ADDDEVICE_COUNTER_FAILED.inc() # pragma: no cover grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover @CONFIGUREDEVICE_HISTOGRAM_DURATION.time() def ConfigureDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId: CONFIGUREDEVICE_COUNTER_STARTED.inc() try: LOGGER.info('ConfigureDevice request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- try: device_uuid = chk_string('device_uuid', request.device_id.device_id.uuid, allow_empty=False) device_type = chk_string('device_type', request.device_type, allow_empty=True) device_config = chk_string('device_config', request.device_config.device_config, allow_empty=True) device_opstat = chk_options('devOperationalStatus', request.devOperationalStatus, operationalstatus_enum_values()) except Exception as e: LOGGER.exception('Invalid arguments:') raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) device_opstat = to_operationalstatus_enum(device_opstat) if device_opstat is None: msg = 'Unsupported OperationalStatus({}).' msg = msg.format(request.devOperationalStatus) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) db_context = self.database.context(DEFAULT_CONTEXT_ID).create() db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() if not db_topology.devices.contains(device_uuid): msg = 'device_uuid({}) does not exist.' msg = msg.format(device_uuid) raise ServiceException(grpc.StatusCode.NOT_FOUND, msg) db_device = db_topology.device(device_uuid) db_device_attributes = db_device.attributes.get(attributes=['device_type']) if len(db_device_attributes) == 0: msg = 'attribute device_type for device_uuid({}) does not exist.' msg = msg.format(device_uuid) raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg) db_device_type = db_device_attributes.get('device_type') if len(db_device_type) == 0: msg = 'attribute device_type for device_uuid({}) is empty.' msg = msg.format(device_uuid) raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg) if db_device_type != device_type: msg = 'Device({}) has Type({}). Cannot be changed to Type({}).' msg = msg.format(device_uuid, db_device_type, device_type) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) if len(request.endpointList) > 0: msg = 'Endpoints belonging to Device({}) cannot be modified.' msg = msg.format(device_uuid) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) update_attributes = {} if len(device_config) > 0: update_attributes['device_config'] = device_config if device_opstat != OperationalStatus.KEEP_STATE: update_attributes['device_operational_status'] = device_opstat LOGGER.info('update_attributes={}'.format(str(update_attributes))) if len(update_attributes) == 0: msg = ' '.join([ 'Any change has been requested for Device({}).', 'Either specify a new configuration or a new device state.', ]) msg = msg.format(device_uuid) raise ServiceException(grpc.StatusCode.ABORTED, msg) # ----- Implement changes in database ---------------------------------------------------------------------- db_device.update(update_attributes=update_attributes) # ----- Compose reply -------------------------------------------------------------------------------------- reply = DeviceId(device_id=dict(uuid=device_uuid)) LOGGER.info('ConfigureDevice reply: {}'.format(str(reply))) CONFIGUREDEVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('ConfigureDevice exception') # pragma: no cover CONFIGUREDEVICE_COUNTER_FAILED.inc() # pragma: no cover grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover @DELETEDEVICE_HISTOGRAM_DURATION.time() def DeleteDevice(self, request : DeviceId, grpc_context : grpc.ServicerContext) -> Empty: DELETEDEVICE_COUNTER_STARTED.inc() try: LOGGER.debug('DeleteDevice request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- try: device_uuid = chk_string('device_uuid', request.device_id.uuid, allow_empty=False) except Exception as e: LOGGER.exception('Invalid arguments:') raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) db_context = self.database.context(DEFAULT_CONTEXT_ID).create() db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() if not db_topology.devices.contains(device_uuid): msg = 'device_uuid({}) does not exist.' msg = msg.format(device_uuid) raise ServiceException(grpc.StatusCode.NOT_FOUND, msg) # ----- Implement changes in database ---------------------------------------------------------------------- db_topology.device(device_uuid).delete() # ----- Compose reply -------------------------------------------------------------------------------------- reply = Empty() LOGGER.debug('DeleteDevice reply: {}'.format(str(reply))) DELETEDEVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteDevice exception') # pragma: no cover DELETEDEVICE_COUNTER_FAILED.inc() # pragma: no cover grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover