Newer
Older
def create_events_subscription(events=["SERVICE_API_AVAILABLE", "API_INVOKER_ONBOARDED"], notificationDestination="http://robot.testing", eventFilters=None, eventReq=None, requestTestNotification=None, supportedFeatures=None, websockNotifConfig=None):
event_subscription = {
"events": events,
"notificationDestination": notificationDestination,
}
if eventFilters != None:
if eventReq != None:
if requestTestNotification != None:
event_subscription['requestTestNotification'] = requestTestNotification
if supportedFeatures != None:
event_subscription['supportedFeatures'] = supportedFeatures
if websockNotifConfig != None:
event_subscription['websockNotifConfig'] = websockNotifConfig
return event_subscription
def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None):
if aefIds == None and apiIds == None and apiInvokerIds:
raise ("Error, no data present to create event filter")
capif_event_filter = dict()
if aefIds != None:
if apiIds != None:
if apiInvokerIds != None:
capif_event_filter['apiInvokerIds'] = apiInvokerIds
return capif_event_filter
def create_default_event_req():
return {
"grpRepTime": 5,
"immRep": True,
"maxReportNbr": 0,
"monDur": "2000-01-23T04:56:07+00:00",
"partitionCriteria": ["TAC", "GEOAREA"],
"repPeriod": 6,
"sampRatio": 15
}
def create_websock_notif_config_default():
"requestWebsocketUri": True,
"websocketUri": "websocketUri"
def create_notification_event(subscriptionId, event, serviceAPIDescriptions=None, apiIds=None, apiInvokerIds=None, accCtrlPolList=None, invocationLogs=None, apiTopoHide=None):
"events": event,
"eventDetail": dict()
}
if serviceAPIDescriptions != None:
if isinstance(serviceAPIDescriptions, list):
result['eventDetail']['serviceAPIDescriptions'] = serviceAPIDescriptions
else:
result['eventDetail']['serviceAPIDescriptions'] = [
serviceAPIDescriptions]
count = count+1
if apiIds != None:
if isinstance(apiIds, list):
result['eventDetail']['apiIds'] = apiIds
else:
result['eventDetail']['apiIds'] = [apiIds]
count = count+1
if apiInvokerIds != None:
if isinstance(apiInvokerIds, list):
result['eventDetail']['apiInvokerIds'] = apiInvokerIds
else:
result['eventDetail']['apiInvokerIds'] = [apiInvokerIds]
count = count+1
if accCtrlPolList != None:
result['eventDetail']['accCtrlPolList'] = accCtrlPolList
count = count+1
if invocationLogs != None:
if isinstance(invocationLogs, list):
result['eventDetail']['invocationLogs'] = invocationLogs
else:
result['eventDetail']['invocationLogs'] = [invocationLogs]
count = count+1
if apiTopoHide != None:
if isinstance(apiTopoHide, list):
result['eventDetail']['apiTopoHide'] = apiTopoHide
else:
result['eventDetail']['apiTopoHide'] = [apiTopoHide]
count = count+1
if count == 0:
del result['eventDetail']
return result