def create_events_subscription(events=["SERVICE_API_AVAILABLE", "API_INVOKER_ONBOARDED"], notificationDestination="http://robot.testing", eventFilters=None, eventReq=None, requestTestNotification=None, supportedFeatures=None, websockNotifConfig=None): event_subscription = { "events": events, "notificationDestination": notificationDestination, } if eventFilters != None: event_subscription['eventFilters'] = eventFilters if eventReq != None: event_subscription['eventReq'] = eventReq if requestTestNotification != None: event_subscription['requestTestNotification'] = requestTestNotification if supportedFeatures != None: event_subscription['supportedFeatures'] = supportedFeatures if websockNotifConfig != None: event_subscription['websockNotifConfig'] = websockNotifConfig return event_subscription def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None): if aefIds == None and apiIds == None and apiInvokerIds: raise ("Error, no data present to create event filter") capif_event_filter = dict() if aefIds != None: capif_event_filter['aefIds'] = aefIds if apiIds != None: capif_event_filter['apiIds'] = apiIds if apiInvokerIds != None: capif_event_filter['apiInvokerIds'] = apiInvokerIds return capif_event_filter def create_default_event_req(): return { "grpRepTime": 5, "immRep": True, "maxReportNbr": 0, "monDur": "2000-01-23T04:56:07+00:00", "partitionCriteria": ["TAC", "GEOAREA"], "repPeriod": 6, "sampRatio": 15 } def create_websock_notif_config_default(): return { "requestWebsocketUri": True, "websocketUri": "websocketUri" } def create_notification_event(subscriptionId, event, serviceAPIDescriptions=None, apiIds=None, apiInvokerIds=None, accCtrlPolList=None, invocationLogs=None, apiTopoHide=None): result = { "subscriptionId": subscriptionId, "events": event, "eventDetail": dict() } count = 0 if serviceAPIDescriptions != None: if isinstance(serviceAPIDescriptions, list): result['eventDetail']['serviceAPIDescriptions'] = serviceAPIDescriptions else: result['eventDetail']['serviceAPIDescriptions'] = [ serviceAPIDescriptions] count = count+1 if apiIds != None: if isinstance(apiIds, list): result['eventDetail']['apiIds'] = apiIds else: result['eventDetail']['apiIds'] = [apiIds] count = count+1 if apiInvokerIds != None: if isinstance(apiInvokerIds, list): result['eventDetail']['apiInvokerIds'] = apiInvokerIds else: result['eventDetail']['apiInvokerIds'] = [apiInvokerIds] count = count+1 if accCtrlPolList != None: result['eventDetail']['accCtrlPolList'] = accCtrlPolList count = count+1 if invocationLogs != None: if isinstance(invocationLogs, list): result['eventDetail']['invocationLogs'] = invocationLogs else: result['eventDetail']['invocationLogs'] = [invocationLogs] count = count+1 if apiTopoHide != None: if isinstance(apiTopoHide, list): result['eventDetail']['apiTopoHide'] = apiTopoHide else: result['eventDetail']['apiTopoHide'] = [apiTopoHide] count = count+1 if count == 0: del result['eventDetail'] return result