Newer
Older

Lluis Gifre Renom
committed
import grpc, logging
from prometheus_client import Counter, Histogram
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
from common.database.api.context.topology.device.OperationalStatus import OperationalStatus, \
operationalstatus_enum_values, to_operationalstatus_enum
from common.exceptions.ServiceException import ServiceException
from device.proto.context_pb2 import DeviceId, Device, Empty

Lluis Gifre Renom
committed
from device.proto.device_pb2_grpc import DeviceServiceServicer
LOGGER = logging.getLogger(__name__)
ADDDEVICE_COUNTER_STARTED = Counter ('device_adddevice_counter_started',
'Device:AddDevice counter of requests started' )
ADDDEVICE_COUNTER_COMPLETED = Counter ('device_adddevice_counter_completed',
'Device:AddDevice counter of requests completed')
ADDDEVICE_COUNTER_FAILED = Counter ('device_adddevice_counter_failed',
'Device:AddDevice counter of requests failed' )
ADDDEVICE_HISTOGRAM_DURATION = Histogram('device_adddevice_histogram_duration',
'Device:AddDevice histogram of request duration')
CONFIGUREDEVICE_COUNTER_STARTED = Counter ('device_configuredevice_counter_started',
'Device:ConfigureDevice counter of requests started' )
CONFIGUREDEVICE_COUNTER_COMPLETED = Counter ('device_configuredevice_counter_completed',
'Device:ConfigureDevice counter of requests completed')
CONFIGUREDEVICE_COUNTER_FAILED = Counter ('device_configuredevice_counter_failed',
'Device:ConfigureDevice counter of requests failed' )
CONFIGUREDEVICE_HISTOGRAM_DURATION = Histogram('device_configuredevice_histogram_duration',
'Device:ConfigureDevice histogram of request duration')
DELETEDEVICE_COUNTER_STARTED = Counter ('device_deletedevice_counter_started',
'Device:DeleteDevice counter of requests started' )
DELETEDEVICE_COUNTER_COMPLETED = Counter ('device_deletedevice_counter_completed',
'Device:DeleteDevice counter of requests completed')
DELETEDEVICE_COUNTER_FAILED = Counter ('device_deletedevice_counter_failed',
'Device:DeleteDevice counter of requests failed' )
DELETEDEVICE_HISTOGRAM_DURATION = Histogram('device_deletedevice_histogram_duration',
'Device:DeleteDevice histogram of request duration')
class DeviceServiceServicerImpl(DeviceServiceServicer):

Lluis Gifre Renom
committed
LOGGER.debug('Creating Servicer...')
self.database = database
LOGGER.debug('Servicer Created')
@ADDDEVICE_HISTOGRAM_DURATION.time()
def AddDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId:

Lluis Gifre Renom
committed
ADDDEVICE_COUNTER_STARTED.inc()
try:
LOGGER.debug('AddDevice request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
try:
device_id = chk_string ('device.device_id.device_id.uuid',
request.device_id.device_id.uuid,
allow_empty=False)
device_type = chk_string ('device.device_type',
request.device_type,
allow_empty=False)
device_config = chk_string ('device.device_config.device_config',
request.device_config.device_config,
allow_empty=True)
device_opstat = chk_options('device.devOperationalStatus',
request.devOperationalStatus,
operationalstatus_enum_values())
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
device_opstat = to_operationalstatus_enum(device_opstat)
# should not happen because gRPC limits accepted values in enums
if device_opstat is None: # pragma: no cover
msg = 'Unsupported OperationalStatus({}).' # pragma: no cover
msg = msg.format(request.devOperationalStatus) # pragma: no cover
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) # pragma: no cover
if device_opstat == OperationalStatus.KEEP_STATE:
msg = ' '.join([
'Device has to be created with either ENABLED/DISABLED Operational State.',
'Use KEEP_STATE only in configure Device methods.',
])
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
if db_topology.devices.contains(device_id):
msg = 'Device({}) already exists in the database.'
msg = msg.format(device_id)
raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)
added_endpoint_uuids = set()
endpoint_pairs : List[Tuple[str, str]] = []
for i,endpoint in enumerate(request.endpointList):
try:
ep_context_id = chk_string('endpoint[#{}].port_id.topoId.contextId.contextUuid.uuid'.format(i),
endpoint.port_id.topoId.contextId.contextUuid.uuid,
allow_empty=True)
ep_topology_id = chk_string('endpoint[#{}].port_id.topoId.topoId.uuid'.format(i),
endpoint.port_id.topoId.topoId.uuid,
allow_empty=True)
ep_device_id = chk_string('endpoint[#{}].port_id.dev_id.device_id.uuid'.format(i),
endpoint.port_id.dev_id.device_id.uuid,
allow_empty=True)
ep_port_id = chk_string('endpoint[#{}].port_id.port_id.uuid'.format(i),
endpoint.port_id.port_id.uuid,
allow_empty=False)
ep_port_type = chk_string('endpoint[#{}].port_type'.format(i),
endpoint.port_type,
allow_empty=False)
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
if (len(ep_context_id) > 0) and (ep_context_id != DEFAULT_CONTEXT_ID):
'Unsupported Context({}) in Endpoint(#{}) of Device({}).',
'Only default Context({}) is currently supported.',
'Optionally, leave field empty to use default Context.',
msg = msg.format(ep_context_id, i, device_id, DEFAULT_CONTEXT_ID)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
elif len(ep_context_id) == 0:
ep_context_id = DEFAULT_CONTEXT_ID
if (len(ep_topology_id) > 0) and (ep_topology_id != DEFAULT_TOPOLOGY_ID):
'Unsupported Topology({}) in Endpoint(#{}) of Device({}).',
'Only default Topology({}) is currently supported.',
'Optionally, leave field empty to use default Topology.',
msg = msg.format(ep_topology_id, i, device_id, DEFAULT_TOPOLOGY_ID)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
elif len(ep_topology_id) == 0:
ep_topology_id = DEFAULT_TOPOLOGY_ID
if (len(ep_device_id) > 0) and (ep_device_id != device_id):
'Wrong Device({}) in Endpoint(#{}).',
'Parent specified in message is Device({}).',
'Optionally, leave field empty to use parent Device.',
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
elif len(ep_device_id) == 0:
ep_device_id = device_id
if ep_port_id in added_endpoint_uuids:
msg = 'Duplicated Port({}) in Endpoint(#{}) of Device({}).'
msg = msg.format(ep_port_id, i, device_id)
raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)
added_endpoint_uuids.add(ep_port_id)
endpoint_pairs.append((ep_port_id, ep_port_type))
# ----- Implement changes in the database ------------------------------------------------------------------
db_device = db_topology.device(device_id).create(device_type, device_config, device_opstat)
for port_id,port_type in endpoint_pairs: db_device.endpoint(port_id).create(port_type)
# ----- Compose reply --------------------------------------------------------------------------------------
LOGGER.debug('AddDevice reply: {}'.format(str(reply)))

Lluis Gifre Renom
committed
ADDDEVICE_COUNTER_COMPLETED.inc()
return reply
LOGGER.exception('AddDevice exception')
ADDDEVICE_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('AddDevice exception')
ADDDEVICE_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))

Lluis Gifre Renom
committed
@CONFIGUREDEVICE_HISTOGRAM_DURATION.time()
def ConfigureDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId:

Lluis Gifre Renom
committed
CONFIGUREDEVICE_COUNTER_STARTED.inc()
try:
LOGGER.debug('ConfigureDevice request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
try:
device_id = chk_string ('device.device_id.device_id.uuid',
request.device_id.device_id.uuid,
allow_empty=False)
device_type = chk_string ('device.device_type',
request.device_type,
allow_empty=True)
device_config = chk_string ('device.device_config.device_config',
request.device_config.device_config,
allow_empty=True)
device_opstat = chk_options('device.devOperationalStatus',
request.devOperationalStatus,
operationalstatus_enum_values())
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
device_opstat = to_operationalstatus_enum(device_opstat)
# should not happen because gRPC limits accepted values in enums
if device_opstat is None: # pragma: no cover
msg = 'Unsupported OperationalStatus({}).' # pragma: no cover
msg = msg.format(request.devOperationalStatus) # pragma: no cover
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg) # pragma: no cover
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
if not db_topology.devices.contains(device_id):
msg = 'Device({}) does not exist in the database.'
msg = msg.format(device_id)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
db_device_attributes = db_device.attributes.get(attributes=['device_type'])
# should not happen, device creation through Database API ensures all fields are always present
if len(db_device_attributes) == 0: # pragma: no cover
msg = 'Attribute device_type for Device({}) does not exist in the database.' # pragma: no cover
msg = msg.format(device_id) # pragma: no cover
raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg) # pragma: no cover
db_device_type = db_device_attributes.get('device_type')
# should not happen, device creation through Database API ensures all fields are always present
if len(db_device_type) == 0: # pragma: no cover
msg = 'Attribute device_type for Device({}) is empty in the database.' # pragma: no cover
msg = msg.format(device_id) # pragma: no cover
raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg) # pragma: no cover
msg = 'Device({}) has Type({}) in the database. Cannot be changed to Type({}).'
msg = msg.format(device_id, db_device_type, device_type)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
if len(request.endpointList) > 0:
msg = 'Endpoints belonging to Device({}) cannot be modified.'
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
update_attributes = {}
if len(device_config) > 0:
update_attributes['device_config'] = device_config
if device_opstat != OperationalStatus.KEEP_STATE:
update_attributes['device_operational_status'] = device_opstat
if len(update_attributes) == 0:
msg = ' '.join([
'Any change has been requested for Device({}).',
'Either specify a new configuration or a new device operational status.',
raise ServiceException(grpc.StatusCode.ABORTED, msg)
# ----- Implement changes in the database ------------------------------------------------------------------
db_device.update(update_attributes=update_attributes)
# ----- Compose reply --------------------------------------------------------------------------------------
reply = DeviceId(**db_device.dump_id())
LOGGER.debug('ConfigureDevice reply: {}'.format(str(reply)))

Lluis Gifre Renom
committed
CONFIGUREDEVICE_COUNTER_COMPLETED.inc()
return reply
LOGGER.exception('ConfigureDevice exception')
CONFIGUREDEVICE_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('ConfigureDevice exception')
CONFIGUREDEVICE_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))

Lluis Gifre Renom
committed
@DELETEDEVICE_HISTOGRAM_DURATION.time()
def DeleteDevice(self, request : DeviceId, grpc_context : grpc.ServicerContext) -> Empty:

Lluis Gifre Renom
committed
DELETEDEVICE_COUNTER_STARTED.inc()
try:
LOGGER.debug('DeleteDevice request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
try:
device_id = chk_string('device_id.device_id.uuid',
request.device_id.uuid,
allow_empty=False)
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
if not db_topology.devices.contains(device_id):
msg = 'Device({}) does not exist in the database.'
msg = msg.format(device_id)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
# ----- Implement changes in the database ------------------------------------------------------------------
db_topology.device(device_id).delete()
# ----- Compose reply --------------------------------------------------------------------------------------
reply = Empty()
LOGGER.debug('DeleteDevice reply: {}'.format(str(reply)))

Lluis Gifre Renom
committed
DELETEDEVICE_COUNTER_COMPLETED.inc()
return reply
LOGGER.exception('DeleteDevice exception')
DELETEDEVICE_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteDevice exception')
DELETEDEVICE_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))