Loading src/device/service/driver_api/DriverFactory.py 0 → 100644 +50 −0 Original line number Diff line number Diff line from typing import Dict, Set from device.service.driver_api.QueryFields import QUERY_FIELDS from device.service.driver_api._Driver import _Driver from device.service.driver_api.Exceptions import MultipleResultsForQueryException, UnsatisfiedQueryException, \ UnsupportedDriverClassException, UnsupportedQueryFieldException, UnsupportedQueryFieldValueException class DriverFactory: def __init__(self) -> None: self.__indices : Dict[str, Dict[str, Set[_Driver]]] = {} # Dict{field_name => Dict{field_value => Set{Driver}}} def register_driver_class(self, driver_class, **query_fields): if not issubclass(driver_class, _Driver): raise UnsupportedDriverClassException(str(driver_class)) driver_name = driver_class.__name__ unsupported_query_fields = set(query_fields.keys()).difference(set(QUERY_FIELDS.keys())) if len(unsupported_query_fields) > 0: raise UnsupportedQueryFieldException(unsupported_query_fields, driver_class_name=driver_name) for field_name, field_value in query_fields.items(): field_indice = self.__indices.setdefault(field_name, dict()) field_enum_values = QUERY_FIELDS.get(field_name) if field_enum_values is not None and field_value not in field_enum_values: raise UnsupportedQueryFieldValueException( field_name, field_value, field_enum_values, driver_class_name=driver_name) field_indice_drivers = field_indice.setdefault(field_name, set()) field_indice_drivers.add(driver_class) def get_driver_class(self, **query_fields) -> _Driver: unsupported_query_fields = set(query_fields.keys()).difference(set(QUERY_FIELDS.keys())) if len(unsupported_query_fields) > 0: raise UnsupportedQueryFieldException(unsupported_query_fields) candidate_driver_classes = None for field_name, field_value in query_fields.items(): field_indice = self.__indices.get(field_name) if field_indice is None: continue field_enum_values = QUERY_FIELDS.get(field_name) if field_enum_values is not None and field_value not in field_enum_values: raise UnsupportedQueryFieldValueException(field_name, field_value, field_enum_values) field_indice_drivers = field_indice.get(field_name) if field_indice_drivers is None: continue candidate_driver_classes = set().union(field_indice_drivers) if candidate_driver_classes is None else \ candidate_driver_classes.intersection(field_indice_drivers) if len(candidate_driver_classes) == 0: raise UnsatisfiedQueryException(query_fields) if len(candidate_driver_classes) > 1: # TODO: Consider choosing driver with more query fields being satisfied (i.e., the most restrictive one) raise MultipleResultsForQueryException(query_fields, {d.__name__ for d in candidate_driver_classes}) return candidate_driver_classes.pop() src/device/service/driver_api/Exceptions.py 0 → 100644 +30 −0 Original line number Diff line number Diff line class MultipleResultsForQueryException(Exception): def __init__(self, query_fields, driver_names): super().__init__('Multiple Drivers({}) satisfy QueryFields({})'.format(str(driver_names), str(query_fields))) class UnsatisfiedQueryException(Exception): def __init__(self, query_fields): super().__init__('No Driver satisfies QueryFields({})'.format(str(query_fields))) class UnsupportedDriverClassException(Exception): def __init__(self, driver_class_name): super().__init__('Class({}) is not a subclass of _Driver'.format(str(driver_class_name))) class UnsupportedQueryFieldException(Exception): def __init__(self, unsupported_query_fields, driver_class_name=None): if driver_class_name: msg = 'QueryFields({}) specified by Driver({}) are not supported'.format( str(unsupported_query_fields), str(driver_class_name)) else: msg = 'QueryFields({}) specified in query are not supported'.format(str(unsupported_query_fields)) super().__init__(msg) class UnsupportedQueryFieldValueException(Exception): def __init__(self, query_field_name, query_field_value, allowed_query_field_values, driver_class_name=None): if driver_class_name: msg = 'QueryField({}={}) specified by Driver({}) is not supported. Allowed values are {}'.format( str(query_field_name), str(query_field_value), str(driver_class_name), str(allowed_query_field_values)) else: msg = 'QueryField({}={}) specified in query is not supported. Allowed values are {}'.format( str(query_field_name), str(query_field_value), str(allowed_query_field_values)) super().__init__(msg) src/device/service/driver_api/QueryFields.py 0 → 100644 +33 −0 Original line number Diff line number Diff line from enum import Enum class DeviceTypeQueryFieldEnum(Enum): OPTICAL_ROADM = 'optical-roadm' OPTICAL_TRANDPONDER = 'optical-trandponder' PACKET_ROUTER = 'packet-router' PACKET_SWITCH = 'packet-switch' class ProtocolQueryFieldEnum(Enum): SOFTWARE = 'software' GRPC = 'grpc' RESTAPI = 'restapi' NETCONF = 'netconf' GNMI = 'gnmi' RESTCONF = 'restconf' class DataModelQueryFieldEnum(Enum): EMULATED = 'emu' OPENCONFIG = 'oc' P4 = 'p4' TRANSPORT_API = 'tapi' IETF_NETWORK_TOPOLOGY = 'ietf-netw-topo' ONF_TR_352 = 'onf-tr-352' # Map allowed query fields to allowed values per query field. If no restriction (free text) None is specified QUERY_FIELDS = { 'device_type' : {i.value for i in DeviceTypeQueryFieldEnum}, 'protocol' : {i.value for i in ProtocolQueryFieldEnum}, 'data_model' : {i.value for i in DataModelQueryFieldEnum}, 'vendor' : None, 'model' : None, 'serial_number': None, } src/device/service/driver_api/_Driver.py 0 → 100644 +113 −0 Original line number Diff line number Diff line from typing import Any, Iterator, List, Tuple, Union class _Driver: def __init__(self, address : str, port : int, **kwargs) -> None: """ Initialize Driver. Parameters: address : str The address of the device port : int The port of the device **kwargs Extra attributes can be configured using kwargs. """ raise NotImplementedError() def Connect(self) -> bool: """ Connect to the Device. Returns: succeeded : bool Boolean variable indicating if connection succeeded. """ raise NotImplementedError() def Disconnect(self) -> bool: """ Disconnect from the Device. Returns: succeeded : bool Boolean variable indicating if disconnection succeeded. """ raise NotImplementedError() def GetConfig(self, resource : List[str]) -> List[Union[str, None]]: """ Retrieve running configuration of entire device, or selected resources. Parameters: resources : List[str] List of keys pointing to the resources to be retrieved. Returns: values : List[Union[str, None]] List of values for resource keys requested. Values should be in the same order than resource keys requested. If a resource is not found, None should be specified in the List for that resource. """ raise NotImplementedError() def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[bool]: """ Create/Update configuration for a list of resources. Parameters: resources : List[Tuple[str, Any]] List of tuples, each containing a resource_key pointing the resource to be modified, and a resource_value containing the new value to be set. Returns: results : List[bool] List of results for changes in resource keys requested. Each result is a boolean value indicating if the change succeeded. Results should be in the same order than resource keys requested. """ raise NotImplementedError() def DeleteConfig(self, resource : List[str]) -> List[bool]: """ Delete configuration for a list of resources. Parameters: resources : List[str] List of keys pointing to the resources to be deleted. Returns: results : List[bool] List of results for delete resource keys requested. Each result is a boolean value indicating if the delete succeeded. Results should be in the same order than resource keys requested. """ raise NotImplementedError() def SubscribeState(self, resources : List[Tuple[str, float]]) -> List[bool]: """ Subscribe to state information of entire device, or selected resources. Subscriptions are incremental. Driver should keep track of requested resources. Parameters: resources : List[Tuple[str, float]] List of tuples, each containing a resource_key pointing the resource to be subscribed, and a sampling_rate in seconds defining the desired monitoring periodicity for that resource. Note: sampling_rate values must be a strictly positive floats. Returns: results : List[bool] List of results for subscriptions to resource keys requested. Each result is a boolean value indicating if the subscription succeeded. Results should be in the same order than resource keys requested. """ raise NotImplementedError() def UnsubscribeState(self, resources : List[str]) -> List[bool]: """ Unsubscribe from state information of entire device, or selected resources. Subscriptions are incremental. Driver should keep track of requested resources. Parameters: resources : List[str] List of resource_keys pointing the resource to be unsubscribed. Returns: results : List[bool] List of results for unsubscriptions from resource keys requested. Each result is a boolean value indicating if the unsubscription succeeded. Results should be in the same order than resource keys requested. """ raise NotImplementedError() def GetState(self) -> Iterator[Tuple[str, Any]]: """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be called once and will block until values are available. When values are available, it should yield each of them and block again until new values are available. When the driver is destroyed, GetState() can return instead of yield to terminate the loop. Example: for resource_key,resource_value in my_driver.GetState(): process(resource_key,resource_value) Returns: results : Iterator[Tuple[str, Any]] List of tuples with state samples, each containing a resource_key and a resource_value. Only resources with an active subscription must be retrieved. Periodicity of the samples is specified when creating the subscription using method SubscribeState(). Order of yielded values is arbitrary. """ raise NotImplementedError() src/device/service/driver_api/__init__.py 0 → 100644 +0 −0 Empty file added. Loading
src/device/service/driver_api/DriverFactory.py 0 → 100644 +50 −0 Original line number Diff line number Diff line from typing import Dict, Set from device.service.driver_api.QueryFields import QUERY_FIELDS from device.service.driver_api._Driver import _Driver from device.service.driver_api.Exceptions import MultipleResultsForQueryException, UnsatisfiedQueryException, \ UnsupportedDriverClassException, UnsupportedQueryFieldException, UnsupportedQueryFieldValueException class DriverFactory: def __init__(self) -> None: self.__indices : Dict[str, Dict[str, Set[_Driver]]] = {} # Dict{field_name => Dict{field_value => Set{Driver}}} def register_driver_class(self, driver_class, **query_fields): if not issubclass(driver_class, _Driver): raise UnsupportedDriverClassException(str(driver_class)) driver_name = driver_class.__name__ unsupported_query_fields = set(query_fields.keys()).difference(set(QUERY_FIELDS.keys())) if len(unsupported_query_fields) > 0: raise UnsupportedQueryFieldException(unsupported_query_fields, driver_class_name=driver_name) for field_name, field_value in query_fields.items(): field_indice = self.__indices.setdefault(field_name, dict()) field_enum_values = QUERY_FIELDS.get(field_name) if field_enum_values is not None and field_value not in field_enum_values: raise UnsupportedQueryFieldValueException( field_name, field_value, field_enum_values, driver_class_name=driver_name) field_indice_drivers = field_indice.setdefault(field_name, set()) field_indice_drivers.add(driver_class) def get_driver_class(self, **query_fields) -> _Driver: unsupported_query_fields = set(query_fields.keys()).difference(set(QUERY_FIELDS.keys())) if len(unsupported_query_fields) > 0: raise UnsupportedQueryFieldException(unsupported_query_fields) candidate_driver_classes = None for field_name, field_value in query_fields.items(): field_indice = self.__indices.get(field_name) if field_indice is None: continue field_enum_values = QUERY_FIELDS.get(field_name) if field_enum_values is not None and field_value not in field_enum_values: raise UnsupportedQueryFieldValueException(field_name, field_value, field_enum_values) field_indice_drivers = field_indice.get(field_name) if field_indice_drivers is None: continue candidate_driver_classes = set().union(field_indice_drivers) if candidate_driver_classes is None else \ candidate_driver_classes.intersection(field_indice_drivers) if len(candidate_driver_classes) == 0: raise UnsatisfiedQueryException(query_fields) if len(candidate_driver_classes) > 1: # TODO: Consider choosing driver with more query fields being satisfied (i.e., the most restrictive one) raise MultipleResultsForQueryException(query_fields, {d.__name__ for d in candidate_driver_classes}) return candidate_driver_classes.pop()
src/device/service/driver_api/Exceptions.py 0 → 100644 +30 −0 Original line number Diff line number Diff line class MultipleResultsForQueryException(Exception): def __init__(self, query_fields, driver_names): super().__init__('Multiple Drivers({}) satisfy QueryFields({})'.format(str(driver_names), str(query_fields))) class UnsatisfiedQueryException(Exception): def __init__(self, query_fields): super().__init__('No Driver satisfies QueryFields({})'.format(str(query_fields))) class UnsupportedDriverClassException(Exception): def __init__(self, driver_class_name): super().__init__('Class({}) is not a subclass of _Driver'.format(str(driver_class_name))) class UnsupportedQueryFieldException(Exception): def __init__(self, unsupported_query_fields, driver_class_name=None): if driver_class_name: msg = 'QueryFields({}) specified by Driver({}) are not supported'.format( str(unsupported_query_fields), str(driver_class_name)) else: msg = 'QueryFields({}) specified in query are not supported'.format(str(unsupported_query_fields)) super().__init__(msg) class UnsupportedQueryFieldValueException(Exception): def __init__(self, query_field_name, query_field_value, allowed_query_field_values, driver_class_name=None): if driver_class_name: msg = 'QueryField({}={}) specified by Driver({}) is not supported. Allowed values are {}'.format( str(query_field_name), str(query_field_value), str(driver_class_name), str(allowed_query_field_values)) else: msg = 'QueryField({}={}) specified in query is not supported. Allowed values are {}'.format( str(query_field_name), str(query_field_value), str(allowed_query_field_values)) super().__init__(msg)
src/device/service/driver_api/QueryFields.py 0 → 100644 +33 −0 Original line number Diff line number Diff line from enum import Enum class DeviceTypeQueryFieldEnum(Enum): OPTICAL_ROADM = 'optical-roadm' OPTICAL_TRANDPONDER = 'optical-trandponder' PACKET_ROUTER = 'packet-router' PACKET_SWITCH = 'packet-switch' class ProtocolQueryFieldEnum(Enum): SOFTWARE = 'software' GRPC = 'grpc' RESTAPI = 'restapi' NETCONF = 'netconf' GNMI = 'gnmi' RESTCONF = 'restconf' class DataModelQueryFieldEnum(Enum): EMULATED = 'emu' OPENCONFIG = 'oc' P4 = 'p4' TRANSPORT_API = 'tapi' IETF_NETWORK_TOPOLOGY = 'ietf-netw-topo' ONF_TR_352 = 'onf-tr-352' # Map allowed query fields to allowed values per query field. If no restriction (free text) None is specified QUERY_FIELDS = { 'device_type' : {i.value for i in DeviceTypeQueryFieldEnum}, 'protocol' : {i.value for i in ProtocolQueryFieldEnum}, 'data_model' : {i.value for i in DataModelQueryFieldEnum}, 'vendor' : None, 'model' : None, 'serial_number': None, }
src/device/service/driver_api/_Driver.py 0 → 100644 +113 −0 Original line number Diff line number Diff line from typing import Any, Iterator, List, Tuple, Union class _Driver: def __init__(self, address : str, port : int, **kwargs) -> None: """ Initialize Driver. Parameters: address : str The address of the device port : int The port of the device **kwargs Extra attributes can be configured using kwargs. """ raise NotImplementedError() def Connect(self) -> bool: """ Connect to the Device. Returns: succeeded : bool Boolean variable indicating if connection succeeded. """ raise NotImplementedError() def Disconnect(self) -> bool: """ Disconnect from the Device. Returns: succeeded : bool Boolean variable indicating if disconnection succeeded. """ raise NotImplementedError() def GetConfig(self, resource : List[str]) -> List[Union[str, None]]: """ Retrieve running configuration of entire device, or selected resources. Parameters: resources : List[str] List of keys pointing to the resources to be retrieved. Returns: values : List[Union[str, None]] List of values for resource keys requested. Values should be in the same order than resource keys requested. If a resource is not found, None should be specified in the List for that resource. """ raise NotImplementedError() def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[bool]: """ Create/Update configuration for a list of resources. Parameters: resources : List[Tuple[str, Any]] List of tuples, each containing a resource_key pointing the resource to be modified, and a resource_value containing the new value to be set. Returns: results : List[bool] List of results for changes in resource keys requested. Each result is a boolean value indicating if the change succeeded. Results should be in the same order than resource keys requested. """ raise NotImplementedError() def DeleteConfig(self, resource : List[str]) -> List[bool]: """ Delete configuration for a list of resources. Parameters: resources : List[str] List of keys pointing to the resources to be deleted. Returns: results : List[bool] List of results for delete resource keys requested. Each result is a boolean value indicating if the delete succeeded. Results should be in the same order than resource keys requested. """ raise NotImplementedError() def SubscribeState(self, resources : List[Tuple[str, float]]) -> List[bool]: """ Subscribe to state information of entire device, or selected resources. Subscriptions are incremental. Driver should keep track of requested resources. Parameters: resources : List[Tuple[str, float]] List of tuples, each containing a resource_key pointing the resource to be subscribed, and a sampling_rate in seconds defining the desired monitoring periodicity for that resource. Note: sampling_rate values must be a strictly positive floats. Returns: results : List[bool] List of results for subscriptions to resource keys requested. Each result is a boolean value indicating if the subscription succeeded. Results should be in the same order than resource keys requested. """ raise NotImplementedError() def UnsubscribeState(self, resources : List[str]) -> List[bool]: """ Unsubscribe from state information of entire device, or selected resources. Subscriptions are incremental. Driver should keep track of requested resources. Parameters: resources : List[str] List of resource_keys pointing the resource to be unsubscribed. Returns: results : List[bool] List of results for unsubscriptions from resource keys requested. Each result is a boolean value indicating if the unsubscription succeeded. Results should be in the same order than resource keys requested. """ raise NotImplementedError() def GetState(self) -> Iterator[Tuple[str, Any]]: """ Retrieve last collected values for subscribed resources. Operates as a generator, so this method should be called once and will block until values are available. When values are available, it should yield each of them and block again until new values are available. When the driver is destroyed, GetState() can return instead of yield to terminate the loop. Example: for resource_key,resource_value in my_driver.GetState(): process(resource_key,resource_value) Returns: results : Iterator[Tuple[str, Any]] List of tuples with state samples, each containing a resource_key and a resource_value. Only resources with an active subscription must be retrieved. Periodicity of the samples is specified when creating the subscription using method SubscribeState(). Order of yielded values is arbitrary. """ raise NotImplementedError()