Skip to content
bodyRequests.py 1.73 KiB
Newer Older
def create_events_subscription(events=["SERVICE_API_AVAILABLE", "API_INVOKER_ONBOARDED"],notificationDestination="http://robot.testing", eventFilters=None, eventReq=None, requestTestNotification=None, supportedFeatures=None,websockNotifConfig=None):
    event_subscription={
        "events": events,
        "notificationDestination": notificationDestination,
    }
    if eventFilters != None:
        event_subscription['eventFilters']=eventFilters
    if eventReq != None:
        event_subscription['eventReq']=eventReq
    if requestTestNotification != None:
        event_subscription['requestTestNotification']=requestTestNotification
    if supportedFeatures != None:
        event_subscription['supportedFeatures']=supportedFeatures
    if websockNotifConfig != None:
        event_subscription['websockNotifConfig']=websockNotifConfig
    
    return event_subscription

def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None):
    if aefIds == None and apiIds == None and apiInvokerIds:
        raise("Error, no data present to create event filter")
    capif_event_filter=dict()
    if aefIds != None:
        capif_event_filter['aefIds']=aefIds
    if apiIds != None:
        capif_event_filter['apiIds']=apiIds
    if apiInvokerIds != None:
        capif_event_filter['apiInvokerIds']=apiInvokerIds
    return capif_event_filter

def create_default_event_req():
    return {
        "grpRepTime": 5,
        "immRep": True,
        "maxReportNbr": 0,
        "monDur": "2000-01-23T04:56:07+00:00",
        "partitionCriteria": ["TAC", "GEOAREA"],
        "repPeriod": 6,
        "sampRatio": 15
    }

def create_websock_notif_config_default():
Jorge Moratinos's avatar
Jorge Moratinos committed
    return {
        "requestWebsocketUri": True,
        "websocketUri": "websocketUri"
Jorge Moratinos's avatar
Jorge Moratinos committed
    }