def create_events_subscription(events=["SERVICE_API_AVAILABLE", "API_INVOKER_ONBOARDED"],notificationDestination="http://robot.testing", eventFilters=None, eventReq=None, requestTestNotification=None, supportedFeatures=None,websockNotifConfig=None): event_subscription={ "events": events, "notificationDestination": notificationDestination, } if eventFilters != None: event_subscription['eventFilters']=eventFilters if eventReq != None: event_subscription['eventReq']=eventReq if requestTestNotification != None: event_subscription['requestTestNotification']=requestTestNotification if supportedFeatures != None: event_subscription['supportedFeatures']=supportedFeatures if websockNotifConfig != None: event_subscription['websockNotifConfig']=websockNotifConfig return event_subscription def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None): if aefIds == None and apiIds == None and apiInvokerIds: raise("Error, no data present to create event filter") capif_event_filter=dict() if aefIds != None: capif_event_filter['aefIds']=aefIds if apiIds != None: capif_event_filter['apiIds']=apiIds if apiInvokerIds != None: capif_event_filter['apiInvokerIds']=apiInvokerIds return capif_event_filter def create_default_event_req(): return { "grpRepTime": 5, "immRep": True, "maxReportNbr": 0, "monDur": "2000-01-23T04:56:07+00:00", "partitionCriteria": ["TAC", "GEOAREA"], "repPeriod": 6, "sampRatio": 15 } def create_websock_notif_config_default(): return { "requestWebsocketUri": True, "websocketUri": "websocketUri" } def create_notification_event(subscriptionId, event, serviceAPIDescriptions=None, apiIds=None, apiInvokerIds=None, accCtrlPolList=None, invocationLogs=None, apiTopoHide=None): result={ "subscriptionId":subscriptionId, "events": event, "eventDetail": dict() } count=0 if serviceAPIDescriptions != None: if isinstance(serviceAPIDescriptions,list): result['eventDetail']['serviceAPIDescriptions']=serviceAPIDescriptions else: result['eventDetail']['serviceAPIDescriptions']=[serviceAPIDescriptions] count=count+1 if apiIds != None: if isinstance(apiIds,list): result['eventDetail']['apiIds']=apiIds else: result['eventDetail']['apiIds']=[apiIds] count=count+1 if apiInvokerIds != None: if isinstance(apiInvokerIds,list): result['eventDetail']['apiInvokerIds']=apiInvokerIds else: result['eventDetail']['apiInvokerIds']=[apiInvokerIds] count=count+1 if accCtrlPolList != None: if isinstance(accCtrlPolList,list): result['eventDetail']['accCtrlPolList']=accCtrlPolList else: result['eventDetail']['accCtrlPolList']=[accCtrlPolList] count=count+1 if invocationLogs != None: if isinstance(invocationLogs,list): result['eventDetail']['invocationLogs']=invocationLogs else: result['eventDetail']['invocationLogs']=[invocationLogs] count=count+1 if apiTopoHide != None: if isinstance(apiTopoHide,list): result['eventDetail']['apiTopoHide']=apiTopoHide else: result['eventDetail']['apiTopoHide']=[apiTopoHide] count=count+1 if count == 0: del result['eventDetail'] return result