Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def create_events_subscription(events=["SERVICE_API_AVAILABLE", "API_INVOKER_ONBOARDED"],notificationDestination="http://robot.testing", eventFilters=None, eventReq=None, requestTestNotification=None, supportedFeatures=None,websockNotifConfig=None):
event_subscription={
"events": events,
"notificationDestination": notificationDestination,
}
if eventFilters != None:
event_subscription['eventFilters']=eventFilters
if eventReq != None:
event_subscription['eventReq']=eventReq
if requestTestNotification != None:
event_subscription['requestTestNotification']=requestTestNotification
if supportedFeatures != None:
event_subscription['supportedFeatures']=supportedFeatures
if websockNotifConfig != None:
event_subscription['websockNotifConfig']=websockNotifConfig
return event_subscription
def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None):
if aefIds == None and apiIds == None and apiInvokerIds:
raise("Error, no data present to create event filter")
capif_event_filter=dict()
if aefIds != None:
capif_event_filter['aefIds']=aefIds
if apiIds != None:
capif_event_filter['apiIds']=apiIds
if apiInvokerIds != None:
capif_event_filter['apiInvokerIds']=apiInvokerIds
return capif_event_filter
def create_default_event_req():
return {
"grpRepTime": 5,
"immRep": True,
"maxReportNbr": 0,
"monDur": "2000-01-23T04:56:07+00:00",
"partitionCriteria": ["TAC", "GEOAREA"],
"repPeriod": 6,
"sampRatio": 15
}
def create_websock_notif_config_default():
"requestWebsocketUri": True,
"websocketUri": "websocketUri"
def create_notification_event(subscriptionId, event, serviceAPIDescriptions=None, apiIds=None, apiInvokerIds=None, accCtrlPolList=None, invocationLogs=None, apiTopoHide=None):
result={
"subscriptionId":subscriptionId,
"events": event,
"eventDetail": dict()
}
count=0
if serviceAPIDescriptions != None:
if isinstance(serviceAPIDescriptions,list):
result['eventDetail']['serviceAPIDescriptions']=serviceAPIDescriptions
else:
result['eventDetail']['serviceAPIDescriptions']=[serviceAPIDescriptions]
count=count+1
if apiIds != None:
if isinstance(apiIds,list):
result['eventDetail']['apiIds']=apiIds
else:
result['eventDetail']['apiIds']=[apiIds]
count=count+1
if apiInvokerIds != None:
if isinstance(apiInvokerIds,list):
result['eventDetail']['apiInvokerIds']=apiInvokerIds
else:
result['eventDetail']['apiInvokerIds']=[apiInvokerIds]
count=count+1
if accCtrlPolList != None:
if isinstance(accCtrlPolList,list):
result['eventDetail']['accCtrlPolList']=accCtrlPolList
else:
result['eventDetail']['accCtrlPolList']=[accCtrlPolList]
count=count+1
if invocationLogs != None:
if isinstance(invocationLogs,list):
result['eventDetail']['invocationLogs']=invocationLogs
else:
result['eventDetail']['invocationLogs']=[invocationLogs]
count=count+1
if apiTopoHide != None:
result['eventDetail']['apiTopoHide']=apiTopoHide
else:
result['eventDetail']['apiTopoHide']=[apiTopoHide]
count=count+1
if count == 0:
del result['eventDetail']
return result