Skip to content
Snippets Groups Projects
ServiceHandler_L3NM_EMU.py 3.63 KiB
Newer Older
from copy import deepcopy
from service.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum, ServiceStatusEnum, ServiceTypeEnum
from service.service.service_handlers.Tools import config_rule_set, constraint, device_id, endpoint_id
from .CommonObjects import CONTEXT_ID

# use "deepcopy" to prevent propagating forced changes during tests

SERVICE_HANDLER_NAME = 'l3nm_emulated'

SERVICE_UUID = 'SVC_L3NM_EMU'

SERVICE_ID = {
    'context_id': deepcopy(CONTEXT_ID),
    'service_uuid': {'uuid': SERVICE_UUID}
}

SERVICE_DESCRIPTOR = {
    'service_id': deepcopy(SERVICE_ID),
    'service_type': ServiceTypeEnum.SERVICETYPE_L3NM,
    'service_endpoint_ids' : [],
    'service_constraints': [],
    'service_status': {'service_status': ServiceStatusEnum.SERVICESTATUS_PLANNED},
    'service_config': {'config_rules': []},
}

DEVICE1_UUID = 'EMULATED-ROUTER-1'
DEVICE2_UUID = 'EMULATED-ROUTER-2'

DEVICE_ATTRIBUTES = { # device_uuid => {device_settings}
    DEVICE1_UUID: {
        'type'    : 'emulated',
        'address' : '10.95.86.155',
        'port'    : '830',
        'drivers' : [DeviceDriverEnum.DEVICEDRIVER_UNDEFINED],
        'endpoint': 'EP100',
    },
    DEVICE2_UUID: {
        'type'    : 'emulated',
        'address' : '10.96.86.149',
        'port'    : '830',
        'drivers' : [DeviceDriverEnum.DEVICEDRIVER_UNDEFINED],
        'endpoint': 'EP100',
    },
}

SERVICE_DEVICE_UUIDS = [DEVICE1_UUID, DEVICE2_UUID]

SERVICE_ENDPOINT_IDS = [
    endpoint_id(device_uuid, DEVICE_ATTRIBUTES[device_uuid]['endpoint'])
    for device_uuid in SERVICE_DEVICE_UUIDS
]

SERVICE_CONFIG_RULES = [
    config_rule_set(
        'settings', {
            'route_distinguisher': '60001:801', 'mtu': 1512, 'address_families': ['IPV4']
        }),
    config_rule_set(
        'device[{:s}]/endpoint[{:s}]/settings'.format(DEVICE1_UUID, DEVICE_ATTRIBUTES[DEVICE1_UUID]['endpoint']), {
            'router_id': '10.0.0.1', 'sub_interface_index': 1,
        }),
    config_rule_set(
        'device[{:s}]/endpoint[{:s}]/settings'.format(DEVICE2_UUID, DEVICE_ATTRIBUTES[DEVICE2_UUID]['endpoint']), {
            'router_id': '10.0.0.2', 'sub_interface_index': 1,
        }),
]

SERVICE_CONSTRAINTS = [
    constraint('latency_ms', 15.2),
    constraint('jitter_us', 1.2),
]

def get_device_descriptor(device_uuid, enabled=True):
    device_operational_status = (
        DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED \
        if enabled else \
        DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED)
    return {
        'device_id': device_id(device_uuid),
        'device_type': DEVICE_ATTRIBUTES[device_uuid]['type'],
        'device_config': {'config_rules': []},
        'device_operational_status': device_operational_status,
        'device_drivers': DEVICE_ATTRIBUTES[device_uuid]['drivers'],
        'device_endpoints': [],
    }

def get_connect_rules(device_uuid):
    return [
        config_rule_set('_connect/address', '127.0.0.1'),
        config_rule_set('_connect/port',    '0'),
        config_rule_set('_connect/settings', {'endpoints': [
            {'uuid': endpoint_uuid, 'type': '10Gbps', 'sample_types': []}
            for endpoint_uuid in ['EP1', 'EP2', 'EP3', 'EP100']
        ]}),
    ]

TEST_SERVICE_HANDLER = (SERVICE_HANDLER_NAME, {
    'service_id'            : SERVICE_ID,
    'service_descriptor'    : SERVICE_DESCRIPTOR,
    'service_endpoint_ids'  : SERVICE_ENDPOINT_IDS,
    'service_config_rules'  : SERVICE_CONFIG_RULES,
    'service_constraints'   : SERVICE_CONSTRAINTS,
    'service_device_uuids'  : SERVICE_DEVICE_UUIDS,
    'get_device_descriptor' : get_device_descriptor,
    'get_connect_rules'     : get_connect_rules,
})