From dfa0d98d2938b887c4a2d640fe10a19f525b9f74 Mon Sep 17 00:00:00 2001 From: Lluis Gifre <lluis.gifre@cttc.es> Date: Fri, 5 Nov 2021 22:10:00 +0100 Subject: [PATCH] Several changes: - added missing proto files in genproto.sh - created .gitignore files to prevent sharing device credentials and data - improved MonitoringLoops class with one thread per driver for data collection, a unique aggregation queue and one single thread for data export to monitoring - added GetResource method to Driver API - improved DriverFactory to select the driver that better fits with filter fields in terms of number of correspondences in filter fields. Drivers with some mismatch filter are discarded. - implemented proper initialization and connection of drivers to real devices - started to implement OpenConfig driver with support for Infinera Packet Routers --- data/.gitignore | 1 + src/device/genproto.sh | 4 + src/device/proto/context_pb2.py | 117 +---- src/device/proto/monitoring_pb2.py | 452 ++++++++++++++++++ .../service/DeviceServiceServicerImpl.py | 38 +- src/device/service/MonitoringLoops.py | 71 ++- src/device/service/__main__.py | 27 +- .../service/driver_api/DriverFactory.py | 33 +- .../service/driver_api/DriverInstanceCache.py | 4 + src/device/service/driver_api/Exceptions.py | 5 - src/device/service/driver_api/FilterFields.py | 10 +- src/device/service/driver_api/_Driver.py | 13 +- src/device/service/drivers/__init__.py | 13 +- .../drivers/emulated/EmulatedDriver.py | 8 +- .../drivers/openconfig/OpenConfigDriver.py | 245 ++++++++++ .../handlers/InfineraDeviceHandler.py | 18 + .../drivers/openconfig/handlers/__init__.py | 7 + .../transport_api/TransportApiDriver.py | 0 .../service/drivers/transport_api/__init__.py | 0 src/device/tests/.gitignore | 1 + src/device/tests/example_objects.py | 47 +- src/device/tests/test_unitary.py | 42 +- 22 files changed, 945 insertions(+), 211 deletions(-) create mode 100644 data/.gitignore create mode 100644 src/device/proto/monitoring_pb2.py create mode 100644 src/device/service/drivers/openconfig/OpenConfigDriver.py create mode 100644 src/device/service/drivers/openconfig/handlers/InfineraDeviceHandler.py create mode 100644 src/device/service/drivers/openconfig/handlers/__init__.py create mode 100644 src/device/service/drivers/transport_api/TransportApiDriver.py create mode 100644 src/device/service/drivers/transport_api/__init__.py create mode 100644 src/device/tests/.gitignore diff --git a/data/.gitignore b/data/.gitignore new file mode 100644 index 000000000..128ac08dd --- /dev/null +++ b/data/.gitignore @@ -0,0 +1 @@ +drx30-01.xml diff --git a/src/device/genproto.sh b/src/device/genproto.sh index c5f9ec20f..31632fb89 100755 --- a/src/device/genproto.sh +++ b/src/device/genproto.sh @@ -26,10 +26,14 @@ touch proto/__init__.py python -m grpc_tools.protoc -I../../proto --python_out=proto --grpc_python_out=proto context.proto python -m grpc_tools.protoc -I../../proto --python_out=proto --grpc_python_out=proto device.proto python -m grpc_tools.protoc -I../../proto --python_out=proto --grpc_python_out=proto kpi_sample_types.proto +python -m grpc_tools.protoc -I../../proto --python_out=proto --grpc_python_out=proto monitoring.proto rm proto/context_pb2_grpc.py rm proto/kpi_sample_types_pb2_grpc.py +rm proto/monitoring_pb2_grpc.py sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/context_pb2.py sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/device_pb2.py sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/device_pb2_grpc.py +sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/kpi_sample_types_pb2.py +sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/monitoring_pb2.py diff --git a/src/device/proto/context_pb2.py b/src/device/proto/context_pb2.py index e2773a40b..658c58897 100644 --- a/src/device/proto/context_pb2.py +++ b/src/device/proto/context_pb2.py @@ -716,31 +716,6 @@ _TOPOLOGYEVENT = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=780, - serialized_end=839, -) - - -_TOPOLOGYLIST = _descriptor.Descriptor( - name='TopologyList', - full_name='context.TopologyList', - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ _descriptor.FieldDescriptor( name='topology_id', full_name='context.TopologyEvent.topology_id', index=1, number=2, type=11, cpp_type=10, label=1, @@ -999,9 +974,9 @@ _DEVICEEVENT = _descriptor.Descriptor( ) -_DEVICE = _descriptor.Descriptor( - name='Device', - full_name='context.Device', +_LINKID = _descriptor.Descriptor( + name='LinkId', + full_name='context.LinkId', filename=None, file=DESCRIPTOR, containing_type=None, @@ -1014,41 +989,6 @@ _DEVICE = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='device_type', full_name='context.Device.device_type', index=1, - number=2, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=b"".decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='device_config', full_name='context.Device.device_config', index=2, - number=3, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='device_operational_status', full_name='context.Device.device_operational_status', index=3, - number=4, type=14, cpp_type=8, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='device_drivers', full_name='context.Device.device_drivers', index=4, - number=5, type=14, cpp_type=8, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='device_endpoints', full_name='context.Device.device_endpoints', index=5, - number=6, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -2138,43 +2078,12 @@ LinkId = _reflection.GeneratedProtocolMessageType('LinkId', (_message.Message,), }) _sym_db.RegisterMessage(LinkId) -_LINK = _descriptor.Descriptor( - name='Link', - full_name='context.Link', - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name='link_id', full_name='context.Link.link_id', index=0, - number=1, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='link_endpoint_ids', full_name='context.Link.link_endpoint_ids', index=1, - number=2, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=1609, - serialized_end=1697, -) +Link = _reflection.GeneratedProtocolMessageType('Link', (_message.Message,), { + 'DESCRIPTOR' : _LINK, + '__module__' : 'context_pb2' + # @@protoc_insertion_point(class_scope:context.Link) + }) +_sym_db.RegisterMessage(Link) LinkIdList = _reflection.GeneratedProtocolMessageType('LinkIdList', (_message.Message,), { 'DESCRIPTOR' : _LINKIDLIST, @@ -2246,12 +2155,12 @@ ServiceEvent = _reflection.GeneratedProtocolMessageType('ServiceEvent', (_messag }) _sym_db.RegisterMessage(ServiceEvent) -DeviceList = _reflection.GeneratedProtocolMessageType('DeviceList', (_message.Message,), { - 'DESCRIPTOR' : _DEVICELIST, +EndPointId = _reflection.GeneratedProtocolMessageType('EndPointId', (_message.Message,), { + 'DESCRIPTOR' : _ENDPOINTID, '__module__' : 'context_pb2' - # @@protoc_insertion_point(class_scope:context.DeviceList) + # @@protoc_insertion_point(class_scope:context.EndPointId) }) -_sym_db.RegisterMessage(DeviceList) +_sym_db.RegisterMessage(EndPointId) EndPoint = _reflection.GeneratedProtocolMessageType('EndPoint', (_message.Message,), { 'DESCRIPTOR' : _ENDPOINT, diff --git a/src/device/proto/monitoring_pb2.py b/src/device/proto/monitoring_pb2.py new file mode 100644 index 000000000..b313ebb68 --- /dev/null +++ b/src/device/proto/monitoring_pb2.py @@ -0,0 +1,452 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: monitoring.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from . import context_pb2 as context__pb2 +from . import kpi_sample_types_pb2 as kpi__sample__types__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='monitoring.proto', + package='monitoring', + syntax='proto3', + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\x10monitoring.proto\x12\nmonitoring\x1a\rcontext.proto\x1a\x16kpi_sample_types.proto\"\xda\x01\n\rKpiDescriptor\x12\x17\n\x0fkpi_description\x18\x01 \x01(\t\x12\x38\n\x0fkpi_sample_type\x18\x02 \x01(\x0e\x32\x1f.kpi_sample_types.KpiSampleType\x12$\n\tdevice_id\x18\x03 \x01(\x0b\x32\x11.context.DeviceId\x12(\n\x0b\x65ndpoint_id\x18\x04 \x01(\x0b\x32\x13.context.EndPointId\x12&\n\nservice_id\x18\x05 \x01(\x0b\x32\x12.context.ServiceId\"p\n\x11MonitorKpiRequest\x12!\n\x06kpi_id\x18\x01 \x01(\x0b\x32\x11.monitoring.KpiId\x12\x1b\n\x13sampling_duration_s\x18\x02 \x01(\x02\x12\x1b\n\x13sampling_interval_s\x18\x03 \x01(\x02\"&\n\x05KpiId\x12\x1d\n\x06kpi_id\x18\x01 \x01(\x0b\x32\r.context.Uuid\"d\n\x03Kpi\x12!\n\x06kpi_id\x18\x01 \x01(\x0b\x32\x11.monitoring.KpiId\x12\x11\n\ttimestamp\x18\x02 \x01(\t\x12\'\n\tkpi_value\x18\x04 \x01(\x0b\x32\x14.monitoring.KpiValue\"a\n\x08KpiValue\x12\x10\n\x06intVal\x18\x01 \x01(\rH\x00\x12\x12\n\x08\x66loatVal\x18\x02 \x01(\x02H\x00\x12\x13\n\tstringVal\x18\x03 \x01(\tH\x00\x12\x11\n\x07\x62oolVal\x18\x04 \x01(\x08H\x00\x42\x07\n\x05value\",\n\x07KpiList\x12!\n\x08kpi_list\x18\x01 \x03(\x0b\x32\x0f.monitoring.Kpi2\xf3\x02\n\x11MonitoringService\x12;\n\tCreateKpi\x12\x19.monitoring.KpiDescriptor\x1a\x11.monitoring.KpiId\"\x00\x12\x42\n\x10GetKpiDescriptor\x12\x11.monitoring.KpiId\x1a\x19.monitoring.KpiDescriptor\"\x00\x12/\n\nIncludeKpi\x12\x0f.monitoring.Kpi\x1a\x0e.context.Empty\"\x00\x12=\n\nMonitorKpi\x12\x1d.monitoring.MonitorKpiRequest\x1a\x0e.context.Empty\"\x00\x12\x36\n\x0cGetStreamKpi\x12\x11.monitoring.KpiId\x1a\x0f.monitoring.Kpi\"\x00\x30\x01\x12\x35\n\rGetInstantKpi\x12\x11.monitoring.KpiId\x1a\x0f.monitoring.Kpi\"\x00\x62\x06proto3' + , + dependencies=[context__pb2.DESCRIPTOR,kpi__sample__types__pb2.DESCRIPTOR,]) + + + + +_KPIDESCRIPTOR = _descriptor.Descriptor( + name='KpiDescriptor', + full_name='monitoring.KpiDescriptor', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='kpi_description', full_name='monitoring.KpiDescriptor.kpi_description', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='kpi_sample_type', full_name='monitoring.KpiDescriptor.kpi_sample_type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='device_id', full_name='monitoring.KpiDescriptor.device_id', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='endpoint_id', full_name='monitoring.KpiDescriptor.endpoint_id', index=3, + number=4, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='service_id', full_name='monitoring.KpiDescriptor.service_id', index=4, + number=5, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=72, + serialized_end=290, +) + + +_MONITORKPIREQUEST = _descriptor.Descriptor( + name='MonitorKpiRequest', + full_name='monitoring.MonitorKpiRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='kpi_id', full_name='monitoring.MonitorKpiRequest.kpi_id', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='sampling_duration_s', full_name='monitoring.MonitorKpiRequest.sampling_duration_s', index=1, + number=2, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='sampling_interval_s', full_name='monitoring.MonitorKpiRequest.sampling_interval_s', index=2, + number=3, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=292, + serialized_end=404, +) + + +_KPIID = _descriptor.Descriptor( + name='KpiId', + full_name='monitoring.KpiId', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='kpi_id', full_name='monitoring.KpiId.kpi_id', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=406, + serialized_end=444, +) + + +_KPI = _descriptor.Descriptor( + name='Kpi', + full_name='monitoring.Kpi', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='kpi_id', full_name='monitoring.Kpi.kpi_id', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='timestamp', full_name='monitoring.Kpi.timestamp', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='kpi_value', full_name='monitoring.Kpi.kpi_value', index=2, + number=4, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=446, + serialized_end=546, +) + + +_KPIVALUE = _descriptor.Descriptor( + name='KpiValue', + full_name='monitoring.KpiValue', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='intVal', full_name='monitoring.KpiValue.intVal', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='floatVal', full_name='monitoring.KpiValue.floatVal', index=1, + number=2, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='stringVal', full_name='monitoring.KpiValue.stringVal', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='boolVal', full_name='monitoring.KpiValue.boolVal', index=3, + number=4, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + _descriptor.OneofDescriptor( + name='value', full_name='monitoring.KpiValue.value', + index=0, containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[]), + ], + serialized_start=548, + serialized_end=645, +) + + +_KPILIST = _descriptor.Descriptor( + name='KpiList', + full_name='monitoring.KpiList', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='kpi_list', full_name='monitoring.KpiList.kpi_list', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=647, + serialized_end=691, +) + +_KPIDESCRIPTOR.fields_by_name['kpi_sample_type'].enum_type = kpi__sample__types__pb2._KPISAMPLETYPE +_KPIDESCRIPTOR.fields_by_name['device_id'].message_type = context__pb2._DEVICEID +_KPIDESCRIPTOR.fields_by_name['endpoint_id'].message_type = context__pb2._ENDPOINTID +_KPIDESCRIPTOR.fields_by_name['service_id'].message_type = context__pb2._SERVICEID +_MONITORKPIREQUEST.fields_by_name['kpi_id'].message_type = _KPIID +_KPIID.fields_by_name['kpi_id'].message_type = context__pb2._UUID +_KPI.fields_by_name['kpi_id'].message_type = _KPIID +_KPI.fields_by_name['kpi_value'].message_type = _KPIVALUE +_KPIVALUE.oneofs_by_name['value'].fields.append( + _KPIVALUE.fields_by_name['intVal']) +_KPIVALUE.fields_by_name['intVal'].containing_oneof = _KPIVALUE.oneofs_by_name['value'] +_KPIVALUE.oneofs_by_name['value'].fields.append( + _KPIVALUE.fields_by_name['floatVal']) +_KPIVALUE.fields_by_name['floatVal'].containing_oneof = _KPIVALUE.oneofs_by_name['value'] +_KPIVALUE.oneofs_by_name['value'].fields.append( + _KPIVALUE.fields_by_name['stringVal']) +_KPIVALUE.fields_by_name['stringVal'].containing_oneof = _KPIVALUE.oneofs_by_name['value'] +_KPIVALUE.oneofs_by_name['value'].fields.append( + _KPIVALUE.fields_by_name['boolVal']) +_KPIVALUE.fields_by_name['boolVal'].containing_oneof = _KPIVALUE.oneofs_by_name['value'] +_KPILIST.fields_by_name['kpi_list'].message_type = _KPI +DESCRIPTOR.message_types_by_name['KpiDescriptor'] = _KPIDESCRIPTOR +DESCRIPTOR.message_types_by_name['MonitorKpiRequest'] = _MONITORKPIREQUEST +DESCRIPTOR.message_types_by_name['KpiId'] = _KPIID +DESCRIPTOR.message_types_by_name['Kpi'] = _KPI +DESCRIPTOR.message_types_by_name['KpiValue'] = _KPIVALUE +DESCRIPTOR.message_types_by_name['KpiList'] = _KPILIST +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +KpiDescriptor = _reflection.GeneratedProtocolMessageType('KpiDescriptor', (_message.Message,), { + 'DESCRIPTOR' : _KPIDESCRIPTOR, + '__module__' : 'monitoring_pb2' + # @@protoc_insertion_point(class_scope:monitoring.KpiDescriptor) + }) +_sym_db.RegisterMessage(KpiDescriptor) + +MonitorKpiRequest = _reflection.GeneratedProtocolMessageType('MonitorKpiRequest', (_message.Message,), { + 'DESCRIPTOR' : _MONITORKPIREQUEST, + '__module__' : 'monitoring_pb2' + # @@protoc_insertion_point(class_scope:monitoring.MonitorKpiRequest) + }) +_sym_db.RegisterMessage(MonitorKpiRequest) + +KpiId = _reflection.GeneratedProtocolMessageType('KpiId', (_message.Message,), { + 'DESCRIPTOR' : _KPIID, + '__module__' : 'monitoring_pb2' + # @@protoc_insertion_point(class_scope:monitoring.KpiId) + }) +_sym_db.RegisterMessage(KpiId) + +Kpi = _reflection.GeneratedProtocolMessageType('Kpi', (_message.Message,), { + 'DESCRIPTOR' : _KPI, + '__module__' : 'monitoring_pb2' + # @@protoc_insertion_point(class_scope:monitoring.Kpi) + }) +_sym_db.RegisterMessage(Kpi) + +KpiValue = _reflection.GeneratedProtocolMessageType('KpiValue', (_message.Message,), { + 'DESCRIPTOR' : _KPIVALUE, + '__module__' : 'monitoring_pb2' + # @@protoc_insertion_point(class_scope:monitoring.KpiValue) + }) +_sym_db.RegisterMessage(KpiValue) + +KpiList = _reflection.GeneratedProtocolMessageType('KpiList', (_message.Message,), { + 'DESCRIPTOR' : _KPILIST, + '__module__' : 'monitoring_pb2' + # @@protoc_insertion_point(class_scope:monitoring.KpiList) + }) +_sym_db.RegisterMessage(KpiList) + + + +_MONITORINGSERVICE = _descriptor.ServiceDescriptor( + name='MonitoringService', + full_name='monitoring.MonitoringService', + file=DESCRIPTOR, + index=0, + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_start=694, + serialized_end=1065, + methods=[ + _descriptor.MethodDescriptor( + name='CreateKpi', + full_name='monitoring.MonitoringService.CreateKpi', + index=0, + containing_service=None, + input_type=_KPIDESCRIPTOR, + output_type=_KPIID, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='GetKpiDescriptor', + full_name='monitoring.MonitoringService.GetKpiDescriptor', + index=1, + containing_service=None, + input_type=_KPIID, + output_type=_KPIDESCRIPTOR, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='IncludeKpi', + full_name='monitoring.MonitoringService.IncludeKpi', + index=2, + containing_service=None, + input_type=_KPI, + output_type=context__pb2._EMPTY, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='MonitorKpi', + full_name='monitoring.MonitoringService.MonitorKpi', + index=3, + containing_service=None, + input_type=_MONITORKPIREQUEST, + output_type=context__pb2._EMPTY, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='GetStreamKpi', + full_name='monitoring.MonitoringService.GetStreamKpi', + index=4, + containing_service=None, + input_type=_KPIID, + output_type=_KPI, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='GetInstantKpi', + full_name='monitoring.MonitoringService.GetInstantKpi', + index=5, + containing_service=None, + input_type=_KPIID, + output_type=_KPI, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), +]) +_sym_db.RegisterServiceDescriptor(_MONITORINGSERVICE) + +DESCRIPTOR.services_by_name['MonitoringService'] = _MONITORINGSERVICE + +# @@protoc_insertion_point(module_scope) diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index ea42ab282..f46619874 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -7,8 +7,9 @@ from common.orm.backend.BackendEnum import BackendEnum from common.orm.backend.Tools import key_to_str from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException +from common.type_checkers.Checkers import chk_integer from context.client.ContextClient import ContextClient -from device.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty +from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty from device.proto.device_pb2 import MonitoringSettings from device.proto.device_pb2_grpc import DeviceServiceServicer from .MonitoringLoops import MonitoringLoops @@ -49,17 +50,30 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): device_id = request.device_id device_uuid = device_id.device_uuid.uuid - if len(request.device_config.config_rules) > 0: + connection_config_rules = {} + unexpected_config_rules = [] + for config_rule in request.device_config.config_rules: + if (config_rule.action == ConfigActionEnum.CONFIGACTION_SET) and \ + (config_rule.resource_key.startswith('_connect/')): + connection_config_rules[config_rule.resource_key.replace('_connect/', '')] = config_rule.resource_value + else: + unexpected_config_rules.append(config_rule) + if len(unexpected_config_rules) > 0: raise InvalidArgumentException( - 'device.device_config.config_rules', str(request.device_config.config_rules), - extra_details='RPC method AddDevice does not allow definition of Config Rules. '\ - 'Add the Device first, and then configure it.') + 'device.device_config.config_rules', str(unexpected_config_rules), + extra_details='RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.') sync_device_from_context(device_uuid, self.context_client, self.database) db_device,_ = update_device_in_local_database(self.database, request) driver_filter_fields = get_device_driver_filter_fields(db_device) - driver : _Driver = self.driver_instance_cache.get(device_uuid, driver_filter_fields) + + address = connection_config_rules.pop('address', None) + port = connection_config_rules.pop('port', None) + driver : _Driver = self.driver_instance_cache.get( + device_uuid, filter_fields=driver_filter_fields, address=address, port=port, + settings=connection_config_rules) driver.Connect() running_config_rules = driver.GetConfig() @@ -186,7 +200,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): kpi_uuid = request.kpi_id.kpi_id.uuid device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid - db_device = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) + db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False) endpoint_id = request.kpi_descriptor.endpoint_id endpoint_uuid = endpoint_id.endpoint_uuid.uuid @@ -198,9 +212,10 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0: str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid]) str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') - db_endpoint = get_object(self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) + db_endpoint : EndPointModel = get_object( + self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False) - db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False) + #db_kpi_prev = get_object(self.database, KpiModel, kpi_uuid, raise_if_not_found=False) result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, { 'kpi_uuid' : request.kpi_id.kpi_id.uuid, 'kpi_description' : request.kpi_descriptor.kpi_description, @@ -217,13 +232,14 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid)) raise OperationFailedException('ConfigureDevice', extra_details=msg) + sampling_resource = driver.GetResource(db_endpoint.endpoint_uuid) + results = driver.SubscribeState([ - (db_kpi.sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval), + (sampling_resource, db_kpi.sampling_duration, db_kpi.sampling_interval), ]) assert len(results) == 4 for result in results: assert isinstance(result, bool) and result self.monitoring_loops.add(device_uuid, driver) - raise NotImplementedError() return Empty() diff --git a/src/device/service/MonitoringLoops.py b/src/device/service/MonitoringLoops.py index 48c185fed..13842c00c 100644 --- a/src/device/service/MonitoringLoops.py +++ b/src/device/service/MonitoringLoops.py @@ -1,39 +1,30 @@ -import threading -from queue import Queue +import logging, queue, threading from typing import Dict from monitoring.client.monitoring_client import MonitoringClient from monitoring.proto.monitoring_pb2 import Kpi from .driver_api._Driver import _Driver +LOGGER = logging.getLogger(__name__) QUEUE_GET_WAIT_TIMEOUT = 0.5 class MonitoringLoop: - def __init__(self, driver : _Driver, monitoring_client : MonitoringClient) -> None: + def __init__(self, driver : _Driver, samples_queue : queue.Queue) -> None: self._driver = driver - self._monitoring_client = monitoring_client - self._samples_queue = Queue() + self._samples_queue = samples_queue self._running = threading.Event() self._terminate = threading.Event() self._samples_stream = self._driver.GetState(blocking=True) self._collector_thread = threading.Thread(target=self._collect, daemon=False) - self._exporter_thread = threading.Thread(target=self._export, daemon=False) def _collect(self) -> None: - for event in self._samples_stream: + for sample in self._samples_stream: if self._terminate.is_set(): break + LOGGER.info('[MonitoringLoop:_collect] sample={:s}'.format(str(sample))) # TODO: add timestamp (if not present) - self._samples_queue.put_nowait(event) - - def _export(self) -> None: - while not self._terminate.is_set(): - sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) - # TODO: find in database the KpiId, format KPI and send to Monitoring - kpi_data = {} - self._monitoring_client.IncludeKpi(Kpi(**kpi_data)) + self._samples_queue.put_nowait(sample) def start(self): self._collector_thread.start() - self._exporter_thread.start() self._running.set() @property @@ -43,20 +34,50 @@ class MonitoringLoop: self._terminate.set() self._samples_stream.cancel() self._collector_thread.join() - self._exporter_thread.join() class MonitoringLoops: def __init__(self, monitoring_client : MonitoringClient) -> None: - self.monitoring_client = monitoring_client + self._monitoring_client = monitoring_client + self._samples_queue = queue.Queue() + self._running = threading.Event() + self._terminate = threading.Event() + self._lock = threading.Lock() self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {} + self._exporter_thread = threading.Thread(target=self._export, daemon=False) def add(self, device_uuid : str, driver : _Driver) -> None: - monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) - if (monitoring_loop is not None) and monitoring_loop.is_running: return - self._device_uuid__to__monitoring_loop.setdefault(device_uuid, MonitoringLoop(driver, self.monitoring_client)) + with self._lock: + monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) + if (monitoring_loop is not None) and monitoring_loop.is_running: return + monitoring_loop = MonitoringLoop(driver, self._samples_queue) + self._device_uuid__to__monitoring_loop[device_uuid] = monitoring_loop + monitoring_loop.start() def remove(self, device_uuid : str) -> None: - monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) - if monitoring_loop is None: return - if monitoring_loop.is_running: monitoring_loop.stop() - self._device_uuid__to__monitoring_loop.pop(device_uuid, None) + with self._lock: + monitoring_loop = self._device_uuid__to__monitoring_loop.get(device_uuid) + if monitoring_loop is None: return + if monitoring_loop.is_running: monitoring_loop.stop() + self._device_uuid__to__monitoring_loop.pop(device_uuid, None) + + def start(self): + self._exporter_thread.start() + self._running.set() + + @property + def is_running(self): return self._running.is_set() + + def stop(self): + self._terminate.set() + self._exporter_thread.join() + + def _export(self) -> None: + while not self._terminate.is_set(): + try: + sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT) + LOGGER.info('[MonitoringLoops:_export] sample={:s}'.format(str(sample))) + except queue.Empty: + continue + # TODO: find in database the KpiId, format KPI and send to Monitoring + kpi_data = {} + self._monitoring_client.IncludeKpi(Kpi(**kpi_data)) diff --git a/src/device/service/__main__.py b/src/device/service/__main__.py index fcc0e6a70..2893d25a5 100644 --- a/src/device/service/__main__.py +++ b/src/device/service/__main__.py @@ -3,6 +3,7 @@ from prometheus_client import start_http_server from common.Settings import get_setting from context.client.ContextClient import ContextClient from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT +from monitoring.client.monitoring_client import MonitoringClient from .DeviceService import DeviceService from .MonitoringLoops import MonitoringLoops from .driver_api.DriverFactory import DriverFactory @@ -19,13 +20,15 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def main(): global LOGGER # pylint: disable=global-statement - grpc_service_port = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT) - max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) - grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD) - log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) - metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) - context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=None ) - context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=None ) + grpc_service_port = get_setting('DEVICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT) + max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) + grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD) + log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) + metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) + context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=None ) + context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=None ) + monitoring_service_host = get_setting('MONITORINGSERVICE_SERVICE_HOST', default=None ) + monitoring_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=None ) logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) @@ -44,21 +47,29 @@ def main(): str(context_service_host), str(context_service_port))) context_client = ContextClient(context_service_host, context_service_port) + # Initialize Monitoring Client + if monitoring_service_host is None or monitoring_service_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Monitoring component'.format( + str(monitoring_service_host), str(monitoring_service_port))) + monitoring_client = MonitoringClient(monitoring_service_host, monitoring_service_port) + # Initialize Driver framework driver_factory = DriverFactory(DRIVERS) driver_instance_cache = DriverInstanceCache(driver_factory) - monitoring_loops = MonitoringLoops() + monitoring_loops = MonitoringLoops(monitoring_client) # Starting device service grpc_service = DeviceService( context_client, driver_instance_cache, monitoring_loops, port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) grpc_service.start() + monitoring_loops.start() # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass LOGGER.info('Terminating...') + monitoring_loops.stop() grpc_service.stop() driver_instance_cache.terminate() diff --git a/src/device/service/driver_api/DriverFactory.py b/src/device/service/driver_api/DriverFactory.py index c1ac703b4..10b4961fc 100644 --- a/src/device/service/driver_api/DriverFactory.py +++ b/src/device/service/driver_api/DriverFactory.py @@ -1,8 +1,10 @@ -import logging +import logging, operator +from enum import Enum from typing import Any, Dict, Iterable, List, Set, Tuple from device.service.driver_api._Driver import _Driver -from device.service.driver_api.Exceptions import MultipleResultsForFilterException, UnsatisfiedFilterException, \ - UnsupportedDriverClassException, UnsupportedFilterFieldException, UnsupportedFilterFieldValueException +from device.service.driver_api.Exceptions import ( + UnsatisfiedFilterException, UnsupportedDriverClassException, UnsupportedFilterFieldException, + UnsupportedFilterFieldValueException) from device.service.driver_api.FilterFields import FILTER_FIELD_ALLOWED_VALUES, FilterFieldEnum LOGGER = logging.getLogger(__name__) @@ -20,7 +22,7 @@ class DriverFactory: if not issubclass(driver_class, _Driver): raise UnsupportedDriverClassException(str(driver_class)) driver_name = driver_class.__name__ - supported_filter_fields = {filter_field.value for filter_field in set(FILTER_FIELD_ALLOWED_VALUES.keys())} + supported_filter_fields = set(FILTER_FIELD_ALLOWED_VALUES.keys()) unsupported_filter_fields = set(filter_fields.keys()).difference(supported_filter_fields) if len(unsupported_filter_fields) > 0: raise UnsupportedFilterFieldException(unsupported_filter_fields, driver_class_name=driver_name) @@ -31,18 +33,19 @@ class DriverFactory: if not isinstance(field_values, Iterable) or isinstance(field_values, str): field_values = [field_values] for field_value in field_values: + if isinstance(field_value, Enum): field_value = field_value.value if field_enum_values is not None and field_value not in field_enum_values: raise UnsupportedFilterFieldValueException( field_name, field_value, field_enum_values, driver_class_name=driver_name) - field_indice_drivers = field_indice.setdefault(field_name, set()) + field_indice_drivers = field_indice.setdefault(field_value, set()) field_indice_drivers.add(driver_class) def get_driver_class(self, **filter_fields) -> _Driver: - supported_filter_fields = {filter_field.value for filter_field in set(FILTER_FIELD_ALLOWED_VALUES.keys())} + supported_filter_fields = set(FILTER_FIELD_ALLOWED_VALUES.keys()) unsupported_filter_fields = set(filter_fields.keys()).difference(supported_filter_fields) if len(unsupported_filter_fields) > 0: raise UnsupportedFilterFieldException(unsupported_filter_fields) - candidate_driver_classes = None + candidate_driver_classes : Dict[_Driver, int] = None # number of filter hits per driver for field_name, field_values in filter_fields.items(): field_indice = self.__indices.get(field_name) if field_indice is None: continue @@ -54,15 +57,17 @@ class DriverFactory: for field_value in field_values: if field_enum_values is not None and field_value not in field_enum_values: raise UnsupportedFilterFieldValueException(field_name, field_value, field_enum_values) - field_indice_drivers = field_indice.get(field_name) + field_indice_drivers = field_indice.get(field_value) if field_indice_drivers is None: continue field_candidate_driver_classes = field_candidate_driver_classes.union(field_indice_drivers) - candidate_driver_classes = field_indice_drivers if candidate_driver_classes is None else \ - candidate_driver_classes.intersection(field_indice_drivers) + if candidate_driver_classes is None: + candidate_driver_classes = {k:1 for k in field_candidate_driver_classes} + else: + for candidate_driver_class in candidate_driver_classes: + if candidate_driver_class not in field_candidate_driver_classes: continue + candidate_driver_classes[candidate_driver_class] += 1 if len(candidate_driver_classes) == 0: raise UnsatisfiedFilterException(filter_fields) - if len(candidate_driver_classes) > 1: - # TODO: Consider choosing driver with more query fields being satisfied (i.e., the most restrictive one) - raise MultipleResultsForFilterException(filter_fields, {d.__name__ for d in candidate_driver_classes}) - return candidate_driver_classes.pop() + candidate_driver_classes = sorted(candidate_driver_classes.items(), key=operator.itemgetter(1), reverse=True) + return candidate_driver_classes[0][0] diff --git a/src/device/service/driver_api/DriverInstanceCache.py b/src/device/service/driver_api/DriverInstanceCache.py index 603a6429b..f960e37cb 100644 --- a/src/device/service/driver_api/DriverInstanceCache.py +++ b/src/device/service/driver_api/DriverInstanceCache.py @@ -29,7 +29,11 @@ class DriverInstanceCache: if driver_instance is not None: return driver_instance if len(filter_fields) == 0: return None + MSG = 'Selecting driver for device({:s}) with filter_fields({:s})...' + LOGGER.info(MSG.format(str(device_uuid), str(filter_fields))) driver_class = self._driver_factory.get_driver_class(**filter_fields) + MSG = 'Driver({:s}) selected for device({:s}) with filter_fields({:s})...' + LOGGER.info(MSG.format(str(driver_class.__name__), str(device_uuid), str(filter_fields))) driver_instance : _Driver = driver_class(address, port, **settings) self._device_uuid__to__driver_instance[device_uuid] = driver_instance return driver_instance diff --git a/src/device/service/driver_api/Exceptions.py b/src/device/service/driver_api/Exceptions.py index 9e70278a3..a3a5e9871 100644 --- a/src/device/service/driver_api/Exceptions.py +++ b/src/device/service/driver_api/Exceptions.py @@ -1,8 +1,3 @@ -class MultipleResultsForFilterException(Exception): - def __init__(self, filter_fields, driver_names): - msg = 'Multiple Drivers({:s}) satisfy FilterFields({:s})' - super().__init__(msg.format(str(driver_names), str(filter_fields))) - class UnsatisfiedFilterException(Exception): def __init__(self, filter_fields): msg = 'No Driver satisfies FilterFields({:s})' diff --git a/src/device/service/driver_api/FilterFields.py b/src/device/service/driver_api/FilterFields.py index c27a2ac90..15bbc0fc7 100644 --- a/src/device/service/driver_api/FilterFields.py +++ b/src/device/service/driver_api/FilterFields.py @@ -17,9 +17,9 @@ class FilterFieldEnum(Enum): # Map allowed filter fields to allowed values per Filter field. If no restriction (free text) None is specified FILTER_FIELD_ALLOWED_VALUES = { - FilterFieldEnum.DEVICE_TYPE : {i.value for i in DeviceTypeFilterFieldEnum}, - FilterFieldEnum.DRIVER : {i.value for i in ORM_DeviceDriverEnum}, - FilterFieldEnum.VENDOR : None, - FilterFieldEnum.MODEL : None, - FilterFieldEnum.SERIAL_NUMBER : None, + FilterFieldEnum.DEVICE_TYPE.value : {i.value for i in DeviceTypeFilterFieldEnum}, + FilterFieldEnum.DRIVER.value : {i.value for i in ORM_DeviceDriverEnum}, + FilterFieldEnum.VENDOR.value : None, + FilterFieldEnum.MODEL.value : None, + FilterFieldEnum.SERIAL_NUMBER.value : None, } diff --git a/src/device/service/driver_api/_Driver.py b/src/device/service/driver_api/_Driver.py index 34274159e..b79661faa 100644 --- a/src/device/service/driver_api/_Driver.py +++ b/src/device/service/driver_api/_Driver.py @@ -1,4 +1,4 @@ -from typing import Any, Iterator, List, Tuple, Union +from typing import Any, Iterator, List, Optional, Tuple, Union class _Driver: def __init__(self, address : str, port : int, **settings) -> None: @@ -50,6 +50,17 @@ class _Driver: """ raise NotImplementedError() + def GetResource(self, endpoint_uuid : str) -> Optional[str]: + """ Retrieve the endpoint path for subscriptions. + Parameters: + endpoint_uuid : str + Target endpoint UUID + Returns: + resource_path : Optional[str] + The path of the endpoint, or None if it is not found. + """ + raise NotImplementedError() + def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: """ Create/Update configuration for a list of resources. Parameters: diff --git a/src/device/service/drivers/__init__.py b/src/device/service/drivers/__init__.py index 6d8a20863..ae0415b78 100644 --- a/src/device/service/drivers/__init__.py +++ b/src/device/service/drivers/__init__.py @@ -1,6 +1,6 @@ from ..driver_api.FilterFields import FilterFieldEnum, DeviceTypeFilterFieldEnum, ORM_DeviceDriverEnum from .emulated.EmulatedDriver import EmulatedDriver -#from .openconfig.OpenConfigDriver import OpenConfigDriver +from .openconfig.OpenConfigDriver import OpenConfigDriver #from .transport_api.TransportAPIDriver import TransportAPIDriver DRIVERS = [ @@ -10,11 +10,12 @@ DRIVERS = [ FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, } ]), - #(OpenConfigDriver, [ - # { - # FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, - # } - #]), + (OpenConfigDriver, [ + { + FilterFieldEnum.DEVICE_TYPE: DeviceTypeFilterFieldEnum.PACKET_ROUTER, + FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.OPENCONFIG, + } + ]), #(TransportAPIDriver, [ # { # FilterFieldEnum.DRIVER : ORM_DeviceDriverEnum.TRANSPORT_API, diff --git a/src/device/service/drivers/emulated/EmulatedDriver.py b/src/device/service/drivers/emulated/EmulatedDriver.py index 5854af777..3328a29de 100644 --- a/src/device/service/drivers/emulated/EmulatedDriver.py +++ b/src/device/service/drivers/emulated/EmulatedDriver.py @@ -1,6 +1,6 @@ import anytree, logging, pytz, queue, random, threading from datetime import datetime, timedelta -from typing import Any, Iterator, List, Tuple, Union +from typing import Any, Iterator, List, Optional, Tuple, Union from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.job import Job from apscheduler.jobstores.memory import MemoryJobStore @@ -77,6 +77,12 @@ class EmulatedDriver(_Driver): results.extend(dump_subtree(resource_node)) return results + def GetResource(self, endpoint_uuid : str) -> Optional[str]: + chk_string('endpoint_uuid', endpoint_uuid) + return { + #'key': 'value', + }.get(endpoint_uuid) + def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: chk_type('resources', resources, list) if len(resources) == 0: return [] diff --git a/src/device/service/drivers/openconfig/OpenConfigDriver.py b/src/device/service/drivers/openconfig/OpenConfigDriver.py new file mode 100644 index 000000000..d76962a06 --- /dev/null +++ b/src/device/service/drivers/openconfig/OpenConfigDriver.py @@ -0,0 +1,245 @@ +import logging, ncclient.manager, pytz, queue, threading +import xml.dom.minidom +from typing import Any, Iterator, List, Optional, Tuple, Union +#import anytree, random +#from datetime import datetime, timedelta +#from apscheduler.executors.pool import ThreadPoolExecutor +#from apscheduler.job import Job +#from apscheduler.jobstores.memory import MemoryJobStore +#from apscheduler.schedulers.background import BackgroundScheduler +from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type +from device.service.driver_api._Driver import _Driver +from device.service.drivers.openconfig.handlers import DEFAULT_HANDLER, HANDLERS +#from .AnyTreeTools import TreeNode, dump_subtree, get_subnode, set_subnode_value + +logging.getLogger('ncclient.transport.ssh').setLevel(logging.WARNING) + +LOGGER = logging.getLogger(__name__) + +#def do_sampling(resource_key : str, out_samples : queue.Queue): +# out_samples.put_nowait((datetime.timestamp(datetime.utcnow()), resource_key, random.random())) + +class OpenConfigDriver(_Driver): + def __init__(self, address : str, port : int, **settings) -> None: # pylint: disable=super-init-not-called + self.__address = address + self.__port = port + self.__settings = settings + self.__lock = threading.Lock() + #self.__initial = TreeNode('.') + #self.__running = TreeNode('.') + self.__started = threading.Event() + self.__terminate = threading.Event() + self.__netconf_manager : ncclient.manager.Manager = None + #self.__scheduler = BackgroundScheduler(daemon=True) # scheduler used to emulate sampling events + #self.__scheduler.configure( + # jobstores = {'default': MemoryJobStore()}, + # executors = {'default': ThreadPoolExecutor(max_workers=1)}, + # job_defaults = {'coalesce': False, 'max_instances': 3}, + # timezone=pytz.utc) + #self.__out_samples = queue.Queue() + + def Connect(self) -> bool: + with self.__lock: + if self.__started.is_set(): return True + username = self.__settings.get('username') + password = self.__settings.get('password') + handler_name = self.__settings.get('handler') + handler = HANDLERS.get(handler_name, DEFAULT_HANDLER) + self.__netconf_manager = ncclient.manager.connect_ssh( + host=self.__address, port=self.__port, username=username, password=password, hostkey_verify=False, + device_params=handler) + # Connect triggers activation of sampling events that will be scheduled based on subscriptions + #self.__scheduler.start() + self.__started.set() + return True + + def Disconnect(self) -> bool: + with self.__lock: + # Trigger termination of loops and processes + self.__terminate.set() + # If not started, assume it is already disconnected + if not self.__started.is_set(): return True + # Disconnect triggers deactivation of sampling events + #self.__scheduler.shutdown() + self.__netconf_manager.close_session() + return True + + def GetInitialConfig(self) -> List[Tuple[str, Any]]: + with self.__lock: + return [] + + def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]: + chk_type('resources', resource_keys, list) + for i,resource_key in enumerate(resource_keys): + chk_string('resource_key[#{:d}]'.format(i), resource_key, allow_empty=False) + + results = [] + with self.__lock: + if len(resource_keys) == 0: + config = self.__netconf_manager.get_config(source='running').data_xml + with open('../data/drx30-01.xml', mode='w', encoding='UTF-8') as f: + dom = xml.dom.minidom.parseString(config) + f.write(dom.toprettyxml()) + return results + +# resolver = anytree.Resolver(pathattr='name') +# for i,resource_key in enumerate(resource_keys): +# str_resource_name = 'resource_key[#{:d}]'.format(i) +# try: +# resource_path = resource_key.split('/') +# except Exception as e: # pylint: disable=broad-except +# LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) +# results.append((resource_key, e)) # if validation fails, store the exception +# continue +# +# resource_node = get_subnode(resolver, self.__running, resource_path, default=None) +# # if not found, resource_node is None +# if resource_node is None: continue +# results.extend(dump_subtree(resource_node)) +# return results +# +# def GetResource(self, endpoint_uuid : str) -> Optional[str]: +# chk_string('endpoint_uuid', endpoint_uuid) +# return { +# #'key': 'value', +# }.get(endpoint_uuid) +# +# def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: +# chk_type('resources', resources, list) +# if len(resources) == 0: return [] +# results = [] +# resolver = anytree.Resolver(pathattr='name') +# with self.__lock: +# for i,resource in enumerate(resources): +# str_resource_name = 'resources[#{:d}]'.format(i) +# try: +# chk_type(str_resource_name, resource, (list, tuple)) +# chk_length(str_resource_name, resource, min_length=2, max_length=2) +# resource_key,resource_value = resource +# resource_path = resource_key.split('/') +# except Exception as e: # pylint: disable=broad-except +# LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) +# results.append(e) # if validation fails, store the exception +# continue +# +# set_subnode_value(resolver, self.__running, resource_path, resource_value) +# results.append(True) +# return results +# +# def DeleteConfig(self, resource_keys : List[str]) -> List[Union[bool, Exception]]: +# chk_type('resources', resource_keys, list) +# if len(resource_keys) == 0: return [] +# results = [] +# resolver = anytree.Resolver(pathattr='name') +# with self.__lock: +# for i,resource_key in enumerate(resource_keys): +# str_resource_name = 'resource_key[#{:d}]'.format(i) +# try: +# chk_string(str_resource_name, resource_key, allow_empty=False) +# resource_path = resource_key.split('/') +# except Exception as e: # pylint: disable=broad-except +# LOGGER.exception('Exception validating {:s}: {:s}'.format(str_resource_name, str(resource_key))) +# results.append(e) # if validation fails, store the exception +# continue +# +# resource_node = get_subnode(resolver, self.__running, resource_path, default=None) +# # if not found, resource_node is None +# if resource_node is None: +# results.append(False) +# continue +# +# parent = resource_node.parent +# children = list(parent.children) +# children.remove(resource_node) +# parent.children = tuple(children) +# results.append(True) +# return results +# +# def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: +# chk_type('subscriptions', subscriptions, list) +# if len(subscriptions) == 0: return [] +# results = [] +# resolver = anytree.Resolver(pathattr='name') +# with self.__lock: +# for i,subscription in enumerate(subscriptions): +# str_subscription_name = 'subscriptions[#{:d}]'.format(i) +# try: +# chk_type(str_subscription_name, subscription, (list, tuple)) +# chk_length(str_subscription_name, subscription, min_length=3, max_length=3) +# resource_key,sampling_duration,sampling_interval = subscription +# chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) +# resource_path = resource_key.split('/') +# chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) +# chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) +# except Exception as e: # pylint: disable=broad-except +# LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) +# results.append(e) # if validation fails, store the exception +# continue +# +# start_date,end_date = None,None +# if sampling_duration <= 1.e-12: +# start_date = datetime.utcnow() +# end_date = start_date + timedelta(seconds=sampling_duration) +# +# job_id = 'k={:s}/d={:f}/i={:f}'.format(resource_key, sampling_duration, sampling_interval) +# job = self.__scheduler.add_job( +# do_sampling, args=(resource_key, self.__out_samples), kwargs={}, +# id=job_id, trigger='interval', seconds=sampling_interval, +# start_date=start_date, end_date=end_date, timezone=pytz.utc) +# +# subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] +# set_subnode_value(resolver, self.__running, subscription_path, job) +# results.append(True) +# return results +# +# def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: +# chk_type('subscriptions', subscriptions, list) +# if len(subscriptions) == 0: return [] +# results = [] +# resolver = anytree.Resolver(pathattr='name') +# with self.__lock: +# for i,resource in enumerate(subscriptions): +# str_subscription_name = 'resources[#{:d}]'.format(i) +# try: +# chk_type(str_subscription_name, resource, (list, tuple)) +# chk_length(str_subscription_name, resource, min_length=3, max_length=3) +# resource_key,sampling_duration,sampling_interval = resource +# chk_string(str_subscription_name + '.resource_key', resource_key, allow_empty=False) +# resource_path = resource_key.split('/') +# chk_float(str_subscription_name + '.sampling_duration', sampling_duration, min_value=0) +# chk_float(str_subscription_name + '.sampling_interval', sampling_interval, min_value=0) +# except Exception as e: # pylint: disable=broad-except +# LOGGER.exception('Exception validating {:s}: {:s}'.format(str_subscription_name, str(resource_key))) +# results.append(e) # if validation fails, store the exception +# continue +# +# subscription_path = resource_path + ['{:.3f}:{:.3f}'.format(sampling_duration, sampling_interval)] +# subscription_node = get_subnode(resolver, self.__running, subscription_path) +# +# # if not found, resource_node is None +# if subscription_node is None: +# results.append(False) +# continue +# +# job : Job = getattr(subscription_node, 'value', None) +# if job is None or not isinstance(job, Job): +# raise Exception('Malformed subscription node or wrong resource key: {:s}'.format(str(resource))) +# job.remove() +# +# parent = subscription_node.parent +# children = list(parent.children) +# children.remove(subscription_node) +# parent.children = tuple(children) +# +# results.append(True) +# return results +# +# def GetState(self, blocking=False) -> Iterator[Tuple[str, Any]]: +# while not self.__terminate.is_set(): +# try: +# sample = self.__out_samples.get(block=blocking, timeout=0.1) +# except queue.Empty: +# if blocking: continue +# return +# if sample is None: continue +# yield sample diff --git a/src/device/service/drivers/openconfig/handlers/InfineraDeviceHandler.py b/src/device/service/drivers/openconfig/handlers/InfineraDeviceHandler.py new file mode 100644 index 000000000..ebf0af2e4 --- /dev/null +++ b/src/device/service/drivers/openconfig/handlers/InfineraDeviceHandler.py @@ -0,0 +1,18 @@ +# Handler for Infinera device specific information through YANG. +from ncclient.xml_ import BASE_NS_1_0 +from ncclient.devices.default import DefaultDeviceHandler + +class InfineraDeviceHandler(DefaultDeviceHandler): + _EXEMPT_ERRORS = [] + + def get_capabilities(self): + return [ + 'urn:ietf:params:netconf:base:1.0', + 'urn:ietf:params:netconf:base:1.1', + ] + + def get_xml_base_namespace_dict(self): + return {None: BASE_NS_1_0} + + def get_xml_extra_prefix_kwargs(self): + return {'nsmap': self.get_xml_base_namespace_dict()} diff --git a/src/device/service/drivers/openconfig/handlers/__init__.py b/src/device/service/drivers/openconfig/handlers/__init__.py new file mode 100644 index 000000000..109853e3b --- /dev/null +++ b/src/device/service/drivers/openconfig/handlers/__init__.py @@ -0,0 +1,7 @@ +from .InfineraDeviceHandler import InfineraDeviceHandler + +HANDLERS = { + 'infinera': {'handler': InfineraDeviceHandler}, +} + +DEFAULT_HANDLER = {'name': 'default'} diff --git a/src/device/service/drivers/transport_api/TransportApiDriver.py b/src/device/service/drivers/transport_api/TransportApiDriver.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/device/service/drivers/transport_api/__init__.py b/src/device/service/drivers/transport_api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/device/tests/.gitignore b/src/device/tests/.gitignore new file mode 100644 index 000000000..b34bbec2c --- /dev/null +++ b/src/device/tests/.gitignore @@ -0,0 +1 @@ +_device_credentials.py diff --git a/src/device/tests/example_objects.py b/src/device/tests/example_objects.py index 7894fbc00..b84df119f 100644 --- a/src/device/tests/example_objects.py +++ b/src/device/tests/example_objects.py @@ -2,6 +2,20 @@ from copy import deepcopy from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID from context.proto.context_pb2 import ConfigActionEnum, DeviceDriverEnum, DeviceOperationalStatusEnum +try: + from ._device_credentials import ( + DEVICE1_UUID, DEVICE1_TYPE, DEVICE1_ADDRESS, DEVICE1_PORT, DEVICE1_USERNAME, DEVICE1_PASSWORD, DEVICE1_HANDLER, + DEVICE1_DRIVERS) +except ImportError: + DEVICE1_UUID = 'DEV1' + DEVICE1_TYPE = 'packet-router' + DEVICE1_ADDRESS = '127.0.0.1' + DEVICE1_PORT = '830' + DEVICE1_USERNAME = 'username' + DEVICE1_PASSWORD = 'password' + DEVICE1_HANDLER = 'default' + DEVICE1_DRIVERS = [DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG] + # Some example objects to be used by the tests # Helper methods @@ -33,21 +47,26 @@ TOPOLOGY = { 'link_ids': [], } -DEVICE1_UUID = 'DEV1' DEVICE1_ID = {'device_uuid': {'uuid': DEVICE1_UUID}} DEVICE1 = { 'device_id': deepcopy(DEVICE1_ID), - 'device_type': 'packet-router', - 'device_config': {'config_rules': [ - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc1/value', 'value1'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc2/value', 'value2'), - config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc3/value', 'value3'), - ]}, - 'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED, - 'device_drivers': [DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG, DeviceDriverEnum.DEVICEDRIVER_P4], - 'device_endpoints': [ - endpoint(TOPOLOGY_ID, DEVICE1_ID, 'EP2', 'port-packet-100G'), - endpoint(TOPOLOGY_ID, DEVICE1_ID, 'EP3', 'port-packet-100G'), - endpoint(TOPOLOGY_ID, DEVICE1_ID, 'EP100', 'port-packet-10G'), - ], + 'device_type': DEVICE1_TYPE, + 'device_config': {'config_rules': []}, + 'device_operational_status': DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED, + 'device_drivers': DEVICE1_DRIVERS, + 'device_endpoints': [], } + +DEVICE1_CONNECT_RULES = [ + config_rule(ConfigActionEnum.CONFIGACTION_SET, '_connect/address', DEVICE1_ADDRESS ), + config_rule(ConfigActionEnum.CONFIGACTION_SET, '_connect/port', DEVICE1_PORT ), + config_rule(ConfigActionEnum.CONFIGACTION_SET, '_connect/username', DEVICE1_USERNAME), + config_rule(ConfigActionEnum.CONFIGACTION_SET, '_connect/password', DEVICE1_PASSWORD), + config_rule(ConfigActionEnum.CONFIGACTION_SET, '_connect/handler', DEVICE1_HANDLER ), +] + +DEVICE1_CONFIG_RULES = [ + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc1/value', 'value1'), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc2/value', 'value2'), + config_rule(ConfigActionEnum.CONFIGACTION_SET, 'dev/rsrc3/value', 'value3'), +] diff --git a/src/device/tests/test_unitary.py b/src/device/tests/test_unitary.py index 37848f4ad..e4132171c 100644 --- a/src/device/tests/test_unitary.py +++ b/src/device/tests/test_unitary.py @@ -17,11 +17,14 @@ from device.Config import ( from device.client.DeviceClient import DeviceClient from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology from device.service.DeviceService import DeviceService +from device.service.MonitoringLoops import MonitoringLoops from device.service.driver_api._Driver import _Driver from device.service.driver_api.DriverFactory import DriverFactory from device.service.driver_api.DriverInstanceCache import DriverInstanceCache from device.service.drivers import DRIVERS -from .example_objects import CONTEXT, DEVICE1, DEVICE1_ID, DEVICE1_UUID, TOPOLOGY, config_rule +from monitoring.client.monitoring_client import MonitoringClient +from .example_objects import ( + CONTEXT, DEVICE1, DEVICE1_CONFIG_RULES, DEVICE1_CONNECT_RULES, DEVICE1_ID, DEVICE1_UUID, TOPOLOGY, config_rule) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -70,14 +73,17 @@ def context_client(context_service : ContextService): # pylint: disable=redefine _client.close() @pytest.fixture(scope='session') -def device_service(context_client : ContextClient): # pylint: disable=redefined-outer-name - driver_factory = DriverFactory(DRIVERS) - driver_instance_cache = DriverInstanceCache(driver_factory) +def device_service(context_client : ContextClient): # pylint: disable=redefined-outer-name + _driver_factory = DriverFactory(DRIVERS) + _driver_instance_cache = DriverInstanceCache(_driver_factory) + _monitoring_loops = MonitoringLoops(None) # TODO: replace by monitoring client + _monitoring_loops.start() _service = DeviceService( - context_client, driver_instance_cache, port=DEVICE_GRPC_SERVICE_PORT, max_workers=DEVICE_GRPC_MAX_WORKERS, - grace_period=DEVICE_GRPC_GRACE_PERIOD) + context_client, _driver_instance_cache, _monitoring_loops, port=DEVICE_GRPC_SERVICE_PORT, + max_workers=DEVICE_GRPC_MAX_WORKERS, grace_period=DEVICE_GRPC_GRACE_PERIOD) _service.start() yield _service + _monitoring_loops.stop() _service.stop() @pytest.fixture(scope='session') @@ -90,7 +96,7 @@ def grpc_message_to_json_string(message): return str(MessageToDict( message, including_default_value_fields=True, preserving_proto_field_name=True, use_integers_for_enums=False)) -def test_device_add( +def test_device_add_configure( context_client : ContextClient, # pylint: disable=redefined-outer-name device_client : DeviceClient, # pylint: disable=redefined-outer-name device_service : DeviceService): # pylint: disable=redefined-outer-name @@ -99,18 +105,20 @@ def test_device_add( context_client.SetTopology(Topology(**TOPOLOGY)) with pytest.raises(grpc.RpcError) as e: - device_client.AddDevice(Device(**DEVICE1)) + DEVICE1_WITH_EXTRA_RULES = copy.deepcopy(DEVICE1) + DEVICE1_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE1_CONNECT_RULES) + DEVICE1_WITH_EXTRA_RULES['device_config']['config_rules'].extend(DEVICE1_CONFIG_RULES) + device_client.AddDevice(Device(**DEVICE1_WITH_EXTRA_RULES)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT - msg = 'device.device_config.config_rules(['\ - 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc1/value"\nresource_value: "value1"\n, '\ - 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc2/value"\nresource_value: "value2"\n, '\ - 'action: CONFIGACTION_SET\nresource_key: "dev/rsrc3/value"\nresource_value: "value3"\n]) is invalid; '\ - 'RPC method AddDevice does not allow definition of Config Rules. Add the Device first, and then configure it.' - assert e.value.details() == msg + msg_head = 'device.device_config.config_rules([' + msg_tail = ']) is invalid; RPC method AddDevice only accepts connection Config Rules that should start '\ + 'with "_connect/" tag. Others should be configured after adding the device.' + except_msg = str(e.value.details()) + assert except_msg.startswith(msg_head) and except_msg.endswith(msg_tail) - DEVICE1_WITHOUT_RULES = copy.deepcopy(DEVICE1) - DEVICE1_WITHOUT_RULES['device_config']['config_rules'].clear() - device_client.AddDevice(Device(**DEVICE1_WITHOUT_RULES)) + DEVICE1_WITH_CONNECT_RULES = copy.deepcopy(DEVICE1) + DEVICE1_WITH_CONNECT_RULES['device_config']['config_rules'].extend(DEVICE1_CONNECT_RULES) + device_client.AddDevice(Device(**DEVICE1_WITH_CONNECT_RULES)) driver : _Driver = device_service.driver_instance_cache.get(DEVICE1_UUID) # we know the driver exists now assert driver is not None -- GitLab