Skip to content
Snippets Groups Projects
Commit 35df1abb authored by Javier Mateos Najari's avatar Javier Mateos Najari
Browse files

feat(drivers): add morpheus driver

parent 6573fa12
No related branches found
No related tags found
1 merge request!330Draft: Resolve "(NAUDIT) SmartNIC support"
import logging, requests, threading, json
from typing import Any, Iterator, List, Optional, Tuple, Union, Dict
from queue import Queue
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from device.service.driver_api._Driver import _Driver
LOGGER = logging.getLogger(__name__)
DRIVER_NAME = 'morpheus'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class MorpheusApiDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
super().__init__(DRIVER_NAME, address, port, **settings)
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
scheme = self.settings.get('scheme', 'http')
self.__morpheus_root = '{:s}://{:s}:{:d}'.format(scheme, self.address, int(self.port))
self.__timeout = int(self.settings.get('timeout', 120))
self.__headers = {'Accept': 'application/yang-data+json', 'Content-Type': 'application/yang-data+json'}
self.__detection_thread = None
self.__pipeline_error_thread = None
size = self.settings.get('queue_events_size', 10)
self.__pipeline_error_queue = Queue(maxsize=size)
self.__detection_queue = Queue(maxsize=size)
def Connect(self) -> bool:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus'
with self.__lock:
if self.__started.is_set(): return True
try:
requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
self.__started.set()
return True
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(str(self.__morpheus_root)))
return False
except Exception: # pylint: disable=broad-except
LOGGER.exception('Exception connecting {:s}'.format(str(self.__morpheus_root)))
return False
def Disconnect(self) -> bool:
with self.__lock:
try:
if self.__detection_thread and self.__detection_thread.is_alive():
self.UnsubscribeDetectionEvent()
if self.__pipeline_thread and self.__pipeline_thread.is_alive():
self.UnsubscribePipelineError()
except Exception as e:
LOGGER.exception(f'Error during disconnect: {str(e)}')
self.__terminate.set()
return True
@metered_subclass_method(METRICS_POOL)
def GetInitialConfig(self) -> List[Tuple[str, Any]]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
with self.__lock:
try:
response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
if response.ok:
config = response.json()
result = []
for key, value in config.items():
result.append((key, value))
return result
except Exception as e:
LOGGER.exception('Exception getting initial config {:s}'.format(str(self.__morpheus_root)))
return []
@metered_subclass_method(METRICS_POOL)
def GetConfig(self, resource_keys : List[str] = []) -> List[Tuple[str, Union[Any, None, Exception]]]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
with self.__lock:
try:
response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
if response.ok:
config = response.json()
results = []
if not resource_keys:
for key, value in config.items():
results.append((key, value))
return results
for key in resource_keys:
try:
results.append(config[key])
except KeyError:
results.append(None)
except Exception as e:
results.append(e)
return results
return [(key, None) for key in resource_keys]
except Exception as e:
LOGGER.exception(f'Error getting config: {str(e)}')
return [(key, e) for key in resource_keys]
@metered_subclass_method(METRICS_POOL)
def SetConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/config'
results = []
with self.__lock:
config = dict(resources)
try:
response = requests.put(url,
headers=self.__headers,
json=config,
timeout=self.__timeout,
verify=False)
results.append(response.ok)
except Exception as e:
results.append(e)
return results
def GetState(self) -> List[Tuple[str, Any]]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/state'
with self.__lock:
try:
response = requests.get(url, headers=self.__headers, timeout=self.__timeout, verify=False)
if response.ok:
state = response.json()
result = []
for key, value in state.items():
result.append((key, value))
return result
return []
except Exception as e:
LOGGER.exception(f'Error getting state: {str(e)}')
return []
def StartPipeline(self) -> Union[bool, Exception]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/start'
with self.__lock:
try:
response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
response.raise_for_status()
return True
except Exception as e:
LOGGER.exception(f'Error starting pipeline: {e}')
return e
def StopPipeline(self) -> Union[bool, Exception]:
url = self.__morpheus_root + '/restconf/data/naudit-morpheus:morpheus/stop'
with self.__lock:
try:
response = requests.post(url, headers=self.__headers, timeout=self.__timeout, verify=False)
response.raise_for_status()
return True
except Exception as e:
LOGGER.exception(f'Error stopping pipeline: {e}')
return False
@metered_subclass_method(METRICS_POOL)
def SubscribeDetectionEvent(self) -> Union[bool, Exception]:
url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/detection-event'
with self.__lock:
try:
self.__detection_thread = threading.Thread(
target=self.__handle_notification_stream,
args=(url, self.__detection_queue),
daemon=True
)
self.__detection_thread.start()
return True
except Exception as e:
LOGGER.exception(f'Error subscribing to detection events: {str(e)}')
return e
@metered_subclass_method(METRICS_POOL)
def UnsubscribeDetectionEvent(self) -> Union[bool, Exception]:
try:
if self.__detection_thread and self.__detection_thread.is_alive():
self.__detection_thread.join(timeout=5)
return True
except Exception as e:
LOGGER.exception(f'Error unsubscribing from detection events: {str(e)}')
return e
def GetDetectionEvent(self, blocking=False, terminate : Optional[threading.Event] = None) -> Iterator[Dict]:
while True:
if self.__terminate.is_set(): break
if terminate is not None and terminate.is_set(): break
try:
event = self.__detection_queue.get(block=blocking, timeout=0.1)
if event is not None:
yield event
except queue.Empty:
if blocking:
continue
return
def SubscribePipelineError(self) -> Union[bool, Exception]:
url = self.__morpheus_root + '/restconf/streams/naudit-morpheus:morpheus/pipeline-error'
with self.__lock:
try:
self.__pipeline_error_thread = threading.Thread(
target=self.__handle_notification_stream,
args=(url, self.__pipeline_error_queue),
daemon=True
)
self.__pipeline_error_thread.start()
return True
except Exception as e:
LOGGER.exception(f'Error subscribing to pipeline errors: {str(e)}')
return e
def UnsubscribePipelineError(self) -> Union[bool, Exception]:
try:
if self.__pipeline_error_thread and self.__pipeline_error_thread.is_alive():
self.__pipeline_error_thread.join(timeout=5)
return True
except Exception as e:
LOGGER.exception(f'Error unsubscribing from pipeline errors: {str(e)}')
return e
def GetPipelineError(self, blocking=False, terminate: Optional[threading.Event] = None) -> Iterator[Dict]:
while True:
if self.__terminate.is_set(): break
if terminate is not None and terminate.is_set(): break
try:
error = self.__pipeline_error_queue.get(block=blocking, timeout=0.1)
if error is not None:
yield error
except queue.Empty:
if blocking:
continue
return
def __handle_notification_stream(self, url: str, queue: Queue[Any]) -> None:
try:
with requests.get(url,
headers=self.__headers,
stream=True,
verify=False) as response:
if not response.ok:
LOGGER.error(f'Error connecting to event stream: {response.status_code}')
return
try:
event = response.json()
queue.put(event['data']['ietf-restconf:notification'])
except json.JSONDecodeError as e:
LOGGER.error(f'Error parsing event: {e}')
except Exception as e:
LOGGER.exception(f'Error in notification stream handler: {str(e)}')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment