Skip to content
Snippets Groups Projects
DeviceServiceServicerImpl.py 15.3 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import List, Tuple
import grpc, logging
from prometheus_client import Counter, Histogram
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.Checkers import chk_options, chk_string
from common.database.api.Database import Database
from common.database.api.context.OperationalStatus import OperationalStatus, operationalstatus_enum_values, \
    to_operationalstatus_enum
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.exceptions.ServiceException import ServiceException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from device.proto.context_pb2 import DeviceId, Device, Empty
from device.proto.device_pb2_grpc import DeviceServiceServicer

LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
DEFAULT_CONTEXT_ID = 'admin'
DEFAULT_TOPOLOGY_ID = 'admin'

ADDDEVICE_COUNTER_STARTED    = Counter  ('device_adddevice_counter_started',
                                          'Device:AddDevice counter of requests started'  )
ADDDEVICE_COUNTER_COMPLETED  = Counter  ('device_adddevice_counter_completed',
                                          'Device:AddDevice counter of requests completed')
ADDDEVICE_COUNTER_FAILED     = Counter  ('device_adddevice_counter_failed',
                                          'Device:AddDevice counter of requests failed'   )
ADDDEVICE_HISTOGRAM_DURATION = Histogram('device_adddevice_histogram_duration',
                                          'Device:AddDevice histogram of request duration')

CONFIGUREDEVICE_COUNTER_STARTED    = Counter  ('device_configuredevice_counter_started',
                                               'Device:ConfigureDevice counter of requests started'  )
CONFIGUREDEVICE_COUNTER_COMPLETED  = Counter  ('device_configuredevice_counter_completed',
                                               'Device:ConfigureDevice counter of requests completed')
CONFIGUREDEVICE_COUNTER_FAILED     = Counter  ('device_configuredevice_counter_failed',
                                               'Device:ConfigureDevice counter of requests failed'   )
CONFIGUREDEVICE_HISTOGRAM_DURATION = Histogram('device_configuredevice_histogram_duration',
                                               'Device:ConfigureDevice histogram of request duration')

DELETEDEVICE_COUNTER_STARTED    = Counter  ('device_deletedevice_counter_started',
                                            'Device:DeleteDevice counter of requests started'  )
DELETEDEVICE_COUNTER_COMPLETED  = Counter  ('device_deletedevice_counter_completed',
                                            'Device:DeleteDevice counter of requests completed')
DELETEDEVICE_COUNTER_FAILED     = Counter  ('device_deletedevice_counter_failed',
                                            'Device:DeleteDevice counter of requests failed'   )
DELETEDEVICE_HISTOGRAM_DURATION = Histogram('device_deletedevice_histogram_duration',
                                            'Device:DeleteDevice histogram of request duration')

