from typing import List, Tuple import grpc, logging from prometheus_client import Counter, Histogram from common.Checkers import chk_options, chk_string from common.database.api.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID from common.database.api.Database import Database from common.database.api.context.OperationalStatus import OperationalStatus, operationalstatus_enum_values, \ to_operationalstatus_enum from common.exceptions.ServiceException import ServiceException from device.proto.context_pb2 import DeviceId, Device, Empty from device.proto.device_pb2_grpc import DeviceServiceServicer LOGGER = logging.getLogger(__name__) ADDDEVICE_COUNTER_STARTED = Counter ('device_adddevice_counter_started', 'Device:AddDevice counter of requests started' ) ADDDEVICE_COUNTER_COMPLETED = Counter ('device_adddevice_counter_completed', 'Device:AddDevice counter of requests completed') ADDDEVICE_COUNTER_FAILED = Counter ('device_adddevice_counter_failed', 'Device:AddDevice counter of requests failed' ) ADDDEVICE_HISTOGRAM_DURATION = Histogram('device_adddevice_histogram_duration', 'Device:AddDevice histogram of request duration') CONFIGUREDEVICE_COUNTER_STARTED = Counter ('device_configuredevice_counter_started', 'Device:ConfigureDevice counter of requests started' ) CONFIGUREDEVICE_COUNTER_COMPLETED = Counter ('device_configuredevice_counter_completed', 'Device:ConfigureDevice counter of requests completed') CONFIGUREDEVICE_COUNTER_FAILED = Counter ('device_configuredevice_counter_failed', 'Device:ConfigureDevice counter of requests failed' ) CONFIGUREDEVICE_HISTOGRAM_DURATION = Histogram('device_configuredevice_histogram_duration', 'Device:ConfigureDevice histogram of request duration') DELETEDEVICE_COUNTER_STARTED = Counter ('device_deletedevice_counter_started', 'Device:DeleteDevice counter of requests started' ) DELETEDEVICE_COUNTER_COMPLETED = Counter ('device_deletedevice_counter_completed', 'Device:DeleteDevice counter of requests completed') DELETEDEVICE_COUNTER_FAILED = Counter ('device_deletedevice_counter_failed', 'Device:DeleteDevice counter of requests failed' ) DELETEDEVICE_HISTOGRAM_DURATION = Histogram('device_deletedevice_histogram_duration', 'Device:DeleteDevice histogram of request duration') class DeviceServiceServicerImpl(DeviceServiceServicer): def __init__(self, database : Database): LOGGER.debug('Creating Servicer...') self.database = database LOGGER.debug('Servicer Created') @ADDDEVICE_HISTOGRAM_DURATION.time() def AddDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId: ADDDEVICE_COUNTER_STARTED.inc() try: LOGGER.debug('AddDevice request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- try: device_id = chk_string ('device.device_id.device_id.uuid', request.device_id.device_id.uuid, allow_empty=False) device_type = chk_string ('device.device_type', request.device_type, allow_empty=False) device_config = chk_string ('device.device_config.device_config', request.device_config.device_config, allow_empty=True) device_opstat = chk_options('device.devOperationalStatus', request.devOperationalStatus, operationalstatus_enum_values()) except Exception as e: LOGGER.exception('Invalid arguments:') raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) device_opstat = to_operationalstatus_enum(device_opstat) # should not happen because gRPC limits accepted values in enums if device_opstat is None: # pragma: no cover msg = 'Unsupported OperationalStatus({}).' # pragma: no cover msg = msg.format(request.devOperationalStatus) # pragma: no cover raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) # pragma: no cover if device_opstat == OperationalStatus.KEEP_STATE: msg = ' '.join([ 'Device has to be created with either ENABLED/DISABLED Operational State.', 'Use KEEP_STATE only in configure Device methods.', ]) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) db_context = self.database.context(DEFAULT_CONTEXT_ID).create() db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() if db_topology.devices.contains(device_id): msg = 'Device({}) already exists in the database.' msg = msg.format(device_id) raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg) added_endpoint_uuids = set() endpoint_pairs : List[Tuple[str, str]] = [] for i,endpoint in enumerate(request.endpointList): try: ep_context_id = chk_string('endpoint[#{}].port_id.topoId.contextId.contextUuid.uuid'.format(i), endpoint.port_id.topoId.contextId.contextUuid.uuid, allow_empty=True) ep_topology_id = chk_string('endpoint[#{}].port_id.topoId.topoId.uuid'.format(i), endpoint.port_id.topoId.topoId.uuid, allow_empty=True) ep_device_id = chk_string('endpoint[#{}].port_id.dev_id.device_id.uuid'.format(i), endpoint.port_id.dev_id.device_id.uuid, allow_empty=True) ep_port_id = chk_string('endpoint[#{}].port_id.port_id.uuid'.format(i), endpoint.port_id.port_id.uuid, allow_empty=False) ep_port_type = chk_string('endpoint[#{}].port_type'.format(i), endpoint.port_type, allow_empty=False) except Exception as e: LOGGER.exception('Invalid arguments:') raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) if (len(ep_context_id) > 0) and (ep_context_id != DEFAULT_CONTEXT_ID): msg = ' '.join([ 'Unsupported Context({}) in Endpoint(#{}) of Device({}).', 'Only default Context({}) is currently supported.', 'Optionally, leave field empty to use default Context.', ]) msg = msg.format(ep_context_id, i, device_id, DEFAULT_CONTEXT_ID) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) elif len(ep_context_id) == 0: ep_context_id = DEFAULT_CONTEXT_ID if (len(ep_topology_id) > 0) and (ep_topology_id != DEFAULT_TOPOLOGY_ID): msg = ' '.join([ 'Unsupported Topology({}) in Endpoint(#{}) of Device({}).', 'Only default Topology({}) is currently supported.', 'Optionally, leave field empty to use default Topology.', ]) msg = msg.format(ep_topology_id, i, device_id, DEFAULT_TOPOLOGY_ID) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) elif len(ep_topology_id) == 0: ep_topology_id = DEFAULT_TOPOLOGY_ID if (len(ep_device_id) > 0) and (ep_device_id != device_id): msg = ' '.join([ 'Wrong Device({}) in Endpoint(#{}).', 'Parent specified in message is Device({}).', 'Optionally, leave field empty to use parent Device.', ]) msg = msg.format(ep_device_id, i, device_id) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) elif len(ep_device_id) == 0: ep_device_id = device_id if ep_port_id in added_endpoint_uuids: msg = 'Duplicated Port({}) in Endpoint(#{}) of Device({}).' msg = msg.format(ep_port_id, i, device_id) raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg) added_endpoint_uuids.add(ep_port_id) endpoint_pairs.append((ep_port_id, ep_port_type)) # ----- Implement changes in the database ------------------------------------------------------------------ db_device = db_topology.device(device_id).create(device_type, device_config, device_opstat) for port_id,port_type in endpoint_pairs: db_device.endpoint(port_id).create(port_type) # ----- Compose reply -------------------------------------------------------------------------------------- reply = DeviceId(**db_device.dump_id()) LOGGER.debug('AddDevice reply: {}'.format(str(reply))) ADDDEVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('AddDevice exception') # pragma: no cover ADDDEVICE_COUNTER_FAILED.inc() # pragma: no cover grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover @CONFIGUREDEVICE_HISTOGRAM_DURATION.time() def ConfigureDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId: CONFIGUREDEVICE_COUNTER_STARTED.inc() try: LOGGER.debug('ConfigureDevice request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- try: device_id = chk_string ('device.device_id.device_id.uuid', request.device_id.device_id.uuid, allow_empty=False) device_type = chk_string ('device.device_type', request.device_type, allow_empty=True) device_config = chk_string ('device.device_config.device_config', request.device_config.device_config, allow_empty=True) device_opstat = chk_options('device.devOperationalStatus', request.devOperationalStatus, operationalstatus_enum_values()) except Exception as e: LOGGER.exception('Invalid arguments:') raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) device_opstat = to_operationalstatus_enum(device_opstat) # should not happen because gRPC limits accepted values in enums if device_opstat is None: # pragma: no cover msg = 'Unsupported OperationalStatus({}).' # pragma: no cover msg = msg.format(request.devOperationalStatus) # pragma: no cover raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) # pragma: no cover db_context = self.database.context(DEFAULT_CONTEXT_ID).create() db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() if not db_topology.devices.contains(device_id): msg = 'Device({}) does not exist in the database.' msg = msg.format(device_id) raise ServiceException(grpc.StatusCode.NOT_FOUND, msg) db_device = db_topology.device(device_id) db_device_attributes = db_device.attributes.get(attributes=['device_type']) # should not happen, device creation through Database API ensures all fields are always present if len(db_device_attributes) == 0: # pragma: no cover msg = 'Attribute device_type for Device({}) does not exist in the database.' # pragma: no cover msg = msg.format(device_id) # pragma: no cover raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg) # pragma: no cover db_device_type = db_device_attributes.get('device_type') # should not happen, device creation through Database API ensures all fields are always present if len(db_device_type) == 0: # pragma: no cover msg = 'Attribute device_type for Device({}) is empty in the database.' # pragma: no cover msg = msg.format(device_id) # pragma: no cover raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg) # pragma: no cover if db_device_type != device_type: msg = 'Device({}) has Type({}) in the database. Cannot be changed to Type({}).' msg = msg.format(device_id, db_device_type, device_type) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) if len(request.endpointList) > 0: msg = 'Endpoints belonging to Device({}) cannot be modified.' msg = msg.format(device_id) raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) update_attributes = {} if len(device_config) > 0: update_attributes['device_config'] = device_config if device_opstat != OperationalStatus.KEEP_STATE: update_attributes['device_operational_status'] = device_opstat if len(update_attributes) == 0: msg = ' '.join([ 'Any change has been requested for Device({}).', 'Either specify a new configuration or a new device operational status.', ]) msg = msg.format(device_id) raise ServiceException(grpc.StatusCode.ABORTED, msg) # ----- Implement changes in the database ------------------------------------------------------------------ db_device.update(update_attributes=update_attributes) # ----- Compose reply -------------------------------------------------------------------------------------- reply = DeviceId(**db_device.dump_id()) LOGGER.debug('ConfigureDevice reply: {}'.format(str(reply))) CONFIGUREDEVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('ConfigureDevice exception') # pragma: no cover CONFIGUREDEVICE_COUNTER_FAILED.inc() # pragma: no cover grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover @DELETEDEVICE_HISTOGRAM_DURATION.time() def DeleteDevice(self, request : DeviceId, grpc_context : grpc.ServicerContext) -> Empty: DELETEDEVICE_COUNTER_STARTED.inc() try: LOGGER.debug('DeleteDevice request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- try: device_id = chk_string('device_id.device_id.uuid', request.device_id.uuid, allow_empty=False) except Exception as e: LOGGER.exception('Invalid arguments:') raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) db_context = self.database.context(DEFAULT_CONTEXT_ID).create() db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create() if not db_topology.devices.contains(device_id): msg = 'Device({}) does not exist in the database.' msg = msg.format(device_id) raise ServiceException(grpc.StatusCode.NOT_FOUND, msg) # ----- Implement changes in the database ------------------------------------------------------------------ db_topology.device(device_id).delete() # ----- Compose reply -------------------------------------------------------------------------------------- reply = Empty() LOGGER.debug('DeleteDevice reply: {}'.format(str(reply))) DELETEDEVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteDevice exception') # pragma: no cover DELETEDEVICE_COUNTER_FAILED.inc() # pragma: no cover grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover