Skip to content
bodyRequests.py 3.53 KiB
Newer Older
def create_events_subscription(events=["SERVICE_API_AVAILABLE", "API_INVOKER_ONBOARDED"],notificationDestination="http://robot.testing", eventFilters=None, eventReq=None, requestTestNotification=None, supportedFeatures=None,websockNotifConfig=None):
    event_subscription={
        "events": events,
        "notificationDestination": notificationDestination,
    }
    if eventFilters != None:
        event_subscription['eventFilters']=eventFilters
    if eventReq != None:
        event_subscription['eventReq']=eventReq
    if requestTestNotification != None:
        event_subscription['requestTestNotification']=requestTestNotification
    if supportedFeatures != None:
        event_subscription['supportedFeatures']=supportedFeatures
    if websockNotifConfig != None:
        event_subscription['websockNotifConfig']=websockNotifConfig
    
    return event_subscription

def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None):
    if aefIds == None and apiIds == None and apiInvokerIds:
        raise("Error, no data present to create event filter")
    capif_event_filter=dict()
    if aefIds != None:
        capif_event_filter['aefIds']=aefIds
    if apiIds != None:
        capif_event_filter['apiIds']=apiIds
    if apiInvokerIds != None:
        capif_event_filter['apiInvokerIds']=apiInvokerIds
    return capif_event_filter

def create_default_event_req():
    return {
        "grpRepTime": 5,
        "immRep": True,
        "maxReportNbr": 0,
        "monDur": "2000-01-23T04:56:07+00:00",
        "partitionCriteria": ["TAC", "GEOAREA"],
        "repPeriod": 6,
        "sampRatio": 15
    }

def create_websock_notif_config_default():
Jorge Moratinos's avatar
Jorge Moratinos committed
    return {
        "requestWebsocketUri": True,
        "websocketUri": "websocketUri"
Jorge Moratinos's avatar
Jorge Moratinos committed
    }
def create_notification_event(subscriptionId, event, serviceAPIDescriptions=None, apiIds=None, apiInvokerIds=None, accCtrlPolList=None, invocationLogs=None, apiTopoHide=None):
    result={
        "subscriptionId":subscriptionId,
        "events": event,
        "eventDetail": dict()
    }
    count=0
    if serviceAPIDescriptions != None:
        if isinstance(serviceAPIDescriptions,list):
            result['eventDetail']['serviceAPIDescriptions']=serviceAPIDescriptions
        else:
            result['eventDetail']['serviceAPIDescriptions']=[serviceAPIDescriptions]
        count=count+1
    if apiIds != None:
        if isinstance(apiIds,list):
            result['eventDetail']['apiIds']=apiIds
        else:
            result['eventDetail']['apiIds']=[apiIds]
        count=count+1
    if apiInvokerIds != None:
        if isinstance(apiInvokerIds,list):
            result['eventDetail']['apiInvokerIds']=apiInvokerIds
        else:
            result['eventDetail']['apiInvokerIds']=[apiInvokerIds]
        count=count+1
    if accCtrlPolList != None:
        if isinstance(accCtrlPolList,list):
            result['eventDetail']['accCtrlPolList']=accCtrlPolList
        else:
            result['eventDetail']['accCtrlPolList']=[accCtrlPolList]
        count=count+1
    if invocationLogs != None:
        if isinstance(invocationLogs,list):
            result['eventDetail']['invocationLogs']=invocationLogs
        else:
            result['eventDetail']['invocationLogs']=[invocationLogs]
        count=count+1
    if apiTopoHide != None:
        if isinstance(apiTopoHide,list):
            result['eventDetail']['apiTopoHide']=apiTopoHide
        else:
            result['eventDetail']['apiTopoHide']=[apiTopoHide]
        count=count+1

    if count == 0:
        del result['eventDetail']

    return result