def create_events_subscription(events=["SERVICE_API_AVAILABLE", "API_INVOKER_ONBOARDED"],notificationDestination="http://robot.testing", eventFilters=None, eventReq=None, requestTestNotification=None, supportedFeatures=None,websockNotifConfig=None): event_subscription={ "events": events, "notificationDestination": notificationDestination, } if eventFilters != None: event_subscription['eventFilters']=eventFilters if eventReq != None: event_subscription['eventReq']=eventReq if requestTestNotification != None: event_subscription['requestTestNotification']=requestTestNotification if supportedFeatures != None: event_subscription['supportedFeatures']=supportedFeatures if websockNotifConfig != None: event_subscription['websockNotifConfig']=websockNotifConfig return event_subscription def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None): if aefIds == None and apiIds == None and apiInvokerIds: raise("Error, no data present to create event filter") capif_event_filter=dict() if aefIds != None: capif_event_filter['aefIds']=aefIds if apiIds != None: capif_event_filter['apiIds']=apiIds if apiInvokerIds != None: capif_event_filter['apiInvokerIds']=apiInvokerIds return capif_event_filter def create_default_event_req(): return { "grpRepTime": 5, "immRep": True, "maxReportNbr": 0, "monDur": "2000-01-23T04:56:07+00:00", "partitionCriteria": ["TAC", "GEOAREA"], "repPeriod": 6, "sampRatio": 15 } def create_websock_notif_config_default(): return { "requestWebsocketUri": True, "websocketUri": "websocketUri" }