Skip to content
bodyRequests.py 3.48 KiB
Newer Older
Jorge Moratinos's avatar
Jorge Moratinos committed
def create_events_subscription(events=["SERVICE_API_AVAILABLE", "API_INVOKER_ONBOARDED"], notificationDestination="http://robot.testing", eventFilters=None, eventReq=None, requestTestNotification=None, supportedFeatures=None, websockNotifConfig=None):
    event_subscription = {
        "events": events,
        "notificationDestination": notificationDestination,
    }
    if eventFilters != None:
Jorge Moratinos's avatar
Jorge Moratinos committed
        event_subscription['eventFilters'] = eventFilters
Jorge Moratinos's avatar
Jorge Moratinos committed
        event_subscription['eventReq'] = eventReq
    if requestTestNotification != None:
Jorge Moratinos's avatar
Jorge Moratinos committed
        event_subscription['requestTestNotification'] = requestTestNotification
Jorge Moratinos's avatar
Jorge Moratinos committed
        event_subscription['supportedFeatures'] = supportedFeatures
    if websockNotifConfig != None:
Jorge Moratinos's avatar
Jorge Moratinos committed
        event_subscription['websockNotifConfig'] = websockNotifConfig

def create_capif_event_filter(aefIds=None, apiIds=None, apiInvokerIds=None):
    if aefIds == None and apiIds == None and apiInvokerIds:
Jorge Moratinos's avatar
Jorge Moratinos committed
        raise ("Error, no data present to create event filter")
    capif_event_filter = dict()
Jorge Moratinos's avatar
Jorge Moratinos committed
        capif_event_filter['aefIds'] = aefIds
Jorge Moratinos's avatar
Jorge Moratinos committed
        capif_event_filter['apiIds'] = apiIds
Jorge Moratinos's avatar
Jorge Moratinos committed
        capif_event_filter['apiInvokerIds'] = apiInvokerIds
def create_default_event_req():
    return {
        "grpRepTime": 5,
        "immRep": True,
        "maxReportNbr": 0,
        "monDur": "2000-01-23T04:56:07+00:00",
        "partitionCriteria": ["TAC", "GEOAREA"],
        "repPeriod": 6,
        "sampRatio": 15
    }

def create_websock_notif_config_default():
Jorge Moratinos's avatar
Jorge Moratinos committed
    return {
        "requestWebsocketUri": True,
        "websocketUri": "websocketUri"
Jorge Moratinos's avatar
Jorge Moratinos committed
    }
def create_notification_event(subscriptionId, event, serviceAPIDescriptions=None, apiIds=None, apiInvokerIds=None, accCtrlPolList=None, invocationLogs=None, apiTopoHide=None):
Jorge Moratinos's avatar
Jorge Moratinos committed
    result = {
        "subscriptionId": subscriptionId,
Jorge Moratinos's avatar
Jorge Moratinos committed
    count = 0
Jorge Moratinos's avatar
Jorge Moratinos committed
        if isinstance(serviceAPIDescriptions, list):
            result['eventDetail']['serviceAPIDescriptions'] = serviceAPIDescriptions
Jorge Moratinos's avatar
Jorge Moratinos committed
            result['eventDetail']['serviceAPIDescriptions'] = [
                serviceAPIDescriptions]
        count = count+1
Jorge Moratinos's avatar
Jorge Moratinos committed
        if isinstance(apiIds, list):
            result['eventDetail']['apiIds'] = apiIds
Jorge Moratinos's avatar
Jorge Moratinos committed
            result['eventDetail']['apiIds'] = [apiIds]
        count = count+1
Jorge Moratinos's avatar
Jorge Moratinos committed
        if isinstance(apiInvokerIds, list):
            result['eventDetail']['apiInvokerIds'] = apiInvokerIds
Jorge Moratinos's avatar
Jorge Moratinos committed
            result['eventDetail']['apiInvokerIds'] = [apiInvokerIds]
        count = count+1
Jorge Moratinos's avatar
Jorge Moratinos committed
        result['eventDetail']['accCtrlPolList'] = accCtrlPolList
        count = count+1
Jorge Moratinos's avatar
Jorge Moratinos committed
        if isinstance(invocationLogs, list):
            result['eventDetail']['invocationLogs'] = invocationLogs
Jorge Moratinos's avatar
Jorge Moratinos committed
            result['eventDetail']['invocationLogs'] = [invocationLogs]
        count = count+1
Jorge Moratinos's avatar
Jorge Moratinos committed
        if isinstance(apiTopoHide, list):
            result['eventDetail']['apiTopoHide'] = apiTopoHide
Jorge Moratinos's avatar
Jorge Moratinos committed
            result['eventDetail']['apiTopoHide'] = [apiTopoHide]
        count = count+1

    if count == 0:
        del result['eventDetail']

    return result