diff --git a/run_tests_locally.sh b/run_tests_locally.sh
index 954d19946d13b3fe4c5086b0d71361978f0eb5f9..71ce3d61f20ee3cf0cf68ce760ea1eaebebf46ed 100755
--- a/run_tests_locally.sh
+++ b/run_tests_locally.sh
@@ -38,7 +38,6 @@ rm -f $COVERAGEFILE
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
device/tests/test_unitary.py
- #device/tests/test_unitary_driverapi.py \
#coverage run --rcfile=$RCFILE --append -m pytest -s --log-level=INFO --verbose \
# l3_centralizedattackdetector/tests/test_unitary.py
diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py
index 25bff405e962981b4978d7b6c0f574b6319dff40..e452d8fdd0cd8325d7167f90b072409cae2635bf 100644
--- a/src/device/service/DeviceServiceServicerImpl.py
+++ b/src/device/service/DeviceServiceServicerImpl.py
@@ -7,6 +7,7 @@ from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, OperationFailedException
from context.client.ContextClient import ContextClient
+from context.proto.kpi_sample_types_pb2 import KpiSampleType
from device.proto.context_pb2 import ConfigActionEnum, Device, DeviceConfig, DeviceId, Empty
from device.proto.device_pb2 import MonitoringSettings
from device.proto.device_pb2_grpc import DeviceServiceServicer
@@ -20,7 +21,7 @@ from .database.DatabaseTools import (
from .database.DeviceModel import DeviceModel, DriverModel
from .database.EndPointModel import EndPointModel, EndPointMonitorModel
from .database.KpiModel import KpiModel
-from .database.KpiSampleType import grpc_to_enum__kpi_sample_type
+from .database.KpiSampleType import ORM_KpiSampleType, grpc_to_enum__kpi_sample_type
from .driver_api._Driver import _Driver, RESOURCE_ENDPOINTS #, RESOURCE_INTERFACES, RESOURCE_NETWORK_INSTANCES
from .driver_api.DriverInstanceCache import DriverInstanceCache
from .driver_api.Tools import (
@@ -108,7 +109,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
driver.Connect()
endpoints = driver.GetConfig([RESOURCE_ENDPOINTS])
- LOGGER.info('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
+ #LOGGER.info('[AddDevice] endpoints = {:s}'.format(str(endpoints)))
for resource_key, resource_value in endpoints:
endpoint_uuid = resource_value.get('uuid')
endpoint_type = resource_value.get('type')
@@ -224,7 +225,10 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
KpiModel(self.database, db_kpi_pk).delete()
for db_endpoint_pk,_ in db_device.references(EndPointModel):
- EndPointModel(self.database, db_endpoint_pk).delete()
+ db_endpoint = EndPointModel(self.database, db_endpoint_pk)
+ for db_endpoint_monitor_pk,_ in db_endpoint.references(EndPointMonitorModel):
+ EndPointMonitorModel(self.database, db_endpoint_monitor_pk).delete()
+ db_endpoint.delete()
for db_driver_pk,_ in db_device.references(DriverModel):
DriverModel(self.database, db_driver_pk).delete()
@@ -256,62 +260,63 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
kpi_uuid = request.kpi_id.kpi_id.uuid
- device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
- db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
- if db_device is None:
- msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
- raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
- driver : _Driver = self.driver_instance_cache.get(device_uuid)
- if driver is None:
- msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
- raise OperationFailedException('ConfigureDevice', extra_details=msg)
-
- endpoint_id = request.kpi_descriptor.endpoint_id
- endpoint_uuid = endpoint_id.endpoint_uuid.uuid
- str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
- endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
- endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
- if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
- str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
- str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
- db_endpoint : EndPointModel = get_object(
- self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
- if db_endpoint is None:
- msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
- str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
- raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
- sample_type = request.kpi_descriptor.kpi_sample_type
- str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
- db_endpoint_monitor : EndPointMonitorModel = get_object(
- self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
- if db_endpoint_monitor is None:
- msg = 'Device({:s})/EndPoint({:s})/EndPointMonitor({:s}) not found.'.format(
- str(device_uuid), str(endpoint_uuid), str(sample_type))
- raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
-
- subscribe = True
+ subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
if subscribe:
+ device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
+
+ db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid, raise_if_not_found=False)
+ if db_device is None:
+ msg = 'Device({:s}) has not been added to this Device instance.'.format(str(device_uuid))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+ endpoint_id = request.kpi_descriptor.endpoint_id
+ endpoint_uuid = endpoint_id.endpoint_uuid.uuid
+ str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
+ endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
+ endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
+ if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
+ str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
+ str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
+ db_endpoint : EndPointModel = get_object(
+ self.database, EndPointModel, str_endpoint_key, raise_if_not_found=False)
+ if db_endpoint is None:
+ msg = 'Device({:s})/EndPoint({:s}) not found. EndPointKey({:s})'.format(
+ str(device_uuid), str(endpoint_uuid), str(str_endpoint_key))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+ driver : _Driver = self.driver_instance_cache.get(device_uuid)
+ if driver is None:
+ msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+ sample_type = request.kpi_descriptor.kpi_sample_type
+
attributes = {
'kpi_uuid' : request.kpi_id.kpi_id.uuid,
'kpi_description' : request.kpi_descriptor.kpi_description,
- 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(request.kpi_descriptor.kpi_sample_type),
+ 'kpi_sample_type' : grpc_to_enum__kpi_sample_type(sample_type),
'device_fk' : db_device,
'endpoint_fk' : db_endpoint,
'sampling_duration': request.sampling_duration_s,
'sampling_interval': request.sampling_interval_s,
}
- LOGGER.info('kpi.attributes = {:s}'.format(str(attributes)))
result : Tuple[KpiModel, bool] = update_or_create_object(self.database, KpiModel, kpi_uuid, attributes)
db_kpi, updated = result
+ str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
+ db_endpoint_monitor : EndPointMonitorModel = get_object(
+ self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
+ if db_endpoint_monitor is None:
+ msg = 'SampleType({:s}/{:s}) not supported for EndPoint({:s}).'.format(
+ str(sample_type), str(KpiSampleType.Name(sample_type).upper().replace('KPISAMPLETYPE_', '')),
+ str(endpoint_uuid))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
attributes = {
'endpoint_monitor_fk': db_endpoint_monitor,
'kpi_fk' : db_kpi,
}
- LOGGER.info('epm_kpi.attributes = {:s}'.format(str(attributes)))
result : Tuple[EndPointMonitorKpiModel, bool] = update_or_create_object(
self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, attributes)
db_endpoint_monitor_kpi, updated = result
@@ -319,7 +324,6 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
resources_to_subscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
resources_to_subscribe.append(
(db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
- LOGGER.info('[MonitorDeviceKpi] resources_to_subscribe = {:s}'.format(str(resources_to_subscribe)))
results_subscribestate = driver.SubscribeState(resources_to_subscribe)
errors = check_subscribe_errors(resources_to_subscribe, results_subscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
@@ -327,12 +331,61 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
self.monitoring_loops.add(device_uuid, driver)
else:
+ db_kpi : KpiModel = get_object(
+ self.database, KpiModel, kpi_uuid, raise_if_not_found=False)
+ if db_kpi is None:
+ msg = 'Kpi({:s}) not found'.format(str(kpi_uuid))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+ db_device : DeviceModel = get_object(
+ self.database, DeviceModel, db_kpi.device_fk, raise_if_not_found=False)
+ if db_device is None:
+ msg = 'Device({:s}) not found'.format(str(db_kpi.device_fk))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+ device_uuid = db_device.device_uuid
+
+ db_endpoint : EndPointModel = get_object(
+ self.database, EndPointModel, db_kpi.endpoint_fk, raise_if_not_found=False)
+ if db_endpoint is None:
+ msg = 'EndPoint({:s}) not found'.format(str(db_kpi.endpoint_fk))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+ endpoint_uuid = db_endpoint.endpoint_uuid
+ str_endpoint_key = db_endpoint.pk
+
+ kpi_sample_type : ORM_KpiSampleType = db_kpi.kpi_sample_type
+ sample_type = kpi_sample_type.value
+ str_endpoint_monitor_key = key_to_str([str_endpoint_key, str(sample_type)])
+ db_endpoint_monitor : EndPointMonitorModel = get_object(
+ self.database, EndPointMonitorModel, str_endpoint_monitor_key, raise_if_not_found=False)
+ if db_endpoint_monitor is None:
+ msg = 'EndPointMonitor({:s}) not found.'.format(str(str_endpoint_monitor_key))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
+ str_endpoint_monitor_kpi_key = key_to_str([device_uuid, db_endpoint_monitor.resource_key], separator=':')
+ db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
+ self.database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
+ if db_endpoint_monitor_kpi is None:
+ msg = 'EndPointMonitorKpi({:s}) not found.'.format(str(str_endpoint_monitor_kpi_key))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
resources_to_unsubscribe : List[Tuple[str, float, float]] = [] # key, sampling_duration, sampling_interval
- LOGGER.info('[MonitorDeviceKpi] resources_to_unsubscribe = {:s}'.format(str(resources_to_unsubscribe)))
+ resources_to_unsubscribe.append(
+ (db_endpoint_monitor.resource_key, db_kpi.sampling_duration, db_kpi.sampling_interval))
+
+ driver : _Driver = self.driver_instance_cache.get(device_uuid)
+ if driver is None:
+ msg = 'Device({:s}) has not been added to this Device instance'.format(str(device_uuid))
+ raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
+
results_unsubscribestate = driver.UnsubscribeState(resources_to_unsubscribe)
errors = check_unsubscribe_errors(resources_to_unsubscribe, results_unsubscribestate)
if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)
- self.monitoring_loops.remove(device_uuid)
+ db_endpoint_monitor_kpi.delete()
+ db_kpi.delete()
+
+ # There is one monitoring loop per device; keep them active since they are re-used by different monitoring
+ # requests.
+ #self.monitoring_loops.remove(device_uuid)
return Empty()
diff --git a/src/device/service/MonitoringLoops.py b/src/device/service/MonitoringLoops.py
index e3eb6a94190936a2d2bc66fc8e0333c24be5dee9..2e96e8df1dfd7a050aae004b5d5a41bae469e438 100644
--- a/src/device/service/MonitoringLoops.py
+++ b/src/device/service/MonitoringLoops.py
@@ -20,7 +20,7 @@ class MonitoringLoop:
self._running = threading.Event()
self._terminate = threading.Event()
self._samples_stream = self._driver.GetState(blocking=True)
- self._collector_thread = threading.Thread(target=self._collect, daemon=False)
+ self._collector_thread = threading.Thread(target=self._collect, daemon=True)
def _collect(self) -> None:
for sample in self._samples_stream:
@@ -49,7 +49,7 @@ class MonitoringLoops:
self._terminate = threading.Event()
self._lock = threading.Lock()
self._device_uuid__to__monitoring_loop : Dict[str, MonitoringLoop] = {}
- self._exporter_thread = threading.Thread(target=self._export, daemon=False)
+ self._exporter_thread = threading.Thread(target=self._export, daemon=True)
def add(self, device_uuid : str, driver : _Driver) -> None:
with self._lock:
@@ -85,14 +85,12 @@ class MonitoringLoops:
while not self._terminate.is_set():
try:
sample = self._samples_queue.get(block=True, timeout=QUEUE_GET_WAIT_TIMEOUT)
- LOGGER.info('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
+ #LOGGER.debug('[MonitoringLoops:_export] sample={:s}'.format(str(sample)))
except queue.Empty:
continue
device_uuid, timestamp, endpoint_monitor_resource_key, value = sample
str_endpoint_monitor_kpi_key = key_to_str([device_uuid, endpoint_monitor_resource_key], separator=':')
- LOGGER.info('[MonitoringLoops:_export] retrieving EndPointMonitorKpiModel {:s}'.format(
- str(str_endpoint_monitor_kpi_key)))
#db_entries = self._database.dump()
#LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
@@ -102,25 +100,17 @@ class MonitoringLoops:
db_endpoint_monitor_kpi : EndPointMonitorKpiModel = get_object(
self._database, EndPointMonitorKpiModel, str_endpoint_monitor_kpi_key, raise_if_not_found=False)
- LOGGER.info('[MonitoringLoops:_export] db_endpoint_monitor_kpi = {:s}'.format(
- str(db_endpoint_monitor_kpi)))
if db_endpoint_monitor_kpi is None:
LOGGER.warning('EndPointMonitorKpi({:s}) not found'.format(str_endpoint_monitor_kpi_key))
continue
str_kpi_key = db_endpoint_monitor_kpi.kpi_fk
- LOGGER.info('[MonitoringLoops:_export] retrieving KpiModel {:s}'.format(
- str(str_kpi_key)))
db_kpi : KpiModel = get_object(
self._database, KpiModel, str_kpi_key, raise_if_not_found=False)
- LOGGER.info('[MonitoringLoops:_export] db_kpi = {:s}'.format(
- str(db_kpi)))
if db_kpi is None:
LOGGER.warning('Kpi({:s}) not found'.format(str_kpi_key))
continue
- LOGGER.info('[MonitoringLoops:_export] formatting kpi... = {:s}/{:s}'.format(
- str(type(value)), str(value)))
if isinstance(value, int):
kpi_value_field_name = 'intVal'
kpi_value_field_cast = int
@@ -134,16 +124,11 @@ class MonitoringLoops:
kpi_value_field_name = 'stringVal'
kpi_value_field_cast = str
- LOGGER.info('[MonitoringLoops:_export] kpi_value_field_name = {:s}'.format(str(kpi_value_field_name)))
- LOGGER.info('[MonitoringLoops:_export] kpi_value_field_cast = {:s}'.format(str(kpi_value_field_cast)))
-
- kpi_data = {
- 'kpi_id' : {'kpi_id': db_kpi.kpi_uuid},
- 'timestamp': str(timestamp),
- 'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)}
- }
- LOGGER.info('[MonitoringLoops:_export] sending sample: {:s}'.format(str(kpi_data)))
- LOGGER.info('[MonitoringLoops:_export] self._monitoring_client: {:s}'.format(str(self._monitoring_client)))
- LOGGER.info('[MonitoringLoops:_export] self._monitoring_client.IncludeKpi: {:s}'.format(str(self._monitoring_client.IncludeKpi)))
- self._monitoring_client.IncludeKpi(Kpi(**kpi_data))
- LOGGER.info('[MonitoringLoops:_export] sample sent: {:s}'.format(str(kpi_data)))
+ try:
+ self._monitoring_client.IncludeKpi(Kpi(**{
+ 'kpi_id' : {'kpi_id': {'uuid': db_kpi.kpi_uuid}},
+ 'timestamp': str(timestamp),
+ 'kpi_value': {kpi_value_field_name: kpi_value_field_cast(value)}
+ }))
+ except: # pylint: disable=bare-except
+ LOGGER.exception('Unable to format/send Kpi')
diff --git a/src/device/service/database/KpiSampleType.py b/src/device/service/database/KpiSampleType.py
index e5c4c5bbc0407e7dd3650ca4ff2f2e95a1202472..397b208a57537201cc891e18159c975edf7147a9 100644
--- a/src/device/service/database/KpiSampleType.py
+++ b/src/device/service/database/KpiSampleType.py
@@ -4,11 +4,11 @@ from device.proto.kpi_sample_types_pb2 import KpiSampleType
from .Tools import grpc_to_enum
class ORM_KpiSampleType(Enum):
- UNKNOWN = KpiSampleType.UNKNOWN
- PACKETS_TRANSMITTED = KpiSampleType.PACKETS_TRANSMITTED
- PACKETS_RECEIVED = KpiSampleType.PACKETS_RECEIVED
- BYTES_TRANSMITTED = KpiSampleType.BYTES_TRANSMITTED
- BYTES_RECEIVED = KpiSampleType.BYTES_RECEIVED
+ UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN
+ PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
+ PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
+ BYTES_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_BYTES_TRANSMITTED
+ BYTES_RECEIVED = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED
grpc_to_enum__kpi_sample_type = functools.partial(
grpc_to_enum, KpiSampleType, ORM_KpiSampleType)
diff --git a/src/device/tests/MockMonitoringServiceServicerImpl.py b/src/device/tests/MockMonitoringServiceServicerImpl.py
index 913df99705944c1c638ad9287f6347bb352b6278..1787acdaa68fd80037ab1f4d14d7f86599f9ac14 100644
--- a/src/device/tests/MockMonitoringServiceServicerImpl.py
+++ b/src/device/tests/MockMonitoringServiceServicerImpl.py
@@ -1,7 +1,7 @@
import logging
from queue import Queue
from monitoring.proto.context_pb2 import Empty
-from monitoring.proto.monitoring_pb2 import IncludeKpiRequest
+from monitoring.proto.monitoring_pb2 import Kpi
from monitoring.proto.monitoring_pb2_grpc import MonitoringServiceServicer
LOGGER = logging.getLogger(__name__)
@@ -10,8 +10,6 @@ class MockMonitoringServiceServicerImpl(MonitoringServiceServicer):
def __init__(self, queue_samples : Queue):
self.queue_samples = queue_samples
- def IncludeKpi(self, request : IncludeKpiRequest, context) -> Empty:
- LOGGER.info('[IncludeKpi] begin')
+ def IncludeKpi(self, request : Kpi, context) -> Empty:
self.queue_samples.put(request)
- LOGGER.info('[IncludeKpi] end')
return Empty()
diff --git a/src/device/tests/test_unitary.py b/src/device/tests/test_unitary.py
index 6af5dee62892f8bd61781fca23b3e84e281564a9..ccd1a1f8205711d7c2fdbe85ef6f653b8f58a486 100644
--- a/src/device/tests/test_unitary.py
+++ b/src/device/tests/test_unitary.py
@@ -1,5 +1,6 @@
-import copy, grpc, json, logging, math, operator, os, pytest, time
-from typing import Any, Dict, List, Tuple
+import copy, grpc, json, logging, operator, os, pytest, time
+from datetime import datetime
+from typing import Tuple
from queue import Queue, Empty
from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database
@@ -20,32 +21,21 @@ from device.proto.context_pb2 import ConfigActionEnum, Context, Device, Topology
from device.proto.device_pb2 import MonitoringSettings
from device.proto.kpi_sample_types_pb2 import KpiSampleType
from device.service.DeviceService import DeviceService
-from device.service.MonitoringLoops import MonitoringLoops
-from device.service.database.KpiSampleType import ORM_KpiSampleType
-from device.service.driver_api._Driver import _Driver, RESOURCE_ENDPOINTS
+from device.service.driver_api._Driver import _Driver
from device.service.driver_api.DriverFactory import DriverFactory
from device.service.driver_api.DriverInstanceCache import DriverInstanceCache
from device.service.drivers import DRIVERS
from device.tests.MockMonitoringService import MockMonitoringService
-from device.tests.Tools import config_rule_set, endpoint, endpoint_id
+from device.tests.Tools import endpoint_id
from monitoring.Config import (
GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as MONITORING_GRPC_MAX_WORKERS,
GRPC_GRACE_PERIOD as MONITORING_GRPC_GRACE_PERIOD)
from monitoring.client.monitoring_client import MonitoringClient
from .CommonObjects import CONTEXT, TOPOLOGY
from .Device_Emulated import (
- DEVICE_EMU,
- DEVICE_EMU_CONFIG_ADDRESSES,
- DEVICE_EMU_CONFIG_ENDPOINTS,
- DEVICE_EMU_CONNECT_RULES,
- DEVICE_EMU_DECONFIG_ADDRESSES,
- DEVICE_EMU_DECONFIG_ENDPOINTS,
- DEVICE_EMU_ENDPOINTS,
- DEVICE_EMU_ENDPOINTS_COOKED,
- DEVICE_EMU_ID,
- DEVICE_EMU_RECONFIG_ADDRESSES,
- DEVICE_EMU_UUID,
- RSRC_EP)
+ DEVICE_EMU, DEVICE_EMU_CONFIG_ADDRESSES, DEVICE_EMU_CONFIG_ENDPOINTS, DEVICE_EMU_CONNECT_RULES,
+ DEVICE_EMU_DECONFIG_ADDRESSES, DEVICE_EMU_DECONFIG_ENDPOINTS, DEVICE_EMU_ENDPOINTS_COOKED, DEVICE_EMU_ID,
+ DEVICE_EMU_RECONFIG_ADDRESSES, DEVICE_EMU_UUID)
try:
from .Device_OpenConfig_Infinera import(
DEVICE_OC, DEVICE_OC_CONFIG_RULES, DEVICE_OC_DECONFIG_RULES, DEVICE_OC_CONNECT_RULES, DEVICE_OC_ID,
@@ -60,9 +50,6 @@ except ImportError:
# DEVICE_OC_UUID)
-ENABLE_OPENCONFIG = False
-
-
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
@@ -200,10 +187,10 @@ def test_device_emulated_get(
device_service : DeviceService): # pylint: disable=redefined-outer-name
initial_config = device_client.GetInitialConfig(DeviceId(**DEVICE_EMU_ID))
- LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config)))
+ #LOGGER.info('initial_config = {:s}'.format(grpc_message_to_json_string(initial_config)))
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
- LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))
+ #LOGGER.info('device_data = {:s}'.format(grpc_message_to_json_string(device_data)))
def test_device_emulated_configure(
@@ -215,10 +202,9 @@ def test_device_emulated_configure(
assert driver is not None
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
- LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
+ #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED)
for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
- LOGGER.info('endpoint_cooked = {:s}'.format(str(endpoint_cooked)))
assert endpoint_cooked in driver_config
DEVICE_EMU_WITH_CONFIG_RULES = copy.deepcopy(DEVICE_EMU)
@@ -236,15 +222,13 @@ def test_device_emulated_configure(
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
- LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
+ #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES)
for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
endpoint_cooked = copy.deepcopy(endpoint_cooked)
endpoint_cooked[1]['enabled'] = True
- LOGGER.info('endpoint_cooked = {:s}'.format(str(endpoint_cooked)))
assert endpoint_cooked in driver_config
for config_rule in DEVICE_EMU_CONFIG_ADDRESSES:
- LOGGER.info('config_rule = {:s}'.format(str(config_rule)))
assert (config_rule['resource_key'], json.loads(config_rule['resource_value'])) in driver_config
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
@@ -254,8 +238,8 @@ def test_device_emulated_configure(
(ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value)
for config_rule in device_data.device_config.config_rules
]
- LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
- '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
+ #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
+ # '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
RESULTING_CONFIG_ENDPOINTS = {cr['resource_key']:cr for cr in copy.deepcopy(DEVICE_EMU_CONFIG_ENDPOINTS)}
for endpoint_cooked in DEVICE_EMU_ENDPOINTS_COOKED:
values = json.loads(RESULTING_CONFIG_ENDPOINTS[endpoint_cooked[0]]['resource_value'])
@@ -272,7 +256,6 @@ def test_device_emulated_configure(
json.loads(json.dumps(config_rule['resource_value'])))
assert config_rule in config_rules
-
# Try to reconfigure...
DEVICE_EMU_WITH_RECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
@@ -293,15 +276,14 @@ def test_device_emulated_configure(
else:
RESULTING_CONFIG_RULES[reconfig_rule['resource_key']] = reconfig_rule
RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values()
- LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES)))
+ #LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES)))
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
- LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
+ #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == len(RESULTING_CONFIG_RULES)
for config_rule in RESULTING_CONFIG_RULES:
resource = [config_rule['resource_key'], json.loads(config_rule['resource_value'])]
- LOGGER.info('resource = {:s}'.format(str(resource)))
assert resource in driver_config
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
@@ -309,12 +291,11 @@ def test_device_emulated_configure(
(ConfigActionEnum.Name(config_rule.action), config_rule.resource_key, config_rule.resource_value)
for config_rule in device_data.device_config.config_rules
]
- LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
- '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
+ #LOGGER.info('device_data.device_config.config_rules = \n{:s}'.format(
+ # '\n'.join(['{:s} {:s} = {:s}'.format(*config_rule) for config_rule in config_rules])))
for config_rule in RESULTING_CONFIG_RULES:
config_rule = (
ConfigActionEnum.Name(config_rule['action']), config_rule['resource_key'], config_rule['resource_value'])
- LOGGER.info('config_rule = {:s}'.format(str(config_rule)))
assert config_rule in config_rules
@@ -333,22 +314,29 @@ def test_device_emulated_monitor(
#LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == len(DEVICE_EMU_ENDPOINTS_COOKED) + len(DEVICE_EMU_CONFIG_ADDRESSES)
- SAMPLING_DURATION_SEC = 1.0
+ SAMPLING_DURATION_SEC = 3.0
SAMPLING_INTERVAL_SEC = 0.5
NUM_SAMPLES_EXPECTED = SAMPLING_DURATION_SEC / SAMPLING_INTERVAL_SEC
+
+ ENDPOINT_UUID = 'EP2'
+ SAMPLE_TYPE = KpiSampleType.KPISAMPLETYPE_BYTES_RECEIVED
+ SAMPLE_TYPE_NAME = str(KpiSampleType.Name(SAMPLE_TYPE)).upper().replace('KPISAMPLETYPE_', '')
+ KPI_UUID = '{:s}-{:s}-{:s}-kpi_uuid'.format(DEVICE_EMU_UUID, ENDPOINT_UUID, str(SAMPLE_TYPE))
MONITORING_SETTINGS = {
- 'kpi_id' : {'kpi_id': {'uuid': 'dev1-monit1-uuid'}},
+ 'kpi_id' : {'kpi_id': {'uuid': KPI_UUID}},
'kpi_descriptor': {
- 'kpi_description': 'My DEVICE1 Monitoring KPI #1',
- 'kpi_sample_type': KpiSampleType.BYTES_RECEIVED,
+ 'kpi_description': 'Metric {:s} for Endpoint {:s} in Device {:s}'.format(
+ SAMPLE_TYPE_NAME, ENDPOINT_UUID, DEVICE_EMU_UUID),
+ 'kpi_sample_type': SAMPLE_TYPE,
'device_id': DEVICE_EMU_ID,
- 'endpoint_id': endpoint_id(DEVICE_EMU_ID, 'EP2'),
+ 'endpoint_id': endpoint_id(DEVICE_EMU_ID, ENDPOINT_UUID),
},
'sampling_duration_s': SAMPLING_DURATION_SEC,
'sampling_interval_s': SAMPLING_INTERVAL_SEC,
}
# Start monitoring the device
+ t_start_monitoring = datetime.timestamp(datetime.utcnow())
device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS))
# wait to receive the expected number of samples
@@ -358,17 +346,32 @@ def test_device_emulated_monitor(
received_samples = []
while (len(received_samples) < NUM_SAMPLES_EXPECTED) and (time.time() - time_ini < SAMPLING_DURATION_SEC * 1.5):
try:
- sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED)
- LOGGER.info('sample = {:s}'.format(str(sample)))
- received_samples.append(sample)
+ received_sample = queue_samples.get(block=True, timeout=SAMPLING_INTERVAL_SEC / NUM_SAMPLES_EXPECTED)
+ #LOGGER.info('received_sample = {:s}'.format(str(received_sample)))
+ received_samples.append(received_sample)
except Empty:
continue
- assert len(received_samples) == NUM_SAMPLES_EXPECTED
- # TODO: Validate the received samples
- LOGGER.info('received_samples = {:s}'.format(str(received_samples)))
- raise Exception()
+ t_end_monitoring = datetime.timestamp(datetime.utcnow())
+ LOGGER.info('received_samples = {:s}'.format(str(received_samples)))
+ assert len(received_samples) == NUM_SAMPLES_EXPECTED
+ for received_sample in received_samples:
+ assert received_sample.kpi_id.kpi_id.uuid == KPI_UUID
+ assert isinstance(received_sample.timestamp, str)
+ timestamp = float(received_sample.timestamp)
+ assert timestamp > t_start_monitoring
+ assert timestamp < t_end_monitoring
+ assert received_sample.kpi_value.HasField('floatVal')
+ assert isinstance(received_sample.kpi_value.floatVal, float)
+
+ # Unsubscribe monitoring
+ MONITORING_SETTINGS_UNSUBSCRIBE = {
+ 'kpi_id' : {'kpi_id': {'uuid': KPI_UUID}},
+ 'sampling_duration_s': -1, # negative value in sampling_duration_s or in sampling_interval_s means unsibscribe
+ 'sampling_interval_s': -1, # kpi_id is mandatory to unsibscribe
+ }
+ device_client.MonitorDeviceKpi(MonitoringSettings(**MONITORING_SETTINGS_UNSUBSCRIBE))
def test_device_emulated_deconfigure(
@@ -380,7 +383,7 @@ def test_device_emulated_deconfigure(
assert driver is not None
driver_config = driver.GetConfig()
- LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
+ #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
DEVICE_EMU_WITH_DECONFIG_RULES['device_operational_status'] = \
@@ -396,12 +399,12 @@ def test_device_emulated_deconfigure(
RESULTING_CONFIG_RULES = RESULTING_CONFIG_RULES.values()
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
- LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
+ #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == len(RESULTING_CONFIG_RULES)
- LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES)))
+ #LOGGER.info('RESULTING_CONFIG_RULES = {:s}'.format(str(RESULTING_CONFIG_RULES)))
for config_rule in RESULTING_CONFIG_RULES:
config_rule = [config_rule['resource_key'], json.loads(config_rule['resource_value'])]
- LOGGER.info('config_rule = {:s}'.format(str(config_rule)))
+ #LOGGER.info('config_rule = {:s}'.format(str(config_rule)))
assert config_rule in driver_config
DEVICE_EMU_WITH_DECONFIG_RULES = copy.deepcopy(DEVICE_EMU)
@@ -410,7 +413,7 @@ def test_device_emulated_deconfigure(
driver_config = sorted(driver.GetConfig(), key=operator.itemgetter(0))
driver_config = json.loads(json.dumps(driver_config)) # prevent integer keys to fail matching with string keys
- LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
+ #LOGGER.info('driver_config = {:s}'.format(str(driver_config)))
assert len(driver_config) == 0
device_data = context_client.GetDevice(DeviceId(**DEVICE_EMU_ID))
diff --git a/src/device/tests/test_unitary_driverapi.py b/src/device/tests/test_unitary_driverapi.py
deleted file mode 100644
index ef355155b381837f4e01a5d7eba502f45448cc79..0000000000000000000000000000000000000000
--- a/src/device/tests/test_unitary_driverapi.py
+++ /dev/null
@@ -1,58 +0,0 @@
-
-PATH_IF_TX_PKTS = PATH_IF + 'state/tx_packets_per_second'
-PATH_IF_RX_PKTS = PATH_IF + 'state/rx_packets_per_second'
-
-DEVICE_STATE_IF1_TX_PKTS = PATH_IF_TX_PKTS.format('IF1')
-DEVICE_STATE_IF1_RX_PKTS = PATH_IF_RX_PKTS.format('IF1')
-DEVICE_STATE_IF2_TX_PKTS = PATH_IF_TX_PKTS.format('IF2')
-DEVICE_STATE_IF2_RX_PKTS = PATH_IF_RX_PKTS.format('IF2')
-
-
-def test_device_driverapi_emulated_subscriptions(
- device_driverapi_emulated : EmulatedDriver): # pylint: disable=redefined-outer-name
-
- duration = 10.0
- interval = 1.5
- results = device_driverapi_emulated.SubscribeState([
- (DEVICE_STATE_IF1_TX_PKTS, duration, interval),
- (DEVICE_STATE_IF1_RX_PKTS, duration, interval),
- (DEVICE_STATE_IF2_TX_PKTS, duration, interval),
- (DEVICE_STATE_IF2_RX_PKTS, duration, interval),
- ])
- LOGGER.info('results:\n{:s}'.format(', '.join(map(str, results))))
- assert len(results) == 4
- for result in results: assert isinstance(result, bool) and result
-
- stored_config = device_driverapi_emulated.GetConfig()
- LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config))))
-
- time.sleep(duration + 1.0) # let time to generate samples, plus 1 second extra time
-
- samples = []
- for sample in device_driverapi_emulated.GetState(blocking=False):
- LOGGER.info('sample: {:s}'.format(str(sample)))
- timestamp,resource_key,resource_value = sample
- samples.append((timestamp, resource_key, resource_value))
- LOGGER.info('samples:\n{:s}'.format('\n'.join(map(str, samples))))
- assert len(samples) == 4 * (math.floor(duration/interval) + 1)
-
- results = device_driverapi_emulated.UnsubscribeState([
- (DEVICE_STATE_IF1_TX_PKTS, duration, interval),
- (DEVICE_STATE_IF1_RX_PKTS, duration, interval),
- (DEVICE_STATE_IF2_TX_PKTS, duration, interval),
- (DEVICE_STATE_IF2_RX_PKTS, duration, interval),
- ])
- LOGGER.info('results:\n{:s}'.format(', '.join(map(str, results))))
- assert len(results) == 4
- for result in results: assert isinstance(result, bool) and result
-
- stored_config = device_driverapi_emulated.GetConfig()
- LOGGER.info('stored_config:\n{:s}'.format('\n'.join(map(str, stored_config))))
- device_config_if2 = list(filter(lambda row: '10.2.0.1' not in row[0], copy.deepcopy(DEVICE_CONFIG_IF2)))
- assert len(stored_config) == len(DEVICE_CONFIG_ENDPOINTS) + len(DEVICE_CONFIG_IF1) + len(device_config_if2)
- for config_row in stored_config:
- assert (config_row in DEVICE_CONFIG_ENDPOINTS) or (config_row in DEVICE_CONFIG_IF1) or \
- (config_row in device_config_if2)
- for config_row in DEVICE_CONFIG_ENDPOINTS: assert config_row in stored_config
- for config_row in DEVICE_CONFIG_IF1: assert config_row in stored_config
- for config_row in device_config_if2: assert config_row in stored_config
diff --git a/src/device/tests/test_unitary_service.py b/src/device/tests/test_unitary_service.py
deleted file mode 100644
index 8d9591dd6492395a44adc25e4a54eaebc8ff9121..0000000000000000000000000000000000000000
--- a/src/device/tests/test_unitary_service.py
+++ /dev/null
@@ -1,270 +0,0 @@
-import copy, grpc, logging, pytest
-from google.protobuf.json_format import MessageToDict
-from common.database.Factory import get_database, DatabaseEngineEnum
-from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
-from common.database.api.context.topology.device.OperationalStatus import OperationalStatus
-from common.tests.Assertions import validate_device_id, validate_empty
-from device.client.DeviceClient import DeviceClient
-from device.proto.context_pb2 import Device, DeviceId
-from device.service.DeviceService import DeviceService
-from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
-
-port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
-
-LOGGER = logging.getLogger(__name__)
-LOGGER.setLevel(logging.DEBUG)
-
-# use "copy.deepcopy" to prevent propagating forced changes during tests
-CONTEXT_ID = {'contextUuid': {'uuid': DEFAULT_CONTEXT_ID}}
-TOPOLOGY_ID = {'contextId': copy.deepcopy(CONTEXT_ID), 'topoId': {'uuid': DEFAULT_TOPOLOGY_ID}}
-DEVICE_ID = {'device_id': {'uuid': 'DEV1'}}
-DEVICE = {
- 'device_id': copy.deepcopy(DEVICE_ID),
- 'device_type': 'ROADM',
- 'device_config': {'device_config': ''},
- 'devOperationalStatus': OperationalStatus.ENABLED.value,
- 'endpointList' : [
- {
- 'port_id': {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': copy.deepcopy(DEVICE_ID), 'port_id': {'uuid' : 'EP2'}},
- 'port_type': 'WDM'
- },
- {
- 'port_id': {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': copy.deepcopy(DEVICE_ID), 'port_id': {'uuid' : 'EP3'}},
- 'port_type': 'WDM'
- },
- {
- 'port_id': {'topoId': copy.deepcopy(TOPOLOGY_ID), 'dev_id': copy.deepcopy(DEVICE_ID), 'port_id': {'uuid' : 'EP4'}},
- 'port_type': 'WDM'
- },
- ]
-}
-
-@pytest.fixture(scope='session')
-def device_database():
- _database = get_database(engine=DatabaseEngineEnum.INMEMORY)
- return _database
-
-@pytest.fixture(scope='session')
-def device_service(device_database):
- _service = DeviceService(
- device_database, port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
- _service.start()
- yield _service
- _service.stop()
-
-@pytest.fixture(scope='session')
-def device_client(device_service):
- _client = DeviceClient(address='127.0.0.1', port=port)
- yield _client
- _client.close()
-
-def test_add_device_wrong_attributes(device_client : DeviceClient):
- # should fail with device uuid is empty
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['device_id']['device_id']['uuid'] = ''
- device_client.AddDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'device.device_id.device_id.uuid() is out of range: '\
- 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
- assert e.value.details() == msg
-
- # should fail with device type is empty
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['device_type'] = ''
- device_client.AddDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'device.device_type() is out of range: '\
- 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
- assert e.value.details() == msg
-
- # should fail with wrong device operational status
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value
- device_client.AddDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'Method(AddDevice) does not accept OperationalStatus(KEEP_STATE). '\
- 'Permitted values for Method(AddDevice) are OperationalStatus([\'DISABLED\', \'ENABLED\']).'
- assert e.value.details() == msg
-
-def test_add_device_wrong_endpoint(device_client : DeviceClient):
- # should fail with unsupported endpoint context
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = 'wrong-context'
- request = Device(**copy_device)
- device_client.AddDevice(request)
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'Context(wrong-context) in Endpoint(#0) of '\
- 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Contexts({\'admin\'}). '\
- 'Optionally, leave field empty to use predefined Context(admin).'
- assert e.value.details() == msg
-
- # should fail with unsupported endpoint topology
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = 'wrong-topo'
- device_client.AddDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'Context(admin)/Topology(wrong-topo) in Endpoint(#0) of '\
- 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Topologies({\'admin\'}). '\
- 'Optionally, leave field empty to use predefined Topology(admin).'
- assert e.value.details() == msg
-
- # should fail with wrong endpoint device
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = 'wrong-device'
- device_client.AddDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'Context(admin)/Topology(admin)/Device(wrong-device) in Endpoint(#0) of '\
- 'Context(admin)/Topology(admin)/Device(DEV1) mismatches acceptable Devices({\'DEV1\'}). '\
- 'Optionally, leave field empty to use predefined Device(DEV1).'
- assert e.value.details() == msg
-
- # should fail with endpoint port uuid is empty
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['endpointList'][0]['port_id']['port_id']['uuid'] = ''
- device_client.AddDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'endpoint_id[#0].port_id.uuid() is out of range: '\
- 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
- assert e.value.details() == msg
-
- # should fail with endpoint port type is empty
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['endpointList'][0]['port_type'] = ''
- device_client.AddDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'endpoint[#0].port_type() is out of range: '\
- 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
- assert e.value.details() == msg
-
- # should fail with duplicate port in device
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['endpointList'][1]['port_id']['port_id']['uuid'] = 'EP2'
- device_client.AddDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'Duplicated Context(admin)/Topology(admin)/Device(DEV1)/Port(EP2) in Endpoint(#1) of '\
- 'Context(admin)/Topology(admin)/Device(DEV1).'
- assert e.value.details() == msg
-
-def test_add_device(device_client : DeviceClient):
- # should work
- validate_device_id(MessageToDict(
- device_client.AddDevice(Device(**DEVICE)),
- including_default_value_fields=True, preserving_proto_field_name=True,
- use_integers_for_enums=False))
-
-def test_add_device_duplicate(device_client : DeviceClient):
- # should fail with device already exists
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- device_client.AddDevice(Device(**DEVICE))
- assert e.value.code() == grpc.StatusCode.ALREADY_EXISTS
- msg = 'Context(admin)/Topology(admin)/Device(DEV1) already exists in the database.'
- assert e.value.details() == msg
-
-def test_delete_device_empty_uuid(device_client : DeviceClient):
- # should fail with device uuid is empty
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device_id = copy.deepcopy(DEVICE_ID)
- copy_device_id['device_id']['uuid'] = ''
- device_client.DeleteDevice(DeviceId(**copy_device_id))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'device_id.device_id.uuid() is out of range: '\
- 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
- assert e.value.details() == msg
-
-def test_delete_device_not_found(device_client : DeviceClient):
- # should fail with device not found
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device_id = copy.deepcopy(DEVICE_ID)
- copy_device_id['device_id']['uuid'] = 'wrong-device-id'
- device_client.DeleteDevice(DeviceId(**copy_device_id))
- assert e.value.code() == grpc.StatusCode.NOT_FOUND
- msg = 'Context(admin)/Topology(admin)/Device(wrong-device-id) does not exist in the database.'
- assert e.value.details() == msg
-
-def test_delete_device(device_client : DeviceClient):
- # should work
- validate_empty(MessageToDict(
- device_client.DeleteDevice(DeviceId(**DEVICE_ID)),
- including_default_value_fields=True, preserving_proto_field_name=True,
- use_integers_for_enums=False))
-
-def test_configure_device_empty_device_uuid(device_client : DeviceClient):
- # should fail with device uuid is empty
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['device_id']['device_id']['uuid'] = ''
- device_client.ConfigureDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'device.device_id.device_id.uuid() is out of range: '\
- 'allow_empty(False) min_length(None) max_length(None) allowed_lengths(None).'
- assert e.value.details() == msg
-
-def test_configure_device_not_found(device_client : DeviceClient):
- # should fail with device not found
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['device_id']['device_id']['uuid'] = 'wrong-device-id'
- device_client.ConfigureDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.NOT_FOUND
- msg = 'Context(admin)/Topology(admin)/Device(wrong-device-id) does not exist in the database.'
- assert e.value.details() == msg
-
-def test_add_device_default_endpoint_context_topology_device(device_client : DeviceClient):
- # should work
- copy_device = copy.deepcopy(DEVICE)
- copy_device['endpointList'][0]['port_id']['topoId']['contextId']['contextUuid']['uuid'] = ''
- copy_device['endpointList'][0]['port_id']['topoId']['topoId']['uuid'] = ''
- copy_device['endpointList'][0]['port_id']['dev_id']['device_id']['uuid'] = ''
- validate_device_id(MessageToDict(
- device_client.AddDevice(Device(**copy_device)),
- including_default_value_fields=True, preserving_proto_field_name=True,
- use_integers_for_enums=False))
-
-def test_configure_device_wrong_attributes(device_client : DeviceClient):
- # should fail with device type is wrong
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['device_type'] = 'wrong-type'
- device_client.ConfigureDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- msg = 'Device(DEV1) has Type(ROADM) in the database. Cannot be changed to Type(wrong-type).'
- assert e.value.details() == msg
-
- # should fail with endpoints cannot be modified
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- device_client.ConfigureDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
- assert e.value.details() == 'Endpoints belonging to Device(DEV1) cannot be modified.'
-
- # should fail with any change detected
- with pytest.raises(grpc._channel._InactiveRpcError) as e:
- copy_device = copy.deepcopy(DEVICE)
- copy_device['device_config']['device_config'] = ''
- copy_device['devOperationalStatus'] = OperationalStatus.KEEP_STATE.value
- copy_device['endpointList'].clear()
- device_client.ConfigureDevice(Device(**copy_device))
- assert e.value.code() == grpc.StatusCode.ABORTED
- msg = 'Any change has been requested for Device(DEV1). '\
- 'Either specify a new configuration or a new device operational status.'
- assert e.value.details() == msg
-
-def test_configure_device(device_client : DeviceClient):
- # should work
- copy_device = copy.deepcopy(DEVICE)
- copy_device['device_config']['device_config'] = ''
- copy_device['devOperationalStatus'] = OperationalStatus.DISABLED.value
- copy_device['endpointList'].clear()
- validate_device_id(MessageToDict(
- device_client.ConfigureDevice(Device(**copy_device)),
- including_default_value_fields=True, preserving_proto_field_name=True,
- use_integers_for_enums=False))