Skip to content
Snippets Groups Projects
Commit 6ba8fc54 authored by Shayan Hajipour's avatar Shayan Hajipour
Browse files

feat: L3VPN driver added

- L3VPN added to DeviceDriverEnum of context.proto
- ietf_l3vpn plugin added to src/device/service/drivers directory
parent 07b8adea
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!283Resolve "(CTTC) Implement L3 VPN SBI driver compatible with IETF L3 Service Model"
...@@ -215,6 +215,7 @@ enum DeviceDriverEnum { ...@@ -215,6 +215,7 @@ enum DeviceDriverEnum {
DEVICEDRIVER_IETF_ACTN = 10; DEVICEDRIVER_IETF_ACTN = 10;
DEVICEDRIVER_OC = 11; DEVICEDRIVER_OC = 11;
DEVICEDRIVER_QKD = 12; DEVICEDRIVER_QKD = 12;
DEVICEDRIVER_IETF_L3VPN = 13;
} }
enum DeviceOperationalStatusEnum { enum DeviceOperationalStatusEnum {
......
...@@ -81,6 +81,16 @@ DRIVERS.append( ...@@ -81,6 +81,16 @@ DRIVERS.append(
} }
])) ]))
from .ietf_l3vpn.driver import IetfL3VpnDriver # pylint: disable=wrong-import-position
DRIVERS.append(
(IetfL3VpnDriver, [
{
FilterFieldEnum.DEVICE_TYPE: DeviceTypeEnum.TERAFLOWSDN_CONTROLLER,
FilterFieldEnum.DRIVER: DeviceDriverEnum.DEVICEDRIVER_IETF_L3VPN,
}
]))
from .ietf_actn.IetfActnDriver import IetfActnDriver # pylint: disable=wrong-import-position from .ietf_actn.IetfActnDriver import IetfActnDriver # pylint: disable=wrong-import-position
DRIVERS.append( DRIVERS.append(
(IetfActnDriver, [ (IetfActnDriver, [
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, List, Optional
import requests
from requests.auth import HTTPBasicAuth
from device.service.driver_api.ImportTopologyEnum import ImportTopologyEnum
GET_DEVICES_URL = "{:s}://{:s}:{:d}/tfs-api/devices"
GET_LINKS_URL = "{:s}://{:s}:{:d}/tfs-api/links"
L3VPN_URL = "{:s}://{:s}:{:d}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
TIMEOUT = 30
HTTP_OK_CODES = {
200, # OK
201, # Created
202, # Accepted
204, # No Content
}
MAPPING_STATUS = {
"DEVICEOPERATIONALSTATUS_UNDEFINED": 0,
"DEVICEOPERATIONALSTATUS_DISABLED": 1,
"DEVICEOPERATIONALSTATUS_ENABLED": 2,
}
MAPPING_DRIVER = {
"DEVICEDRIVER_UNDEFINED": 0,
"DEVICEDRIVER_OPENCONFIG": 1,
"DEVICEDRIVER_TRANSPORT_API": 2,
"DEVICEDRIVER_P4": 3,
"DEVICEDRIVER_IETF_NETWORK_TOPOLOGY": 4,
"DEVICEDRIVER_ONF_TR_532": 5,
"DEVICEDRIVER_XR": 6,
"DEVICEDRIVER_IETF_L2VPN": 7,
"DEVICEDRIVER_GNMI_OPENCONFIG": 8,
"DEVICEDRIVER_OPTICAL_TFS": 9,
"DEVICEDRIVER_IETF_ACTN": 10,
"DEVICEDRIVER_OC": 11,
}
MSG_ERROR = "Could not retrieve devices in remote TeraFlowSDN instance({:s}). status_code={:s} reply={:s}"
LOGGER = logging.getLogger(__name__)
class TfsApiClient:
def __init__(
self,
address: str,
port: int,
scheme: str = "http",
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
self._devices_url = GET_DEVICES_URL.format(scheme, address, port)
self._links_url = GET_LINKS_URL.format(scheme, address, port)
self._auth = (
HTTPBasicAuth(username, password)
if username is not None and password is not None
else None
)
def get_devices_endpoints(
self, import_topology: ImportTopologyEnum = ImportTopologyEnum.DEVICES
) -> List[Dict]:
LOGGER.debug("[get_devices_endpoints] begin")
LOGGER.debug(
"[get_devices_endpoints] import_topology={:s}".format(str(import_topology))
)
reply = requests.get(
self._devices_url, timeout=TIMEOUT, verify=False, auth=self._auth
)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(
str(self._devices_url), str(reply.status_code), str(reply)
)
LOGGER.error(msg)
raise Exception(msg)
if import_topology == ImportTopologyEnum.DISABLED:
raise Exception(
"Unsupported import_topology mode: {:s}".format(str(import_topology))
)
result = list()
for json_device in reply.json()["devices"]:
device_uuid: str = json_device["device_id"]["device_uuid"]["uuid"]
device_type: str = json_device["device_type"]
device_status = json_device["device_operational_status"]
device_url = "/devices/device[{:s}]".format(device_uuid)
device_data = {
"uuid": json_device["device_id"]["device_uuid"]["uuid"],
"name": json_device["name"],
"type": device_type,
"status": MAPPING_STATUS[device_status],
"drivers": [
MAPPING_DRIVER[driver] for driver in json_device["device_drivers"]
],
}
result.append((device_url, device_data))
for json_endpoint in json_device["device_endpoints"]:
endpoint_uuid = json_endpoint["endpoint_id"]["endpoint_uuid"]["uuid"]
endpoint_url = "/endpoints/endpoint[{:s}]".format(endpoint_uuid)
endpoint_data = {
"device_uuid": device_uuid,
"uuid": endpoint_uuid,
"name": json_endpoint["name"],
"type": json_endpoint["endpoint_type"],
}
result.append((endpoint_url, endpoint_data))
if import_topology == ImportTopologyEnum.DEVICES:
LOGGER.debug("[get_devices_endpoints] devices only; returning")
return result
reply = requests.get(
self._links_url, timeout=TIMEOUT, verify=False, auth=self._auth
)
if reply.status_code not in HTTP_OK_CODES:
msg = MSG_ERROR.format(
str(self._links_url), str(reply.status_code), str(reply)
)
LOGGER.error(msg)
raise Exception(msg)
for json_link in reply.json()["links"]:
link_uuid: str = json_link["link_id"]["link_uuid"]["uuid"]
link_url = "/links/link[{:s}]".format(link_uuid)
link_endpoint_ids = [
(
json_endpoint_id["device_id"]["device_uuid"]["uuid"],
json_endpoint_id["endpoint_uuid"]["uuid"],
)
for json_endpoint_id in json_link["link_endpoint_ids"]
]
link_data = {
"uuid": json_link["link_id"]["link_uuid"]["uuid"],
"name": json_link["name"],
"endpoints": link_endpoint_ids,
}
result.append((link_url, link_data))
LOGGER.debug("[get_devices_endpoints] topology; returning")
return result
def create_connectivity_service(self, l3vpn_data: dict) -> None:
try:
requests.post(L3VPN_URL, json=l3vpn_data, auth=self._auth)
except requests.exceptions.ConnectionError:
raise Exception("faild to send post request to TFS L3VPN NBI")
def delete_connectivity_service(self, service_uuid: str) -> None:
url = L3VPN_URL + f"/vpn-service={service_uuid}"
try:
requests.delete(url, auth=self._auth)
except requests.exceptions.ConnectionError:
raise Exception("faild to send delete request to TFS L3VPN NBI")
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TypedDict
import requests
class LANPrefixesDict(TypedDict):
lan: str
lan_tag: str
LOGGER = logging.getLogger(__name__)
SITE_NETWORK_ACCESS_TYPE = "ietf-l3vpn-svc:multipoint"
def service_exists(wim_url: str, auth, service_uuid: str) -> bool:
try:
get_connectivity_service(wim_url, auth, service_uuid)
return True
except: # pylint: disable=bare-except
return False
def get_all_active_connectivity_services(wim_url: str, auth):
try:
LOGGER.info("Sending get all connectivity services")
servicepoint = f"{wim_url}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
response = requests.get(servicepoint, auth=auth)
if response.status_code != requests.codes.ok:
raise Exception(
"Unable to get all connectivity services",
http_code=response.status_code,
)
return response
except requests.exceptions.ConnectionError:
raise Exception("Request Timeout", http_code=408)
def get_connectivity_service(wim_url, auth, service_uuid):
try:
LOGGER.info("Sending get connectivity service")
servicepoint = f"{wim_url}/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-service={service_uuid}"
response = requests.get(servicepoint, auth=auth)
if response.status_code != requests.codes.ok:
raise Exception(
"Unable to get connectivity service{:s}".format(str(service_uuid)),
http_code=response.status_code,
)
return response
except requests.exceptions.ConnectionError:
raise Exception("Request Timeout", http_code=408)
def create_l3vpn_datamodel(service_uuid, resource_value: dict) -> dict:
src_device_uuid: str = resource_value["src_device_name"]
src_endpoint_uuid: str = resource_value["src_endpoint_name"]
src_site_location: str = resource_value["src_site_location"]
src_ipv4_lan_prefixes: list[LANPrefixesDict] = resource_value.get(
"src_ipv4_lan_prefixes"
)
src_site_id: str = resource_value.get("src_site_id", f"site_{src_site_location}")
src_management_type: str = resource_value.get(
"src_management_type", "ietf-l3vpn-svc:provider-managed"
)
if src_management_type != "ietf-l3vpn-svc:provider-managed":
raise Exception("management type %s not supported", src_management_type)
src_role: str = "ietf-l3vpn-svc:hub-role"
src_ce_address: str = resource_value["src_ce_address"]
src_pe_address: str = resource_value["src_pe_address"]
src_ce_pe_network_prefix: int = resource_value["src_ce_pe_network_prefix"]
src_mtu: int = resource_value["src_mtu"]
src_input_bw: int = resource_value["src_input_bw"]
src_output_bw: int = resource_value["src_output_bw"]
src_qos_profile_id: str = resource_value["src_qos_profile_id"]
src_qos_profile_direction: str = resource_value["src_qos_profile_direction"]
src_qos_profile_latency: int = resource_value["src_qos_profile_latency"]
src_qos_profile_bw_guarantee: int = resource_value["src_qos_profile_bw_guarantee"]
dst_device_uuid = resource_value["dst_device_name"]
dst_endpoint_uuid = resource_value["dst_endpoint_name"]
dst_site_location: str = resource_value["dst_site_location"]
dst_ipv4_lan_prefixes: list[LANPrefixesDict] = resource_value.get(
"dst_ipv4_lan_prefixes"
)
dst_site_id: str = resource_value.get("dst_site_id", f"site_{dst_site_location}")
dst_management_type: str = resource_value.get(
"dst_management_type", "ietf-l3vpn-svc:provider-managed"
)
if dst_management_type != "ietf-l3vpn-svc:provider-managed":
raise Exception("management type %s not supported", dst_management_type)
dst_role: str = "ietf-l3vpn-svc:spoke-role"
dst_ce_address: str = resource_value["dst_ce_address"]
dst_pe_address: str = resource_value["dst_pe_address"]
dst_ce_pe_network_prefix: int = resource_value["dst_ce_pe_network_prefix"]
dst_mtu: int = resource_value["dst_mtu"]
dst_input_bw: int = resource_value["dst_input_bw"]
dst_output_bw: int = resource_value["dst_output_bw"]
dst_qos_profile_id: str = resource_value["dst_qos_profile_id"]
dst_qos_profile_direction: str = resource_value["dst_qos_profile_direction"]
dst_qos_profile_latency: int = resource_value["dst_qos_profile_latency"]
dst_qos_profile_bw_guarantee: int = resource_value["dst_qos_profile_bw_guarantee"]
# Create source site information
src_management = {"type": "ietf-l3vpn-svc:provider-managed"}
src_locations = {"location": [{"location-id": src_site_location}]}
src_devices = {
"device": [{"device-id": src_device_uuid, "location": src_site_location}]
}
src_site_lan_prefixes = [
{"lan": lp["lan"], "lan-tag": lp["lan_tag"], "next-hop": src_ce_address}
for lp in src_ipv4_lan_prefixes
]
src_site_routing_protocols = {
"routing-protocol": [
{
"type": "ietf-l3vpn-svc:static",
"static": {
"cascaded-lan-prefixes": {
"ipv4-lan-prefixes": src_site_lan_prefixes
}
},
}
]
}
src_site_network_accesses = {
"site-network-access": [
{
"site-network-access-id": src_endpoint_uuid,
"site-network-access-type": SITE_NETWORK_ACCESS_TYPE,
"device-reference": src_device_uuid,
"vpn-attachment": {"vpn-id": service_uuid, "site-role": src_role},
"ip-connection": {
"ipv4": {
"address-allocation-type": "ietf-l3vpn-svc:static-address",
"addresses": {
"provider-address": src_pe_address,
"customer-address": src_ce_address,
"prefix-length": src_ce_pe_network_prefix,
},
}
},
"service": {
"svc-mtu": src_mtu,
"svc-input-bandwidth": src_input_bw,
"svc-output-bandwidth": src_output_bw,
"qos": {
"qos-profile": {
"classes": {
"class": [
{
"class-id": src_qos_profile_id,
"direction": src_qos_profile_direction,
"latency": {
"latency-boundary": src_qos_profile_latency
},
"bandwidth": {
"guaranteed-bw-percent": src_qos_profile_bw_guarantee
},
}
]
}
}
},
},
}
]
}
# Create destination site information
dst_management = {"type": "ietf-l3vpn-svc:provider-managed"}
dst_locations = {"location": [{"location-id": dst_site_location}]}
dst_devices = {
"device": [{"device-id": dst_device_uuid, "location": dst_site_location}]
}
dst_site_lan_prefixes = [
{"lan": lp["lan"], "lan-tag": lp["lan_tag"], "next-hop": dst_ce_address}
for lp in dst_ipv4_lan_prefixes
]
dst_site_routing_protocols = {
"routing-protocol": [
{
"type": "ietf-l3vpn-svc:static",
"static": {
"cascaded-lan-prefixes": {
"ipv4-lan-prefixes": dst_site_lan_prefixes
}
},
}
]
}
dst_site_network_accesses = {
"site-network-access": [
{
"site-network-access-id": dst_endpoint_uuid,
"site-network-access-type": SITE_NETWORK_ACCESS_TYPE,
"device-reference": dst_device_uuid,
"vpn-attachment": {"vpn-id": service_uuid, "site-role": dst_role},
"ip-connection": {
"ipv4": {
"address-allocation-type": "ietf-l3vpn-svc:static-address",
"addresses": {
"provider-address": dst_pe_address,
"customer-address": dst_ce_address,
"prefix-length": dst_ce_pe_network_prefix,
},
}
},
"service": {
"svc-mtu": dst_mtu,
"svc-input-bandwidth": dst_input_bw,
"svc-output-bandwidth": dst_output_bw,
"qos": {
"qos-profile": {
"classes": {
"class": [
{
"class-id": dst_qos_profile_id,
"direction": dst_qos_profile_direction,
"latency": {
"latency-boundary": dst_qos_profile_latency
},
"bandwidth": {
"guaranteed-bw-percent": dst_qos_profile_bw_guarantee
},
}
]
}
}
},
},
}
]
}
sites = {
"site": [
{
"site-id": src_site_id,
"management": src_management,
"locations": src_locations,
"devices": src_devices,
"routing-protocols": src_site_routing_protocols,
"site-network-accesses": src_site_network_accesses,
},
{
{
"site-id": dst_site_id,
"management": dst_management,
"locations": dst_locations,
"devices": dst_devices,
"routing-protocols": dst_site_routing_protocols,
"site-network-accesses": dst_site_network_accesses,
},
},
]
}
l3_vpn_data_model = {
"ietf-l3vpn-svc:l3vpn-svc": {
"vpn-services": {"vpn-service": [{"vpn-id": service_uuid}]},
"sites": sites,
}
}
return l3_vpn_data_model
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import threading
from typing import Any, Iterator, List, Optional, Tuple, Union
import requests
from requests.auth import HTTPBasicAuth
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import (
RESOURCE_ENDPOINTS,
RESOURCE_SERVICES,
_Driver,
)
from device.service.driver_api.ImportTopologyEnum import (
ImportTopologyEnum,
get_import_topology,
)
from .TfsApiClient import TfsApiClient
from .Tools import (
create_l3vpn_datamodel,
get_all_active_connectivity_services,
get_connectivity_service,
service_exists,
)
LOGGER = logging.getLogger(__name__)
ALL_RESOURCE_KEYS = [
RESOURCE_ENDPOINTS,
RESOURCE_SERVICES,
]
DRIVER_NAME = "ietf_l3vpn"
METRICS_POOL = MetricsPool("Device", "Driver", labels={"driver": DRIVER_NAME})
class IetfL3VpnDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
super().__init__(DRIVER_NAME, address, port, **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
scheme = self.settings.get("scheme", "http")
username = self.settings.get("username")
password = self.settings.get("password")
self.tac = TfsApiClient(
self.address,
int(self.port),
scheme=scheme,
username=username,
password=password,
)
self.__auth = (
HTTPBasicAuth(username, password)
if username is not None and password is not None
else None
)
self.__tfs_nbi_root = "{:s}://{:s}:{:d}".format(
scheme, self.address, int(self.port)
)
self.__timeout = int(self.settings.get("timeout", 120))
self.__import_topology = get_import_topology(
self.settings, default=ImportTopologyEnum.DEVICES
)
def Connect(self) -> bool:
url = (
self.__tfs_nbi_root + "/restconf/data/ietf-l3vpn-svc:l3vpn-svc/vpn-services"
)
with self.__lock:
if self.__started.is_set():
return True
try:
requests.get(
url, timeout=self.__timeout, verify=False, auth=self.__auth
)
except requests.exceptions.Timeout:
LOGGER.exception(
"Timeout connecting {:s}".format(str(self.__tapi_root))
)
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception(
"Exception connecting {:s}".format(str(self.__tapi_root))
)
return False
else:
self.__started.set()
return True
def Disconnect(self) -> bool:
with self.__lock:
self.__terminate.set()
return True
@metered_subclass_method(METRICS_POOL)
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
with self.__lock:
return []
@metered_subclass_method(METRICS_POOL)
def GetConfig(
self, resource_keys: List[str] = []
) -> List[Tuple[str, Union[Any, None, Exception]]]:
chk_type("resources", resource_keys, list)
results = []
with self.__lock:
if len(resource_keys) == 0:
resource_keys = ALL_RESOURCE_KEYS
for i, resource_key in enumerate(resource_keys):
str_resource_name = "resource_key[#{:d}]".format(i)
try:
chk_string(str_resource_name, resource_key, allow_empty=False)
if resource_key == RESOURCE_ENDPOINTS:
# return endpoints through TFS NBI API and list-devices method
results.extend(
self.tac.get_devices_endpoints(self.__import_topology)
)
elif resource_key == RESOURCE_SERVICES:
# return all services through
reply = get_all_active_connectivity_services(
wim_url=self.__tfs_nbi_root, auth=self.__auth
)
results.extend(reply.json())
else:
# assume single-service retrieval
reply = get_connectivity_service(
self.__tfs_nbi_root, resource_key
)
results.append(reply.json())
except Exception as e: # pylint: disable=broad-except
LOGGER.exception(
"Unhandled error processing resource_key({:s})".format(
str(resource_key)
)
)
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def SetConfig(
self, resources: List[Tuple[str, Any]]
) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
with self.__lock:
for resource in resources:
LOGGER.info("resource = {:s}".format(str(resource)))
resource_key, resource_value = resource
try:
resource_value = json.loads(resource_value)
service_uuid = resource_value["uuid"] #! fix based on resources
if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid):
exc = NotImplementedError(
"IETF L3VPN Service Update is still not supported"
)
results.append((resource[0], exc))
continue
l3vpn_datamodel = create_l3vpn_datamodel(
service_uuid, resource_value
)
self.tac.create_connectivity_service(l3vpn_datamodel)
results.append((resource_key, True))
except Exception as e: # pylint: disable=broad-except
LOGGER.exception(
"Unhandled error processing resource_key({:s})".format(
str(resource_key)
)
)
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def DeleteConfig(
self, resources: List[Tuple[str, Any]]
) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0:
return results
with self.__lock:
for resource in resources:
LOGGER.info("resource = {:s}".format(str(resource)))
resource_key, resource_value = resource
try:
resource_value = json.loads(resource_value)
service_uuid = resource_value["uuid"]
if service_exists(self.__tfs_nbi_root, self.__auth, service_uuid):
self.tac.delete_connectivity_service(service_uuid)
results.append((resource_key, True))
except Exception as e: # pylint: disable=broad-except
LOGGER.exception(
"Unhandled error processing resource_key({:s})".format(
str(resource_key)
)
)
results.append((resource_key, e))
return results
@metered_subclass_method(METRICS_POOL)
def SubscribeState(
self, subscriptions: List[Tuple[str, float, float]]
) -> List[Union[bool, Exception]]:
# TODO: IETF L3VPN does not support monitoring by now
return [False for _ in subscriptions]
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(
self, subscriptions: List[Tuple[str, float, float]]
) -> List[Union[bool, Exception]]:
# TODO: IETF L3VPN does not support monitoring by now
return [False for _ in subscriptions]
def GetState(
self, blocking=False, terminate: Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
# TODO: IETF L3VPN does not support monitoring by now
return []
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment