{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# How to develop a MEC application using the MEC Sandbox HTTP REST API\n", "This tutorial introduces the step by step procedure to create a basic MEC application following ETSI MEC standards.\n", "It uses the ETSI MEC Sandbox simulator.\n", "\n", "
\n", " Note: These source code examples are simplified and ignore return codes and error checks to a large extent. We do this to highlight how to use the MEC Sandbox API and the different MEC satndards and reduce unrelated code.\n", "A real-world application will of course properly check every return value and exit correctly at the first serious error.\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What is a MEC application\n", "\n", "See [The Wiki MEC web site](https://www.etsi.org/technologies/multi-access-edge-computing)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The basics of developing a MEC application\n", "\n", "The developement of a MEC application follows a strict process in order to access the ETSI MEC services and provides valuable services to the customers.\n", "Mainly, this process can be split in several steps:\n", "1. Global initializations (constant, variables...)\n", "2. Create a new instance of a MEC Sandbox (Note that using an existing one could be a solution too (see Annex A))\n", "3. Activate a network scenario in order to access the ETSI MEC services\n", "4. Create a new application identifier\n", "5. Register our MEC application and subscribe to service termination (see MEC 011)\n", "6. Use MEC services in order to provide valuable services to the customers\n", " 6.1. Apply MEC services required subscriptions (e.g. MEC 013 location subscription)\n", "7. Terminate the MEC application\n", " 7.1. Remove MEC services subscriptions\n", " 7.2. Deactivate the current network scenario\n", " 7.3. Delete the instance of the MEC Sandbox\n", "8. Release all the MEC application resources\n", "\n", "**Note:** Several application identifier can be created to address several MEC application.\n", "\n", "\n", "## Use the MEC Sandbox HTTP REST API models and code\n", "\n", "The MEC sandbox provides a piece of code (the python sub) that shall be used to develop the MEC application and interact with the MEC Sandbox. This piece of code mainly contains swagger models to serialize/deserialize JSON data structures and HTTP REST API call functions.\n", "The openApi file is availabe [here](https://labs.etsi.org/rep/mec/etsi-mec-sandbox/-/blob/STF678_Task1_2_3_4/go-apps/meep-sandbox-api/api/swagger.yaml) and the [Swagger editor](https://editor-next.swagger.io/) is used to generate the python stub.\n", "\n", "The project architecture is describe [here](images/project_arch.jpg).\n", "\n", "The sandbox_api folder contains the python implementation of the HTTP REST API definitions introduced by the openApi [file](https://labs.etsi.org/rep/mec/etsi-mec-sandbox/-/blob/STF678_Task1_2_3_4/go-apps/meep-sandbox-api/api/swagger.yaml).\n", "The model folder contains the python implementation of the data type definitions introduced by the openApi [file](https://labs.etsi.org/rep/mec/etsi-mec-sandbox/-/blob/STF678_Task1_2_3_4/go-apps/meep-sandbox-api/api/swagger.yaml).\n", "\n", "
\n", " Note: The sub-paragraph 'Putting everything together' is a specific paragraph where all the newly features introduced in the main paragraph are put together to create an executable block of code. It is possible to skip this block of code by removing the comment character (#) on first line of this block of code.\n", "
\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before going to create our MEC application skeleton, the following steps shall be done:\n", "1) Change the working directory (see the project architecture)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "import os\n", "os.chdir(os.path.join(os.getcwd(), '../mecapp'))\n", "print(os.getcwd())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2) Apply the python imports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from __future__ import division # Import floating-point division (1/4=0.25) instead of Euclidian division (1/4=0)\n", "\n", "import os\n", "import sys\n", "import re\n", "import logging\n", "import threading\n", "import time\n", "import json\n", "import uuid\n", "\n", "import pprint\n", "\n", "import six\n", "\n", "import swagger_client\n", "from swagger_client.rest import ApiException\n", "\n", "from http import HTTPStatus\n", "from http.server import BaseHTTPRequestHandler, HTTPServer\n", "\n", "try:\n", " import urllib3\n", "except ImportError:\n", " raise ImportError('Swagger python client requires urllib3.')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "3) Initialize of the global constants (cell 3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "MEC_SANDBOX_URL = 'https://mec-platform2.etsi.org' # MEC Sandbox host/base URL\n", "MEC_SANDBOX_API_URL = 'https://mec-platform2.etsi.org/sandbox-api/v1' # MEC Sandbox API host/base URL\n", "PROVIDER = 'Jupyter2024' # Login provider value - To skip authorization: 'github'\n", "MEC_PLTF = 'mep1' # MEC plateform name. Linked to the network scenario\n", "LOGGER_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Logging format\n", "STABLE_TIME_OUT = 10 # Timer to wait for MEC Sndbox reaches its stable state (K8S pods in running state)\n", "LOGIN_TIMEOUT = 3 #30 # Timer to wait for user to authorize from GITHUB\n", "LISTENER_IP = '0.0.0.0' # Listener IPv4 address for notification callback calls\n", "LISTENER_PORT = 31111 # Listener IPv4 port for notification callback calls. Default: 36001\n", "CALLBACK_URI = 'http://mec-platform2.etsi.org:31111/sandbox/v1'\n", " #'https://yanngarcia.ddns.net:' + str(LISTENER_PORT) + '/jupyter/sandbox/demo6/v1/'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "4) Setup the logger instance and the HTTP REST API (cell 4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize the logger\n", "logger = logging.getLogger(__name__)\n", "logger.setLevel(logging.DEBUG)\n", "logging.basicConfig(filename='/tmp/' + time.strftime('%Y%m%d-%H%M%S') + '.log')\n", "l = logging.StreamHandler()\n", "l.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))\n", "logger.addHandler(l)\n", "\n", "# Setup the HTTP REST API configuration to be used to send request to MEC Sandbox API \n", "configuration = swagger_client.Configuration()\n", "configuration.host = MEC_SANDBOX_API_URL\n", "configuration.verify_ssl = True\n", "configuration.debug = True\n", "configuration.logger_format = LOGGER_FORMAT\n", "# Create an instance of ApiClient\n", "sandbox_api = swagger_client.ApiClient(configuration, 'Content-Type', 'application/json')\n", "\n", "# Setup the HTTP REST API configuration to be used to send request to MEC Services\n", "configuration1 = swagger_client.Configuration()\n", "configuration1.host = MEC_SANDBOX_URL\n", "configuration1.verify_ssl = True\n", "configuration1.debug = True\n", "configuration1.logger_format = LOGGER_FORMAT\n", "# Create an instance of ApiClient\n", "service_api = swagger_client.ApiClient(configuration1, 'Content-Type', 'application/json')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "5) Setup the global variables (cell 5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize the global variables\n", "nw_scenarios = [] # The list of available network scenarios\n", "nw_scenario_idx = -1 # The network scenario idx to activate (deactivate)\n", "app_inst_id = None # The requested application instance identifier\n", "got_notification = False # Set to true if a POST notification is received" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create our first MEC application\n", "\n", "The first step to develop a MEC application is to create the application skeleton which contains the minimum steps below:\n", " \n", "- Login to instanciate a MEC Sandbox\n", "- Logout to delete a existing MEC Sandbox" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### First steps: the login/logout\n", "\n", "Here is the first squeleton with the following sequence:\n", "- Login\n", "- Print sandbox identifier\n", "- Logout\n", "- Check that logout is effective\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The login function\n", "\n", "To log to the MEC Sandbox, \n", "the login process is done in two step. In step 1, a user code is requested to GITHUB. In step 2, the user has to enter this user code to https://github.com/login/device and proceed to the authorization.\n", "Please, pay attention to the log '=======================> DO AUTHORIZATION WITH CODE :' which indicates you the user code to use for the authorization.\n", "\n", "It uses the HTTP POST request with the URL 'POST /sandbox-sandbox_api/v1/login?provide=github' (see PROVIDER constant).\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Login\n", "def process_login() -> str:\n", " \"\"\"\n", " Authenticate and create a new MEC Sandbox instance.\n", " :return: The sandbox instance identifier on success, None otherwise\n", " \"\"\" \n", " global PROVIDER, logger\n", "\n", " logger.debug('>>> process_login')\n", "\n", " try:\n", " auth = swagger_client.AuthorizationApi(sandbox_api)\n", " oauth = auth.login(PROVIDER, async_req = False)\n", " logger.debug('process_login (step1): oauth: ' + str(oauth))\n", " # Wait for the MEC Sandbox is running\n", " logger.debug('=======================> DO AUTHORIZATION WITH CODE : ' + oauth.user_code)\n", " logger.debug('=======================> DO AUTHORIZATION HERE : ' + oauth.verification_uri)\n", " if oauth.verification_uri == \"\":\n", " time.sleep(LOGIN_TIMEOUT) # Skip scecurity, wait for a few seconds\n", " else:\n", " time.sleep(10 * LOGIN_TIMEOUT) # Wait for Authirization from user side\n", " namespace = auth.get_namespace(oauth.user_code)\n", " logger.debug('process_login (step2): result: ' + str(namespace))\n", " return namespace.sandbox_name\n", " except ApiException as e:\n", " logger.error('Exception when calling AuthorizationApi->login: %s\\n' % e)\n", "\n", " return None\n", " # End of function process_login\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The logout function\n", "\n", "It uses the HTTP POST request with the URL 'POST /sandbox-sandbox_api/v1/logout?sandbox_name={sandbox_name}'.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Logout\n", "def process_logout(sandbox_name: str) -> int:\n", " \"\"\"\n", " Delete the specified MEC Sandbox instance.\n", " :param sandbox_name: The MEC Sandbox to delete\n", " :return: 0 on success, -1 otherwise\n", " \"\"\"\n", " global logger\n", "\n", " logger.debug('>>> process_logout: sandbox=' + sandbox_name)\n", "\n", " try:\n", " auth = swagger_client.AuthorizationApi(sandbox_api)\n", " result = auth.logout(sandbox_name, async_req = False) # noqa: E501\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling AuthorizationApi->logout: %s\\n' % e)\n", " return -1\n", " # End of function process_logout\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let put in action our Login/Logout functions:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the skeleton of our MEC application:\n", " - Login\n", " - Print sandbox identifier\n", " - Logout\n", " - Check that logout is effective\n", " This skeleton will be the bas of the next sprint in order to achieve a full implementation of a MEC application\n", " \"\"\" \n", " global logger\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Login\n", " sandbox = process_login()\n", " if sandbox is None:\n", " return\n", "\n", " # Print sandbox identifier\n", " logger.info('Sandbox created: ' + sandbox)\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Logout\n", " process_logout(sandbox)\n", "\n", " # Check that logout is effective\n", " logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')\n", " \n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Second step: Retrieve the list of network scenarios\n", "\n", "Let's go futhur and see how we can retrieve the list of the network scenarios available in order to activate one of them and access the MEC services exposed such as MEC 013 or MEC 030.\n", "\n", "The sequence will be:\n", "- Login\n", "- Print sandbox identifier\n", "- Print available network scenarios\n", "- Logout\n", "- Check that logout is effective\n", "\n", "The login and logout functions are described in cell 3 and 4.\n", "\n", "To retrieve the list of the network scenarios, let's create a new function called 'get_network_scenarios'. It uses the HTTP GET request with the URL '/sandbox-sandbox_api/v1/sandboxNetworkScenarios?sandbox_name={sandbox_name}'." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_network_scenarios(sandbox_name: str) -> list:\n", " \"\"\"\n", " Retrieve the list of the available network scenarios.\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :return: The list of the available network scenarios on success, None otherwise\n", " \"\"\"\n", " global PROVIDER, logger, sandbox_api, configuration\n", "\n", " logger.debug('>>> get_network_scenarios: sandbox=' + sandbox_name)\n", "\n", " try:\n", " nw = swagger_client.SandboxNetworkScenariosApi(sandbox_api)\n", " result = nw.sandbox_network_scenarios_get(sandbox_name, async_req = False) # noqa: E501\n", " logger.debug('get_network_scenarios: result: ' + str(result))\n", " return result\n", " except ApiException as e:\n", " logger.error('Exception when calling SandboxNetworkScenariosApi->sandbox_network_scenarios_get: %s\\n' % e)\n", "\n", " return None\n", " # End of function get_network_scenarios\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Putting everything together\n", "\n", "Here the logic is:\n", "- Login\n", "- Print sandbox identifier\n", "- Print available network scenarios\n", "- Logout\n", "- Check that logout is effective\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the first sprint of our skeleton of our MEC application:\n", " - Login\n", " - Print sandbox identifier\n", " - Print available network scenarios\n", " - Logout\n", " - Check that logout is effective\n", " \"\"\" \n", " global logger, nw_scenarios \n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Login\n", " sandbox = process_login()\n", " if sandbox is None:\n", " logger.error('Failed to instanciate a MEC Sandbox')\n", " return\n", "\n", " # Print sandbox identifier\n", " logger.info('Sandbox created: ' + sandbox)\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Print available network scenarios\n", " nw_scenarios = get_network_scenarios(sandbox)\n", " if nw_scenarios is None:\n", " logger.error('Failed to retrieve the list of network scenarios')\n", " elif len(nw_scenarios) != 0:\n", " logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))\n", " logger.info('nw_scenarios: %s', str(nw_scenarios))\n", " else:\n", " logger.info('nw_scenarios: No scenario available')\n", "\n", " # Logout\n", " process_logout(sandbox)\n", "\n", " # Check that logout is effective\n", " logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')\n", " \n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Third step: Activate and deactivate a network scenario\n", "\n", "Having a list of network scenarion, the next step is to actvate (and deactivate) a network scenario. This step is mandatory to create a new application instance id and access the MEC services.\n", "\n", "In this section, we will arbitrary activate the network scenario called '4g-5g-macro-v2x', which is at the index 0 of the nw_scenarios. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def select_network_scenario_based_on_criteria(criterias_list: list) -> int:\n", " \"\"\"\n", " Select the network scenario to activate based of the provided list of criterias.\n", " :param criterias_list: The list of criterias to select the correct network scenario\n", " :return: 0 on success, -1 otherwise\n", " \"\"\"\n", "\n", " return 0 # The index of the '4g-5g-macro-v2x' network scenario - Hard coded" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The activate function\n", "\n", "The process to activate a scenario is based on an HTTP POST request with the URL '/sandboxNetworkScenarios/{sandbox_name}?network_scenario_id={network_scenario_id}'.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def activate_network_scenario(sandbox_name: str) -> int:\n", " \"\"\"\n", " Activate the specified network scenario.\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :return: 0 on success, -1 otherwise\n", " \"\"\"\n", " global logger, sandbox_api, nw_scenarios, nw_scenario_idx\n", "\n", " logger.debug('>>> activate_network_scenario: ' + sandbox_name)\n", "\n", " nw_scenario_idx = select_network_scenario_based_on_criteria([])\n", " if nw_scenario_idx == -1:\n", " logger.error('activate_network_scenario: Failed to select a network scenarion')\n", " return -1\n", "\n", " try:\n", " nw = swagger_client.SandboxNetworkScenariosApi(sandbox_api)\n", " nw.sandbox_network_scenario_post(sandbox_name, nw_scenarios[nw_scenario_idx].id, async_req = False) # noqa: E501\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling SandboxNetworkScenariosApi->activate_network_scenario: %s\\n' % e)\n", "\n", " return -1\n", " # End of function activate_network_scenario\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The deactivate function\n", "\n", "The process to deactivate a scenario is based on an HTTP DELETE request with the URL '/sandboxNetworkScenarios/{sandbox_name}?network_scenario_id={network_scenario_id}'.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def deactivate_network_scenario(sandbox: str) -> int:\n", " \"\"\"\n", " Deactivate the current network scenario.\n", " :param sandbox: The MEC Sandbox instance to use\n", " :return: 0 on success, -1 otherwise\n", " \"\"\"\n", " global logger, sandbox_api, nw_scenarios, nw_scenario_idx\n", "\n", " logger.debug('>>> deactivate_network_scenario: ' + sandbox)\n", "\n", " try:\n", " nw = swagger_client.SandboxNetworkScenariosApi(sandbox_api)\n", " nw.sandbox_network_scenario_delete(sandbox, nw_scenarios[nw_scenario_idx].id, async_req = False) # noqa: E501\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling SandboxNetworkScenariosApi->deactivate_network_scenario: %s\\n' % e)\n", "\n", " return -1\n", " # End of function deactivate_network_scenario\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Putting everything together\n", "\n", "Now, it is time to create the second iteration of our MEC application.\n", "\n", "The sequence is the following:\n", "- Login\n", "- Print sandbox identifier\n", "- Print available network scenarios\n", "- Activate a network scenario\n", "- Check that the network scenario is activated and the MEC services are running\n", "- Deactivate a network scenario\n", "- Logout\n", "- Check that logout is effective\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the second sprint of our skeleton of our MEC application:\n", " - Login\n", " - Print sandbox identifier\n", " - Print available network scenarios\n", " - Activate a network scenario\n", " - Check that the network scenario is activated and the MEC services are running\n", " - Deactivate a network scenario\n", " - Logout\n", " - Check that logout is effective\n", " \"\"\" \n", " global logger, nw_scenarios, nw_scenario_idx\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Login\n", " sandbox = process_login()\n", " if sandbox is None:\n", " logger.error('Failed to instanciate a MEC Sandbox')\n", " return\n", "\n", " # Print sandbox identifier\n", " logger.info('Sandbox created: ' + sandbox)\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Print available network scenarios\n", " nw_scenarios = get_network_scenarios(sandbox)\n", " if nw_scenarios is None:\n", " logger.error('Failed to retrieve the list of network scenarios')\n", " elif len(nw_scenarios) != 0:\n", " logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))\n", " logger.info('nw_scenarios: %s', str(nw_scenarios))\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", " else:\n", " logger.info('nw_scenarios: No scenario available')\n", "\n", " # Activate a network scenario based on a list of criterias (hard coded!!!)\n", " if activate_network_scenario(sandbox) == -1:\n", " logger.error('Failed to activate network scenario')\n", " else:\n", " logger.info('Network scenario activated: ' + nw_scenarios[nw_scenario_idx].id)\n", " # Wait for the MEC services are running\n", " time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Check that the network scenario is activated and the MEC services are running \n", " logger.info('To check that the network scenario is activated, verify on the MEC Sandbox server that the MEC services are running (kubectl get pods -A)')\n", " time.sleep(30) # Sleep for 30 seconds\n", "\n", " # Deactivate a network scenario based on a list of criterias (hard coded!!!)\n", " if deactivate_network_scenario(sandbox) == -1:\n", " logger.error('Failed to deactivate network scenario')\n", " else:\n", " logger.info('Network scenario deactivated: ' + nw_scenarios[nw_scenario_idx].id)\n", " # Wait for the MEC services are terminated\n", " time.sleep(2 * STABLE_TIME_OUT)\n", "\n", " # Logout\n", " process_logout(sandbox)\n", "\n", " # Check that logout is effective\n", " logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')\n", " \n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fourth step: Create and delete an appliction instance id\n", "\n", "To enable our MEC application to be part of the activated network scenario, we need to request the MEC sandbox to create a new application instance identifier. Our MEC application will use this identifier to register to the MEC Sandbox according to MEC 011.\n", "\n", "Reference: ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.2 MEC application start-up\n", "\n", "#### The appliction instance id creation function\n", "\n", "It is like the MEC application was instanciated by the MEC platform and it is executed locally.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def request_application_instance_id(sandbox_name: str) -> swagger_client.models.ApplicationInfo:\n", " \"\"\"\n", " Request the creation of a new MEC application instance identifier.\n", " It is like the MEC application was instanciated by the MEC platform and it is executed locally.\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :return: The MEC application instance identifier on success, None otherwise\n", " :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.2 MEC application start-up\n", " \"\"\"\n", " global MEC_PLTF, logger, sandbox_api, configuration\n", "\n", " logger.debug('>>> request_application_instance_id: ' + sandbox_name)\n", "\n", " # Create a instance of our MEC application\n", " try:\n", " a = swagger_client.models.ApplicationInfo(id=str(uuid.uuid4()), name='JupyterMecApp', node_name=MEC_PLTF, type='USER') # noqa: E501\n", " nw = swagger_client.SandboxAppInstancesApi(sandbox_api)\n", " result = nw.sandbox_app_instances_post(a, sandbox_name, async_req = False) # noqa: E501\n", " logger.debug('request_application_instance_id: result: ' + str(result))\n", " return result\n", " except ApiException as e:\n", " logger.error('Exception when calling SandboxAppInstancesApi->sandbox_app_instances_post: %s\\n' % e)\n", "\n", " return None\n", " # End of function request_application_instance_id" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The appliction instance id deletion function" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def delete_application_instance_id(sandbox_name: str, app_inst_id: str) -> int:\n", " \"\"\"\n", " Request the deletion of a MEC application.\n", " :param sandbox: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :return: 0 on success, -1 otherwise\n", " \"\"\"\n", " global logger, sandbox_api, configuration\n", "\n", " logger.debug('>>> delete_application_instance_id: ' + sandbox_name)\n", " logger.debug('>>> delete_application_instance_id: ' + app_inst_id)\n", "\n", " try:\n", " nw = swagger_client.SandboxAppInstancesApi(sandbox_api)\n", " nw.sandbox_app_instances_delete(sandbox_name, app_inst_id, async_req = False) # noqa: E501\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling SandboxAppInstancesApi->sandbox_app_instances_delete: %s\\n' % e)\n", "\n", " return -1\n", " # End of function deletet_application_instance_id" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Getting the list of applications" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_applications_list(sandbox_name: str) -> list:\n", " \"\"\"\n", " Request the list of the MEC application available on the MEC Platform.\n", " :param sandbox: The MEC Sandbox instance to use\n", " :return: 0 on success, -1 otherwise\n", " \"\"\"\n", " global logger, sandbox_api, configuration\n", "\n", " logger.debug('>>> get_applications_list: ' + sandbox_name)\n", "\n", " try:\n", " nw = swagger_client.SandboxAppInstancesApi(sandbox_api)\n", " result = nw.sandbox_app_instances_get(sandbox_name, async_req = False) # noqa: E501\n", " logger.debug('get_applications_list: result: ' + str(result))\n", " return result\n", " except ApiException as e:\n", " logger.error('Exception when calling SandboxAppInstancesApi->get_applications_list: %s\\n' % e)\n", " return None \n", " # End of function delete_application_instance_id" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Putting everything together\n", "\n", "It is time now to create the our third iteration of our MEC application.\n", "\n", "The sequence is the following:\n", "- Login\n", "- Print sandbox identifier\n", "- Print available network scenarios\n", "- Activate a network scenario\n", "- Request for a new application instance identifier\n", "- Retrieve the list of the applications instance identifier\n", "- Check the demo application is present in the list of applications\n", "- Delete our application instance identifier\n", "- Deactivate a network scenario\n", "- Logout\n", "- Check that logout is effective\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the second sprint of our skeleton of our MEC application:\n", " - Login\n", " - Print sandbox identifier\n", " - Print available network scenarios\n", " - Activate a network scenario\n", " - Request for a new application instance identifier\n", " - Retrieve the list of the applications instance identifier\n", " - Check the demo application is present in the list of applications\n", " - Deactivate a network scenario\n", " - Logout\n", " - Check that logout is effective\n", " \"\"\" \n", " global logger, nw_scenarios, nw_scenario_idx\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Login\n", " sandbox = process_login()\n", " if sandbox is None:\n", " logger.error('Failed to instanciate a MEC Sandbox')\n", " return\n", "\n", " # Print sandbox identifier\n", " logger.info('Sandbox created: ' + sandbox)\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Print available network scenarios\n", " nw_scenarios = get_network_scenarios(sandbox)\n", " if nw_scenarios is None:\n", " logger.error('Failed to retrieve the list of network scenarios')\n", " elif len(nw_scenarios) != 0:\n", " logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))\n", " logger.info('nw_scenarios: %s', str(nw_scenarios))\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", " else:\n", " logger.info('nw_scenarios: No scenario available')\n", "\n", " # Activate a network scenario based on a list of criterias (hard coded!!!)\n", " if activate_network_scenario(sandbox) == -1:\n", " logger.error('Failed to activate network scenario')\n", " else:\n", " logger.info('Network scenario activated: ' + nw_scenarios[nw_scenario_idx].id)\n", " # Wait for the MEC services are running\n", " time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Request for a new application instance identifier\n", " app_inst_id = request_application_instance_id(sandbox)\n", " if app_inst_id == None:\n", " logger.error('Failed to request an application instance identifier')\n", " else:\n", " logger.info('app_inst_id: %s', str(type(app_inst_id)))\n", " logger.info('app_inst_id: %s', str(app_inst_id))\n", "\n", " # Check the demo application is present in the list of applications\n", " app_list = get_applications_list(sandbox)\n", " if app_list is None:\n", " logger.error('Failed to request the list of applications')\n", " else:\n", " logger.info('app_list: %s', str(type(app_list)))\n", " logger.info('app_list: %s', str(app_list))\n", " # Check if our application is present in the list of applications\n", " found = False\n", " for item in app_list:\n", " if item.id == app_inst_id.id:\n", " found = True\n", " break\n", " if not found:\n", " logger.error('Failed to retrieve our application instance identifier')\n", "\n", " # Delete the application instance identifier\n", " if delete_application_instance_id(sandbox, app_inst_id.id) == -1:\n", " logger.error('Failed to delete the application instance identifier')\n", " else:\n", " logger.info('app_inst_id deleted: ' + app_inst_id.id)\n", "\n", " # Deactivate a network scenario based on a list of criterias (hard coded!!!)\n", " if deactivate_network_scenario(sandbox) == -1:\n", " logger.error('Failed to deactivate network scenario')\n", " else:\n", " logger.info('Network scenario deactivated: ' + nw_scenarios[nw_scenario_idx].id)\n", " # Wait for the MEC services are terminated\n", " time.sleep(2 * STABLE_TIME_OUT)\n", "\n", " # Logout\n", " process_logout(sandbox)\n", "\n", " # Check that logout is effective\n", " logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')\n", " \n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## MEC Registration and the READY confirmation\n", "\n", "Having an application instance identifier allows us to register with the MEC Sandbox and interact with it (e.g. to send service queries, to subscribe to events and to recieve notifications...).\n", "\n", "The standard MEC 011 Clause 5.2.2 MEC application start-up describes the start up process. Basically, our MEC application has to:\n", "1. Indicates that it is running by sending a Confirm Ready message\n", "2. Retrieve the list of MEC services \n", "\n", "To do so, a MEC application needs to be able to send requests but also to receive notifications (POST requests) and to reply to them." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fifth step: Send the READY confirmation\n", "\n", "The MEC application instance confirms towards the MEC platform that it is up and running. It corresponds to step 4c described inETSI GS MEC 011 V3.2.1 (2024-04)11 Clause 5.2.2 MEC application start-up.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def send_ready_confirmation(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> int:\n", " \"\"\"\n", " Send the ready_confirmation to indicate that the MEC application is active.\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :return: 0 on success, -1 otherwise\n", " :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.2 MEC application start-up - Step 4c\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> send_ready_confirmation: ' + app_inst_id.id)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/confirm_ready'\n", " logger.debug('send_ready_confirmation: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " path_params['app_inst_id'] = app_inst_id.id\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " # JSON indication READY\n", " dict_body = {}\n", " dict_body['indication'] = 'READY'\n", " service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=dict_body, async_req=False)\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return -1\n", " # End of function send_ready_confirmation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In addition, our MEC application is registering to AppTerminationNotificationSubscription and it needs to delete its subscription when terminating.\n", "\n", "At this stage, it is important to note that all subscription deletion use the same format: / (see ETSI MEC GS 003 [16]). \n", "In this case, it the AppTerminationNotificationSubscription is 'sub-1234', the URIs to do the susbscription and to delete it are:\n", "- MEC_SANDBOX_URL + '/' + sandbox_name + '/' + MEC_PLTF + '/mec_app_support/v2/applications/' + app_inst_id + '/subscriptions'\n", "- MEC_SANDBOX_URL + '/' + sandbox_name + '/' + MEC_PLTF + '/mec_app_support/v2/applications/' + app_inst_id + '/subscriptions/sub-1234'\n", "\n", "So, it will be usefull to create a small function to extract the subscription identifier from either the HTTP Location header or from the Link field found into the reponse body data structure. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Subscribing to application termination\n", "\n", "The purpose is to create a new subscription to \n", "the MEC application termination notification as describe in ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.6b Receiving event notifications on MEC application instance \n", "terminations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def send_subscribe_termination(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> object:\n", " \"\"\"\n", " Subscribe to the MEC application termination notifications.\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :return: The HTTP respone, the subscription ID and the resource URL on success, None otherwise\n", " :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.6b Receiving event notifications on MEC application instance termination\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> send_subscribe_termination: ' + app_inst_id.id)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/subscriptions'\n", " logger.debug('send_subscribe_termination: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " path_params['app_inst_id'] = app_inst_id.id\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " # Body\n", " dict_body = {}\n", " dict_body['subscriptionType'] = 'AppTerminationNotificationSubscription'\n", " dict_body['callbackReference'] = CALLBACK_URI + '/mec011/v2/termination' # FIXME To be parameterized\n", " dict_body['appInstanceId'] = app_inst_id.id\n", " (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=dict_body, async_req=False)\n", " return (result, extract_sub_id(headers['Location']), headers['Location'])\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, status, None)\n", " # End of function send_subscribe_termination" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Extracting subscription identifier\n", "\n", "This helper function extracts the subscription identifier from any subscription URL." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def extract_sub_id(resource_url: str) -> str:\n", " \"\"\"\n", " Extract the subscription identifier from the specified subscription URL.\n", " :param resource_url: The subscription URL\n", " :return: The subscription identifier on success, None otherwise\n", " \"\"\"\n", " global logger\n", "\n", " logger.debug('>>> extract_sub_id: resource_url: ' + resource_url)\n", "\n", " res = urllib3.util.parse_url(resource_url)\n", " if res is not None and res.path is not None and res.path != '':\n", " id = res.path.rsplit('/', 1)[-1]\n", " if id is not None:\n", " return id\n", " return None\n", " # End of function extract_sub_id" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Delete subscription to application termination" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def delete_subscribe_termination(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo, sub_id: str) -> int:\n", " \"\"\"\n", " Delete the subscrition to the AppTermination notification.\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :param sub_id: The subscription identifier\n", " :return: 0 on success, -1 otherwise\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> delete_subscribe_termination: ' + app_inst_id.id)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/subscriptions/{sub_id}'\n", " logger.debug('delete_subscribe_termination: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " path_params['app_inst_id'] = app_inst_id.id\n", " path_params['sub_id'] = sub_id\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " service_api.call_api(url, 'DELETE', header_params=header_params, path_params = path_params, async_req=False)\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return -1\n", " # End of function delete_subscribe_termination" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When the MEC application instance is notified to gracefully terminate, it provides to the MEC platform \n", "that the application has completed its application level related terminate/stop actiono\n", "\n", "Reference: ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.3 MEC application graceful termination/stopp." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def send_termination_confirmation(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> int:\n", " \"\"\"\n", " Send the confirm_termination to indicate that the MEC application is terminating gracefully.\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :return: 0 on success, -1 otherwise\n", " :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.3 MEC application graceful termination/stop\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> send_termination_confirmation: ' + app_inst_id.id)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/confirm_termination'\n", " logger.debug('send_termination_confirmation: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " path_params['app_inst_id'] = app_inst_id.id\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " # JSON indication READY\n", " dict_body = {}\n", " dict_body['operationAction'] = 'TERMINATING'\n", " service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=dict_body, async_req=False)\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return -1\n", " # End of function send_termination_confirmation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Putting everythig together\n", "Now, it is time now to create the our fifth iteration of our MEC application.\n", "\n", "The sequence is the following:\n", "- Login\n", "- Print sandbox identifier\n", "- Print available network scenarios\n", "- Activate a network scenario\n", "- Request for a new application instance identifier\n", "- Send READY confirmation\n", "- Subscribe to AppTerminationNotificationSubscription\n", "- Check list of services\n", "- Delete AppTerminationNotification subscription\n", "- Delete our application instance identifier\n", "- Deactivate a network scenario\n", "- Logout\n", "- Check that logout is effective\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the second sprint of our skeleton of our MEC application:\n", " - Login\n", " - Print sandbox identifier\n", " - Print available network scenarios\n", " - Activate a network scenario\n", " - Request for a new application instance identifier\n", " - Send READY confirmation\n", " \n", " - Subscribe to AppTermination Notification\n", " - Send Termination\n", " - Delete AppTerminationNotification subscription\n", " - Delete our application instance identifier\n", " - Deactivate a network scenario\n", " - Logout\n", " - Check that logout is effective\n", " \"\"\" \n", " global logger, nw_scenarios, nw_scenario_idx\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Login\n", " sandbox = process_login()\n", " if sandbox is None:\n", " logger.error('Failed to instanciate a MEC Sandbox')\n", " return\n", "\n", " # Print sandbox identifier\n", " logger.info('Sandbox created: ' + sandbox)\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Print available network scenarios\n", " nw_scenarios = get_network_scenarios(sandbox)\n", " if nw_scenarios is None:\n", " logger.error('Failed to retrieve the list of network scenarios')\n", " elif len(nw_scenarios) != 0:\n", " logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))\n", " logger.info('nw_scenarios: %s', str(nw_scenarios))\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", " else:\n", " logger.info('nw_scenarios: No scenario available')\n", "\n", " # Activate a network scenario based on a list of criterias (hard coded!!!)\n", " if activate_network_scenario(sandbox) == -1:\n", " logger.error('Failed to activate network scenario')\n", " else:\n", " logger.info('Network scenario activated: ' + nw_scenarios[nw_scenario_idx].id)\n", " # Wait for the MEC services are running\n", " time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Request for a new application instance identifier\n", " app_inst_id = request_application_instance_id(sandbox)\n", " if app_inst_id == None:\n", " logger.error('Failed to request an application instance identifier')\n", " else:\n", " logger.info('app_inst_id: %s', str(app_inst_id))\n", " time.sleep(STABLE_TIME_OUT)\n", "\n", " # Send READY confirmation\n", " sub_id = None\n", " if send_ready_confirmation(sandbox, app_inst_id) == -1:\n", " logger.error('Failed to send confirm_ready')\n", " else:\n", " # Subscribe to AppTerminationNotificationSubscription\n", " result, sub_id, res_url = send_subscribe_termination(sandbox, app_inst_id)\n", " if sub_id == None:\n", " logger.error('Failed to do the subscription')\n", " else:\n", " logger.info('result: ' + str(result))\n", " logger.info('sub_id: %s', sub_id)\n", " data = json.loads(result.data)\n", " logger.info('data: ' + str(data))\n", "\n", " # Any processing here\n", " time.sleep(STABLE_TIME_OUT)\n", "\n", " # Delete AppTerminationNotification subscription\n", " if sub_id is not None:\n", " if delete_subscribe_termination(sandbox, app_inst_id, sub_id) == -1:\n", " logger.error('Failed to delete the application instance identifier')\n", " else:\n", " logger.info('app_inst_id deleted: ' + app_inst_id.id)\n", "\n", " # Delete the application instance identifier\n", " if delete_application_instance_id(sandbox, app_inst_id.id) == -1:\n", " logger.error('Failed to delete the application instance identifier')\n", " else:\n", " logger.info('app_inst_id deleted: ' + app_inst_id.id)\n", "\n", " # Deactivate a network scenario based on a list of criterias (hard coded!!!)\n", " if deactivate_network_scenario(sandbox) == -1:\n", " logger.error('Failed to deactivate network scenario')\n", " else:\n", " logger.info('Network scenario deactivated: ' + nw_scenarios[nw_scenario_idx].id)\n", " # Wait for the MEC services are terminated\n", " time.sleep(2 * STABLE_TIME_OUT)\n", "\n", " # Logout\n", " process_logout(sandbox)\n", "\n", " # Check that logout is effective\n", " logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')\n", " \n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Conclusion: Create two procedures for the setup and the termination of our MEC application\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The procedure for the setup of a MEC application\n", "\n", "This function provides the steps to setup a MEC application and to be ready to use the MEC service exposed by the created MEC Sandbox.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def mec_app_setup():\n", " \"\"\"\n", " This function provides the steps to setup a MEC application:\n", " - Login\n", " - Print sandbox identifier\n", " - Print available network scenarios\n", " - Activate a network scenario\n", " - Request for a new application instance identifier\n", " - Send READY confirmation\n", " - Subscribe to AppTermination Notification\n", " :return The MEC Sandbox instance, the MEC application instance identifier and the subscription identifier on success, None otherwise\n", " \"\"\"\n", " global logger, nw_scenarios\n", "\n", " # Login\n", " sandbox = process_login()\n", " if sandbox is None:\n", " logger.error('Failed to instanciate a MEC Sandbox')\n", " return None\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Print available network scenarios\n", " nw_scenarios = get_network_scenarios(sandbox)\n", " if nw_scenarios is None:\n", " logger.error('Failed to retrieve the list of network scenarios')\n", " elif len(nw_scenarios) != 0:\n", " # Wait for the MEC Sandbox is running\n", " time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running\n", " else:\n", " logger.info('nw_scenarios: No scenario available')\n", "\n", " # Activate a network scenario based on a list of criterias (hard coded!!!)\n", " if activate_network_scenario(sandbox) == -1:\n", " logger.error('Failed to activate network scenario')\n", " else:\n", " # Wait for the MEC services are running\n", " time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running\n", "\n", " # Request for a new application instance identifier\n", " app_inst_id = request_application_instance_id(sandbox)\n", " if app_inst_id == None:\n", " logger.error('Failed to request an application instance identifier')\n", " else:\n", " # Wait for the MEC services are terminated\n", " time.sleep(STABLE_TIME_OUT)\n", "\n", " # Send READY confirmation\n", " sub_id = None\n", " if send_ready_confirmation(sandbox, app_inst_id) == -1:\n", " logger.error('Failed to send confirm_ready')\n", " else:\n", " # Subscribe to AppTerminationNotificationSubscription\n", " result, sub_id, res_url = send_subscribe_termination(sandbox, app_inst_id)\n", " if sub_id == None:\n", " logger.error('Failed to do the subscription')\n", "\n", " return (sandbox, app_inst_id, sub_id)\n", " # End of function mec_app_setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The procedure for the termination of a MEC application\n", "\n", "This function provides the steps to terminate a MEC application.\n", "\n", "**Note:** All subscriptions done outside of the mec_app_setup function are not deleted." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def mec_app_termination(sandbox_name: str, app_inst_id:swagger_client.models.ApplicationInfo, sub_id: str):\n", " \"\"\"\n", " This function provides the steps to setup a MEC application:\n", " - Login\n", " - Print sandbox identifier\n", " - Print available network scenarios\n", " - Activate a network scenario\n", " - Request for a new application instance identifier\n", " - Send READY confirmation\n", " - Subscribe to AppTermination Notification\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :param sub_id: The subscription identifier\n", " \"\"\"\n", " global logger\n", "\n", " # Delete AppTerminationNotification subscription\n", " if sub_id is not None:\n", " if delete_subscribe_termination(sandbox_name, app_inst_id, sub_id) == -1:\n", " logger.error('Failed to delete the application instance identifier')\n", "\n", " # Delete the application instance identifier\n", " if delete_application_instance_id(sandbox_name, app_inst_id.id) == -1:\n", " logger.error('Failed to delete the application instance identifier')\n", " else:\n", " # Wait for the MEC services are terminated\n", " time.sleep(STABLE_TIME_OUT)\n", "\n", " # Deactivate a network scenario based on a list of criterias (hard coded!!!)\n", " if deactivate_network_scenario(sandbox_name) == -1:\n", " logger.error('Failed to deactivate network scenario')\n", " else:\n", " # Wait for the MEC services are terminated\n", " time.sleep(2 * STABLE_TIME_OUT)\n", "\n", " # Logout\n", " process_logout(sandbox_name)\n", " # End of function mec_app_termination" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following cell describes the new basic MEC application architecture. It will be used in the rest of this titorial." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the second sprint of our skeleton of our MEC application:\n", " - Mec application setup\n", " - Get UU unicast provisioning information\n", " - Mec application termination\n", " \"\"\" \n", " global logger\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Setup the MEC application\n", " (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n", "\n", " # Any processing here\n", " logger.info('sandbox_name: ' + sandbox_name)\n", " logger.info('app_inst_id: ' + app_inst_id.id)\n", " if sub_id is not None:\n", " logger.info('sub_id: ' + sub_id)\n", " time.sleep(STABLE_TIME_OUT)\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create our second MEC application: how to use MEC Location Service\n", "\n", "After doing the logging, network scenario activation, MEC application instance creation steps, we are ready to exploit the MEC services exposed by the MEC Sandbox.\n", "\n", "In this clause, we use the following functionalities provided by MEC-013 LocationAPIs:\n", "- Getting UE location lookup (ETSI GS MEC 013 Clause 5.3.2 UE Location Lookup)\n", "- Subscribe to the UE Location changes (ETSI GS MEC 030 Clause 5.3.4)\n", "- Delete subscription\n", "\n", "### First step: Getting UE location lookup\n", "\n", "First of all, just create a function to request the location of a specific UE. The UE identifier depends of the network scenario which was activated. In our example, we are using the 4g-5g-macro-v2x network scenario and we will run our code with the high velocity UE (i.e., the cars such as 10.100.0.1).\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_ue_location(sandbox_name: str, ue: str) -> object:\n", " \"\"\"\n", " To retrieves the location information of an UE identified by its IPv4 address (e.g. 10.100.0.1)\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :return The HTTP response and the response status on success, None otherwise\n", " :see ETSI GS MEC 013 V3.1.1 (2023-01) Clause 5.3.2 UE Location Lookup\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> get_ue_location: ' + ue)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/location/v3/queries/users'\n", " logger.debug('get_ue_location: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " query_params = []\n", " query_params.append(('address', ue))\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " (result, status, headers) = service_api.call_api(url, 'GET', header_params=header_params, path_params=path_params, query_params=query_params, async_req=False)\n", " return (result, status)\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, status)\n", " # End of function get_ue_location" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's create our new MEC application. The expected result looks like:\n", "```json\n", "{\n", " \"userList\": {\n", " \"resourceURL\": \"https://mec-platform2.etsi.org/xxx/mep1/location/v3/queries/users\",\n", " \"user\": [\n", " {\n", " \"address\": \"10.100.0.1\",\n", " \"accessPointId\": \"4g-macro-cell-6\",\n", " \"zoneId\": \"zone03\",\n", " \"resourceURL\": \"https://mec-platform.etsi.org/xxx/mep1/location/v3/queries/users?address=10.100.0.1\",\n", " \"timestamp\": {\n", " \"nanoSeconds\": 0,\n", " \"seconds\": 1729245754\n", " },\n", " \"locationInfo\": {\n", " \"latitude\": [\n", " 43.73707\n", " ],\n", " \"longitude\": [\n", " 7.422555\n", " ],\n", " \"shape\": 2\n", " },\n", " \"civicInfo\": {\n", " \"country\": \"MC\"\n", " },\n", " \"relativeLocationInfo\": {\n", " \"X\": 630.37036,\n", " \"Y\": 261.92648,\n", " \"mapInfo\": {\n", " \"mapId\": \"324561243\",\n", " \"origin\": {\n", " \"latitude\": 43.7314,\n", " \"longitude\": 7.4202\n", " }\n", " }\n", " }\n", " }\n", " ]\n", " }\n", "}\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Putting everything together\n", "\n", "It is time now to create the our third iteration of our MEC application.\n", "\n", "The sequence is the following:\n", "- Login\n", "- Activate a network scenario\n", "- Getting UE location lookup (ETSI GS MEC 013 Clause 5.3.2)\n", "- Delete our application instance identifier\n", "- Deactivate a network scenario\n", "- Logout" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the second sprint of our skeleton of our MEC application:\n", " - Login\n", " - Activate a network scenario\n", " - Getting UE location lookup (ETSI GS MEC 013 Clause 5.3.2)\n", " - Delete our application instance identifier\n", " - Deactivate a network scenario\n", " - Logout\n", " \"\"\" \n", " global logger, nw_scenarios\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Setup the MEC application\n", " (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n", "\n", " # Getting UE location lookup\n", " result, status = get_ue_location(sandbox_name, '10.100.0.1')\n", " logger.info('UE location information: status: %s', str(status))\n", " if status != 200:\n", " logger.error('Failed to get UE location information')\n", " else:\n", " logger.info('UE location information: %s', str(result.data))\n", "\n", " # Any processing comes here\n", " logger.info('body: ' + str(result.data))\n", " data = json.loads(result.data)\n", " logger.info('UserList: ' + str(data))\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Second step: Adding a subscription\n", "\n", "The purpose here is to create a subscritpion when the UE (e.g. 10.100.0.1) is entering or leaving a PoA area (PoA is tanding for Piont of Access).\n", "Accorcding to ETSI GS MEC 013 V3.1.1 Clause 7.5.3.4 POST, the 'request method is a POST and the endpoint is '/location/v3/subscriptions/users/'.\n", "\n", "The cell below provides the code to create our subscription." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Notification support\n", "\n", "To recieve notification, our MEC application is required to support an HTTP listenener to recieve POST requests from the MEC Sandbox and reply to them: this is the notification mechanism.\n", "\n", "This minimalistic HTTP server will also be used to implement the endpoints provided by our MEC application service: see chapter [Our third MEC application: how to create a new MEC Services](#our_third_mec_application_how_to_create_a_new_mec_services).\n", "\n", "The class HTTPRequestHandler (see cell below) provides the suport of such mechanism.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class HTTPServer_RequestHandler(BaseHTTPRequestHandler):\n", " \"\"\"\n", " Minimal implementation of an HTTP server (http only).\n", " \"\"\"\n", "\n", " def do_GET(self):\n", " logger.info('>>> do_GET: ' + self.path)\n", "\n", " ctype = self.headers.get('content-type')\n", " logger.info('do_GET: ' + ctype)\n", "\n", " message = ''\n", " if self.path == '/sandbox/v1/statistic/v1/quantity':\n", " logger.info('do_GET: Computing statistic quantities for application MEC service')\n", " # TODO Add logit to our MEC service\n", " message = '{\"time\":20180124,\"avg\": 0.0,\"max\": 0.0,\"min\": 0.0,\"stddev\": 0.0 }'\n", " else:\n", " # Send error message\n", " message = '{\"title\":\"Unknown URI\",\"type\":\"do_GET.parser\",\"status\":404 }'\n", " logger.info('do_GET: message: ' + message)\n", " \n", " # Send response status code\n", " self.send_response(HTTPStatus.OK)\n", "\n", " # Send headers\n", " self.send_header('Content-type','text/plain; charset=utf-8')\n", " self.send_header('Content-length', str(len(message)))\n", " self.end_headers()\n", "\n", " # Write content as utf-8 data\n", " self.wfile.write(message.encode('utf8'))\n", " return\n", " # End of function do_GET\n", "\n", " def do_POST(self):\n", " global got_notification\n", "\n", " logger.info('>>> do_POST: ' + self.path)\n", "\n", " ctype = self.headers.get('content-type')\n", " logger.info('do_POST: ' + ctype)\n", "\n", " content_len = int(self.headers.get('Content-Length'))\n", " if content_len != 0:\n", " body = self.rfile.read(content_len).decode('utf8')\n", " logger.info('do_POST: body:' + str(type(body)))\n", " logger.info('do_POST: body:' + str(body))\n", " data = json.loads(str(body))\n", " logger.info('do_POST: data: %s', str(data))\n", "\n", " self.send_response(HTTPStatus.NOT_IMPLEMENTED)\n", " self.end_headers()\n", " got_notification = True\n", " return\n", " # End of function do_POST\n", "\n", " def do_PUT(self):\n", " logger.info('>>> do_PUT: ' + self.path)\n", "\n", " ctype = self.headers.get('content-type')\n", " logger.info('do_PUT: ' + ctype)\n", "\n", " self.send_response(HTTPStatus.NOT_IMPLEMENTED)\n", " self.end_headers()\n", " return\n", " # End of function do_PUT\n", "\n", " def do_PATCH(self):\n", " logger.info('>>> do_PATCH: ' + self.path)\n", "\n", " ctype = self.headers.get('content-type')\n", " logger.info('do_PATCH: ' + ctype)\n", " \n", " self.send_response(HTTPStatus.NOT_IMPLEMENTED)\n", " self.end_headers()\n", " return\n", " # End of function do_PATCH\n", " # End of class HTTPRequestHandler\n", "\n", " def do_DELETE(self):\n", " logger.info('>>> do_DELETE: ' + self.path)\n", "\n", " ctype = self.headers.get('content-type')\n", " logger.info('do_DELETE: ' + ctype)\n", " \n", " self.send_response(HTTPStatus.NOT_IMPLEMENTED)\n", " self.end_headers()\n", " return\n", " # End of function do_DELETE\n", " # End of class HTTPRequestHandler\n", "\n", "def start_notification_server() -> HTTPServer:\n", " \"\"\"\n", " Start the notification server\n", " :return The instance of the HTTP server\n", " \"\"\"\n", " global LISTENER_PORT, got_notification, logger\n", "\n", " logger.debug('>>> start_notification_server on port ' + str(LISTENER_PORT))\n", " got_notification = False\n", " server_address = ('', LISTENER_PORT)\n", " httpd = HTTPServer(server_address, HTTPServer_RequestHandler)\n", " # Start notification server in a daemonized thread\n", " notification_server = threading.Thread(target = httpd.serve_forever, name='notification_server')\n", " notification_server.daemon = True\n", " notification_server.start()\n", " return httpd\n", " # End of function HTTPRequestHandler\n", "\n", "def stop_notification_server(httpd: HTTPServer):\n", " \"\"\"\n", " Stop the notification server\n", " :param The instance of the HTTP server\n", " \"\"\"\n", " global logger\n", "\n", " logger.debug('>>> stop_notification_server')\n", " httpd.server_close()\n", " httpd=None\n", " # End of function HTTPRequestHandler\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def subscribe_for_user_events(sandbox_name: str) -> object:\n", " \"\"\"\n", " Subscriptions for notifications related to location.\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :return: The HTTP respone, the subscription ID and the resource URL on success, None otherwise\n", " :see ETSI GS MEC 013 V3.1.1 Clause 7.5 Resource: user_subscriptions\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> subscribe_for_user_events: ' + sandbox_name)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}//location/v3/subscriptions/users'\n", " logger.debug('subscribe_for_user_events: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " # Body\n", " dict_body = {}\n", " dict_body['subscriptionType'] = 'UserLocationEventSubscription'\n", " dict_body['callbackReference'] = CALLBACK_URI + '/mec013/v3/location' # FIXME To be parameterized\n", " dict_body['address'] = '10.100.0.1' # FIXME To be parameterized\n", " dict_body['clientCorrelator'] = \"12345\"\n", " dict_body['locationEventCriteria'] = [ \"ENTERING_AREA_EVENT\", \"LEAVING_AREA_EVENT\"]\n", " m = {}\n", " m[\"userLocationEventSubscription\"] = dict_body\n", " (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=m, async_req=False)\n", " return (result, extract_sub_id(headers['Location']), headers['Location'])\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, None, None)\n", " # End of function subscribe_for_user_are_event" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Putting everything together\n", "\n", "It is time now to create the our third iteration of our MEC application.\n", "\n", "The sequence is the following:\n", "- Login\n", "- Activate a network scenario\n", "- Create subscription\n", "- Wait for notification\n", "- Delete our application instance identifier\n", "- Deactivate a network scenario\n", "- Logout" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the second sprint of our skeleton of our MEC application:\n", " - Login\n", " - Activate a network scenario\n", " - Create subscription\n", " - Wait for notification\n", " - Delete our application instance identifier\n", " - Deactivate a network scenario\n", " - Logout\n", " \"\"\" \n", " global logger, nw_scenarios, got_notification\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Start notification server in a daemonized thread\n", " httpd = start_notification_server()\n", "\n", " # Setup the MEC application\n", " (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n", "\n", " # Create subscription\n", " result, status, headers = subscribe_for_user_events(sandbox_name)\n", " logger.info('UE location information: status: %s', str(status))\n", " if status != 200:\n", " logger.error('Failed to get UE location information')\n", " else:\n", " logger.info('UE location information: ' + str(result.data))\n", " \n", " # Getting UE location lookup\n", " result, status = get_ue_location(sandbox_name, '10.100.0.1')\n", " logger.info('UE location information: status: ' + str(status))\n", " if status != 200:\n", " logger.error('Failed to get UE location information')\n", " else:\n", " logger.info('UE location information: ' + str(result.data))\n", "\n", " # Wait for the notification\n", " counter = 0\n", " while not got_notification and counter < 30:\n", " logger.info('Waiting for subscription...')\n", " time.sleep(STABLE_TIME_OUT)\n", " counter += 1\n", " # End of 'while' statement\n", "\n", " # Stop notification server\n", " stop_notification_server(httpd)\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create our third MEC application: how to use V2X MEC Services\n", "\n", "After doing the logging, network scenario activation, MEC application instance creation steps, we are ready to exploit the MEC services exposed by the MEC Sandbox.\n", "\n", "In this clause, we use the following functionalities provided by MEC-030:\n", "- Getting UU unicast provisioning information (ETSI GS MEC 030 Clause 5.5.1)\n", "- Subscribe to the V2X message distribution server (ETSI GS MEC 030 Clause 5.5.7)\n", "- Delete subscription\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Getting UU unicast provisioning information\n", "\n", "The purpose is to query provisioning information for V2X communication over Uu unicast." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def send_uu_unicast_provisioning_info(sandbox_name: str, ecgi: str) -> object:\n", " \"\"\"\n", " Request for V2X communication over Uu unicast information\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param ecgi: Comma separated list of locations to identify a cell of a base station or a particular geographical area\n", " :return The Uu unicast provisioning information on success, None otherwise\n", " :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.1 Sending a request for provisioning information for V2X communication over Uu unicast\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> send_uu_unicast_provisioning_info: ' + ecgi)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/vis/v2/queries/uu_unicast_provisioning_info'\n", " logger.debug('send_uu_unicast_provisioning_info: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " query_params = []\n", " query_params.append(('location_info', 'ecgi,' + ecgi))\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " (result, status, header) = service_api.call_api(url, 'GET', header_params=header_params, path_params=path_params, query_params=query_params, async_req=False)\n", " return (result, status, header)\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, status, None)\n", " # End of function send_uu_unicast_provisioning_info" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's create the our second MEC application.\n", "The sequence is the following:\n", "- Mec application setup\n", "- Get UU unicast provisioning information\n", "- Mec application termination\n", "\n", "Note that the UU unicast provisioning information is returned as a JSON string. To de-serialized it into a Python data structure, please refer to clause [Subscribing to V2X message distribution server](#subscribing_to_v2x_message_distribution_server)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Putting everything together" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the second sprint of our skeleton of our MEC application:\n", " - Mec application setup\n", " - Get UU unicast provisioning information\n", " - Mec application termination\n", " \"\"\" \n", " global logger, nw_scenarios\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Setup the MEC application\n", " (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n", "\n", " # Get UU unicast provisioning information\n", " ecgi = \"268708941961,268711972264\" # List of ecgi spearated by a ','\n", " result, status, header = send_uu_unicast_provisioning_info(sandbox_name, ecgi)\n", " logger.info('UU unicast provisioning information: status: %s', str(status))\n", " if status != 200:\n", " logger.error('Failed to get UU unicast provisioning information')\n", " else:\n", " logger.info('UU unicast provisioning information: %s', str(result.data))\n", "\n", " # Any processing comes here\n", " logger.info('body: ' + str(result.data))\n", " data = json.loads(result.data)\n", " logger.info('data: ' + str(data))\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Subscribing to V2X message distribution server\n", "\n", "Here, we need to come back to the MEC 030 standard to create the type V2xMsgSubscription. It involves the creation of a set of basic types described below.\n", "\n", "These new type shall be 'JSON' serializable. It means that they have to implement the following methods:\n", "\n", "- to_dict()\n", "- to_str()\n", "- \\_\\_repr\\_\\_()\n", "- \\_\\_eq\\_\\_()\n", "- \\_\\_ne\\_\\_()\n", "\n", "**Reference:** ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.5.13 Type: LinkType\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class LinkType(object):\n", " swagger_types = {'href': 'str'}\n", " attribute_map = {'href': 'href'}\n", " def __init__(self, href=None): # noqa: E501\n", " self._href = None\n", " if href is not None:\n", " self._href = href\n", " @property\n", " def href(self):\n", " return self._href\n", " @href.setter\n", " def href(self, href):\n", " self._href = href\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(LinkType, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, LinkType):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class Links(object):\n", " swagger_types = {'self': 'LinkType'}\n", " attribute_map = {'self': 'self'}\n", " def __init__(self, self_=None): # noqa: E501\n", " self._self = None\n", " if self_ is not None:\n", " self._self = self_\n", " @property\n", " def self_(self):\n", " return self._self\n", " @self_.setter\n", " def self_(self, self_):\n", " self._self = self_\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(Links, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, Links):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class TimeStamp(object):\n", " swagger_types = {'seconds': 'int', 'nano_seconds': 'int'}\n", " attribute_map = {'seconds': 'seconds', 'nano_seconds': 'nanoSeconds'}\n", " def __init__(self, seconds=None, nano_seconds=None): # noqa: E501\n", " self._seconds = None\n", " self._nano_seconds = None\n", " if seconds is not None:\n", " self._seconds = seconds\n", " if nano_seconds is not None:\n", " self._nano_seconds = nano_seconds\n", " @property\n", " def seconds(self):\n", " return self._seconds\n", " @seconds.setter\n", " def seconds(self, seconds):\n", " self._seconds = seconds\n", " @property\n", " def nano_seconds(self):\n", " return self._nano_seconds\n", " @nano_seconds.setter\n", " def nano_seconds(self, nano_seconds):\n", " self._nano_seconds = nano_seconds\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(TimeStamp, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, TimeStamp):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The cell below implements the V2xMsgSubscription data structure.\"}\n", "Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.3.5 Type: V2xMsgSubscription\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class V2xMsgSubscription(object):\n", " swagger_types = {'links': 'Links', 'callback_reference': 'str', 'filter_criteria': 'V2xMsgSubscriptionFilterCriteria', 'request_test_notification': 'bool', 'subscription_type': 'str'}\n", " attribute_map = {'links': 'Links', 'callback_reference': 'callbackReference', 'filter_criteria': 'filterCriteria', 'request_test_notification': 'requestTestNotification', 'subscription_type': 'subscriptionType'}\n", " def __init__(self, links=None, callback_reference=None, filter_criteria=None, request_test_notification=None): # noqa: E501\n", " self._links = None\n", " self._callback_reference = None\n", " self._filter_criteria = None\n", " self._request_test_notification = None\n", " self._subscription_type = \"V2xMsgSubscription\"\n", " if links is not None:\n", " self.links = links\n", " if callback_reference is not None:\n", " self.callback_reference = callback_reference\n", " if filter_criteria is not None:\n", " self.filter_criteria = filter_criteria\n", " if request_test_notification is not None:\n", " self.request_test_notification = request_test_notification\n", " @property\n", " def links(self):\n", " return self._links\n", " @links.setter\n", " def links(self, links):\n", " self_.links = links\n", " @property\n", " def callback_reference(self):\n", " return self._callback_reference\n", " @callback_reference.setter\n", " def callback_reference(self, callback_reference):\n", " self._callback_reference = callback_reference\n", " @property\n", " def links(self):\n", " return self._links\n", " @links.setter\n", " def links(self, links):\n", " self._links = links\n", " @property\n", " def filter_criteria(self):\n", " return self._filter_criteria\n", " @filter_criteria.setter\n", " def filter_criteria(self, filter_criteria):\n", " self._filter_criteria = filter_criteria\n", " @property\n", " def request_test_notification(self):\n", " return self._request_test_notification\n", " @request_test_notification.setter\n", " def request_test_notification(self, request_test_notification):\n", " self._request_test_notification = request_test_notification\n", " @property\n", " def subscription_type(self):\n", " return self._subscription_type\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(V2xMsgSubscription, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, V2xMsgSubscription):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class V2xMsgSubscriptionFilterCriteria(object):\n", " swagger_types = {'msg_type': 'list[str]', 'std_organization': 'str'}\n", " attribute_map = {'msg_type': 'MsgType', 'std_organization': 'stdOrganization'}\n", " def __init__(self, msg_type, std_organization): # noqa: E501\n", " self._msg_type = None\n", " self._std_organization = None\n", " self.msg_type = msg_type\n", " self.std_organization = std_organization\n", " @property\n", " def msg_type(self):\n", " return self._msg_type\n", " @msg_type.setter\n", " def msg_type(self, msg_type):\n", " self._msg_type = msg_type\n", " @property\n", " def std_organization(self):\n", " return self._std_organization\n", " @std_organization.setter\n", " def std_organization(self, std_organization):\n", " self._std_organization = std_organization\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(V2xMsgSubscriptionFilterCriteria, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, V2xMsgSubscriptionFilterCriteria):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is the V2X message subscription function. The HTTP Request message body contains a 'JSON' serialized instance of the class V2xMsgSubscription.\n", "\n", "Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.10 V2X message interoperability\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def subscribe_v2x_message(sandbox_name: str, v2xMsgSubscription: V2xMsgSubscription) -> object:\n", " \"\"\"\n", " Request to subscribe the V2X messages which come from different vehicle OEMs or operators\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :param sub_id: The subscription identifier\n", " :return The HTTP response, the HTTP response status, the subscription identifier and the subscription URL on success, None otherwise\n", " :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.10 V2X message interoperability\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> subscribe_v2x_message: v2xMsgSubscription: ' + str(v2xMsgSubscription))\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/vis/v2/subscriptions'\n", " logger.debug('subscribe_v2x_message: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params=path_params, body=v2xMsgSubscription, async_req=False)\n", " return (result, status, extract_sub_id(headers['Location']), headers['Location'])\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, status, None)\n", " # End of function subscribe_v2x_message" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is a generic function to delete any MEC service subscription based on the subscription resource URL provided in the Location header of the subscription creation response." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def delete_mec_subscription(resource_url: str) -> int:\n", " \"\"\"\n", " Delete any existing MEC subscription\n", " :param resource_url: The subscription URL\n", " :return 0 on success, -1 otherwise\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> delete_mec_subscription: resource_url: ' + resource_url)\n", " try:\n", " res = urllib3.util.parse_url(resource_url)\n", " if res is None:\n", " logger.error('delete_mec_subscription: Failed to paerse URL')\n", " return -1\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " service_api.call_api(res.path, 'DELETE', header_params=header_params, async_req=False)\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return -1\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finaly, here is how to implement the V2X message subscription:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the second sprint of our skeleton of our MEC application:\n", " - Mec application setup\n", " - Subscribe to V2XMessage\n", " - Delete subscription\n", " - Mec application termination\n", " \"\"\" \n", " global MEC_PLTF, CALLBACK_URI, logger\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Setup the MEC application\n", " (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n", "\n", " # Create a V2X message subscritpion\n", " filter_criteria = V2xMsgSubscriptionFilterCriteria(['1', '2'], 'ETSI')\n", " v2xMsgSubscription = V2xMsgSubscription(callback_reference = CALLBACK_URI + '/vis/v2/v2x_msg_notification', filter_criteria = filter_criteria)\n", " result, status, v2x_sub_id, v2x_resource = subscribe_v2x_message(sandbox_name, v2xMsgSubscription)\n", " if status != 201:\n", " logger.error('Failed to create subscription')\n", "\n", " # Any processing here\n", " logger.info('body: ' + str(result.data))\n", " data = json.loads(result.data)\n", " logger.info('data: %s', str(data))\n", " logger.info('app_inst_id: ' + app_inst_id.id)\n", " if sub_id is not None:\n", " logger.info('sub_id: ' + sub_id)\n", " time.sleep(STABLE_TIME_OUT)\n", "\n", " # Delete the V2X message subscritpion\n", " delete_mec_subscription(v2x_resource)\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Putting everything together\n", "\n", "let's add a subscription the our previous MEC application.\n", "The sequence is the following:\n", "- Mec application setup\n", "- Start the notification server\n", "- Get UU unicast provisioning information\n", "- Add subscription\n", "- Stop the notification server\n", "- Mec application termination" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the third sprint of our skeleton of our MEC application:\n", " - Mec application setup\n", " - Start the notification server\n", " - Get UU unicast provisioning information\n", " - Add subscription\n", " - Stop the notification server\n", " - Mec application termination\n", " \"\"\" \n", " global CALLBACK_URI, logger\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Setup the MEC application\n", " (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n", "\n", " # Get UU unicast provisioning information\n", " ecgi = \"268708941961,268711972264\" # List of ecgi spearated by a ','\n", " result = send_uu_unicast_provisioning_info(sandbox_name, ecgi)\n", " if result is None:\n", " logger.error('Failed to get UU unicast provisioning information')\n", " else:\n", " logger.info('UU unicast provisioning information: ' + str(result))\n", "\n", " # Start notification server in a daemonized thread\n", " httpd = start_notification_server()\n", "\n", " # Create a V2X message subscritpion\n", " filter_criteria = V2xMsgSubscriptionFilterCriteria(['1', '2'], 'ETSI')\n", " v2xMsgSubscription = V2xMsgSubscription(callback_reference = CALLBACK_URI + '/vis/v2/v2x_msg_notification', filter_criteria = filter_criteria)\n", " result, status, v2x_sub_id, v2x_resource = subscribe_v2x_message(sandbox_name, v2xMsgSubscription)\n", " if status != 201:\n", " logger.error('Failed to create subscription')\n", "\n", " # Any processing here\n", " logger.info('body: ' + str(result.data))\n", " data = json.loads(result.data)\n", " logger.info('data: %s', str(data))\n", " logger.info('v2x_resource: ' + v2x_resource)\n", " if sub_id is not None:\n", " logger.info('sub_id: ' + sub_id)\n", " time.sleep(STABLE_TIME_OUT)\n", "\n", " # Stop notification server\n", " stop_notification_server(httpd)\n", "\n", " # Delete the V2X message subscritpion\n", " delete_mec_subscription(v2x_resource)\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create our fourth MEC application: how to use V2X QoS Prediction\n", "\n", "The MEC Sanbox V2X QoS Prediction is based on a grid Map of Monaco City where areas are categorized into residential, commercial and coastal. \n", "PoAs (Point Of Access) are categorized depending on where they lie in each grid. \n", "Each category has its own traffic load patterns which are pre-determin. The V2X QoS Prediction) will give more accurate values of RSRP and RSRQ based on the diurnal traffic patterns for each. The network scenario named \"4g-5g-v2x-macro\" must be used to get access to the V2X QoS Prediction feature.\n", "\n", "**Note:** The MEC Sanbox V2X QoS Prediction is enabled when the PredictedQos.routes.routeInfo.time attribute is present in the request (see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.2.6 Type: Preditecd QoS)\n", "\n", "Limitations:\n", "* The Location Granularity is currently not being validated as RSRP/RSRP calculations are done at the exact location provided by the user.\n", "* Time Granularity is currently not supported by the Prediction Function (design limitations of the minimal, emulated, pre-determined traffic prediction)\n", "* Upper limit on the number of elements (10 each) in the routes and routeInfo structures (arrays) to not affect user experience and respoy\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The table below describes the excepted Qos with and without the prediction model in deiffrent area and at different time.\n", "\n", "| Location | Time | PoA | Category | Status | QoS without Prediction Model | QoS with Prediction Model | Expected |\n", "| \t | (Unix time in sec) | Standard (GMT) | | | | RSRP | RSRQ | RSRP | RSRQ | |\n", "| ------------------- | ----------- | -------------- | ---------------- | ----------- | ------------- | -------------- | ----------- | ----------- | ----------- | -------- |\n", "| 43.729416,7.414853 | 1653295620 | 08:47:00 | 4g-macro-cell-2 | Residential | Congested | 63 | 21 | 60 | 20 | Yes |\n", "| 43.732456,7.418417 | 1653299220 | 09:47:00 | 4g-macro-cell-3 | Residential | Not Congested | 55 | 13 | 55 | 13 | Yes |\n", "| 43.73692,7.4209256 | 1653302820 | 10:47:00 | 4g-macro-cell-6 | Coastal | Not Congested | 68 | 26 | 68 | 26 | Yes |\n", "| 43.738007,7.4230533 | 1653305220 | 11:27:00 | 4g-macro-cell-6 | Coastal | Not Congested | 55 | 13 | 55 | 13 | Yes |\n", "| 43.739685,7.424881 | 1653308820 | 12:27:00 | 4g-macro-cell-7 | Commercial | Congested | 63 | 21 | 40 | 13 | Yes |\n", "| 43.74103,7.425759 | 1653312600 | 13:30:00 | 4g-macro-cell-7 | Commercial | Congested | 56 | 14 | 40 | 8 | Yes |\n", "| 43.74258,7.4277945 | 1653315900 | 14:25:00 | 4g-macro-cell-8 | Coastal | Congested | 59 | 17 | 47 | 13 | Yes |\n", "| 43.744972,7.4295254 | 1653318900 | 15:15:00 | 4g-macro-cell-8 | Coastal | Congested | 53 | 11 | 40 | 5 | Yes |\n", "| 43.74773,7.4320855 | 1653322500 | 16:15:00 | 5g-small-cell-14 | Commercial | Congested | 78 | 69 | 60 | 53 | Yes |\n", "| 43.749264,7.435894 | 1653329700 | 18:15:00 | 5g-small-cell-20 | Commercial | Not Congested | 84 | 72 | 84 | 72 | Yes |\t72\t84\t72\tYes\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The image below illustrate the table above: [here](images/V2X Predicted QoS.jpg).\n", "\n", "Here is an example of a basic V2X predicted QoS request based on two point in path at 8am in Residential area:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```json\n", "{\n", " \"predictionTarget\": \"SINGLE_UE_PREDICTION\",\n", " \"timeGranularity\": null,\n", " \"locationGranularity\": \"30\",\n", " \"routes\": [\n", " {\n", " \"routeInfo\": [\n", " {\n", " \"location\": {\n", " \"geoArea\": {\n", " \"latitude\": 43.729416,\n", " \"longitude\": 7.414853\n", " }\n", " },\n", " \"time\": {\n", " \"nanoSeconds\": 0,\n", " \"seconds\": 1653295620\n", " }\n", " },\n", " {\n", " \"location\": {\n", " \"geoArea\": {\n", " \"latitude\": 43.732456,\n", " \"longitude\": 7.418417\n", " }\n", " },\n", " \"time\": {\n", " \"nanoSeconds\": 0,\n", " \"seconds\": 1653299220\n", " ]\n", " }\n", " ]\n", "}\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let first create the required types before to prepare a V2X Predicted QoS request based on the JSON above.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Routes(object):\n", " swagger_types = {'_route_info': 'list[RouteInfo]'}\n", " attribute_map = {'_route_info': 'routeInfo'}\n", " def __init__(self, route_info:list): # noqa: E501\n", " self._route_info = None\n", " self.route_info = route_info\n", " @property\n", " def route_info(self):\n", " return self._route_info\n", " @route_info.setter\n", " def route_info(self, route_info):\n", " if route_info is None:\n", " raise ValueError(\"Invalid value for `route_info`, must not be `None`\") # noqa: E501\n", " self._route_info = route_info\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(Routes, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, Routes):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class LocationInfo(object):\n", " swagger_types = {'_ecgi': 'Ecgi', '_geo_area': 'LocationInfoGeoArea'}\n", " attribute_map = {'_ecgi': 'ecgi', '_geo_area': 'geoArea'}\n", " def __init__(self, ecgi=None, geo_area=None): # noqa: E501\n", " self._ecgi = None\n", " self._geo_area = None\n", " self.discriminator = None\n", " if ecgi is not None:\n", " self.ecgi = ecgi\n", " if geo_area is not None:\n", " self.geo_area = geo_area\n", " @property\n", " def ecgi(self):\n", " return self._ecgi\n", " @ecgi.setter\n", " def ecgi(self, ecgi):\n", " self._ecgi = ecgi\n", " @property\n", " def geo_area(self):\n", " return self._geo_area\n", " @geo_area.setter\n", " def geo_area(self, geo_area):\n", " self._geo_area = geo_area\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(LocationInfo, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, LocationInfo):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class RouteInfo(object):\n", " swagger_types = {'_location': 'LocationInfo', '_time_stamp': 'TimeStamp'}\n", " attribute_map = {'_location': 'location', '_time_stamp': 'time'}\n", " def __init__(self, location:LocationInfo, time_stamp=None): # noqa: E501\n", " self._location = None\n", " self.location = location\n", " self._time_stamp = None\n", " if time_stamp is not None:\n", " self.time_stamp = time_stamp\n", " @property\n", " def location(self):\n", " return self._location\n", " @location.setter\n", " def location(self, location):\n", " if location is None:\n", " raise ValueError(\"Invalid value for `location`, must not be `None`\") # noqa: E501\n", " self._location = location\n", " @property\n", " def time_stamp(self):\n", " return self._time_stamp\n", " @time_stamp.setter\n", " def time_stamp(self, time_stamp):\n", " self._time_stamp = time_stamp\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(RouteInfo, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, RouteInfo):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class LocationInfoGeoArea(object):\n", " swagger_types = {'_latitude': 'float', '_longitude': 'float'}\n", " attribute_map = {'_latitude': 'latitude', '_longitude': 'longitude'}\n", " def __init__(self, latitude, longitude): # noqa: E501\n", " self._latitude = None\n", " self._longitude = None\n", " self.discriminator = None\n", " if latitude is not None:\n", " self.latitude = latitude\n", " if longitude is not None:\n", " self.longitude = longitude\n", " @property\n", " def latitude(self):\n", " return self._latitude\n", " @latitude.setter\n", " def latitude(self, latitude):\n", " if latitude is None:\n", " raise ValueError(\"Invalid value for `latitude`, must not be `None`\") # noqa: E501\n", " self._latitude = latitude\n", " @property\n", " def longitude(self):\n", " return self._longitude\n", " @longitude.setter\n", " def longitude(self, longitude):\n", " if longitude is None:\n", " raise ValueError(\"Invalid value for `longitude`, must not be `None`\") # noqa: E501\n", " self._longitude = longitude\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(LocationInfoGeoArea, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, LocationInfoGeoArea):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class Ecgi(object):\n", " swagger_types = {'_cellId': 'CellId', '_plmn': 'Plmn'}\n", " attribute_map = {'_cellId': 'cellId', '_plmn': 'plmn'}\n", " def __init__(self, cellId=None, plmn=None): # noqa: E501\n", " self._cellId = None\n", " self._plmn = None\n", " self.discriminator = None\n", " if cellId is not None:\n", " self.cellId = cellId\n", " if plmn is not None:\n", " self.plmn = plmn\n", " @property\n", " def cellId(self):\n", " return self._cellId\n", " @cellId.setter\n", " def cellId(self, cellId):\n", " self._cellId = cellId\n", " @property\n", " def plmn(self):\n", " return self._plmn\n", " @plmn.setter\n", " def plmn(self, plmn):\n", " self._plmn = plmn\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(Ecgi, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, Ecgi):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class CellId(object):\n", " swagger_types = {'_cellId': 'str'}\n", " attribute_map = {'_cellId': 'cellId'}\n", " def __init__(self, cellId): # noqa: E501\n", " self._cellId = None\n", " self.cellId = cellId\n", " @property\n", " def cellId(self):\n", " return self._cellId\n", " @cellId.setter\n", " def cellId(self, cellId):\n", " if cellId is None:\n", " raise ValueError(\"Invalid value for `cellId`, must not be `None`\") # noqa: E501\n", " self._cellId = cellId\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(CellId, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, CellId):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class Plmn(object):\n", " swagger_types = {'_mcc': 'str', '_mnc': 'str'}\n", " attribute_map = {'_mcc': 'mcc', '_mnc': 'mnc'}\n", " def __init__(self, mcc:str, mnc:str): # noqa: E501\n", " self.discriminator = None\n", " self._mcc = None\n", " self._mnc = None\n", " self.mcc = mcc\n", " self.mnc = mnc\n", " @property\n", " def mcc(self):\n", " return self._mcc\n", " @mcc.setter\n", " def kpi_nmccame(self, mcc):\n", " if mcc is None:\n", " raise ValueError(\"Invalid value for `mcc`, must not be `None`\") # noqa: E501\n", " self._mcc = mcc\n", " @property\n", " def mnc(self):\n", " return self._mnc\n", " @mnc.setter\n", " def kpi_nmccame(self, mnc):\n", " if mnc is None:\n", " raise ValueError(\"Invalid value for `mnc`, must not be `None`\") # noqa: E501\n", " self._mnc = mnc\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(Plmn, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, Plmn):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class QosKpi(object):\n", " swagger_types = {'_kpi_name': 'str', '_kpi_value': 'str', '_confidence': 'str'}\n", " attribute_map = {'_kpi_name': 'kpiName', '_kpi_value': 'kpiValue', '_confidence': 'Confidence'}\n", " def __init__(self, kpi_name:str, kpi_value:str, confidence=None): # noqa: E501\n", " self._kpi_name = None\n", " self._kpi_value = None\n", " self._confidence = None\n", " self.kpi_name = kpi_name\n", " self.kpi_value = kpi_value\n", " if confidence is not None:\n", " self.confidences = confidence\n", " @property\n", " def kpi_name(self):\n", " return self._kpi_name\n", " @kpi_name.setter\n", " def kpi_name(self, kpi_name):\n", " if kpi_name is None:\n", " raise ValueError(\"Invalid value for `kpi_name`, must not be `None`\") # noqa: E501\n", " self._kpi_name = kpi_name\n", " @property\n", " def kpi_value(self):\n", " return self._kpi_value\n", " @kpi_value.setter\n", " def kpi_value(self, kpi_value):\n", " if kpi_value is None:\n", " raise ValueError(\"Invalid value for `kpi_value`, must not be `None`\") # noqa: E501\n", " self._kpi_value = kpi_value\n", " @property\n", " def confidence(self):\n", " return self._confidence\n", " @confidence.setter\n", " def confidence(self, confidence):\n", " self._confidence = confidence\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(QosKpi, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, QosKpi):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class Stream(object):\n", " swagger_types = {'_stream_id': 'str', '_qos_kpi': 'list[QosKpi]'}\n", " attribute_map = {'_stream_id': 'streamId', '_qos_kpi': 'qosKpi'}\n", " def __init__(self, stream_id:str, qos_kpi:list): # noqa: E501\n", " self._stream_id = None\n", " self._qos_kpi = None\n", " self.stream_id = stream_id\n", " self.qos_kpi = qos_kpi\n", " @property\n", " def stream_id(self):\n", " return self._stream_id\n", " @stream_id.setter\n", " def stream_id(self, stream_id):\n", " if stream_id is None:\n", " raise ValueError(\"Invalid value for `stream_id`, must not be `None`\") # noqa: E501\n", " self._stream_id = stream_id\n", " @property\n", " def qos_kpi(self):\n", " return self._qos_kpi\n", " @qos_kpi.setter\n", " def qos_kpi(self, qos_kpi):\n", " if qos_kpi is None:\n", " raise ValueError(\"Invalid value for `qos_kpi`, must not be `None`\") # noqa: E501\n", " self._qos_kpi = qos_kpi\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(Stream, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, Stream):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class Qos(object):\n", " swagger_types = {'_stream': 'list[Stream]'}\n", " attribute_map = {'_stream': 'stream'}\n", " def __init__(self, stream:list): # noqa: E501\n", " self._stream = None\n", " self.stream = stream\n", " @property\n", " def stream(self):\n", " return self._stream\n", " @stream.setter\n", " def stream(self, stream):\n", " if stream is None:\n", " raise ValueError(\"Invalid value for `stream`, must not be `None`\") # noqa: E501\n", " self._stream = stream\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(Qos, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, Qos):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class PredictedQos(object):\n", " swagger_types = {'_location_granularity': 'str', '_notice_period': 'TimeStamp', '_prediction_area': 'PredictionArea', '_prediction_target': 'str', '_qos': 'Qos', '_routes': 'list[Routes]', '_time_granularity': 'TimeStamp'}\n", " attribute_map = {'_location_granularity': 'locationGranularity', '_notice_period': 'noticePeriod', '_prediction_area': 'predictionArea', '_prediction_target': 'predictionTarget', '_qos': 'qos', '_routes': 'routes', '_time_granularity': 'timeGranularity'}\n", " def __init__(self, prediction_target:str, location_granularity:str, notice_period=None, time_granularity=None, prediction_area=None, routes=None, qos=None): # noqa: E501\n", " self._prediction_target = None\n", " self._time_granularity = None\n", " self._location_granularity = None\n", " self._notice_period = None\n", " self._prediction_area = None\n", " self._routes = None\n", " self._qos = None\n", " self._prediction_target = prediction_target\n", " if time_granularity is not None:\n", " self.time_granularity = time_granularity\n", " self.location_granularity = location_granularity\n", " if notice_period is not None:\n", " self.notice_period = notice_period\n", " if prediction_area is not None:\n", " self.prediction_area = prediction_area\n", " if routes is not None:\n", " self.routes = routes\n", " if qos is not None:\n", " self.qos = qos\n", " @property\n", " def prediction_target(self):\n", " return self._prediction_target\n", " @prediction_target.setter\n", " def prediction_target(self, prediction_target):\n", " if prediction_target is None:\n", " raise ValueError(\"Invalid value for `prediction_target`, must not be `None`\") # noqa: E501\n", " self._prediction_target = prediction_target\n", " @property\n", " def time_granularity(self):\n", " return self._time_granularity\n", " @time_granularity.setter\n", " def time_granularity(self, time_granularity):\n", " self._time_granularity = time_granularity\n", " @property\n", " def location_granularity(self):\n", " return self._location_granularity\n", " @location_granularity.setter\n", " def location_granularity(self, location_granularity):\n", " if location_granularity is None:\n", " raise ValueError(\"Invalid value for `location_granularity`, must not be `None`\") # noqa: E501\n", " self._location_granularity = location_granularity\n", " @property\n", " def notice_period(self):\n", " return self._notice_period\n", " @notice_period.setter\n", " def notice_period(self, notice_period):\n", " self._notice_period = notice_period\n", " @property\n", " def prediction_area(self):\n", " return self._prediction_area\n", " @prediction_area.setter\n", " def prediction_area(self, prediction_area):\n", " self._prediction_area = prediction_area\n", " @property\n", " def routes(self):\n", " return self._routes\n", " @routes.setter\n", " def routes(self, routes):\n", " self._routes = routes\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(PredictedQos, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, PredictedQos):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is the V2X Prediscted QoS function.\n", "\n", "Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.5 Sending a request for journey-specific QoS predictions\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_qos_prediction(sandbox_name: str) -> object:\n", " \"\"\"\n", " Request to predictede QoS\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :return The HTTP response, the HTTP response status and the HTTP response headers on success, None otherwise\n", " :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.5 Sending a request for journey-specific QoS predictions\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> get_qos_prediction: sandbox_name: ' + sandbox_name)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/vis/v2/provide_predicted_qos'\n", " logger.debug('send_uu_unicast_provisioning_info: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " # HTTP header `Accept`\n", " header_params = {}\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " # Body request\n", " loc1 = LocationInfo(geo_area=LocationInfoGeoArea(latitude=43.729416, longitude=7.414853))\n", " loc2 = LocationInfo(geo_area=LocationInfoGeoArea(latitude=43.732456, longitude=7.418417))\n", " routeInfo1 = RouteInfo(loc1, TimeStamp(nano_seconds=0, seconds=1653295620))\n", " routeInfo2 = RouteInfo(loc2, TimeStamp(nano_seconds=0, seconds=1653299220))\n", " routesInfo = [routeInfo1, routeInfo2]\n", " predictedQos = PredictedQos(prediction_target=\"SINGLE_UE_PREDICTION\", location_granularity=\"30\", routes=[Routes(routesInfo)])\n", " (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params=path_params, body=predictedQos, async_req=False)\n", " return (result, status, headers)\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, status, None)\n", " # End of function send_uu_unicast_provisioning_info" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Grouping all together provides the process_main funtion.. The sequence is the following:\n", "- Mec application setup\n", "- V2X QoS request\n", "- Mec application termination\n", "\n", "The expected response should be:\n", "- RSRP: 55\n", "- RSRQ: 13" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the fourth sprint of our skeleton of our MEC application:\n", " - Mec application setup\n", " - V2X QoS request\n", " - Mec application termination\n", " \"\"\" \n", " global logger\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Setup the MEC application\n", " sandbox_name, app_inst_id, sub_id = mec_app_setup()\n", "\n", " # QoS Prediction\n", " (result, status, headers) = get_qos_prediction(sandbox_name)\n", " if status != 200:\n", " logger.error('Failed to get UU unicast provisioning information')\n", " else:\n", " logger.info('UU unicast provisioning information: result: %s', str(result.data))\n", " \n", " # Any processing here\n", " logger.info('body: ' + str(result.data))\n", " data = json.loads(result.data)\n", " logger.info('data: %s', str(data))\n", " time.sleep(STABLE_TIME_OUT)\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Our fith MEC application: how to create a new MEC Services\n", "\n", "The purpose of this MEC Service application is to provide a custom MEC service that can be use by other MEC applications. For the purpose of this tutorial, our MEC service is simulating some complex calculation based on a set of data provided by the MEC use. \n", "We will use a second MEC application to exploit the features of our new MEC services.\n", "\n", "In this clause, we use the following functionalities provided by MEC-011:\n", "- Register a new service\n", "- Retrieve the list of the MEC services exposed by the MEC platform\n", "- Check that our new MEC service is present in the list of the MEC platform services\n", "- Execute a request to the MEC service\n", "- Delete the newly created service\n", "\n", "**Note:** We will use a second MEC application to exploit the features of our new MEC services.\n", "\n", "**Reference:** ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.4 Service availability update and new service registration\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Bases of the creation of a MEC service\n", "\n", "#### Introduction\n", "\n", "From the user perspective, a MEC service provides a set of endpoints which describe the interface of the MEC service (see [HTTP REST APIs \n", "concepts](https://blog.postman.com/rest-api-examples/)). These endpoints come usually with a set of data structures used by the one or more endpoints.\n", "\n", "Our service is really basic: it provide one endpoint:\n", "- GET /statistic/v1/quantity: it computes statistical quantities of a set of data (such as average, max, min, standard deviation)\n", "\n", "The body of this GET method is a list of datas:\n", "```json\n", "{\"time\":20180124,\"data1\":\"[1516752000,11590.6,11616.9,11590.4,11616.9,0.25202387,1516752060,11622.4,11651.7,11622.4,11644.6,1.03977764]\"}\n", "```\n", "\n", "The response body is the list of statistical quantities:\n", "```json\n", "{\"time\":20180124,\"avg\": 0.0,\"max\": 0.0,\"min\": 0.0,\"stddev\": 0.0 }\n", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### MEC mechanisms to create a new service\n", "\n", "As described in ETSI GS MEC 011 Clause 5.2.4 Service availability update and new service registration, to create a new MEC service, the following information is required:\n", "- A MEC Aplication instance: this is the MEC application providing the new MEC service (ETSI GS MEC 011 V3.2.1 Clause 8.2.6.3.4 POST)\n", "- A ServiceInfo instance which describe the MEC service (ETSI GS MEC 011 V3.2.1 Clause 8.1.2.2 Type: ServiceInfo)\n", "- As part of the ServiceInfo instance, a TransportInfo (ETSI GS MEC 011 V3.2.1 Clause 8.1.2.3 Type: TransportInfo) instance descibes the endpoints to use the MEC service\n", "\n", "When created and available, all the other MEC applications are notified about the existance of this MEC service." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### ServiceInfo data type\n", "\n", "The cell below describes the ServiceInfo data structure and its dependencies. It will be used to create our MEC servie.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class ServiceInfo(object):\n", " swagger_types = {'ser_instance_id': 'str','ser_name': 'str','ser_category': 'CategoryRef','version': 'str','state': 'str','transport_id': 'str','transport_info': 'TransportInfo','serializer': 'string','scope_of_locality': 'LocalityType','consumed_local_only': 'bool','is_local': 'bool','liveness_interval': 'int','links': 'ServiceInfoLinks'}\n", " attribute_map = {'ser_instance_id': 'serInstanceId','ser_name': 'serName','ser_category': 'serCategory','version': 'version','state': 'state','transport_id': 'transportId','transport_info': 'transportInfo','serializer': 'serializer','scope_of_locality': 'scopeOfLocality','consumed_local_only': 'consumedLocalOnly','is_local': 'isLocal','liveness_interval': 'livenessInterval','links': '_links'}\n", " def __init__(self, ser_instance_id=None, ser_name=None, ser_category=None, version=None, state=None, transport_id=None, transport_info=None, serializer=None, scope_of_locality=None, consumed_local_only=None, is_local=None, liveness_interval=None, links=None): # noqa: E501\n", " self._ser_instance_id = None\n", " self._ser_name = None\n", " self._ser_category = None\n", " self._version = None\n", " self._state = None\n", " self._transport_id = None\n", " self._transport_info = None\n", " self._serializer = None\n", " self._scope_of_locality = None\n", " self._consumed_local_only = None\n", " self._is_local = None\n", " self._liveness_interval = None\n", " self._links = None\n", " self.discriminator = None\n", " if ser_instance_id is not None:\n", " self.ser_instance_id = ser_instance_id\n", " self.ser_name = ser_name\n", " if ser_category is not None:\n", " self.ser_category = ser_category\n", " self.version = version\n", " self.state = state\n", " if transport_id is not None:\n", " self.transport_id = transport_id\n", " self.transport_info = transport_info\n", " self.serializer = serializer\n", " if scope_of_locality is not None:\n", " self.scope_of_locality = scope_of_locality\n", " if consumed_local_only is not None:\n", " self.consumed_local_only = consumed_local_only\n", " if is_local is not None:\n", " self.is_local = is_local\n", " if liveness_interval is not None:\n", " self.liveness_interval = liveness_interval\n", " if links is not None:\n", " self.links = links\n", " @property\n", " def ser_instance_id(self):\n", " return self._ser_instance_id\n", " @ser_instance_id.setter\n", " def ser_instance_id(self, ser_instance_id):\n", " self._ser_instance_id = ser_instance_id\n", " @property\n", " def ser_name(self):\n", " return self._ser_name\n", " @ser_name.setter\n", " def ser_name(self, ser_name):\n", " if ser_name is None:\n", " raise ValueError(\"Invalid value for `ser_name`, must not be `None`\") # noqa: E501\n", " self._ser_name = ser_name\n", " @property\n", " def ser_category(self):\n", " return self._ser_category\n", " @ser_category.setter\n", " def ser_category(self, ser_category):\n", " self._ser_category = ser_category\n", " @property\n", " def version(self):\n", " return self._version\n", " @version.setter\n", " def version(self, version):\n", " if version is None:\n", " raise ValueError(\"Invalid value for `version`, must not be `None`\") # noqa: E501\n", " self._version = version\n", " @property\n", " def state(self):\n", " return self._state\n", " @state.setter\n", " def state(self, state):\n", " if state is None:\n", " raise ValueError(\"Invalid value for `state`, must not be `None`\") # noqa: E501\n", " self._state = state\n", " @property\n", " def transport_id(self):\n", " return self._transport_id\n", " @transport_id.setter\n", " def transport_id(self, transport_id):\n", " self._transport_id = transport_id\n", " @property\n", " def transport_info(self):\n", " return self._transport_info\n", " @transport_info.setter\n", " def transport_info(self, transport_info):\n", " if transport_info is None:\n", " raise ValueError(\"Invalid value for `transport_info`, must not be `None`\") # noqa: E501\n", " self._transport_info = transport_info\n", " @property\n", " def serializer(self):\n", " return self._serializer\n", " @serializer.setter\n", " def serializer(self, serializer):\n", " if serializer is None:\n", " raise ValueError(\"Invalid value for `serializer`, must not be `None`\") # noqa: E501\n", " self._serializer = serializer\n", " @property\n", " def scope_of_locality(self):\n", " return self._scope_of_locality\n", " @scope_of_locality.setter\n", " def scope_of_locality(self, scope_of_locality):\n", " self._scope_of_locality = scope_of_locality\n", " @property\n", " def consumed_local_only(self):\n", " return self._consumed_local_only\n", " @consumed_local_only.setter\n", " def consumed_local_only(self, consumed_local_only):\n", " self._consumed_local_only = consumed_local_only\n", " @property\n", " def is_local(self):\n", " return self._is_local\n", " @is_local.setter\n", " def is_local(self, is_local):\n", " self._is_local = is_local\n", " @property\n", " def liveness_interval(self):\n", " return self._liveness_interval\n", " @liveness_interval.setter\n", " def liveness_interval(self, liveness_interval):\n", " self._liveness_interval = liveness_interval\n", " @property\n", " def links(self):\n", " return self._links\n", " @links.setter\n", " def links(self, links):\n", " self._links = links\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, \"to_dict\") else x,\n", " value\n", " ))\n", " elif hasattr(value, \"to_dict\"):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], \"to_dict\") else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(ServiceInfo, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, ServiceInfo):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class CategoryRef(object):\n", " swagger_types = {'href': 'str','id': 'str','name': 'str','version': 'str'}\n", " attribute_map = {'href': 'href','id': 'id','name': 'name','version': 'version'}\n", " def __init__(self, href=None, id=None, name=None, version=None): # noqa: E501\n", " self._href = None\n", " self._id = None\n", " self._name = None\n", " self._version = None\n", " self.discriminator = None\n", " self.href = href\n", " self.id = id\n", " self.name = name\n", " self.version = version\n", " @property\n", " def href(self):\n", " return self._href\n", " @href.setter\n", " def href(self, href):\n", " if href is None:\n", " raise ValueError(\"Invalid value for `href`, must not be `None`\") # noqa: E501\n", " self._href = href\n", " @property\n", " def id(self):\n", " return self._id\n", " @id.setter\n", " def id(self, id):\n", " if id is None:\n", " raise ValueError(\"Invalid value for `id`, must not be `None`\") # noqa: E501\n", " self._id = id\n", " @property\n", " def name(self):\n", " return self._name\n", " @name.setter\n", " def name(self, name):\n", " if name is None:\n", " raise ValueError(\"Invalid value for `name`, must not be `None`\") # noqa: E501\n", " self._name = name\n", " @property\n", " def version(self):\n", " return self._version\n", " @version.setter\n", " def version(self, version):\n", " if version is None:\n", " raise ValueError(\"Invalid value for `version`, must not be `None`\") # noqa: E501\n", " self._version = version\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, \"to_dict\") else x,\n", " value\n", " ))\n", " elif hasattr(value, \"to_dict\"):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], \"to_dict\") else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(CategoryRef, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, CategoryRef):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class TransportInfo(object):\n", " swagger_types = {\n", " 'id': 'str','name': 'str','description': 'str','type': 'str','protocol': 'str','version': 'str','endpoint': 'OneOfTransportInfoEndpoint','security': 'SecurityInfo','impl_specific_info': 'str'}\n", " attribute_map = {'id': 'id','name': 'name','description': 'description','type': 'type','protocol': 'protocol','version': 'version','endpoint': 'endpoint','security': 'security','impl_specific_info': 'implSpecificInfo'}\n", " def __init__(self, id=None, name=None, description=None, type=None, protocol=None, version=None, endpoint=None, security=None, impl_specific_info=None): # noqa: E501\n", " self._id = None\n", " self._name = None\n", " self._description = None\n", " self._type = None\n", " self._protocol = None\n", " self._version = None\n", " self._endpoint = None\n", " self._security = None\n", " self._impl_specific_info = None\n", " self.discriminator = None\n", " self.id = id\n", " self.name = name\n", " if description is not None:\n", " self.description = description\n", " self.type = type\n", " self.protocol = protocol\n", " self.version = version\n", " self.endpoint = endpoint\n", " self.security = security\n", " if impl_specific_info is not None:\n", " self.impl_specific_info = impl_specific_info\n", " @property\n", " def id(self):\n", " return self._id\n", " @id.setter\n", " def id(self, id):\n", " if id is None:\n", " raise ValueError(\"Invalid value for `id`, must not be `None`\") # noqa: E501\n", " self._id = id\n", " @property\n", " def name(self):\n", " return self._name\n", " @name.setter\n", " def name(self, name):\n", " if name is None:\n", " raise ValueError(\"Invalid value for `name`, must not be `None`\") # noqa: E501\n", " self._name = name\n", " @property\n", " def description(self):\n", " return self._description\n", " @description.setter\n", " def description(self, description):\n", " self._description = description\n", " @property\n", " def type(self):\n", " return self._type\n", " @type.setter\n", " def type(self, type):\n", " if type is None:\n", " raise ValueError(\"Invalid value for `type`, must not be `None`\") # noqa: E501\n", " self._type = type\n", " @property\n", " def protocol(self):\n", " return self._protocol\n", " @protocol.setter\n", " def protocol(self, protocol):\n", " if protocol is None:\n", " raise ValueError(\"Invalid value for `protocol`, must not be `None`\") # noqa: E501\n", " self._protocol = protocol\n", " @property\n", " def version(self):\n", " return self._version\n", " @version.setter\n", " def version(self, version):\n", " if version is None:\n", " raise ValueError(\"Invalid value for `version`, must not be `None`\") # noqa: E501\n", " self._version = version\n", " @property\n", " def endpoint(self):\n", " return self._endpoint\n", " @endpoint.setter\n", " def endpoint(self, endpoint):\n", " if endpoint is None:\n", " raise ValueError(\"Invalid value for `endpoint`, must not be `None`\") # noqa: E501\n", " self._endpoint = endpoint\n", " @property\n", " def security(self):\n", " return self._security\n", " @security.setter\n", " def security(self, security):\n", " if security is None:\n", " raise ValueError(\"Invalid value for `security`, must not be `None`\") # noqa: E501\n", " self._security = security\n", " @property\n", " def impl_specific_info(self):\n", " return self._impl_specific_info\n", " @impl_specific_info.setter\n", " def impl_specific_info(self, impl_specific_info):\n", " self._impl_specific_info = impl_specific_info\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, \"to_dict\") else x,\n", " value\n", " ))\n", " elif hasattr(value, \"to_dict\"):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], \"to_dict\") else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(TransportInfo, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, TransportInfo):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class SecurityInfo(object):\n", " swagger_types = {'o_auth2_info': 'SecurityInfoOAuth2Info'}\n", " attribute_map = {'o_auth2_info': 'oAuth2Info'}\n", " def __init__(self, o_auth2_info=None): # noqa: E501\n", " self._o_auth2_info = None\n", " self.discriminator = None\n", " if o_auth2_info is not None:\n", " self.o_auth2_info = o_auth2_info\n", " @property\n", " def o_auth2_info(self):\n", " return self._o_auth2_info\n", " @o_auth2_info.setter\n", " def o_auth2_info(self, o_auth2_info):\n", " self._o_auth2_info = o_auth2_info\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, \"to_dict\") else x,\n", " value\n", " ))\n", " elif hasattr(value, \"to_dict\"):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], \"to_dict\") else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(SecurityInfo, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, SecurityInfo):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class SecurityInfoOAuth2Info(object):\n", " swagger_types = {'grant_types': 'list[str]','token_endpoint': 'str'}\n", " attribute_map = {'grant_types': 'grantTypes','token_endpoint': 'tokenEndpoint'}\n", " def __init__(self, grant_types=None, token_endpoint=None): # noqa: E501\n", " self._grant_types = None\n", " self._token_endpoint = None\n", " self.discriminator = None\n", " self.grant_types = grant_types\n", " self.token_endpoint = token_endpoint\n", " @property\n", " def grant_types(self):\n", " return self._grant_types\n", " @grant_types.setter\n", " def grant_types(self, grant_types):\n", " if grant_types is None:\n", " raise ValueError(\"Invalid value for `grant_types`, must not be `None`\") # noqa: E501\n", " self._grant_types = grant_types\n", " @property\n", " def token_endpoint(self):\n", " return self._token_endpoint\n", " @token_endpoint.setter\n", " def token_endpoint(self, token_endpoint):\n", " if token_endpoint is None:\n", " raise ValueError(\"Invalid value for `token_endpoint`, must not be `None`\") # noqa: E501\n", " self._token_endpoint = token_endpoint\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, \"to_dict\") else x,\n", " value\n", " ))\n", " elif hasattr(value, \"to_dict\"):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], \"to_dict\") else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(SecurityInfoOAuth2Info, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, SecurityInfoOAuth2Info):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class OneOfTransportInfoEndpoint(object):\n", " swagger_types = {}\n", " attribute_map = {}\n", " def __init__(self): # noqa: E501\n", " self.discriminator = None\n", " @property\n", " def uris(self):\n", " return self._uris\n", " @uris.setter\n", " def uris(self, uris):\n", " self._uris = uris\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, \"to_dict\") else x,\n", " value\n", " ))\n", " elif hasattr(value, \"to_dict\"):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], \"to_dict\") else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(OneOfappInstanceIdServicesBody, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, OneOfTransportInfoEndpoint):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class EndPointInfoUris(object):\n", " swagger_types = {'_uris': 'list[str]'}\n", " attribute_map = {'_uris': 'uris'}\n", " def __init__(self, uris:list): # noqa: E501\n", " self._uris = None\n", " self.uris = uris\n", " @property\n", " def uris(self):\n", " return self._uris\n", " @uris.setter\n", " def uris(self, uris):\n", " self._uris = uris\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(EndPointInfoUris, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, EndPointInfoUris):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class EndPointInfoFqdn(object):\n", " swagger_types = {'_fqdn': 'list[str]'}\n", " attribute_map = {'_fqdn': 'fqdn'}\n", " def __init__(self, fqdn:list): # noqa: E501\n", " self._fqdn = None\n", " self.fqdn = fqdn\n", " @property\n", " def fqdn(self):\n", " return self._fqdn\n", " @fqdn.setter\n", " def fqdn(self, fqdn):\n", " self._fqdn = fqdn\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(EndPointInfoFqdn, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, EndPointInfoFqdn):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class EndPointInfoAddress(object):\n", " swagger_types = {'_host': 'str', '_port': 'int'}\n", " attribute_map = {'_host': 'host', '_port': 'port'}\n", " def __init__(self, host:str, port:list): # noqa: E501\n", " self._host = None\n", " self._port = None\n", " self.host = host\n", " self.port = port\n", " @property\n", " def host(self):\n", " return self.host\n", " @host.setter\n", " def host(self, host):\n", " self._host = host\n", " @property\n", " def port(self):\n", " return self._port\n", " @port.setter\n", " def port(self, port):\n", " self._port = qosport_kpi\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n", " value\n", " ))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(EndPointInfoAddress, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, EndPointInfoAddress):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class EndPointInfoAddresses(object):\n", " swagger_types = {'_addresses': 'list[EndPointInfoAddress]'}\n", " attribute_map = {'_addresses': 'addresses'}\n", " def __init__(self, addresses:list): # noqa: E501\n", " self._addresses = None\n", " self.addresses = addresses\n", " @property\n", " def addresses(self):\n", " return self._addresses\n", " @addresses.setter\n", " def addresses(self, addresses):\n", " self._addresses = addresses\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n", " elif hasattr(value, 'to_dict'):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], 'to_dict') else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(EndPointInfoAddresses, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, EndPointInfoAddresses):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n", "\n", "class EndPointInfoAlternative(object):\n", " swagger_types = {'alternative': 'object'}\n", " attribute_map = {'alternative': 'alternative'}\n", " def __init__(self, alternative=None): # noqa: E501\n", " self._alternative = None\n", " self.discriminator = None\n", " self.alternative = alternative\n", " @property\n", " def alternative(self):\n", " return self._alternative\n", " @alternative.setter\n", " def alternative(self, alternative):\n", " if alternative is None:\n", " raise ValueError(\"Invalid value for `alternative`, must not be `None`\") # noqa: E501\n", " self._alternative = alternative\n", " def to_dict(self):\n", " result = {}\n", " for attr, _ in six.iteritems(self.swagger_types):\n", " value = getattr(self, attr)\n", " if isinstance(value, list):\n", " result[attr] = list(map(\n", " lambda x: x.to_dict() if hasattr(x, \"to_dict\") else x,\n", " value\n", " ))\n", " elif hasattr(value, \"to_dict\"):\n", " result[attr] = value.to_dict()\n", " elif isinstance(value, dict):\n", " result[attr] = dict(map(\n", " lambda item: (item[0], item[1].to_dict())\n", " if hasattr(item[1], \"to_dict\") else item,\n", " value.items()\n", " ))\n", " else:\n", " result[attr] = value\n", " if issubclass(EndPointInfoAlternative, dict):\n", " for key, value in self.items():\n", " result[key] = value\n", " return result\n", " def to_str(self):\n", " return pprint.pformat(self.to_dict())\n", " def __repr__(self):\n", " return self.to_str()\n", " def __eq__(self, other):\n", " if not isinstance(other, EndPointInfoAlternative):\n", " return False\n", " return self.__dict__ == other.__dict__\n", " def __ne__(self, other):\n", " return not self == other\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an application MEC service\n", "\n", "The function below is creating an application MEC services\n", "\n", "**Note:** This is call application MEC service in opposition of a standardized MEC service exposed by the MEC Sanbox such as MEC 013, MEC 030...\n", "\n", "**Reference:** ETSI GS MEC 011 V3.2.1 (2024-04) Clause 8.2.6.3.4 POST" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_mec_service(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> object:\n", " \"\"\"\n", " Request to create a new application MEC service\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :param sub_id: The subscription identifier\n", " :return The HTTP response, the response status and the headers on success, None otherwise\n", " :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 8.2.6.3.4 POST\n", " \"\"\"\n", " global MEC_PLTF, CALLBACK_URI, logger, service_api\n", "\n", " logger.debug('>>> create_mec_service')\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/mec_service_mgmt/v1/applications/{app_inst_id}/services'\n", " logger.debug('create_mec_service: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " path_params['app_inst_id'] = app_inst_id.id\n", " # HTTP header `Accept`\n", " header_params = {}\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " # Body request\n", " callback = CALLBACK_URI + '/statistic/v1/quantity'\n", " transport_info = TransportInfo(id=str(uuid.uuid4()), name='HTTP REST API', type='REST_HTTP', protocol='HTTP', version='2.0', security=SecurityInfo(), endpoint=OneOfTransportInfoEndpoint())\n", " transport_info.endpoint.uris=[EndPointInfoUris(callback)]\n", " category_ref = CategoryRef(href=callback, id=str(uuid.uuid4()), name='Demo', version='1.0.0')\n", " appServiceInfo = ServiceInfo(ser_name='demo6 MEC Service', ser_category=category_ref, version='1.0.0', state='ACTIVE',transport_info=transport_info, serializer='JSON')\n", " (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params=path_params, body=appServiceInfo, async_req=False)\n", " return (result, status, headers['Location'])\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, status, None)\n", " # End of function create_mec_service" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Delete an application MEC service\n", "\n", "The function below is deleting an application MEC services.\n", "\n", "**Reference:** ETSI GS MEC 011 V3.2.1 (2024-04) Clause 8.2.7.3.5 DELETE" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def delete_mec_service(resource_url: str) -> int:\n", " \"\"\"\n", " Request to create a new application MEC service\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param app_inst_id: The MEC application instance identifier\n", " :param sub_id: The subscription identifier\n", " :return 0 on success, -1 otherwise\n", " :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 8.2.7.3.5 DELETE\n", " \"\"\"\n", " global logger\n", "\n", " logger.debug('>>> delete_mec_subscription: resource_url: ' + resource_url)\n", " try:\n", " res = urllib3.util.parse_url(resource_url)\n", " if res is None:\n", " logger.error('delete_mec_subscription: Failed to paerse URL')\n", " return -1\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " service_api.call_api(res.path, 'DELETE', header_params=header_params, async_req=False)\n", " return 0\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return -1\n", " # End of function delete_mec_service" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Putting everything together\n", "\n", "The sequence is the following:\n", "- Mec application setup\n", "- Create new MEC service\n", "- Send a request to our MEC service\n", "- Delete newly created MEC service\n", "- Mec application termination\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This is the fith sprint of our skeleton of our MEC application:\n", " - Mec application setup\n", " - Create new MEC service\n", " - Send a request to our MEC service\n", " - Delete newly created MEC service\n", " - Mec application termination\n", " \"\"\" \n", " global LISTENER_IP, LISTENER_PORT, CALLBACK_URI, logger\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Start notification server in a daemonized thread\n", " httpd = start_notification_server()\n", "\n", " # Setup the MEC application\n", " sandbox_name, app_inst_id, sub_id = mec_app_setup()\n", "\n", " # Create the MEC service\n", " result, status, mec_service_resource = create_mec_service(sandbox_name, app_inst_id)\n", " if status != 201:\n", " logger.error('Failed to create MEC service')\n", " else:\n", " logger.info('mec_service_resource: %s', mec_service_resource)\n", "\n", " # Send a request to our MEC service\n", " logger.info('body: ' + str(result.data))\n", " data = json.loads(result.data)\n", " logger.info('data: %s', str(data))\n", " logger.info('=============> Execute the command: curl --request GET %s/statistic/v1/quantity --header \"Accept: application/json\" --data \\'{\"time\":20180124,\"data1\":\"[1516752000,11590.6,11616.9,11590.4,11616.9,0.25202387,1516752060,11622.4,11651.7,11622.4,11644.6,1.03977764]\"}\\'', CALLBACK_URI)\n", " time.sleep(60)\n", "\n", " # Stop notification server\n", " stop_notification_server(httpd)\n", "\n", " # Delete the MEC servce\n", " delete_mec_service(mec_service_resource)\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Support of CAPIF (3GPP TS 29.222: 3rd Generation Partnership Project; Technical Specification Group Core Network and Terminals; Common API Framework for 3GPP Northbound APIs)\n", "\n", "MEC-CAPIF support is described in ETSI GS MEC 011 (V3.2.1) Clause 9 [4]\n", "\n", "The sample code below illustrates the usage of MEC-CAPI endpoints:\n", "- /service-apis/v1/allServiceAPIs\n", "- /published-apis/v1/{apfId}/service-apis\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Getting all MEC services\n", "\n", "The code below illustrates how to use CAPIF '/service-apis/v1/allServiceAPIs' endpoint to retrieve the complete list of available MEC services.\n", "\n", "**Reference:** ETSI GS MEC 011 (V3.2.1) Clause 9.2.3 Resource: All published service APIs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def capif_get_all_mec_services(sandbox_name: str):\n", " \"\"\"\n", " To retrieves the MEC services using CAPIF endpoint\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :return The HTTP response and the response status on success, None otherwise\n", " :see ETSI GS MEC 011 (V3.2.1) Clause 9\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> capif_get_all_mec_services: ' + sandbox_name)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/service-apis/v1/allServiceAPIs'\n", " logger.debug('capif_get_all_mec_services: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " (result, status, headers) = service_api.call_api(url, 'GET', header_params=header_params, path_params=path_params, async_req=False)\n", " return (result, status)\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, status)\n", " # End of capif_get_all_mec_services function" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Getting MEC services for a specified MEC application instance ID\n", "\n", "The code below illustrates how to use CAPIF '/published-apis/v1/{apfId}/service-apis' endpoint to retrieve the complete list of MEC services for a specified MEC application instance ID (apfid).\n", "\n", "**Reference:** ETSI GS MEC 011 (V3.2.1) Table 9.2.4.2-1: Profiling of the URI variables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def capif_get_mec_services(sandbox_name: str, apfId: str):\n", " \"\"\"\n", " To retrieves the MEC services using CAPIF endpoint\n", " :param sandbox_name: The MEC Sandbox instance to use\n", " :param apfId: The identifier of the entity that registers the service API\n", " :return The HTTP response and the response status on success, None otherwise\n", " :see ETSI GS MEC 011 (V3.2.1) Clause 9\n", " \"\"\"\n", " global MEC_PLTF, logger, service_api\n", "\n", " logger.debug('>>> capif_get_all_mec_services: ' + sandbox_name)\n", " try:\n", " url = '/{sandbox_name}/{mec_pltf}/published-apis/v1/{apfId}/service-apis'\n", " logger.debug('capif_get_mec_services: url: ' + url)\n", " path_params = {}\n", " path_params['sandbox_name'] = sandbox_name\n", " path_params['mec_pltf'] = MEC_PLTF\n", " path_params['apfId'] = apfId\n", " header_params = {}\n", " # HTTP header `Accept`\n", " header_params['Accept'] = 'application/json' # noqa: E501\n", " # HTTP header `Content-Type`\n", " header_params['Content-Type'] = 'application/json' # noqa: E501\n", " (result, status, headers) = service_api.call_api(url, 'GET', header_params=header_params, path_params=path_params, async_req=False)\n", " return (result, status)\n", " except ApiException as e:\n", " logger.error('Exception when calling call_api: %s\\n' % e)\n", " return (None, status)\n", " # End of capif_get_mec_services function" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Putting evrythong together\n", "\n", "It is time now to create the our third iteration of our MEC application.\n", "\n", "The sequence is the following:\n", "\n", "- Login\n", "- Activate a network scenario\n", "- Create an application instance identifier\n", "- Retrieve all MEC services (/service-apis/v1/allServiceAPIs)\n", "- Retrieve MEC services for the newly created application instance identifier\n", "- Delete our application instance identifier\n", "- Deactivate a network scenario\n", "- Logout" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%script echo skipping\n", "# Uncomment the ;line above to skip execution of this cell\n", "def process_main():\n", " \"\"\"\n", " This code illustrates the usage of MEC-CAPI endpoints:\n", " - Login\n", " - Activate a network scenario\n", " - Create an application instance identifier\n", " - Retrieve all MEC services (/service-apis/v1/allServiceAPIs)\n", " - Retrieve MEC services for the newly created application instance identifier\n", " - Delete our application instance identifier\n", " - Deactivate a network scenario\n", " - Logout\n", " \"\"\" \n", " global LISTENER_IP, LISTENER_PORT, CALLBACK_URI, logger\n", "\n", " logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " logger.debug('\\t pwd= ' + os.getcwd())\n", "\n", " # Setup the MEC application\n", " sandbox_name, app_inst_id, sub_id = mec_app_setup()\n", "\n", " # Get the list of the MEC sevices using CAPIF endpoints\n", " result, status = capif_get_all_mec_services(sandbox_name)\n", " if status != 200:\n", " logger.error('Failed to get the list of MEC services using CAPIF endpoint')\n", " else:\n", " logger.info('capif_get_all_mec_services: ' + str(result.data))\n", "\n", " # Get the list of the MEC sevices for our AppInstanceId using CAPIF endpoints\n", " result, status = capif_get_mec_services(sandbox_name, app_inst_id.id)\n", " if status != 200:\n", " logger.error('Failed to get the list of MEC services for our AppInstanceId using CAPIF endpoint')\n", " else:\n", " logger.info('capif_get_mec_services: ' + str(result.data))\n", "\n", " # In the previous request, the list of services for our AppInstanceId is empty.\n", " # Let's create a new MEC service\n", " result, status, mec_service_resource = create_mec_service(sandbox_name, app_inst_id)\n", " if status != 201:\n", " logger.error('Failed to create MEC service')\n", " else:\n", " logger.info('mec_service_resource: %s', mec_service_resource)\n", "\n", " # Now, we can send a new request for services for our MEC application\n", " # Get the list of the MEC sevices for our AppInstanceId using CAPIF endpoints\n", " result, status = capif_get_mec_services(sandbox_name, app_inst_id.id)\n", " if status != 200:\n", " logger.error('Failed to get the list of MEC services for our AppInstanceId using CAPIF endpoint')\n", " else:\n", " logger.info('capif_get_mec_services: ' + str(result.data))\n", " data = json.loads(result.data)\n", " logger.info('serviceAPIDescriptions: ' + str(data))\n", " # b'{\"serviceAPIDescriptions\":[{\"apiName\":\"demo6 MEC Service\",\"apiId\":\"6172de7d-ed42-4ba2-947a-a0db7cc8915e\",\"aefProfiles\":[{\"aefId\":\"3daa9b79-ba81-4a74-9668-df710dc68020\",\"versions\":[\"1.0.0\"],\"interfaceDescriptions\":{\"uris\":null,\"fqdn\":null,\"addresses\":null,\"alternative\":null},\"vendorSpecific-urn:etsi:mec:capifext:transport-info\":{\"name\":\"HTTP REST API\",\"type\":\"REST_HTTP\",\"protocol\":\"HTTP\",\"version\":\"2.0\",\"security\":{}}}],\"vendorSpecific-urn:etsi:mec:capifext:service-info\":{\"serializer\":\"JSON\",\"state\":\"ACTIVE\",\"scopeOfLocality\":\"MEC_HOST\",\"consumedLocalOnly\":true,\"isLocal\":true,\"category\":{\"href\":\"http://mec-platform2.etsi.org:31111/sandbox/v1/statistic/v1/quantity\",\"id\":\"98a5039d-2b7c-49b1-b3b1-45a6ebad2ea4\",\"name\":\"Demo\",\"version\":\"1.0.0\"}}}]}'\n", " logger.info('apiId: ' + str(data['serviceAPIDescriptions'][0]['apiId'] + ' matching MEC serInstanceID'))\n", " logger.info('aefId: ' + str(data['serviceAPIDescriptions'][0]['aefProfiles'][0]['aefId'] + ' matching MEC TransportInfo.Id\"'))\n", "\n", " # Delete the MEC servce\n", " delete_mec_service(mec_service_resource)\n", "\n", " # Terminate the MEC application\n", " mec_app_termination(sandbox_name, app_inst_id, sub_id)\n", "\n", " logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n", " # End of function process_main\n", "\n", "if __name__ == '__main__':\n", " process_main()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Annexes\n", "\n", "## Annex A: How to use an existing MEC sandbox instance\n", "\n", "This case is used when the MEC Sandbox API is not used. The procedure is the following:\n", "- Log to the MEC Sandbox using a WEB browser\n", "- Select a network scenario\n", "- Create a new application instance\n", "\n", "When it is done, the newly created application instance is used by your application when required. This application instance is usually passed to your application in the command line or using a configuration file\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Bibliography\n", "\n", "1. ETSI GS MEC 002 (V2.2.1) (01-2022): \"Multi-access Edge Computing (MEC); Phase 2: Use Cases and Requirements\".\n", "2. ETSI GS MEC 010-1 (V1.1.1) (10-2017): \"Mobile Edge Computing (MEC); Mobile Edge Management; Part 1: System, host and platform management\".\n", "3. ETSI GS MEC 010-2 (V2.2.1) (02-2022): \"Multi-access Edge Computing (MEC); MEC Management; Part 2: Application lifecycle, rules and requirements management\".\n", "4. ETSI GS MEC 011 (V3.2.1) (09-2022): \"Multi-access Edge Computing (MEC); Edge Platform Application Enablement\".\n", "5. ETSI GS MEC 012 (V2.2.1) (02-2022): \"Multi-access Edge Computing (MEC); Radio Network Information API\".\n", "6. ETSI GS MEC 013 (V2.2.1) (01-2022): \"Multi-access Edge Computing (MEC); Location API\".\n", "7. ETSI GS MEC 014 (V2.1.1) (03-2021): \"Multi-access Edge Computing (MEC); UE Identity API\".\n", "8. ETSI GS MEC 015 (V2.1.1) (06-2020): \"Multi-Access Edge Computing (MEC); Traffic Management APIs\".\n", "9. ETSI GS MEC 016 (V2.2.1) (04-2020): \"Multi-access Edge Computing (MEC); Device application interface\".\n", "10. ETSI GS MEC 021 (V2.2.1) (02-2022): \"Multi-access Edge Computing (MEC); Application Mobility Service API\".\n", "11. ETSI GS MEC 028 (V2.3.1) (07-2022): \"Multi-access Edge Computing (MEC); WLAN Access Information API\".\n", "12. ETSI GS MEC 029 (V2.2.1) (01-2022): \"Multi-access Edge Computing (MEC); Fixed Access Information API\".\n", "13. ETSI GS MEC 030 (V3.2.1) (05-2022): \"Multi-access Edge Computing (MEC); V2X Information Service API\".\n", "14. ETSI GR MEC-DEC 025 (V2.1.1) (06-2019): \"Multi-access Edge Computing (MEC); MEC Testing Framework\".\n", "15. ETSI GR MEC 001 (V3.1.1) (01-2022): \"Multi-access Edge Computing (MEC); Terminology\".\n", "16. ETSI GR MEC 003 (V3.1.1): Multi-access Edge Computing (MEC);\n", "17. 3GPP TS 29.222: 3rd Generation Partnership Project; Technical Specification Group Core Network and Terminals; Common API Framework for 3GPP Northbound APIs\n", "Framework and Reference Architecture\n", "18. [The Wiki MEC web site](https://www.etsi.org/technologies/multi-access-edge-computing)\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.10" } }, "nbformat": 4, "nbformat_minor": 4 }