Newer
Older
"#### Subscribing to application termination\n",
"\n",
"The purpose is to create a new subscription to \n",
"the MEC application termination notification as describe in ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.6b Receiving event notifications on MEC application instance \n",
"terminations"
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def send_subscribe_termination(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> object:\n",
" \"\"\"\n",
" Subscribe to the MEC application termination notifications.\n",
" :param sandbox_name: The MEC Sandbox instance to use\n",
" :param app_inst_id: The MEC application instance identifier\n",
" :return: The HTTP respone, the subscription ID and the resource URL on success, None otherwise\n",
" :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.6b Receiving event notifications on MEC application instance termination\n",
" \"\"\"\n",
" global MEC_PLTF, logger, service_api\n",
"\n",
" logger.debug('>>> send_subscribe_termination: ' + app_inst_id.id)\n",
" try:\n",
" url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/subscriptions'\n",
" logger.debug('send_subscribe_termination: url: ' + url)\n",
" path_params = {}\n",
" path_params['sandbox_name'] = sandbox_name\n",
" path_params['mec_pltf'] = MEC_PLTF\n",
" path_params['app_inst_id'] = app_inst_id.id\n",
" header_params = {}\n",
" # HTTP header `Accept`\n",
" header_params['Accept'] = 'application/json' # noqa: E501\n",
" # HTTP header `Content-Type`\n",
" header_params['Content-Type'] = 'application/json' # noqa: E501\n",
" # Body\n",
" dict_body = {}\n",
" dict_body['subscriptionType'] = 'AppTerminationNotificationSubscription'\n",
" dict_body['callbackReference'] = 'http://yanngarcia.ddns.net/mec011/v2/termination' # FIXME To be parameterized\n",
" dict_body['appInstanceId'] = app_inst_id.id\n",
" (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=dict_body, async_req=False)\n",
" return (result, extract_sub_id(headers['Location']), headers['Location'])\n",
" except ApiException as e:\n",
" logger.error('Exception when calling call_api: %s\\n' % e)\n",
" return None\n",
" # End of function send_subscribe_termination"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Extracting subscription identifier\n",
"\n",
"This helper function extracts the subscription identifier from any subscription URL."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def extract_sub_id(resource_url: str) -> str:\n",
" \"\"\"\n",
" Extract the subscription identifier from the specified subscription URL.\n",
" :param resource_url: The subscription URL\n",
" :return: The subscription identifier on success, None otherwise\n",
" \"\"\"\n",
" global logger\n",
"\n",
" logger.debug('>>> extract_sub_id: resource_url: ' + resource_url)\n",
"\n",
" res = urllib3.util.parse_url(resource_url)\n",
" if res is not None and res.path is not None and res.path != '':\n",
" id = res.path.rsplit('/', 1)[-1]\n",
" if id is not None:\n",
" return id\n",
" return None\n",
" # End of function extract_sub_id"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Delete subscription to application termination"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def delete_subscribe_termination(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo, sub_id: str) -> int:\n",
" \"\"\"\n",
" Delete the subscrition to the AppTermination notification.\n",
" :param sandbox_name: The MEC Sandbox instance to use\n",
" :param app_inst_id: The MEC application instance identifier\n",
" :param sub_id: The subscription identifier\n",
" :return: 0 on success, -1 otherwise\n",
" \"\"\"\n",
" global MEC_PLTF, logger, service_api\n",
" logger.debug('>>> delete_subscribe_termination: ' + app_inst_id.id)\n",
" url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/subscriptions/{sub_id}'\n",
" logger.debug('delete_subscribe_termination: url: ' + url)\n",
" path_params = {}\n",
" path_params['sandbox_name'] = sandbox_name\n",
" path_params['mec_pltf'] = MEC_PLTF\n",
" path_params['app_inst_id'] = app_inst_id.id\n",
" path_params['sub_id'] = sub_id\n",
" header_params = {}\n",
" # HTTP header `Accept`\n",
" header_params['Accept'] = 'application/json' # noqa: E501\n",
" # HTTP header `Content-Type`\n",
" header_params['Content-Type'] = 'application/json' # noqa: E501\n",
" service_api.call_api(url, 'DELETE', header_params=header_params, path_params = path_params, async_req=False)\n",
" return 0\n",
" except ApiException as e:\n",
" logger.error('Exception when calling call_api: %s\\n' % e)\n",
" # End of function delete_subscribe_termination"
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When the MEC application instance is notified to gracefully terminate, it provides to the MEC platform \n",
"that the application has completed its application level related terminate/stop actiono\n",
"\n",
"Reference: ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.3 MEC application graceful termination/stopp."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def send_termination_confirmation(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> int:\n",
" \"\"\"\n",
" Send the confirm_termination to indicate that the MEC application is terminating gracefully.\n",
" :param sandbox_name: The MEC Sandbox instance to use\n",
" :param app_inst_id: The MEC application instance identifier\n",
" :return: 0 on success, -1 otherwise\n",
" :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.3 MEC application graceful termination/stop\n",
" \"\"\"\n",
" global MEC_PLTF, logger, service_api\n",
"\n",
" logger.debug('>>> send_termination_confirmation: ' + app_inst_id.id)\n",
" try:\n",
" url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/confirm_termination'\n",
" logger.debug('send_termination_confirmation: url: ' + url)\n",
" path_params = {}\n",
" path_params['sandbox_name'] = sandbox_name\n",
" path_params['mec_pltf'] = MEC_PLTF\n",
" path_params['app_inst_id'] = app_inst_id.id\n",
" header_params = {}\n",
" # HTTP header `Accept`\n",
" header_params['Accept'] = 'application/json' # noqa: E501\n",
" # HTTP header `Content-Type`\n",
" header_params['Content-Type'] = 'application/json' # noqa: E501\n",
" # JSON indication READY\n",
" dict_body = {}\n",
" dict_body['operationAction'] = 'TERMINATING'\n",
" service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=dict_body, async_req=False)\n",
" return 0\n",
" except ApiException as e:\n",
" logger.error('Exception when calling call_api: %s\\n' % e)\n",
" return -1\n",
" # End of function send_termination_confirmation"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, it is time now to create the our fifth iteration of our MEC application.\n",
"\n",
"The sequence is the following:\n",
"- Login\n",
"- Print sandbox identifier\n",
"- Print available network scenarios\n",
"- Activate a network scenario\n",
"- Request for a new application instance identifier\n",
"- Subscribe to AppTerminationNotificationSubscription\n",
"- Check list of services\n",
"- Delete AppTerminationNotification subscription\n",
"- Delete our application instance identifier\n",
"- Deactivate a network scenario\n",
"- Logout\n",
"- Check that logout is effective\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"def process_main():\n",
" \"\"\"\n",
" This is the second sprint of our skeleton of our MEC application:\n",
" - Login\n",
" - Print sandbox identifier\n",
" - Print available network scenarios\n",
" - Activate a network scenario\n",
" - Request for a new application instance identifier\n",
" \n",
" - Subscribe to AppTermination Notification\n",
" - Delete AppTerminationNotification subscription\n",
" - Delete our application instance identifier\n",
" - Deactivate a network scenario\n",
" - Logout\n",
" - Check that logout is effective\n",
" \"\"\" \n",
" global logger, nw_scenarios\n",
" logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
" logger.debug('\\t pwd= ' + os.getcwd())\n",
"\n",
" # Login\n",
" sandbox = process_login()\n",
" if sandbox is None:\n",
" logger.error('Failed to instanciate a MEC Sandbox')\n",
" return\n",
"\n",
" # Print sandbox identifier\n",
" logger.info('Sandbox created: ' + sandbox)\n",
" # Wait for the MEC Sandbox is running\n",
" time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n",
"\n",
" # Print available network scenarios\n",
" nw_scenarios = get_network_scenarios(sandbox)\n",
" if nw_scenarios is None:\n",
" logger.error('Failed to retrieve the list of network scenarios')\n",
" logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))\n",
" logger.info('nw_scenarios: %s', str(nw_scenarios))\n",
" # Wait for the MEC Sandbox is running\n",
" time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n",
" else:\n",
" logger.info('nw_scenarios: No scenario available')\n",
"\n",
" # Activate a network scenario based on a list of criterias (hard coded!!!)\n",
" if activate_network_scenario(sandbox) == -1:\n",
" logger.error('Failed to activate network scenario')\n",
" logger.info('Network scenario activated: ' + nw_scenarios[nw_scenario_idx].id)\n",
" time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running\n",
" # Request for a new application instance identifier\n",
" app_inst_id = request_application_instance_id(sandbox)\n",
" if app_inst_id == None:\n",
" logger.error('Failed to request an application instance identifier')\n",
" logger.info('app_inst_id: %s', str(app_inst_id))\n",
" time.sleep(STABLE_TIME_OUT)\n",
" sub_id = None\n",
" if send_ready_confirmation(sandbox, app_inst_id) == -1:\n",
" logger.error('Failed to send confirm_ready')\n",
" else:\n",
" # Subscribe to AppTerminationNotificationSubscription\n",
" result, sub_id, res_url = send_subscribe_termination(sandbox, app_inst_id)\n",
" if sub_id == None:\n",
" logger.error('Failed to do the subscription')\n",
" else:\n",
" logger.info('sub_id: %s', sub_id)\n",
" data = json.loads(result.data)\n",
" logger.info('data: ' + str(data))\n",
"\n",
" # Any processing here\n",
" time.sleep(STABLE_TIME_OUT)\n",
"\n",
" # Delete AppTerminationNotification subscription\n",
" if sub_id is not None:\n",
" if delete_subscribe_termination(sandbox, app_inst_id, sub_id) == -1:\n",
" logger.error('Failed to delete the application instance identifier')\n",
" else:\n",
" logger.info('app_inst_id deleted: ' + app_inst_id.id)\n",
" # Delete the application instance identifier\n",
" if delete_application_instance_id(sandbox, app_inst_id.id) == -1:\n",
" logger.error('Failed to delete the application instance identifier')\n",
" logger.info('app_inst_id deleted: ' + app_inst_id.id)\n",
"\n",
" # Deactivate a network scenario based on a list of criterias (hard coded!!!)\n",
" if deactivate_network_scenario(sandbox) == -1:\n",
" logger.error('Failed to deactivate network scenario')\n",
" logger.info('Network scenario deactivated: ' + nw_scenarios[nw_scenario_idx].id)\n",
" time.sleep(2 * STABLE_TIME_OUT)\n",
"\n",
" # Logout\n",
" process_logout(sandbox)\n",
"\n",
" # Check that logout is effective\n",
" logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')\n",
" logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
" # End of function process_main\n",
"\n",
"if __name__ == '__main__':\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"### Conclusion: Create two procedures for the setup and the termination of our MEC application\n"
"cell_type": "markdown",
"metadata": {},
"source": [
"#### The procedure for the setup of a MEC application\n",
"This function provides the steps to setup a MEC application and to be ready to use the MEC service exposed by the created MEC Sandbox.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def mec_app_setup():\n",
" This function provides the steps to setup a MEC application:\n",
" - Login\n",
" - Print sandbox identifier\n",
" - Print available network scenarios\n",
" - Activate a network scenario\n",
" - Request for a new application instance identifier\n",
" - Send READY confirmation\n",
" - Subscribe to AppTermination Notification\n",
" :return The MEC Sandbox instance, the MEC application instance identifier and the subscription identifier on success, None otherwise\n",
" global logger\n",
"\n",
" # Login\n",
" sandbox = process_login()\n",
" if sandbox is None:\n",
" logger.error('Failed to instanciate a MEC Sandbox')\n",
" return None\n",
" # Wait for the MEC Sandbox is running\n",
" time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n",
"\n",
" # Print available network scenarios\n",
" nw_scenarios = get_network_scenarios(sandbox)\n",
" if nw_scenarios is None:\n",
" logger.error('Failed to retrieve the list of network scenarios')\n",
" elif len(nw_scenarios) != 0:\n",
" # Wait for the MEC Sandbox is running\n",
" time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n",
" else:\n",
" logger.info('nw_scenarios: No scenario available')\n",
"\n",
" # Activate a network scenario based on a list of criterias (hard coded!!!)\n",
" if activate_network_scenario(sandbox) == -1:\n",
" logger.error('Failed to activate network scenario')\n",
" else:\n",
" # Wait for the MEC services are running\n",
" time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running\n",
"\n",
" # Request for a new application instance identifier\n",
" app_inst_id = request_application_instance_id(sandbox)\n",
" if app_inst_id == None:\n",
" logger.error('Failed to request an application instance identifier')\n",
" # Wait for the MEC services are terminated\n",
" time.sleep(STABLE_TIME_OUT)\n",
" # Send READY confirmation\n",
" sub_id = None\n",
" if send_ready_confirmation(sandbox, app_inst_id) == -1:\n",
" logger.error('Failed to send confirm_ready')\n",
" else:\n",
" # Subscribe to AppTerminationNotificationSubscription\n",
" result, sub_id, res_url = send_subscribe_termination(sandbox, app_inst_id)\n",
" if sub_id == None:\n",
" logger.error('Failed to do the subscription')\n",
"\n",
" return (sandbox, app_inst_id, sub_id)\n",
" # End of function mec_app_setup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### The procedure for the termination of a MEC application\n",
"\n",
"This function provides the steps to terminate a MEC application.\n",
"NOTE: All subscriptions done outside of the mec_app_setup function are not deleted."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def mec_app_termination(sandbox_name: str, app_inst_id:swagger_client.models.ApplicationInfo, sub_id: str):\n",
" \"\"\"\n",
" This function provides the steps to setup a MEC application:\n",
" - Login\n",
" - Print sandbox identifier\n",
" - Print available network scenarios\n",
" - Activate a network scenario\n",
" - Request for a new application instance identifier\n",
" - Send READY confirmation\n",
" - Subscribe to AppTermination Notification\n",
" :param sandbox_name: The MEC Sandbox instance to use\n",
" :param app_inst_id: The MEC application instance identifier\n",
" :param sub_id: The subscription identifier\n",
" global logger\n",
"\n",
" # Delete AppTerminationNotification subscription\n",
" if sub_id is not None:\n",
" if delete_subscribe_termination(sandbox_name, app_inst_id, sub_id) == -1:\n",
" logger.error('Failed to delete the application instance identifier')\n",
"\n",
" # Delete the application instance identifier\n",
" if delete_application_instance_id(sandbox_name, app_inst_id.id) == -1:\n",
" logger.error('Failed to delete the application instance identifier')\n",
" # Wait for the MEC services are terminated\n",
" time.sleep(STABLE_TIME_OUT)\n",
"\n",
" # Deactivate a network scenario based on a list of criterias (hard coded!!!)\n",
" if deactivate_network_scenario(sandbox_name) == -1:\n",
" logger.error('Failed to deactivate network scenario')\n",
" else:\n",
" # Wait for the MEC services are terminated\n",
" time.sleep(2 * STABLE_TIME_OUT)\n",
" process_logout(sandbox_name)\n",
" # End of function mec_app_termination"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following cell describes the new basic MEC application architecture. It will be used in the rest of this titorial."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def process_main():\n",
" \"\"\"\n",
" This is the second sprint of our skeleton of our MEC application:\n",
" - Mec application setup\n",
" - Get UU unicast provisioning information\n",
" - Mec application termination\n",
" \"\"\" \n",
" global logger\n",
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
"\n",
" logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
" logger.debug('\\t pwd= ' + os.getcwd())\n",
"\n",
" # Setup the MEC application\n",
" (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n",
"\n",
" # Any processing here\n",
" logger.info('sandbox_name: ' + sandbox_name)\n",
" logger.info('app_inst_id: ' + app_inst_id.id)\n",
" if sub_id is not None:\n",
" logger.info('sub_id: ' + sub_id)\n",
" time.sleep(STABLE_TIME_OUT)\n",
"\n",
" # Terminate the MEC application\n",
" mec_app_termination(sandbox_name, app_inst_id, sub_id)\n",
"\n",
" logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
" # End of function process_main\n",
"\n",
"if __name__ == '__main__':\n",
" process_main()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create our second MEC application: how to use MEC Services\n",
"\n",
"After doing the logging, network scenario activation, MEC application instance creation steps, we are ready to exploit the MEC services exposed by the MEC Sandbox.\n",
"\n",
"In this clause, we use the following functionalities provided by MEC-030:\n",
"- Getting UU unicast provisioning information (ETSI GS MEC 030 Clause 5.5.1)\n",
"- Subscribe to the V2X message distribution server (ETSI GS MEC 030 Clause 5.5.7)\n",
"- Delete subscription\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Getting UU unicast provisioning information\n",
"\n",
"The purpose is to query provisioning information for V2X communication over Uu unicast."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def send_uu_unicast_provisioning_info(sandbox_name: str, ecgi: str) -> object:\n",
" \"\"\"\n",
" Request for V2X communication over Uu unicast information\n",
" :param sandbox_name: The MEC Sandbox instance to use\n",
" :param ecgi: Comma separated list of locations to identify a cell of a base station or a particular geographical area\n",
" :return The Uu unicast provisioning information on success, None otherwise\n",
" :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.1 Sending a request for provisioning information for V2X communication over Uu unicast\n",
" \"\"\"\n",
" global MEC_PLTF, logger, service_api\n",
" logger.debug('>>> send_uu_unicast_provisioning_info: ' + ecgi)\n",
" url = '/{sandbox_name}/{mec_pltf}/vis/v2/queries/uu_unicast_provisioning_info'\n",
" logger.debug('send_uu_unicast_provisioning_info: url: ' + url)\n",
" path_params = {}\n",
" path_params['sandbox_name'] = sandbox_name\n",
" path_params['mec_pltf'] = MEC_PLTF\n",
" query_params = []\n",
" query_params.append(('location_info', 'ecgi,' + ecgi))\n",
" header_params = {}\n",
" # HTTP header `Accept`\n",
" header_params['Accept'] = 'application/json' # noqa: E501\n",
" # HTTP header `Content-Type`\n",
" header_params['Content-Type'] = 'application/json' # noqa: E501\n",
" (result, status, header) = service_api.call_api(url, 'GET', header_params=header_params, path_params=path_params, query_params=query_params, async_req=False)\n",
" return (result, status, header)\n",
" except ApiException as e:\n",
" logger.error('Exception when calling call_api: %s\\n' % e)\n",
" # End of function send_uu_unicast_provisioning_info"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's create the our second MEC application.\n",
"The sequence is the following:\n",
"- Mec application setup\n",
"- Get UU unicast provisioning information\n",
"- Mec application termination\n",
"\n",
"Note that the UU unicast provisioning information is returned as a JSON string. To de-serialized it into a Python data structure, please refer to clause [Subscribing to V2X message distribution server](#subscribing_to_v2x_message_distribution_server)."
"execution_count": null,
"outputs": [],
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
"source": [
"def process_main():\n",
" \"\"\"\n",
" This is the second sprint of our skeleton of our MEC application:\n",
" - Mec application setup\n",
" - Get UU unicast provisioning information\n",
" - Mec application termination\n",
" \"\"\" \n",
" global logger, nw_scenarios\n",
"\n",
" logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
" logger.debug('\\t pwd= ' + os.getcwd())\n",
"\n",
" # Setup the MEC application\n",
" (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n",
"\n",
" # Get UU unicast provisioning information\n",
" ecgi = \"268708941961,268711972264\" # List of ecgi spearated by a ','\n",
" result, status, header = send_uu_unicast_provisioning_info(sandbox_name, ecgi)\n",
" logger.info('UU unicast provisioning information: status: %s', str(status))\n",
" if status != 200:\n",
" logger.error('Failed to get UU unicast provisioning information')\n",
" else:\n",
" logger.info('UU unicast provisioning information: %s', str(result.data))\n",
"\n",
" # Any processing comes here\n",
" logger.info('body: ' + str(result.data))\n",
" data = json.loads(result.data)\n",
" logger.info('data: ' + str(data))\n",
"\n",
" # Terminate the MEC application\n",
" mec_app_termination(sandbox_name, app_inst_id, sub_id)\n",
"\n",
" logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
" # End of function process_main\n",
"\n",
"if __name__ == '__main__':\n",
" process_main()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Subscribing to V2X message distribution server\n",
"\n",
"Here, we need to come back to the MEC 030 standard to create the type V2xMsgSubscription. It involves the creation of a set of basic types described below.\n",
"\n",
"Note: These new type shall be 'JSON' serializable. It means that they have to implement the following methods:\n",
"```python\n",
"to_dict()\n",
"to_str()\n",
"__repr__()\n",
"__eq__()\n",
"__ne__()\n",
"```\n",
"\n",
"Reference: Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.5.13 Type: LinkType\n",
"\n",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class LinkType(object):\n",
" swagger_types = {'href': 'str'}\n",
" attribute_map = {'href': 'href'}\n",
" def __init__(self, href=None): # noqa: E501\n",
" self._href = None\n",
" if href is not None:\n",
" self._href = href\n",
" @property\n",
" def href(self):\n",
" return self._href\n",
" @href.setter\n",
" def href(self, href):\n",
" self._href = href\n",
" def to_dict(self):\n",
" result = {}\n",
" for attr, _ in six.iteritems(self.swagger_types):\n",
" value = getattr(self, attr)\n",
" if isinstance(value, list):\n",
" result[attr] = list(map(\n",
" lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
" value\n",
" ))\n",
" elif hasattr(value, 'to_dict'):\n",
" result[attr] = value.to_dict()\n",
" elif isinstance(value, dict):\n",
" result[attr] = dict(map(\n",
" lambda item: (item[0], item[1].to_dict())\n",
" if hasattr(item[1], 'to_dict') else item,\n",
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
" value.items()\n",
" ))\n",
" else:\n",
" result[attr] = value\n",
" if issubclass(LinkType, dict):\n",
" for key, value in self.items():\n",
" result[key] = value\n",
" return result\n",
" def to_str(self):\n",
" return pprint.pformat(self.to_dict())\n",
" def __repr__(self):\n",
" return self.to_str()\n",
" def __eq__(self, other):\n",
" if not isinstance(other, LinkType):\n",
" return False\n",
" return self.__dict__ == other.__dict__\n",
" def __ne__(self, other):\n",
" return not self == other\n",
"\n",
"class Links(object):\n",
" swagger_types = {'self': 'LinkType'}\n",
" attribute_map = {'self': 'self'}\n",
" def __init__(self, self_=None): # noqa: E501\n",
" self._self = None\n",
" if self_ is not None:\n",
" self._self = self_\n",
" @property\n",
" def self_(self):\n",
" return self._self\n",
" @self_.setter\n",
" def self_(self, self_):\n",
" self._self = self_\n",
" def to_dict(self):\n",
" result = {}\n",
" for attr, _ in six.iteritems(self.swagger_types):\n",
" value = getattr(self, attr)\n",
" if isinstance(value, list):\n",
" result[attr] = list(map(\n",
" lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
" value\n",
" ))\n",
" elif hasattr(value, 'to_dict'):\n",
" result[attr] = value.to_dict()\n",
" elif isinstance(value, dict):\n",
" result[attr] = dict(map(\n",
" lambda item: (item[0], item[1].to_dict())\n",
" if hasattr(item[1], 'to_dict') else item,\n",
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
" value.items()\n",
" ))\n",
" else:\n",
" result[attr] = value\n",
" if issubclass(Links, dict):\n",
" for key, value in self.items():\n",
" result[key] = value\n",
" return result\n",
" def to_str(self):\n",
" return pprint.pformat(self.to_dict())\n",
" def __repr__(self):\n",
" return self.to_str()\n",
" def __eq__(self, other):\n",
" if not isinstance(other, Links):\n",
" return False\n",
" return self.__dict__ == other.__dict__\n",
" def __ne__(self, other):\n",
" return not self == other\n",
"\n",
"class TimeStamp(object):\n",
" swagger_types = {'seconds': 'int', 'nano_seconds': 'int'}\n",
" attribute_map = {'seconds': 'seconds', 'nano_seconds': 'nanoSeconds'}\n",
" def __init__(self, seconds=None, nano_seconds=None): # noqa: E501\n",
" self._seconds = None\n",
" self._nano_seconds = None\n",
" if seconds is not None:\n",
" self._seconds = seconds\n",
" if nano_seconds is not None:\n",
" self._nano_seconds = nano_seconds\n",
" @property\n",
" def seconds(self):\n",
" return self._seconds\n",
" @seconds.setter\n",
" def seconds(self, seconds):\n",
" self._seconds = seconds\n",
" @property\n",
" def nano_seconds(self):\n",
" return self._nano_seconds\n",
" @nano_seconds.setter\n",
" def nano_seconds(self, nano_seconds):\n",
" self._nano_seconds = nano_seconds\n",
" def to_dict(self):\n",
" result = {}\n",
" for attr, _ in six.iteritems(self.swagger_types):\n",
" value = getattr(self, attr)\n",
" if isinstance(value, list):\n",
" result[attr] = list(map(\n",
" lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
" value\n",
" ))\n",
" elif hasattr(value, 'to_dict'):\n",
" result[attr] = value.to_dict()\n",
" elif isinstance(value, dict):\n",
" result[attr] = dict(map(\n",
" lambda item: (item[0], item[1].to_dict())\n",
" if hasattr(item[1], 'to_dict') else item,\n",
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
" value.items()\n",
" ))\n",
" else:\n",
" result[attr] = value\n",
" if issubclass(TimeStamp, dict):\n",
" for key, value in self.items():\n",
" result[key] = value\n",
" return result\n",
" def to_str(self):\n",
" return pprint.pformat(self.to_dict())\n",
" def __repr__(self):\n",
" return self.to_str()\n",
" def __eq__(self, other):\n",
" if not isinstance(other, TimeStamp):\n",
" return False\n",
" return self.__dict__ == other.__dict__\n",
" def __ne__(self, other):\n",
" return not self == other"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Subscribing to V2X message distribution server\n",
"\n",
"The cell below implements the V2xMsgSubscription data structure.\"}\r",
"Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.3.5 Type: V2xMsgSubscription\n",
"\n",
{
"cell_type": "code",
"execution_count": null,
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
"metadata": {},
"outputs": [],
"source": [
"class V2xMsgSubscription(object):\n",
" swagger_types = {'links': 'Links', 'callback_reference': 'str', 'filter_criteria': 'V2xMsgSubscriptionFilterCriteria', 'request_test_notification': 'bool', 'subscription_type': 'str'}\n",
" attribute_map = {'links': 'Links', 'callback_reference': 'callbackReference', 'filter_criteria': 'filterCriteria', 'request_test_notification': 'requestTestNotification', 'subscription_type': 'subscriptionType'}\n",
" def __init__(self, links=None, callback_reference=None, filter_criteria=None, request_test_notification=None): # noqa: E501\n",
" self._links = None\n",
" self._callback_reference = None\n",
" self._filter_criteria = None\n",
" self._request_test_notification = None\n",
" self._subscription_type = \"V2xMsgSubscription\"\n",
" if links is not None:\n",
" self.links = links\n",
" if callback_reference is not None:\n",
" self.callback_reference = callback_reference\n",
" if filter_criteria is not None:\n",
" self.filter_criteria = filter_criteria\n",
" if request_test_notification is not None:\n",
" self.request_test_notification = request_test_notification\n",
" @property\n",
" def links(self):\n",
" return self._links\n",
" @links.setter\n",
" def links(self, links):\n",
" self_.links = links\n",
" @property\n",
" def callback_reference(self):\n",
" return self._callback_reference\n",
" @callback_reference.setter\n",
" def callback_reference(self, callback_reference):\n",
" self._callback_reference = callback_reference\n",
" @property\n",
" def links(self):\n",
" return self._links\n",
" @links.setter\n",
" def links(self, links):\n",
" self._links = links\n",
" @property\n",
" def filter_criteria(self):\n",
" return self._filter_criteria\n",
" @filter_criteria.setter\n",
" def filter_criteria(self, filter_criteria):\n",
" self._filter_criteria = filter_criteria\n",
" @property\n",
" def request_test_notification(self):\n",
" return self._request_test_notification\n",
" @request_test_notification.setter\n",
" def request_test_notification(self, request_test_notification):\n",
" self._request_test_notification = request_test_notification\n",
" @property\n",
" def subscription_type(self):\n",
" return self._subscription_type\n",
" def to_dict(self):\n",
" result = {}\n",
" for attr, _ in six.iteritems(self.swagger_types):\n",
" value = getattr(self, attr)\n",
" if isinstance(value, list):\n",
" result[attr] = list(map(\n",
" lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
" value\n",
" ))\n",
" elif hasattr(value, 'to_dict'):\n",
" result[attr] = value.to_dict()\n",
" elif isinstance(value, dict):\n",
" result[attr] = dict(map(\n",
" lambda item: (item[0], item[1].to_dict())\n",
" if hasattr(item[1], 'to_dict') else item,\n",
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
" value.items()\n",
" ))\n",
" else:\n",
" result[attr] = value\n",
" if issubclass(V2xMsgSubscription, dict):\n",
" for key, value in self.items():\n",
" result[key] = value\n",
" return result\n",
" def to_str(self):\n",
" return pprint.pformat(self.to_dict())\n",
" def __repr__(self):\n",
" return self.to_str()\n",
" def __eq__(self, other):\n",
" if not isinstance(other, V2xMsgSubscription):\n",
" return False\n",
" return self.__dict__ == other.__dict__\n",
" def __ne__(self, other):\n",
" return not self == other\n",
"\n",
"class V2xMsgSubscriptionFilterCriteria(object):\n",
" swagger_types = {'msg_type': 'list[str]', 'std_organization': 'str'}\n",
" attribute_map = {'msg_type': 'MsgType', 'std_organization': 'stdOrganization'}\n",
" def __init__(self, msg_type, std_organization): # noqa: E501\n",
" self._msg_type = None\n",
" self._std_organization = None\n",
" self.msg_type = msg_type\n",
" self.std_organization = std_organization\n",
" @property\n",
" def msg_type(self):\n",
" return self._msg_type\n",
" @msg_type.setter\n",
" def msg_type(self, msg_type):\n",
" self._msg_type = msg_type\n",
" @property\n",
" def std_organization(self):\n",
" return self._std_organization\n",
" @std_organization.setter\n",
" def std_organization(self, std_organization):\n",
" self._std_organization = std_organization\n",
" def to_dict(self):\n",
" result = {}\n",
" for attr, _ in six.iteritems(self.swagger_types):\n",
" value = getattr(self, attr)\n",
" if isinstance(value, list):\n",
" result[attr] = list(map(\n",
" lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
" value\n",
" ))\n",
" elif hasattr(value, 'to_dict'):\n",
" result[attr] = value.to_dict()\n",
" elif isinstance(value, dict):\n",
" result[attr] = dict(map(\n",
" lambda item: (item[0], item[1].to_dict())\n",
" if hasattr(item[1], 'to_dict') else item,\n",
" value.items()\n",
" ))\n",
" else:\n",
" result[attr] = value\n",
" if issubclass(V2xMsgSubscriptionFilterCriteria, dict):\n",
" for key, value in self.items():\n",
" result[key] = value\n",
" return result\n",
" def to_str(self):\n",
" return pprint.pformat(self.to_dict())\n",
" def __repr__(self):\n",
" return self.to_str()\n",
" def __eq__(self, other):\n",
" if not isinstance(other, V2xMsgSubscriptionFilterCriteria):\n",
" return False\n",
" return self.__dict__ == other.__dict__\n",
" def __ne__(self, other):\n",
" return not self == other"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here is the V2X message subscription function. The HTTP Request message body contains a 'JSON' serialized instance of the class V2xMsgSubscription.\n",
"\n",
"Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.10 V2X message interoperability\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def subscribe_v2x_message(sandbox_name: str, v2xMsgSubscription: V2xMsgSubscription) -> object:\n",
" \"\"\"\n",
" Request to subscribe the V2X messages which come from different vehicle OEMs or operators\n",
" :param sandbox_name: The MEC Sandbox instance to use\n",
" :param app_inst_id: The MEC application instance identifier\n",
" :param sub_id: The subscription identifier\n",
" :return The HTTP response, the HTTP response status, the subscription identifier and the subscription URL on success, None otherwise\n",
" :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.10 V2X message interoperability\n",
" \"\"\"\n",
" global MEC_PLTF, logger, service_api\n",
" logger.debug('>>> subscribe_v2x_message: v2xMsgSubscription: ' + str(v2xMsgSubscription))\n",
" url = '/{sandbox_name}/{mec_pltf}/vis/v2/subscriptions'\n",
" logger.debug('subscribe_v2x_message: url: ' + url)\n",
" path_params = {}\n",
" path_params['sandbox_name'] = sandbox_name\n",
" path_params['mec_pltf'] = MEC_PLTF\n",
" header_params = {}\n",
" # HTTP header `Accept`\n",
" header_params['Accept'] = 'application/json' # noqa: E501\n",
" # HTTP header `Content-Type`\n",
" header_params['Content-Type'] = 'application/json' # noqa: E501\n",
" (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params=path_params, body=v2xMsgSubscription, async_req=False)\n",
" return (result, status, extract_sub_id(headers['Location']), headers['Location'])\n",