Newer
Older

Lluis Gifre Renom
committed
import grpc, logging
from prometheus_client import Counter, Histogram
from common.Checkers import chk_options, chk_string
from common.database.api.Database import Database
from common.database.api.context.OperationalStatus import OperationalStatus, operationalstatus_enum_values, \
to_operationalstatus_enum
from common.exceptions.ServiceException import ServiceException
from device.proto.context_pb2 import DeviceId, Device, Empty

Lluis Gifre Renom
committed
from device.proto.device_pb2_grpc import DeviceServiceServicer
LOGGER = logging.getLogger(__name__)
DEFAULT_CONTEXT_ID = 'admin'
DEFAULT_TOPOLOGY_ID = 'admin'

Lluis Gifre Renom
committed
ADDDEVICE_COUNTER_STARTED = Counter ('device_adddevice_counter_started',
'Device:AddDevice counter of requests started' )
ADDDEVICE_COUNTER_COMPLETED = Counter ('device_adddevice_counter_completed',
'Device:AddDevice counter of requests completed')
ADDDEVICE_COUNTER_FAILED = Counter ('device_adddevice_counter_failed',
'Device:AddDevice counter of requests failed' )
ADDDEVICE_HISTOGRAM_DURATION = Histogram('device_adddevice_histogram_duration',
'Device:AddDevice histogram of request duration')
CONFIGUREDEVICE_COUNTER_STARTED = Counter ('device_configuredevice_counter_started',
'Device:ConfigureDevice counter of requests started' )
CONFIGUREDEVICE_COUNTER_COMPLETED = Counter ('device_configuredevice_counter_completed',
'Device:ConfigureDevice counter of requests completed')
CONFIGUREDEVICE_COUNTER_FAILED = Counter ('device_configuredevice_counter_failed',
'Device:ConfigureDevice counter of requests failed' )
CONFIGUREDEVICE_HISTOGRAM_DURATION = Histogram('device_configuredevice_histogram_duration',
'Device:ConfigureDevice histogram of request duration')
DELETEDEVICE_COUNTER_STARTED = Counter ('device_deletedevice_counter_started',
'Device:DeleteDevice counter of requests started' )
DELETEDEVICE_COUNTER_COMPLETED = Counter ('device_deletedevice_counter_completed',
'Device:DeleteDevice counter of requests completed')
DELETEDEVICE_COUNTER_FAILED = Counter ('device_deletedevice_counter_failed',
'Device:DeleteDevice counter of requests failed' )
DELETEDEVICE_HISTOGRAM_DURATION = Histogram('device_deletedevice_histogram_duration',
'Device:DeleteDevice histogram of request duration')
class DeviceServiceServicerImpl(DeviceServiceServicer):

Lluis Gifre Renom
committed
LOGGER.debug('Creating Servicer...')
self.database = database
LOGGER.debug('Servicer Created')
@ADDDEVICE_HISTOGRAM_DURATION.time()
def AddDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId:

Lluis Gifre Renom
committed
ADDDEVICE_COUNTER_STARTED.inc()
try:
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
LOGGER.debug('AddDevice request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
try:
device_uuid = chk_string('device_uuid', request.device_id.device_id.uuid, allow_empty=False)
device_type = chk_string('device_type', request.device_type, allow_empty=False)
device_config = chk_string('device_config', request.device_config.device_config, allow_empty=True)
device_opstat = chk_options('devOperationalStatus', request.devOperationalStatus,
operationalstatus_enum_values())
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
device_opstat = to_operationalstatus_enum(device_opstat)
if device_opstat == OperationalStatus.KEEP_STATE:
msg = ' '.join([
'Device has to be created with either ENABLED/DISABLED Operational State.',
'Use KEEP_STATE only in configure Device methods.',
])
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
if db_topology.devices.contains(device_uuid):
msg = 'device_uuid({}) already exists.'
msg = msg.format(device_uuid)
raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)
added_endpoint_uuids = set()
endpoint_pairs : List[Tuple[str, str]] = []
for i,endpoint in enumerate(request.endpointList):
contextId = endpoint.port_id.topoId.contextId.contextUuid.uuid
if (len(contextId) > 0) and (contextId != DEFAULT_CONTEXT_ID):
msg = ' '.join([
'Unsupported context_id({}) in endpoint #{}.',
'Only default context_id({}) is currently supported.',
'Optionally, leave field empty to use default context_id.',
])
msg = msg.format(contextId, i, DEFAULT_CONTEXT_ID)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
elif len(contextId) == 0:
contextId = DEFAULT_CONTEXT_ID
topoId = endpoint.port_id.topoId.topoId.uuid
if (len(topoId) > 0) and (topoId != DEFAULT_TOPOLOGY_ID):
msg = ' '.join([
'Unsupported topology_id({}) in endpoint #{}.',
'Only default topology_id({}) is currently supported.',
'Optionally, leave field empty to use default topology_id.',
])
msg = msg.format(topoId, i, DEFAULT_TOPOLOGY_ID)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
elif len(topoId) == 0:
topoId = DEFAULT_TOPOLOGY_ID
dev_id = endpoint.port_id.dev_id.device_id.uuid
if (len(dev_id) > 0) and (dev_id != device_uuid):
msg = ' '.join([
'Wrong device_id({}) in endpoint #{}.',
'Parent specified in message is device_id({}).',
'Optionally, leave field empty to use parent device_id.',
])
msg = msg.format(dev_id, i, device_uuid)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
elif len(dev_id) == 0:
dev_id = device_uuid
try:
port_id = chk_string('port_uuid', endpoint.port_id.port_id.uuid, allow_empty=False)
port_type = chk_string('port_type', endpoint.port_type, allow_empty=False)
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
if port_id in added_endpoint_uuids:
msg = 'Duplicated port_id({}) in device_id({}).'
msg = msg.format(port_id, device_uuid)
raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)
added_endpoint_uuids.add(port_id)
endpoint_pairs.append((port_id, port_type))
# ----- Implement changes in database ----------------------------------------------------------------------
db_device = db_topology.device(device_uuid).create(device_type, device_config, device_opstat)
for port_id,port_type in endpoint_pairs: db_device.endpoint(port_id).create(port_type)
# ----- Compose reply --------------------------------------------------------------------------------------
reply = DeviceId(device_id=dict(uuid=device_uuid))
LOGGER.debug('AddDevice reply: {}'.format(str(reply)))