class DeviceServiceServicerImpl(DeviceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, database : Database):
        LOGGER.debug('Creating Servicer...')
        self.database = database
        LOGGER.debug('Servicer Created')

    @ADDDEVICE_HISTOGRAM_DURATION.time()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def AddDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.debug('AddDevice request: {}'.format(str(request)))

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            try:
                device_uuid = chk_string('device_uuid', request.device_id.device_id.uuid, allow_empty=False)
                device_type = chk_string('device_type', request.device_type, allow_empty=False)
                device_config = chk_string('device_config', request.device_config.device_config, allow_empty=True)
                device_opstat = chk_options('devOperationalStatus', request.devOperationalStatus,
                                            operationalstatus_enum_values())
            except Exception as e:
                LOGGER.exception('Invalid arguments:')
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

            device_opstat = to_operationalstatus_enum(device_opstat)
            if device_opstat == OperationalStatus.KEEP_STATE:
                msg = ' '.join([
                    'Device has to be created with either ENABLED/DISABLED Operational State.',
                    'Use KEEP_STATE only in configure Device methods.',
                ])
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            if db_topology.devices.contains(device_uuid):
                msg = 'device_uuid({}) already exists.'
                msg = msg.format(device_uuid)
                raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)

            added_endpoint_uuids = set()
            endpoint_pairs : List[Tuple[str, str]] = []
            for i,endpoint in enumerate(request.endpointList):
                contextId = endpoint.port_id.topoId.contextId.contextUuid.uuid
                if (len(contextId) > 0) and (contextId != DEFAULT_CONTEXT_ID):
                    msg = ' '.join([
                        'Unsupported context_id({}) in endpoint #{}.',
                        'Only default context_id({}) is currently supported.',
                        'Optionally, leave field empty to use default context_id.',
                    ])
                    msg = msg.format(contextId, i, DEFAULT_CONTEXT_ID)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                elif len(contextId) == 0:
                    contextId = DEFAULT_CONTEXT_ID

                topoId = endpoint.port_id.topoId.topoId.uuid
                if (len(topoId) > 0) and (topoId != DEFAULT_TOPOLOGY_ID):
                    msg = ' '.join([
                        'Unsupported topology_id({}) in endpoint #{}.',
                        'Only default topology_id({}) is currently supported.',
                        'Optionally, leave field empty to use default topology_id.',
                    ])
                    msg = msg.format(topoId, i, DEFAULT_TOPOLOGY_ID)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                elif len(topoId) == 0:
                    topoId = DEFAULT_TOPOLOGY_ID

                dev_id = endpoint.port_id.dev_id.device_id.uuid
                if (len(dev_id) > 0) and (dev_id != device_uuid):
                    msg = ' '.join([
                        'Wrong device_id({}) in endpoint #{}.',
                        'Parent specified in message is device_id({}).',
                        'Optionally, leave field empty to use parent device_id.',
                    ])
                    msg = msg.format(dev_id, i, device_uuid)
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
                elif len(dev_id) == 0:
                    dev_id = device_uuid

                try:
                    port_id = chk_string('port_uuid', endpoint.port_id.port_id.uuid, allow_empty=False)
                    port_type = chk_string('port_type', endpoint.port_type, allow_empty=False)
                except Exception as e:
                    LOGGER.exception('Invalid arguments:')
                    raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

                if port_id in added_endpoint_uuids:
                    msg = 'Duplicated port_id({}) in device_id({}).'
                    msg = msg.format(port_id, device_uuid)
                    raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)

                added_endpoint_uuids.add(port_id)
                endpoint_pairs.append((port_id, port_type))

            # ----- Implement changes in database ----------------------------------------------------------------------
            db_device = db_topology.device(device_uuid).create(device_type, device_config, device_opstat)
            for port_id,port_type in endpoint_pairs: db_device.endpoint(port_id).create(port_type)

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = DeviceId(device_id=dict(uuid=device_uuid))
            LOGGER.debug('AddDevice reply: {}'.format(str(reply)))
            ADDDEVICE_COUNTER_COMPLETED.inc()
            return reply
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except ServiceException as e:
            grpc_context.abort(e.code, e.details)
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('AddDevice exception')                 # pragma: no cover
            ADDDEVICE_COUNTER_FAILED.inc()                          # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def ConfigureDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId:
        CONFIGUREDEVICE_COUNTER_STARTED.inc()
        try:
            LOGGER.info('ConfigureDevice request: {}'.format(str(request)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            try:
                device_uuid = chk_string('device_uuid', request.device_id.device_id.uuid, allow_empty=False)
                device_type = chk_string('device_type', request.device_type, allow_empty=True)
                device_config = chk_string('device_config', request.device_config.device_config, allow_empty=True)
                device_opstat = chk_options('devOperationalStatus', request.devOperationalStatus,
                                            operationalstatus_enum_values())
            except Exception as e:
                LOGGER.exception('Invalid arguments:')
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

            device_opstat = to_operationalstatus_enum(device_opstat)
            if device_opstat is None:
                msg = 'Unsupported OperationalStatus({}).'
                msg = msg.format(request.devOperationalStatus)
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            if not db_topology.devices.contains(device_uuid):
                msg = 'device_uuid({}) does not exist.'
                msg = msg.format(device_uuid)
                raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

            db_device = db_topology.device(device_uuid)
            db_device_attributes = db_device.attributes.get(attributes=['device_type'])
            if len(db_device_attributes) == 0:
                msg = 'attribute device_type for device_uuid({}) does not exist.'
                msg = msg.format(device_uuid)
                raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg)

            db_device_type = db_device_attributes.get('device_type')
            if len(db_device_type) == 0:
                msg = 'attribute device_type for device_uuid({}) is empty.'
                msg = msg.format(device_uuid)
                raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg)

            if db_device_type != device_type:
                msg = 'Device({}) has Type({}). Cannot be changed to Type({}).'
                msg = msg.format(device_uuid, db_device_type, device_type)
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

            if len(request.endpointList) > 0:
                msg = 'Endpoints belonging to Device({}) cannot be modified.'
                msg = msg.format(device_uuid)
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)

            update_attributes = {}

            if len(device_config) > 0:
                update_attributes['device_config'] = device_config
            
            if device_opstat != OperationalStatus.KEEP_STATE:
                update_attributes['device_operational_status'] = device_opstat

            LOGGER.info('update_attributes={}'.format(str(update_attributes)))

            if len(update_attributes) == 0:
                msg = ' '.join([
                    'Any change has been requested for Device({}).',
                    'Either specify a new configuration or a new device state.',
                ])
                msg = msg.format(device_uuid)
                raise ServiceException(grpc.StatusCode.ABORTED, msg)

            # ----- Implement changes in database ----------------------------------------------------------------------
            db_device.update(update_attributes=update_attributes)

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = DeviceId(device_id=dict(uuid=device_uuid))
            LOGGER.info('ConfigureDevice reply: {}'.format(str(reply)))
            CONFIGUREDEVICE_COUNTER_COMPLETED.inc()
            return reply
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except ServiceException as e:
            grpc_context.abort(e.code, e.details)
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('ConfigureDevice exception')           # pragma: no cover
            CONFIGUREDEVICE_COUNTER_FAILED.inc()                    # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def DeleteDevice(self, request : DeviceId, grpc_context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            LOGGER.debug('DeleteDevice request: {}'.format(str(request)))

            # ----- Validate request data and pre-conditions -----------------------------------------------------------
            try:
                device_uuid = chk_string('device_uuid', request.device_id.uuid, allow_empty=False)
            except Exception as e:
                LOGGER.exception('Invalid arguments:')
                raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))

            db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
            db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()

            if not db_topology.devices.contains(device_uuid):
                msg = 'device_uuid({}) does not exist.'
                msg = msg.format(device_uuid)
                raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)

            # ----- Implement changes in database ----------------------------------------------------------------------
            db_topology.device(device_uuid).delete()

            # ----- Compose reply --------------------------------------------------------------------------------------
            reply = Empty()
            LOGGER.debug('DeleteDevice reply: {}'.format(str(reply)))
            DELETEDEVICE_COUNTER_COMPLETED.inc()
            return reply
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        except ServiceException as e:
            grpc_context.abort(e.code, e.details)
        except Exception as e:                                      # pragma: no cover
            LOGGER.exception('DeleteDevice exception')              # pragma: no cover
            DELETEDEVICE_COUNTER_FAILED.inc()                       # pragma: no cover
            grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))    # pragma: no cover