Skip to content
Snippets Groups Projects
MockOSM.py 4.2 KiB
Newer Older
import logging
from .WimconnectorIETFL2VPN import WimconnectorIETFL2VPN

LOGGER = logging.getLogger(__name__)

WIM_USERNAME = 'admin'
WIM_PASSWORD = 'admin'

# Ref: https://osm.etsi.org/wikipub/index.php/WIM
WIM_MAPPING  = [
    {
        'device-id'           : 'dev-1',            # pop_switch_dpid
        #'device_interface_id' : ??,                # pop_switch_port
        'service_endpoint_id' : 'ep-1',             # wan_service_endpoint_id
        'service_mapping_info': {                   # wan_service_mapping_info, other extra info
            'bearer': {'bearer-reference': 'dev-1:ep-1:10.0.0.1'},
            'site-id': '1',
        },
        #'switch_dpid'         : ??,                # wan_switch_dpid
        #'switch_port'         : ??,                # wan_switch_port
        #'datacenter_id'       : ??,                # vim_account
    },
    {
        'device-id'           : 'dev-2',            # pop_switch_dpid
        #'device_interface_id' : ??,                # pop_switch_port
        'service_endpoint_id' : 'ep-2',             # wan_service_endpoint_id
        'service_mapping_info': {                   # wan_service_mapping_info, other extra info
            'bearer': {'bearer-reference': 'dev-2:ep-2:10.0.0.2'},
            'site-id': '2',
        },
        #'switch_dpid'         : ??,                # wan_switch_dpid
        #'switch_port'         : ??,                # wan_switch_port
        #'datacenter_id'       : ??,                # vim_account
    },
    {
        'device-id'           : 'dev-3',            # pop_switch_dpid
        #'device_interface_id' : ??,                # pop_switch_port
        'service_endpoint_id' : 'ep-3',             # wan_service_endpoint_id
        'service_mapping_info': {                   # wan_service_mapping_info, other extra info
            'bearer': {'bearer-reference': 'dev-3:ep-3:10.0.0.3'},
            'site-id': '3',
        },
        #'switch_dpid'         : ??,                # wan_switch_dpid
        #'switch_port'         : ??,                # wan_switch_port
        #'datacenter_id'       : ??,                # vim_account
    },
]

SERVICE_TYPE = 'ELINE'
SERVICE_CONNECTION_POINTS_1 = [
    {'service_endpoint_id': 'ep-1',
        'service_endpoint_encapsulation_type': 'dot1q',
        'service_endpoint_encapsulation_info': {'vlan': 1234}},
    {'service_endpoint_id': 'ep-2',
        'service_endpoint_encapsulation_type': 'dot1q',
        'service_endpoint_encapsulation_info': {'vlan': 1234}},
]

SERVICE_CONNECTION_POINTS_2 = [
    {'service_endpoint_id': 'ep-3',
        'service_endpoint_encapsulation_type': 'dot1q',
        'service_endpoint_encapsulation_info': {'vlan': 1234}},
]

class MockOSM:
    def __init__(self, wim_url):
        wim = {'wim_url': wim_url}
        wim_account = {'user': WIM_USERNAME, 'password': WIM_PASSWORD}
        config = {'mapping_not_needed': False, 'service_endpoint_mapping': WIM_MAPPING}
        self.wim = WimconnectorIETFL2VPN(wim, wim_account, config=config)
        self.service_uuid = None
        self.conn_info = None

    def create_connectivity_service(self):
        self.wim.check_credentials()
        LOGGER.info('[create_connectivity_service] connection_points={:s}'.format(str(SERVICE_CONNECTION_POINTS_1)))
        result = self.wim.create_connectivity_service(SERVICE_TYPE, SERVICE_CONNECTION_POINTS_1)
        LOGGER.info('[create_connectivity_service] result={:s}'.format(str(result)))
        self.service_uuid, self.conn_info = result

    def get_connectivity_service_status(self):
        self.wim.check_credentials()
        result = self.wim.get_connectivity_service_status(self.service_uuid, conn_info=self.conn_info)
        LOGGER.info('[get_connectivity_service] result={:s}'.format(str(result)))

    def edit_connectivity_service(self):
        self.wim.check_credentials()
        LOGGER.info('[edit_connectivity_service] connection_points={:s}'.format(str(SERVICE_CONNECTION_POINTS_2)))
        self.wim.edit_connectivity_service(
            self.service_uuid, conn_info=self.conn_info, connection_points=SERVICE_CONNECTION_POINTS_2)

    def delete_connectivity_service(self):
        self.wim.check_credentials()
        self.wim.delete_connectivity_service(self.service_uuid, conn_info=self.conn_info)