Lluis Gifre Renom
committed
ADDDEVICE_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('AddDevice exception') # pragma: no cover
ADDDEVICE_COUNTER_FAILED.inc() # pragma: no cover
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover

Lluis Gifre Renom
committed
@CONFIGUREDEVICE_HISTOGRAM_DURATION.time()
def ConfigureDevice(self, request : Device, grpc_context : grpc.ServicerContext) -> DeviceId:

Lluis Gifre Renom
committed
CONFIGUREDEVICE_COUNTER_STARTED.inc()
try:
LOGGER.info('ConfigureDevice request: {}'.format(str(request)))
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# ----- Validate request data and pre-conditions -----------------------------------------------------------
try:
device_uuid = chk_string('device_uuid', request.device_id.device_id.uuid, allow_empty=False)
device_type = chk_string('device_type', request.device_type, allow_empty=True)
device_config = chk_string('device_config', request.device_config.device_config, allow_empty=True)
device_opstat = chk_options('devOperationalStatus', request.devOperationalStatus,
operationalstatus_enum_values())
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
device_opstat = to_operationalstatus_enum(device_opstat)
if device_opstat is None:
msg = 'Unsupported OperationalStatus({}).'
msg = msg.format(request.devOperationalStatus)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
if not db_topology.devices.contains(device_uuid):
msg = 'device_uuid({}) does not exist.'
msg = msg.format(device_uuid)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
db_device = db_topology.device(device_uuid)
db_device_attributes = db_device.attributes.get(attributes=['device_type'])
if len(db_device_attributes) == 0:
msg = 'attribute device_type for device_uuid({}) does not exist.'
msg = msg.format(device_uuid)
raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg)
db_device_type = db_device_attributes.get('device_type')
if len(db_device_type) == 0:
msg = 'attribute device_type for device_uuid({}) is empty.'
msg = msg.format(device_uuid)
raise ServiceException(grpc.StatusCode.FAILED_PRECONDITION, msg)
if db_device_type != device_type:
msg = 'Device({}) has Type({}). Cannot be changed to Type({}).'
msg = msg.format(device_uuid, db_device_type, device_type)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
if len(request.endpointList) > 0:
msg = 'Endpoints belonging to Device({}) cannot be modified.'
msg = msg.format(device_uuid)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
update_attributes = {}
if len(device_config) > 0:
update_attributes['device_config'] = device_config
if device_opstat != OperationalStatus.KEEP_STATE:
update_attributes['device_operational_status'] = device_opstat
LOGGER.info('update_attributes={}'.format(str(update_attributes)))
if len(update_attributes) == 0:
msg = ' '.join([
'Any change has been requested for Device({}).',
'Either specify a new configuration or a new device state.',
])
msg = msg.format(device_uuid)
raise ServiceException(grpc.StatusCode.ABORTED, msg)
# ----- Implement changes in database ----------------------------------------------------------------------
db_device.update(update_attributes=update_attributes)
# ----- Compose reply --------------------------------------------------------------------------------------
reply = DeviceId(device_id=dict(uuid=device_uuid))

Lluis Gifre Renom
committed
LOGGER.info('ConfigureDevice reply: {}'.format(str(reply)))
CONFIGUREDEVICE_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('ConfigureDevice exception') # pragma: no cover
CONFIGUREDEVICE_COUNTER_FAILED.inc() # pragma: no cover
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover

Lluis Gifre Renom
committed
@DELETEDEVICE_HISTOGRAM_DURATION.time()
def DeleteDevice(self, request : DeviceId, grpc_context : grpc.ServicerContext) -> Empty:

Lluis Gifre Renom
committed
DELETEDEVICE_COUNTER_STARTED.inc()
try:
LOGGER.debug('DeleteDevice request: {}'.format(str(request)))
# ----- Validate request data and pre-conditions -----------------------------------------------------------
try:
device_uuid = chk_string('device_uuid', request.device_id.uuid, allow_empty=False)
except Exception as e:
LOGGER.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
db_context = self.database.context(DEFAULT_CONTEXT_ID).create()
db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID).create()
if not db_topology.devices.contains(device_uuid):
msg = 'device_uuid({}) does not exist.'
msg = msg.format(device_uuid)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
# ----- Implement changes in database ----------------------------------------------------------------------
db_topology.device(device_uuid).delete()
# ----- Compose reply --------------------------------------------------------------------------------------
reply = Empty()
LOGGER.debug('DeleteDevice reply: {}'.format(str(reply)))

Lluis Gifre Renom
committed
DELETEDEVICE_COUNTER_COMPLETED.inc()
return reply
except ServiceException as e:
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('DeleteDevice exception') # pragma: no cover
DELETEDEVICE_COUNTER_FAILED.inc() # pragma: no cover
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) # pragma: no cover