Commit 6edc4eea authored by Yann Garcia's avatar Yann Garcia
Browse files

Big fixed in V2X messages distribution

parent 6105b5a7
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ set -e
set +x

docker pull golang
docker run --rm --expose 80 --expose 443 -p 80:80 -p 443:443 -it -v$PWD:/opt/local/etsi/demo6  -v$HOME/var:/opt/local/etsi/var golang bash -c "cd /opt/local/etsi/demo6/bin && ./demo6 ../app_instance.yaml"
docker run --rm --expose 80 --expose 443 --publish 80:80 --publish 443:443 -it -v$PWD:/opt/local/etsi/demo6  -v$HOME/var:/opt/local/etsi/var golang bash -c "cd /opt/local/etsi/demo6/bin && ./demo6 ../app_instance.yaml"

echo ""
echo ">>> Done"
+2 −2
Original line number Diff line number Diff line
%% Cell type:markdown id: tags:

# How to develop a MEC application using the MEC Sandbox HTTP REST API
This tutorial introduces the step by step procedure to create a basic MEC appcation following ETSI MEC standards.
It uses the ETSI MEC Sandbox simulator.

<div class="alert alert-block alert-danger">
    <b>Note:</b> These source code examples are simplified and ignore return codes and error checks to a large extent. We do this to highlight how to use the MEC Sandbox API and the different MEC satndards and reduce unrelated code.
A real-world application will of course properly check every return value and exit correctly at the first serious error.
</div>

%% Cell type:markdown id: tags:

## What is a MEC application

See [The Wiki MEC web site](https://www.etsi.org/technologies/multi-access-edge-computing)

%% Cell type:markdown id: tags:

## The basics of developing a MEC application

The developement of a MEC application follows a strict process in order to access the ETSI MEC services and provides valuable services to the customers.
Mainly, this process can be split in several steps:
1. Global initializations (constant, variables...)
2. Create a new instance of a MEC Sandbox (Note that using an existing one could be a solution too (see Annex A)
2. Create a new instance of a MEC Sandbox (Note that using an existing one could be a solution too (see Annex A))
3. Activate a network scenario in order to access the ETSI MEC services
4. Create a new application identifier
5. Register our MEC application and subscribe to service termination (see MEC 011)
6. Use MEC services in order to provide valuable services to the customers
    6.1. Apply MEC services required subscriptions (e.g. MEC 013 location subscription)
7. Terminate the MEC application
    7.1. Remove MEC services subscriptions
    7.2. Deactivate the current network scenario
    7.3. Delete the instance of the MEC Sandbox
8. Release all the MEC application resources

**Note:** Several application identifier can be created to address several MEC application.


## Use the MEC Sandbox HTTP REST API models and code

The MEC sandbox provides a piece of code (the python sub) that shall be used to develop the MEC application and interact with the MEC Sandbox. This piece of code mainly contains swagger models to serialize/deserialize JSON data structures and HTTP REST API call functions.
The openApi file is availabe [here](https://labs.etsi.org/rep/mec/etsi-mec-sandbox/-/blob/STF678_Task1_2_3_4/go-apps/meep-sandbox-api/api/swagger.yaml) and the [Swagger editor](https://editor-next.swagger.io/) is used to generate the python sub.
The openApi file is availabe [here](https://labs.etsi.org/rep/mec/etsi-mec-sandbox/-/blob/STF678_Task1_2_3_4/go-apps/meep-sandbox-api/api/swagger.yaml) and the [Swagger editor](https://editor-next.swagger.io/) is used to generate the python stub.

The project architecture is describe [here](images/project_arch.jpg).

The sandbox_api folder contains the python implementation of the HTTP REST API definitions introduced by the openApi [file](https://labs.etsi.org/rep/mec/etsi-mec-sandbox/-/blob/STF678_Task1_2_3_4/go-apps/meep-sandbox-api/api/swagger.yaml).
The model folder contains the python implementation of the data type definitions introduced by the openApi [file](https://labs.etsi.org/rep/mec/etsi-mec-sandbox/-/blob/STF678_Task1_2_3_4/go-apps/meep-sandbox-api/api/swagger.yaml).

%% Cell type:markdown id: tags:

Before going to create our MEC application skeleton, the following steps shall be done:
1) Change the working directory (see the project architecture)

%% Cell type:code id: tags:

``` python
import os
os.chdir(os.path.join(os.getcwd(), '../mecapp'))
print(os.getcwd())
```

%% Cell type:markdown id: tags:

2) Apply the python imports

%% Cell type:code id: tags:

``` python
from __future__ import division # Import floating-point division (1/4=0.25) instead of Euclidian division (1/4=0)

import os
import sys
import re
import logging
import threading
import time
import json
import uuid

import pprint

import six

import swagger_client
from swagger_client.rest import ApiException

from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer

try:
    import urllib3
except ImportError:
    raise ImportError('Swagger python client requires urllib3.')
```

%% Cell type:markdown id: tags:

3) Initialize of the global constants (cell 3)

%% Cell type:code id: tags:

``` python
MEC_SANDBOX_URL     = 'https://mec-platform2.etsi.org'                       # MEC Sandbox host/base URL
MEC_SANDBOX_API_URL = 'https://mec-platform2.etsi.org/sandbox-api/v1'        # MEC Sandbox API host/base URL
PROVIDER            = 'Jupyter2024'                                          # Login provider value - To skip authorization: 'github'
MEC_PLTF            = 'mep1'                                                 # MEC plateform name. Linked to the network scenario
LOGGER_FORMAT       = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Logging format
STABLE_TIME_OUT     = 10                                                     # Timer to wait for MEC Sndbox reaches its stable state (K8S pods in running state)
LOGIN_TIMEOUT       = 3 #30                                                  # Timer to wait for user to authorize from GITHUB
LISTENER_IP         = '0.0.0.0'                                              # Listener IPv4 address for notification callback calls
LISTENER_PORT       = 36001                                                  # Listener IPv4 port for notification callback calls
CALLBACK_URI        = 'http://yanngarcia.ddns.net:36001/sandbox/v1'
                                                                             #'https://yanngarcia.ddns.net:' + str(LISTENER_PORT) + '/jupyter/sandbox/demo6/v1/'
```

%% Cell type:markdown id: tags:

4) Setup the logger instance and the HTTP REST API (cell 4)

%% Cell type:code id: tags:

``` python
# Initialize the logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logging.basicConfig(filename='/tmp/' + time.strftime('%Y%m%d-%H%M%S') + '.log')
l = logging.StreamHandler()
l.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(l)

# Setup the HTTP REST API configuration to be used to send request to MEC Sandbox API
configuration               = swagger_client.Configuration()
configuration.host          = MEC_SANDBOX_API_URL
configuration.verify_ssl    = True
configuration.debug         = True
configuration.logger_format = LOGGER_FORMAT
# Create an instance of ApiClient
sandbox_api = swagger_client.ApiClient(configuration, 'Content-Type', 'application/json')

# Setup the HTTP REST API configuration to be used to send request to MEC Services
configuration1               = swagger_client.Configuration()
configuration1.host          = MEC_SANDBOX_URL
configuration1.verify_ssl    = True
configuration1.debug         = True
configuration1.logger_format = LOGGER_FORMAT
# Create an instance of ApiClient
service_api = swagger_client.ApiClient(configuration1, 'Content-Type', 'application/json')
```

%% Cell type:markdown id: tags:

5) Setup the global variables (cell 5)

%% Cell type:code id: tags:

``` python
# Initialize the global variables
nw_scenarios    = []   # The list of available network scenarios
nw_scenario_idx = -1   # The network scenario idx to activate (deactivate)
app_inst_id     = None # The requested application instance identifier
```

%% Cell type:markdown id: tags:

## Create our first MEC application

The first step to develop a MEC application is to create the application skeleton which contains the minimum steps below:

- Login to instanciate a MEC Sandbox
- Logout to delete a existing MEC Sandbox

%% Cell type:markdown id: tags:

### First steps: the login/logout

Here is the first squeleton with the following sequence:
- Login
- Print sandbox identifier
- Logout
- Check that logout is effective

%% Cell type:markdown id: tags:

#### The login function

To log to the MEC Sandbox,
the login process is done in two step. In step 1, a user code is requested to GITHUB. In step 2, the user has to enter this user code to https://github.com/login/device and proceed to the authorization.
Please, pay attention to the log '=======================> DO AUTHORIZATION WITH CODE :' which indicates you the user code to use for the authorization.

It uses the HTTP POST request with the URL 'POST /sandbox-sandbox_api/v1/login?provide=github' (see PROVIDER constant).

%% Cell type:code id: tags:

``` python
# Login
def process_login() -> str:
    """
    Authenticate and create a new MEC Sandbox instance.
    :return: The sandbox instance identifier on success, None otherwise
    """
    global PROVIDER, logger

    logger.debug('>>> process_login')

    try:
        auth = swagger_client.AuthorizationApi(sandbox_api)
        oauth = auth.login(PROVIDER, async_req = False)
        logger.debug('process_login (step1): oauth: ' + str(oauth))
        # Wait for the MEC Sandbox is running
        logger.debug('=======================> DO AUTHORIZATION WITH CODE : ' + oauth.user_code)
        logger.debug('=======================> DO AUTHORIZATION HERE :      ' + oauth.verification_uri)
        if oauth.verification_uri == "":
            time.sleep(LOGIN_TIMEOUT) # Skip scecurity, wait for a few seconds
        else:
            time.sleep(10 * LOGIN_TIMEOUT) # Wait for Authirization from user side
        namespace = auth.get_namespace(oauth.user_code)
        logger.debug('process_login (step2): result: ' + str(namespace))
        return namespace.sandbox_name
    except ApiException as e:
        logger.error('Exception when calling AuthorizationApi->login: %s\n' % e)

    return None
    # End of function process_login
```

%% Cell type:markdown id: tags:

#### The logout function

It uses the HTTP POST request with the URL 'POST /sandbox-sandbox_api/v1/logout?sandbox_name={sandbox_name}'.

%% Cell type:code id: tags:

``` python
# Logout
def process_logout(sandbox_name: str) -> int:
    """
    Delete the specified MEC Sandbox instance.
    :param sandbox_name: The MEC Sandbox to delete
    :return: 0 on success, -1 otherwise
    """
    global logger

    logger.debug('>>> process_logout: sandbox=' + sandbox_name)

    try:
        auth = swagger_client.AuthorizationApi(sandbox_api)
        result = auth.logout(sandbox_name, async_req = False)  # noqa: E501
        return 0
    except ApiException as e:
        logger.error('Exception when calling AuthorizationApi->logout: %s\n' % e)
    return -1
    # End of function process_logout
```

%% Cell type:markdown id: tags:

Now, let put in action our Login/Logout functions:

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the skeleton of our MEC application:
        - Login
        - Print sandbox identifier
        - Logout
        - Check that logout is effective
    This skeleton will be the bas of the next sprint in order to achieve a full implementation of a MEC application
    """
    global logger

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Login
    sandbox = process_login()
    if sandbox is None:
        return

    # Print sandbox identifier
    logger.info('Sandbox created: ' + sandbox)
    # Wait for the MEC Sandbox is running
    time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Logout
    process_logout(sandbox)

    # Check that logout is effective
    logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

### Second step: Retrieve the list of network scenarios

Let's go futhur and see how we can retrieve the list of the network scenarios available in order to activate one of them and access the MEC services exposed such as MEC 013 or MEC 030.

The sequence will be:
- Login
- Print sandbox identifier
- Print available network scenarios
- Logout
- Check that logout is effective

The login and logout functions are described in cell 3 and 4.

To retrieve the list of the network scenarios, let's create a new function called 'get_network_scenarios'. It uses the HTTP GET request with the URL '/sandbox-sandbox_api/v1/sandboxNetworkScenarios?sandbox_name={sandbox_name}'.

%% Cell type:code id: tags:

``` python
def get_network_scenarios(sandbox_name: str) -> list:
    """
    Retrieve the list of the available network scenarios.
    :param sandbox_name: The MEC Sandbox instance to use
    :return: The list of the available network scenarios on success, None otherwise
    """
    global PROVIDER, logger, sandbox_api, configuration

    logger.debug('>>> get_network_scenarios: sandbox=' + sandbox_name)

    try:
        nw = swagger_client.SandboxNetworkScenariosApi(sandbox_api)
        result = nw.sandbox_network_scenarios_get(sandbox_name, async_req = False)  # noqa: E501
        logger.debug('get_network_scenarios: result: ' + str(result))
        return result
    except ApiException as e:
        logger.error('Exception when calling SandboxNetworkScenariosApi->sandbox_network_scenarios_get: %s\n' % e)

    return None
    # End of function get_network_scenarios
```

%% Cell type:markdown id: tags:

Putting everything together:

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the first sprint of our skeleton of our MEC application:
        - Login
        - Print sandbox identifier
        - Print available network scenarios
        - Logout
        - Check that logout is effective
    """
    global logger, nw_scenarios

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Login
    sandbox = process_login()
    if sandbox is None:
        logger.error('Failed to instanciate a MEC Sandbox')
        return

    # Print sandbox identifier
    logger.info('Sandbox created: ' + sandbox)
    # Wait for the MEC Sandbox is running
    time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Print available network scenarios
    nw_scenarios = get_network_scenarios(sandbox)
    if nw_scenarios is None:
        logger.error('Failed to retrieve the list of network scenarios')
    elif len(nw_scenarios) != 0:
        logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))
        logger.info('nw_scenarios: %s', str(nw_scenarios))
    else:
        logger.info('nw_scenarios: No scenario available')

    # Logout
    process_logout(sandbox)

    # Check that logout is effective
    logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

### Third step: Activate and deactivate a network scenario

Having a list of network scenarion, the next step is to actvate (and deactivate) a network scenario. This step is mandatory to create a new application instance id and access the MEC services.

In this section, we will arbitrary activate the network scenario called '4g-5g-macro-v2x', which is at the index 0 of the nw_scenarios.

%% Cell type:code id: tags:

``` python
def select_network_scenario_based_on_criteria(criterias_list: list) -> int:
    """
    Select the network scenario to activate based of the provided list of criterias.
    :param criterias_list: The list of criterias to select the correct network scenario
    :return: 0 on success, -1 otherwise
    """

    return 0 # The index of the '4g-5g-macro-v2x' network scenario - Hard coded
```

%% Cell type:markdown id: tags:

#### The activate function

The process to activate a scenario is based on an HTTP POST request with the URL '/sandboxNetworkScenarios/{sandbox_name}?network_scenario_id={network_scenario_id}'.

%% Cell type:code id: tags:

``` python
def activate_network_scenario(sandbox_name: str) -> int:
    """
    Activate the specified network scenario.
    :param sandbox_name: The MEC Sandbox instance to use
    :return: 0 on success, -1 otherwise
    """
    global logger, sandbox_api, nw_scenarios, nw_scenario_idx

    logger.debug('>>> activate_network_scenario: ' + sandbox_name)

    nw_scenario_idx = select_network_scenario_based_on_criteria([])
    if nw_scenario_idx == -1:
        logger.error('activate_network_scenario: Failed to select a network scenarion')
        return -1

    try:
        nw = swagger_client.SandboxNetworkScenariosApi(sandbox_api)
        nw.sandbox_network_scenario_post(sandbox_name, nw_scenarios[nw_scenario_idx].id, async_req = False)  # noqa: E501
        return 0
    except ApiException as e:
        logger.error('Exception when calling SandboxNetworkScenariosApi->activate_network_scenario: %s\n' % e)

    return -1
    # End of function activate_network_scenario
```

%% Cell type:markdown id: tags:

#### The deactivate function

The process to deactivate a scenario is based on an HTTP DELETE request with the URL '/sandboxNetworkScenarios/{sandbox_name}?network_scenario_id={network_scenario_id}'.

%% Cell type:code id: tags:

``` python
def deactivate_network_scenario(sandbox: str) -> int:
    """
    Deactivate the current network scenario.
    :param sandbox: The MEC Sandbox instance to use
    :return: 0 on success, -1 otherwise
    """
    global logger, sandbox_api, nw_scenarios, nw_scenario_idx

    logger.debug('>>> deactivate_network_scenario: ' + sandbox)

    try:
        nw = swagger_client.SandboxNetworkScenariosApi(sandbox_api)
        nw.sandbox_network_scenario_delete(sandbox, nw_scenarios[nw_scenario_idx].id, async_req = False)  # noqa: E501
        return 0
    except ApiException as e:
        logger.error('Exception when calling SandboxNetworkScenariosApi->deactivate_network_scenario: %s\n' % e)

    return -1
    # End of function deactivate_network_scenario
```

%% Cell type:markdown id: tags:

Now, it is time to create the second iteration of our MEC application.

The sequence is the following:
- Login
- Print sandbox identifier
- Print available network scenarios
- Activate a network scenario
- Check that the network scenario is activated and the MEC services are running
- Deactivate a network scenario
- Logout
- Check that logout is effective


%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Login
        - Print sandbox identifier
        - Print available network scenarios
        - Activate a network scenario
        - Check that the network scenario is activated and the MEC services are running
        - Deactivate a network scenario
        - Logout
        - Check that logout is effective
    """
    global logger, nw_scenarios, nw_scenario_idx

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Login
    sandbox = process_login()
    if sandbox is None:
        logger.error('Failed to instanciate a MEC Sandbox')
        return

    # Print sandbox identifier
    logger.info('Sandbox created: ' + sandbox)
    # Wait for the MEC Sandbox is running
    time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Print available network scenarios
    nw_scenarios = get_network_scenarios(sandbox)
    if nw_scenarios is None:
        logger.error('Failed to retrieve the list of network scenarios')
    elif len(nw_scenarios) != 0:
        logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))
        logger.info('nw_scenarios: %s', str(nw_scenarios))
        # Wait for the MEC Sandbox is running
        time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running
    else:
        logger.info('nw_scenarios: No scenario available')

    # Activate a network scenario based on a list of criterias (hard coded!!!)
    if activate_network_scenario(sandbox) == -1:
        logger.error('Failed to activate network scenario')
    else:
        logger.info('Network scenario activated: ' + nw_scenarios[nw_scenario_idx].id)
        # Wait for the MEC services are running
        time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Check that the network scenario is activated and the MEC services are running
    logger.info('To check that the network scenario is activated, verify on the MEC Sandbox server that the MEC services are running (kubectl get pods -A)')
    time.sleep(30) # Sleep for 30 seconds

    # Deactivate a network scenario based on a list of criterias (hard coded!!!)
    if deactivate_network_scenario(sandbox) == -1:
        logger.error('Failed to deactivate network scenario')
    else:
        logger.info('Network scenario deactivated: ' + nw_scenarios[nw_scenario_idx].id)
        # Wait for the MEC services are terminated
        time.sleep(2 * STABLE_TIME_OUT)

    # Logout
    process_logout(sandbox)

    # Check that logout is effective
    logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

### Fourth step: Create and delete an appliction instance id

To enable our MEC application to be part of the activated network scenario, we need to request the MEC sandbox to create a new application instance identifier. Our MEC application will use this identifier to register to the MEC Sandbox according to MEC 011.

Reference: ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.2 MEC application start-up

#### The appliction instance id creation function

It is like the MEC application was instanciated by the MEC platform and it is executed locally.

%% Cell type:code id: tags:

``` python
def request_application_instance_id(sandbox_name: str) -> swagger_client.models.ApplicationInfo:
    """
    Request the creation of a new MEC application instance identifier.
    It is like the MEC application was instanciated by the MEC platform and it is executed locally.
    :param sandbox_name: The MEC Sandbox instance to use
    :return: The MEC application instance identifier on success, None otherwise
    :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.2 MEC application start-up
    """
    global MEC_PLTF, logger, sandbox_api, configuration

    logger.debug('>>> request_application_instance_id: ' + sandbox_name)

    # Create a instance of our MEC application
    try:
        a = swagger_client.models.ApplicationInfo(id=str(uuid.uuid4()), name='JupyterMecApp', node_name=MEC_PLTF, type='USER')  # noqa: E501
        nw = swagger_client.SandboxAppInstancesApi(sandbox_api)
        result = nw.sandbox_app_instances_post(a, sandbox_name, async_req = False)  # noqa: E501
        logger.debug('request_application_instance_id: result: ' + str(result))
        return result
    except ApiException as e:
        logger.error('Exception when calling SandboxAppInstancesApi->sandbox_app_instances_post: %s\n' % e)

    return None
    # End of function request_application_instance_id
```

%% Cell type:markdown id: tags:

#### The appliction instance id deletion function

%% Cell type:code id: tags:

``` python
def delete_application_instance_id(sandbox_name: str, app_inst_id: str) -> int:
    """
    Request the deletion of a MEC application.
    :param sandbox: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :return: 0 on success, -1 otherwise
    """
    global logger, sandbox_api, configuration

    logger.debug('>>> delete_application_instance_id: ' + sandbox_name)
    logger.debug('>>> delete_application_instance_id: ' + app_inst_id)

    try:
        nw = swagger_client.SandboxAppInstancesApi(sandbox_api)
        nw.sandbox_app_instances_delete(sandbox_name, app_inst_id, async_req = False)  # noqa: E501
        return 0
    except ApiException as e:
        logger.error('Exception when calling SandboxAppInstancesApi->sandbox_app_instances_delete: %s\n' % e)

    return -1
    # End of function deletet_application_instance_id
```

%% Cell type:markdown id: tags:

#### Getting the list of applications

%% Cell type:code id: tags:

``` python
def get_applications_list(sandbox_name: str) -> list:
    """
    Request the list of the MEC application available on the MEC Platform.
    :param sandbox: The MEC Sandbox instance to use
    :return: 0 on success, -1 otherwise
    """
    global logger, sandbox_api, configuration

    logger.debug('>>> get_applications_list: ' + sandbox_name)

    try:
        nw = swagger_client.SandboxAppInstancesApi(sandbox_api)
        result = nw.sandbox_app_instances_get(sandbox_name, async_req = False)  # noqa: E501
        logger.debug('get_applications_list: result: ' + str(result))
        return result
    except ApiException as e:
        logger.error('Exception when calling SandboxAppInstancesApi->get_applications_list: %s\n' % e)
    return None
    # End of function delete_application_instance_id
```

%% Cell type:markdown id: tags:

It is time now to create the our third iteration of our MEC application.

The sequence is the following:
- Login
- Print sandbox identifier
- Print available network scenarios
- Activate a network scenario
- Request for a new application instance identifier
- Retrieve the list of the applications instance identifier
- Check the demo application is present in the list of applications
- Delete our application instance identifier
- Deactivate a network scenario
- Logout
- Check that logout is effective

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Login
        - Print sandbox identifier
        - Print available network scenarios
        - Activate a network scenario
        - Request for a new application instance identifier
        - Retrieve the list of the applications instance identifier
        - Check the demo application is present in the list of applications
        - Deactivate a network scenario
        - Logout
        - Check that logout is effective
    """
    global logger, nw_scenarios, nw_scenario_idx

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Login
    sandbox = process_login()
    if sandbox is None:
        logger.error('Failed to instanciate a MEC Sandbox')
        return

    # Print sandbox identifier
    logger.info('Sandbox created: ' + sandbox)
    # Wait for the MEC Sandbox is running
    time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Print available network scenarios
    nw_scenarios = get_network_scenarios(sandbox)
    if nw_scenarios is None:
        logger.error('Failed to retrieve the list of network scenarios')
    elif len(nw_scenarios) != 0:
        logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))
        logger.info('nw_scenarios: %s', str(nw_scenarios))
        # Wait for the MEC Sandbox is running
        time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running
    else:
        logger.info('nw_scenarios: No scenario available')

    # Activate a network scenario based on a list of criterias (hard coded!!!)
    if activate_network_scenario(sandbox) == -1:
        logger.error('Failed to activate network scenario')
    else:
        logger.info('Network scenario activated: ' + nw_scenarios[nw_scenario_idx].id)
        # Wait for the MEC services are running
        time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Request for a new application instance identifier
    app_inst_id = request_application_instance_id(sandbox)
    if app_inst_id == None:
        logger.error('Failed to request an application instance identifier')
    else:
        logger.info('app_inst_id: %s', str(type(app_inst_id)))
        logger.info('app_inst_id: %s', str(app_inst_id))

    # Check the demo application is present in the list of applications
    app_list = get_applications_list(sandbox)
    if app_list is None:
        logger.error('Failed to request the list of applications')
    else:
        logger.info('app_list: %s', str(type(app_list)))
        logger.info('app_list: %s', str(app_list))
        # Check if our application is present in the list of applications
        found = False
        for item in app_list:
            if item.id == app_inst_id.id:
                found = True
                break
        if not found:
            logger.error('Failed to retrieve our application instance identifier')

    # Delete the application instance identifier
    if delete_application_instance_id(sandbox, app_inst_id.id) == -1:
        logger.error('Failed to delete the application instance identifier')
    else:
        logger.info('app_inst_id deleted: ' + app_inst_id.id)

    # Deactivate a network scenario based on a list of criterias (hard coded!!!)
    if deactivate_network_scenario(sandbox) == -1:
        logger.error('Failed to deactivate network scenario')
    else:
        logger.info('Network scenario deactivated: ' + nw_scenarios[nw_scenario_idx].id)
        # Wait for the MEC services are terminated
        time.sleep(2 * STABLE_TIME_OUT)

    # Logout
    process_logout(sandbox)

    # Check that logout is effective
    logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

## MEC Registration and the READY confirmation

Having an application instance identifier allows us to register with the MEC Sandbox and interact with it (e.g. to send service queries, to subscribe to events and to recieve notifications...).

The standard MEC 011 Clause 5.2.2 MEC application start-up describes the start up process. Basically, our MEC application has to:
1. Indicates that it is running by sending a Confirm Ready message
2. Retrieve the list of MEC services

To do so, a MEC application needs to be able to send requests but also to receive notifications (POST requests) and to reply to them.

%% Cell type:markdown id: tags:

### Fifth step: Send the READY confirmation

The MEC application instance confirms towards the MEC platform that it is up and running. It  corresponds to step 4c described inETSI GS MEC 011 V3.2.1 (2024-04)11 Clause 5.2.2 MEC application start-up.

%% Cell type:code id: tags:

``` python
def send_ready_confirmation(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> int:
    """
    Send the ready_confirmation to indicate that the MEC application is active.
    :param sandbox_name: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :return: 0 on success, -1 otherwise
    :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.2 MEC application start-up - Step 4c
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> send_ready_confirmation: ' + app_inst_id.id)
    try:
        url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/confirm_ready'
        logger.debug('send_ready_confirmation: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        path_params['app_inst_id'] = app_inst_id.id
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        # JSON indication READY
        dict_body = {}
        dict_body['indication'] = 'READY'
        service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=dict_body, async_req=False)
        return 0
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return -1
    # End of function send_ready_confirmation
```

%% Cell type:markdown id: tags:

In addition, our MEC application is registering to AppTerminationNotificationSubscription and it needs to delete its subscription when terminating.

At this stage, it is important to note that all subscription deletion use the same format: <subscription URL>/<subscription identifier> (see ETSI MEC GS 003 [16]).
In this case, it the AppTerminationNotificationSubscription is 'sub-1234', the URIs to do the susbscription and to delete it are:
- MEC_SANDBOX_URL + '/' + sandbox_name + '/' + MEC_PLTF + '/mec_app_support/v2/applications/' + app_inst_id + '/subscriptions'
- MEC_SANDBOX_URL + '/' + sandbox_name + '/' + MEC_PLTF + '/mec_app_support/v2/applications/' + app_inst_id + '/subscriptions/sub-1234'

So, it will be usefull to create a small function to extract the subscription identifier from either the HTTP Location header or from the Link field found into the reponse body data structure.

%% Cell type:markdown id: tags:

#### Subscribing to application termination

The purpose is to create a new subscription to
the MEC application termination notification as describe in ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.6b Receiving event notifications on MEC application instance
terminations

%% Cell type:code id: tags:

``` python
def send_subscribe_termination(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> object:
    """
    Subscribe to the MEC application termination notifications.
    :param sandbox_name: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :return: The HTTP respone, the subscription ID and the resource URL on success, None otherwise
    :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.6b Receiving event notifications on MEC application instance termination
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> send_subscribe_termination: ' + app_inst_id.id)
    try:
        url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/subscriptions'
        logger.debug('send_subscribe_termination: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        path_params['app_inst_id'] = app_inst_id.id
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        # Body
        dict_body = {}
        dict_body['subscriptionType'] = 'AppTerminationNotificationSubscription'
        dict_body['callbackReference'] = 'http://yanngarcia.ddns.net/mec011/v2/termination' # FIXME To be parameterized
        dict_body['appInstanceId'] = app_inst_id.id
        (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=dict_body, async_req=False)
        return (result, extract_sub_id(headers['Location']), headers['Location'])
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return None
    # End of function send_subscribe_termination
```

%% Cell type:markdown id: tags:

#### Extracting subscription identifier

This helper function extracts the subscription identifier from any subscription URL.

%% Cell type:code id: tags:

``` python
def extract_sub_id(resource_url: str) -> str:
    """
    Extract the subscription identifier from the specified subscription URL.
    :param resource_url: The subscription URL
    :return: The subscription identifier on success, None otherwise
    """
    global logger

    logger.debug('>>> extract_sub_id: resource_url: ' + resource_url)

    res = urllib3.util.parse_url(resource_url)
    if res is not None and res.path is not None and res.path != '':
        id = res.path.rsplit('/', 1)[-1]
        if id is not None:
            return id
    return None
    # End of function extract_sub_id
```

%% Cell type:markdown id: tags:

#### Delete subscription to application termination

%% Cell type:code id: tags:

``` python
def delete_subscribe_termination(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo, sub_id: str) -> int:
    """
    Delete the subscrition to the AppTermination notification.
    :param sandbox_name: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :param sub_id: The subscription identifier
    :return: 0 on success, -1 otherwise
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> delete_subscribe_termination: ' + app_inst_id.id)
    try:
        url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/subscriptions/{sub_id}'
        logger.debug('delete_subscribe_termination: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        path_params['app_inst_id'] = app_inst_id.id
        path_params['sub_id'] = sub_id
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        service_api.call_api(url, 'DELETE', header_params=header_params, path_params = path_params, async_req=False)
        return 0
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return -1
    # End of function delete_subscribe_termination
```

%% Cell type:markdown id: tags:

When the MEC application instance is notified to gracefully terminate, it provides to the MEC platform
that the application has completed its application level related terminate/stop actiono

Reference: ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.3 MEC application graceful termination/stopp.

%% Cell type:code id: tags:

``` python
def send_termination_confirmation(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> int:
    """
    Send the confirm_termination to indicate that the MEC application is terminating gracefully.
    :param sandbox_name: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :return: 0 on success, -1 otherwise
    :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.3 MEC application graceful termination/stop
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> send_termination_confirmation: ' + app_inst_id.id)
    try:
        url = '/{sandbox_name}/{mec_pltf}/mec_app_support/v2/applications/{app_inst_id}/confirm_termination'
        logger.debug('send_termination_confirmation: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        path_params['app_inst_id'] = app_inst_id.id
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        # JSON indication READY
        dict_body = {}
        dict_body['operationAction'] = 'TERMINATING'
        service_api.call_api(url, 'POST', header_params=header_params, path_params = path_params, body=dict_body, async_req=False)
        return 0
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return -1
    # End of function send_termination_confirmation
```

%% Cell type:markdown id: tags:

Now, it is time now to create the our fifth iteration of our MEC application.

The sequence is the following:
- Login
- Print sandbox identifier
- Print available network scenarios
- Activate a network scenario
- Request for a new application instance identifier
- Send READY confirmation
- Subscribe to AppTerminationNotificationSubscription
- Check list of services
- Delete AppTerminationNotification subscription
- Delete our application instance identifier
- Deactivate a network scenario
- Logout
- Check that logout is effective

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Login
        - Print sandbox identifier
        - Print available network scenarios
        - Activate a network scenario
        - Request for a new application instance identifier
        - Send READY confirmation

        - Subscribe to AppTermination Notification
        - Send Termination
        - Delete AppTerminationNotification subscription
        - Delete our application instance identifier
        - Deactivate a network scenario
        - Logout
        - Check that logout is effective
    """
    global logger, nw_scenarios, nw_scenario_idx

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Login
    sandbox = process_login()
    if sandbox is None:
        logger.error('Failed to instanciate a MEC Sandbox')
        return

    # Print sandbox identifier
    logger.info('Sandbox created: ' + sandbox)
    # Wait for the MEC Sandbox is running
    time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Print available network scenarios
    nw_scenarios = get_network_scenarios(sandbox)
    if nw_scenarios is None:
        logger.error('Failed to retrieve the list of network scenarios')
    elif len(nw_scenarios) != 0:
        logger.info('nw_scenarios: %s', str(type(nw_scenarios[0])))
        logger.info('nw_scenarios: %s', str(nw_scenarios))
        # Wait for the MEC Sandbox is running
        time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running
    else:
        logger.info('nw_scenarios: No scenario available')

    # Activate a network scenario based on a list of criterias (hard coded!!!)
    if activate_network_scenario(sandbox) == -1:
        logger.error('Failed to activate network scenario')
    else:
        logger.info('Network scenario activated: ' + nw_scenarios[nw_scenario_idx].id)
        # Wait for the MEC services are running
        time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Request for a new application instance identifier
    app_inst_id = request_application_instance_id(sandbox)
    if app_inst_id == None:
        logger.error('Failed to request an application instance identifier')
    else:
        logger.info('app_inst_id: %s', str(app_inst_id))
        time.sleep(STABLE_TIME_OUT)

    # Send READY confirmation
    sub_id = None
    if send_ready_confirmation(sandbox, app_inst_id) == -1:
        logger.error('Failed to send confirm_ready')
    else:
        # Subscribe to AppTerminationNotificationSubscription
        result, sub_id, res_url = send_subscribe_termination(sandbox, app_inst_id)
        if sub_id == None:
            logger.error('Failed to do the subscription')
        else:
            logger.info('result: ' + str(result))
            logger.info('sub_id: %s', sub_id)
            data = json.loads(result.data)
            logger.info('data: ' + str(data))

    # Any processing here
    time.sleep(STABLE_TIME_OUT)

    # Delete AppTerminationNotification subscription
    if sub_id is not None:
        if delete_subscribe_termination(sandbox, app_inst_id, sub_id) == -1:
            logger.error('Failed to delete the application instance identifier')
        else:
            logger.info('app_inst_id deleted: ' + app_inst_id.id)

    # Delete the application instance identifier
    if delete_application_instance_id(sandbox, app_inst_id.id) == -1:
        logger.error('Failed to delete the application instance identifier')
    else:
        logger.info('app_inst_id deleted: ' + app_inst_id.id)

    # Deactivate a network scenario based on a list of criterias (hard coded!!!)
    if deactivate_network_scenario(sandbox) == -1:
        logger.error('Failed to deactivate network scenario')
    else:
        logger.info('Network scenario deactivated: ' + nw_scenarios[nw_scenario_idx].id)
        # Wait for the MEC services are terminated
        time.sleep(2 * STABLE_TIME_OUT)

    # Logout
    process_logout(sandbox)

    # Check that logout is effective
    logger.debug('To check that logout is effective, verify on the MEC Sandbox server that the MEC Sandbox is removed (kubectl get pods -A)')

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

### Conclusion: Create two procedures for the setup and the termination of our MEC application

%% Cell type:markdown id: tags:

#### The procedure for the setup of a MEC application

This function provides the steps to setup a MEC application and to be ready to use the MEC service exposed by the created MEC Sandbox.

%% Cell type:code id: tags:

``` python
def mec_app_setup():
    """
    This function provides the steps to setup a MEC application:
        - Login
        - Print sandbox identifier
        - Print available network scenarios
        - Activate a network scenario
        - Request for a new application instance identifier
        - Send READY confirmation
        - Subscribe to AppTermination Notification
    :return The MEC Sandbox instance, the MEC application instance identifier and the subscription identifier on success, None otherwise
    """
    global logger, nw_scenarios

    # Login
    sandbox = process_login()
    if sandbox is None:
        logger.error('Failed to instanciate a MEC Sandbox')
        return None
    # Wait for the MEC Sandbox is running
    time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Print available network scenarios
    nw_scenarios = get_network_scenarios(sandbox)
    if nw_scenarios is None:
        logger.error('Failed to retrieve the list of network scenarios')
    elif len(nw_scenarios) != 0:
        # Wait for the MEC Sandbox is running
        time.sleep(STABLE_TIME_OUT) # Wait for k8s pods up and running
    else:
        logger.info('nw_scenarios: No scenario available')

    # Activate a network scenario based on a list of criterias (hard coded!!!)
    if activate_network_scenario(sandbox) == -1:
        logger.error('Failed to activate network scenario')
    else:
        # Wait for the MEC services are running
        time.sleep(2 * STABLE_TIME_OUT) # Wait for k8s pods up and running

    # Request for a new application instance identifier
    app_inst_id = request_application_instance_id(sandbox)
    if app_inst_id == None:
        logger.error('Failed to request an application instance identifier')
    else:
        # Wait for the MEC services are terminated
        time.sleep(STABLE_TIME_OUT)

    # Send READY confirmation
    sub_id = None
    if send_ready_confirmation(sandbox, app_inst_id) == -1:
        logger.error('Failed to send confirm_ready')
    else:
        # Subscribe to AppTerminationNotificationSubscription
        result, sub_id, res_url = send_subscribe_termination(sandbox, app_inst_id)
        if sub_id == None:
            logger.error('Failed to do the subscription')

    return (sandbox, app_inst_id, sub_id)
    # End of function mec_app_setup
```

%% Cell type:markdown id: tags:

#### The procedure for the termination of a MEC application

This function provides the steps to terminate a MEC application.

**Note:** All subscriptions done outside of the mec_app_setup function are not deleted.

%% Cell type:code id: tags:

``` python
def mec_app_termination(sandbox_name: str, app_inst_id:swagger_client.models.ApplicationInfo, sub_id: str):
    """
    This function provides the steps to setup a MEC application:
        - Login
        - Print sandbox identifier
        - Print available network scenarios
        - Activate a network scenario
        - Request for a new application instance identifier
        - Send READY confirmation
        - Subscribe to AppTermination Notification
    :param sandbox_name: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :param sub_id: The subscription identifier
    """
    global logger

    # Delete AppTerminationNotification subscription
    if sub_id is not None:
        if delete_subscribe_termination(sandbox_name, app_inst_id, sub_id) == -1:
            logger.error('Failed to delete the application instance identifier')

    # Delete the application instance identifier
    if delete_application_instance_id(sandbox_name, app_inst_id.id) == -1:
        logger.error('Failed to delete the application instance identifier')
    else:
        # Wait for the MEC services are terminated
        time.sleep(STABLE_TIME_OUT)

    # Deactivate a network scenario based on a list of criterias (hard coded!!!)
    if deactivate_network_scenario(sandbox_name) == -1:
        logger.error('Failed to deactivate network scenario')
    else:
        # Wait for the MEC services are terminated
        time.sleep(2 * STABLE_TIME_OUT)

    # Logout
    process_logout(sandbox_name)
    # End of function mec_app_termination
```

%% Cell type:markdown id: tags:

The following cell describes the new basic MEC application architecture. It will be used in the rest of this titorial.

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Mec application setup
        - Get UU unicast provisioning information
        - Mec application termination
    """
    global logger

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Setup the MEC application
    (sandbox_name, app_inst_id, sub_id) = mec_app_setup()

    # Any processing here
    logger.info('sandbox_name: ' + sandbox_name)
    logger.info('app_inst_id: ' + app_inst_id.id)
    if sub_id is not None:
        logger.info('sub_id: ' + sub_id)
    time.sleep(STABLE_TIME_OUT)

    # Terminate the MEC application
    mec_app_termination(sandbox_name, app_inst_id, sub_id)

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

## Create our second MEC application: how to use MEC Location Service

After doing the logging, network scenario activation, MEC application instance creation steps, we are ready to exploit the MEC services exposed by the MEC Sandbox.

In this clause, we use the following functionalities provided by MEC-013 LocationAPIs:
- Getting UE location lookup (ETSI GS MEC 013 Clause 5.3.2 UE Location Lookup)
- Subscribe to the UE Location changes (ETSI GS MEC 030 Clause 5.3.4)
- Delete subscription

### First step: Getting UE location lookup

First of all, just create a function to request the location of a specific UE. The UE identifier depends of the network scenario which was activated. In our example, we are using the 4g-5g-macro-v2x network scenario and we will run our code with the high velocity UE (i.e., the cars such as 10.100.0.1).

%% Cell type:code id: tags:

``` python
def get_ue_location(sandbox_name: str, ue: str) -> object:
    """
    To retrieves the location information of an UE identified by its IPv4 address (e.g. 10.100.0.1)
    :param sandbox_name: The MEC Sandbox instance to use
    :return The HTTP response and the response status on success, None otherwise
    :see ETSI GS MEC 013 V3.1.1 (2023-01) Clause 5.3.2 UE Location Lookup
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> get_ue_location: ' + ue)
    try:
        url = '/{sandbox_name}/{mec_pltf}/location/v3/queries/users'
        logger.debug('get_ue_location: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        query_params = []
        query_params.append(('address', ue))
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        (result, status) = service_api.call_api(url, 'GET', header_params=header_params, path_params=path_params, query_params=query_params, async_req=False)
        return (result, status)
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return None
    # End of function get_ue_location
```

%% Cell type:markdown id: tags:

Let's create our new MEC application. The expected result looks like:
```json
{
  "userList": {
    "resourceURL": "https://mec-platform2.etsi.org/xxx/mep1/location/v3/queries/users",
    "user": [
      {
        "address": "10.100.0.1",
        "accessPointId": "4g-macro-cell-6",
        "zoneId": "zone03",
        "resourceURL": "https://mec-platform.etsi.org/xxx/mep1/location/v3/queries/users?address=10.100.0.1",
        "timestamp": {
          "nanoSeconds": 0,
          "seconds": 1729245754
        },
        "locationInfo": {
          "latitude": [
            43.73707
          ],
          "longitude": [
            7.422555
          ],
          "shape": 2
        },
        "civicInfo": {
          "country": "MC"
        },
        "relativeLocationInfo": {
          "X": 630.37036,
          "Y": 261.92648,
          "mapInfo": {
            "mapId": "324561243",
            "origin": {
              "latitude": 43.7314,
              "longitude": 7.4202
            }
          }
        }
      }
    ]
  }
}
```

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Getting UE location lookup (ETSI GS MEC 013 Clause 5.3.2)
    """
    global logger, nw_scenarios

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Setup the MEC application
    (sandbox_name, app_inst_id, sub_id) = mec_app_setup()

    # Getting UE location lookup
    result, status = get_ue_location(sandbox_name, '10.100.0.1')
    logger.info('UE location information: status: %s', str(status))
    if status != 200:
        logger.error('Failed to get UE location information')
    else:
        logger.info('UE location information: %s', str(result.data))

    # Any processing comes here
    logger.info('body: ' + str(result.data))
    data = json.loads(result.data)
    logger.info('UserList: ' + str(data))

    # Terminate the MEC application
    mec_app_termination(sandbox_name, app_inst_id, sub_id)

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

### Second step: Adding a subscription


%% Cell type:markdown id: tags:

## Create our third MEC application: how to use V2X MEC Services

After doing the logging, network scenario activation, MEC application instance creation steps, we are ready to exploit the MEC services exposed by the MEC Sandbox.

In this clause, we use the following functionalities provided by MEC-030:
- Getting UU unicast provisioning information (ETSI GS MEC 030 Clause 5.5.1)
- Subscribe to the V2X message distribution server (ETSI GS MEC 030 Clause 5.5.7)
- Delete subscription

%% Cell type:markdown id: tags:

### Getting UU unicast provisioning information

The purpose is to query provisioning information for V2X communication over Uu unicast.

%% Cell type:code id: tags:

``` python
def send_uu_unicast_provisioning_info(sandbox_name: str, ecgi: str) -> object:
    """
    Request for V2X communication over Uu unicast information
    :param sandbox_name: The MEC Sandbox instance to use
    :param ecgi: Comma separated list of locations to identify a cell of a base station or a particular geographical area
    :return The Uu unicast provisioning information on success, None otherwise
    :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.1 Sending a request for provisioning information for V2X communication over Uu unicast
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> send_uu_unicast_provisioning_info: ' + ecgi)
    try:
        url = '/{sandbox_name}/{mec_pltf}/vis/v2/queries/uu_unicast_provisioning_info'
        logger.debug('send_uu_unicast_provisioning_info: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        query_params = []
        query_params.append(('location_info', 'ecgi,' + ecgi))
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        (result, status, header) = service_api.call_api(url, 'GET', header_params=header_params, path_params=path_params, query_params=query_params, async_req=False)
        return (result, status, header)
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return None
    # End of function send_uu_unicast_provisioning_info
```

%% Cell type:markdown id: tags:

Let's create the our second MEC application.
The sequence is the following:
- Mec application setup
- Get UU unicast provisioning information
- Mec application termination

Note that the UU unicast provisioning information is returned as a JSON string. To de-serialized it into a Python data structure, please refer to clause [Subscribing to V2X message distribution server](#subscribing_to_v2x_message_distribution_server).

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Mec application setup
        - Get UU unicast provisioning information
        - Mec application termination
    """
    global logger, nw_scenarios

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Setup the MEC application
    (sandbox_name, app_inst_id, sub_id) = mec_app_setup()

    # Get UU unicast provisioning information
    ecgi = "268708941961,268711972264" # List of ecgi spearated by a ','
    result, status, header = send_uu_unicast_provisioning_info(sandbox_name, ecgi)
    logger.info('UU unicast provisioning information: status: %s', str(status))
    if status != 200:
        logger.error('Failed to get UU unicast provisioning information')
    else:
        logger.info('UU unicast provisioning information: %s', str(result.data))

    # Any processing comes here
    logger.info('body: ' + str(result.data))
    data = json.loads(result.data)
    logger.info('data: ' + str(data))

    # Terminate the MEC application
    mec_app_termination(sandbox_name, app_inst_id, sub_id)

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

### Subscribing to V2X message distribution server

Here, we need to come back to the MEC 030 standard to create the type V2xMsgSubscription. It involves the creation of a set of basic types described below.

These new type shall be 'JSON' serializable. It means that they have to implement the following methods:

- to_dict()
- to_str()
- \_\_repr\_\_()
- \_\_eq\_\_()
- \_\_ne\_\_()

**Reference:** ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.5.13 Type: LinkType

%% Cell type:code id: tags:

``` python
class LinkType(object):
    swagger_types = {'href': 'str'}
    attribute_map = {'href': 'href'}
    def __init__(self, href=None):  # noqa: E501
        self._href = None
        if href is not None:
            self._href = href
    @property
    def href(self):
        return self._href
    @href.setter
    def href(self, href):
        self._href = href
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(LinkType, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, LinkType):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class Links(object):
    swagger_types = {'self': 'LinkType'}
    attribute_map = {'self': 'self'}
    def __init__(self, self_=None):  # noqa: E501
        self._self = None
        if self_ is not None:
            self._self = self_
    @property
    def self_(self):
        return self._self
    @self_.setter
    def self_(self, self_):
        self._self = self_
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(Links, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, Links):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class TimeStamp(object):
    swagger_types = {'seconds': 'int', 'nano_seconds': 'int'}
    attribute_map = {'seconds': 'seconds', 'nano_seconds': 'nanoSeconds'}
    def __init__(self, seconds=None, nano_seconds=None):  # noqa: E501
        self._seconds = None
        self._nano_seconds = None
        if seconds is not None:
            self._seconds = seconds
        if nano_seconds is not None:
            self._nano_seconds = nano_seconds
    @property
    def seconds(self):
        return self._seconds
    @seconds.setter
    def seconds(self, seconds):
        self._seconds = seconds
    @property
    def nano_seconds(self):
        return self._nano_seconds
    @nano_seconds.setter
    def nano_seconds(self, nano_seconds):
        self._nano_seconds = nano_seconds
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(TimeStamp, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, TimeStamp):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other
```

%% Cell type:markdown id: tags:

### Subscribing to V2X message distribution server

The cell below implements the V2xMsgSubscription data structure."}
Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.3.5 Type: V2xMsgSubscription



%% Cell type:code id: tags:

``` python
class V2xMsgSubscription(object):
    swagger_types = {'links': 'Links', 'callback_reference': 'str', 'filter_criteria': 'V2xMsgSubscriptionFilterCriteria', 'request_test_notification': 'bool', 'subscription_type': 'str'}
    attribute_map = {'links': 'Links', 'callback_reference': 'callbackReference', 'filter_criteria': 'filterCriteria', 'request_test_notification': 'requestTestNotification', 'subscription_type': 'subscriptionType'}
    def __init__(self, links=None, callback_reference=None, filter_criteria=None, request_test_notification=None):  # noqa: E501
        self._links = None
        self._callback_reference = None
        self._filter_criteria = None
        self._request_test_notification = None
        self._subscription_type = "V2xMsgSubscription"
        if links is not None:
            self.links = links
        if callback_reference is not None:
            self.callback_reference = callback_reference
        if filter_criteria is not None:
            self.filter_criteria = filter_criteria
        if request_test_notification is not None:
            self.request_test_notification = request_test_notification
    @property
    def links(self):
        return self._links
    @links.setter
    def links(self, links):
        self_.links = links
    @property
    def callback_reference(self):
        return self._callback_reference
    @callback_reference.setter
    def callback_reference(self, callback_reference):
        self._callback_reference = callback_reference
    @property
    def links(self):
        return self._links
    @links.setter
    def links(self, links):
        self._links = links
    @property
    def filter_criteria(self):
        return self._filter_criteria
    @filter_criteria.setter
    def filter_criteria(self, filter_criteria):
        self._filter_criteria = filter_criteria
    @property
    def request_test_notification(self):
        return self._request_test_notification
    @request_test_notification.setter
    def request_test_notification(self, request_test_notification):
        self._request_test_notification = request_test_notification
    @property
    def subscription_type(self):
        return self._subscription_type
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(V2xMsgSubscription, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, V2xMsgSubscription):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class V2xMsgSubscriptionFilterCriteria(object):
    swagger_types = {'msg_type': 'list[str]', 'std_organization': 'str'}
    attribute_map = {'msg_type': 'MsgType', 'std_organization': 'stdOrganization'}
    def __init__(self, msg_type, std_organization):  # noqa: E501
        self._msg_type = None
        self._std_organization = None
        self.msg_type = msg_type
        self.std_organization = std_organization
    @property
    def msg_type(self):
        return self._msg_type
    @msg_type.setter
    def msg_type(self, msg_type):
        self._msg_type = msg_type
    @property
    def std_organization(self):
        return self._std_organization
    @std_organization.setter
    def std_organization(self, std_organization):
        self._std_organization = std_organization
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(V2xMsgSubscriptionFilterCriteria, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, V2xMsgSubscriptionFilterCriteria):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other
```

%% Cell type:markdown id: tags:

Here is the V2X message subscription function. The HTTP Request message body contains a 'JSON' serialized instance of the class V2xMsgSubscription.

Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.10 V2X message interoperability

%% Cell type:code id: tags:

``` python
def subscribe_v2x_message(sandbox_name: str, v2xMsgSubscription: V2xMsgSubscription) -> object:
    """
    Request to subscribe the V2X messages which come from different vehicle OEMs or operators
    :param sandbox_name: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :param sub_id: The subscription identifier
    :return The HTTP response, the HTTP response status, the subscription identifier and the subscription URL on success, None otherwise
    :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.10 V2X message interoperability
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> subscribe_v2x_message: v2xMsgSubscription: ' + str(v2xMsgSubscription))
    try:
        url = '/{sandbox_name}/{mec_pltf}/vis/v2/subscriptions'
        logger.debug('subscribe_v2x_message: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params=path_params, body=v2xMsgSubscription, async_req=False)
        return (result, status, extract_sub_id(headers['Location']), headers['Location'])
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return None
    # End of function subscribe_v2x_message
```

%% Cell type:markdown id: tags:

Here is a generic function to delete any MEC service subscription based on the subscription resource URL provided in the Location header of the subscription creation response.

%% Cell type:code id: tags:

``` python
def delete_mec_subscription(resource_url: str) -> int:
    """
    Delete any existing MEC subscription
    :param resource_url: The subscription URL
    :return 0 on success, -1 otherwise
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> delete_mec_subscription: resource_url: ' + resource_url)
    try:
        res = urllib3.util.parse_url(resource_url)
        if res is None:
            logger.error('delete_mec_subscription: Failed to paerse URL')
            return -1
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        service_api.call_api(res.path, 'DELETE', header_params=header_params, async_req=False)
        return 0
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return -1
```

%% Cell type:markdown id: tags:

Finaly, here is how to implement the V2X message subscription:

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Mec application setup
        - Subscribe to V2XMessage
        - Delete subscription
        - Mec application termination
    """
    global MEC_PLTF, CALLBACK_URI, logger

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Setup the MEC application
    (sandbox_name, app_inst_id, sub_id) = mec_app_setup()

    # Create a V2X message subscritpion
    filter_criteria = V2xMsgSubscriptionFilterCriteria(['1', '2'], 'ETSI')
    v2xMsgSubscription = V2xMsgSubscription(callback_reference = CALLBACK_URI + '/vis/v2/v2x_msg_notification', filter_criteria = filter_criteria)
    result, status, v2x_sub_id, v2x_resource = subscribe_v2x_message(sandbox_name, v2xMsgSubscription)
    if status != 201:
        logger.error('Failed to create subscription')

    # Any processing here
    logger.info('body: ' + str(result.data))
    data = json.loads(result.data)
    logger.info('data: %s', str(data))
    logger.info('app_inst_id: ' + app_inst_id.id)
    if sub_id is not None:
        logger.info('sub_id: ' + sub_id)
    time.sleep(STABLE_TIME_OUT)

    # Delete the V2X message subscritpion
    delete_mec_subscription(v2x_resource)

    # Terminate the MEC application
    mec_app_termination(sandbox_name, app_inst_id, sub_id)

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

### Notification support

To recieve notifcation, our MEC application is required to support an HTTP listenener to recieve POST requests from the MEC Sandbox and reply to them: this is the notification mechanism.

This minimalistic HTTP server will also be used to implement the endpoints provided by our MEC application service: see chapter [Our third MEC application: how to create a new MEC Services](#our_third_mec_application_how_to_create_a_new_mec_services).

The class HTTPRequestHandler (see cell below) provides the suport of such mechanism.

%% Cell type:code id: tags:

``` python
class HTTPServer_RequestHandler(BaseHTTPRequestHandler):
    """
    Minimal implementation of an HTTP server (http only).
    """

    def do_GET(self):
        logger.info('>>> do_GET: ' + self.path)

        ctype = self.headers.get('content-type')
        logger.info('do_GET: ' + ctype)

        # Send response status code
        self.send_response(HTTPStatus.OK)

        # Send message back to client
        message = bytes(str(self.headers) + "\n" +self.requestline +"\n", 'utf8')

        # Send headers
        self.send_header('Content-type','text/plain; charset=utf-8')
        self.send_header('Content-length', str(len(message)))
        self.end_headers()

        # Write content as utf-8 data
        self.wfile.write(message)
        return

    def do_POST(self):
        logger.info('>>> do_POST: ' + self.path)

        ctype = self.headers.get('content-type')
        logger.info('do_POST: ' + ctype)

        content_len = int(self.headers.get('Content-Length'))
        if content_len != 0:
            body = self.rfile.read(content_len).decode('utf8')
            logger.info('do_POST: body:' + str(type(body)))
            logger.info('do_POST: body:' + str(body))
            data = json.loads(str(body))
            logger.info('do_POST: data: %s', str(data))

        self.send_response(HTTPStatus.NOT_IMPLEMENTED)
        self.end_headers()

    def do_PUT(self):
        logger.info('>>> do_PUT: ' + self.path)

        ctype = self.headers.get('content-type')
        logger.info('do_PUT: ' + ctype)

        self.send_response(HTTPStatus.NOT_IMPLEMENTED)
        self.end_headers()

    def do_PATCH(self):
        logger.info('>>> do_PATCH: ' + self.path)

        ctype = self.headers.get('content-type')
        logger.info('do_PATCH: ' + ctype)

        self.send_response(HTTPStatus.NOT_IMPLEMENTED)
        self.end_headers()
    # End of class HTTPRequestHandler

    def do_DELETE(self):
        logger.info('>>> do_DELETE: ' + self.path)

        ctype = self.headers.get('content-type')
        logger.info('do_DELETE: ' + ctype)

        self.send_response(HTTPStatus.NOT_IMPLEMENTED)
        self.end_headers()
    # End of class HTTPRequestHandler

def start_notification_server() -> HTTPServer:
    """
    Start the notification server
    :return The instance of the HTTP server
    """
    global LISTENER_PORT

    server_address = ('', LISTENER_PORT)
    httpd = HTTPServer(server_address, HTTPServer_RequestHandler)
    # Start notification server in a daemonized thread
    notification_server = threading.Thread(target = httpd.serve_forever, name='notification_server')
    notification_server.daemon = True
    notification_server.start()
    return httpd
    # End of function HTTPRequestHandler

def stop_notification_server(httpd: HTTPServer):
    """
    Stop the notification server
    :param The instance of the HTTP server
    """
    httpd.server_close()
    httpd=None
    # End of function HTTPRequestHandler
```

%% Cell type:markdown id: tags:

### Put all together

let's add a subscription the our previous MEC application.
The sequence is the following:
- Mec application setup
- Start the notification server
- Get UU unicast provisioning information
- Add subscription
- Stop the notification server
- Mec application termination

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Mec application setup
        - Start the notification server
        - Get UU unicast provisioning information
        - Add subscription
        - Stop the notification server
        - Mec application termination
    """
    global CALLBACK_URI, logger

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Setup the MEC application
    (sandbox_name, app_inst_id, sub_id) = mec_app_setup()

    # Get UU unicast provisioning information
    ecgi = "268708941961,268711972264" # List of ecgi spearated by a ','
    result = send_uu_unicast_provisioning_info(sandbox_name, ecgi)
    if result is None:
        logger.error('Failed to get UU unicast provisioning information')
    else:
        logger.info('UU unicast provisioning information: ' + str(result))

    # Start notification server in a daemonized thread
    httpd = start_notification_server()

    # Create a V2X message subscritpion
    filter_criteria = V2xMsgSubscriptionFilterCriteria(['1', '2'], 'ETSI')
    v2xMsgSubscription = V2xMsgSubscription(callback_reference = CALLBACK_URI + '/vis/v2/v2x_msg_notification', filter_criteria = filter_criteria)
    result, status, v2x_sub_id, v2x_resource = subscribe_v2x_message(sandbox_name, v2xMsgSubscription)
    if status != 201:
        logger.error('Failed to create subscription')

    # Any processing here
    logger.info('body: ' + str(result.data))
    data = json.loads(result.data)
    logger.info('data: %s', str(data))
    logger.info('v2x_resource: ' + v2x_resource)
    if sub_id is not None:
        logger.info('sub_id: ' + sub_id)
    time.sleep(STABLE_TIME_OUT)

    # Stop notification server
    stop_notification_server(httpd)

    # Delete the V2X message subscritpion
    delete_mec_subscription(v2x_resource)

    # Terminate the MEC application
    mec_app_termination(sandbox_name, app_inst_id, sub_id)

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

## Create our fourth MEC application: how to use V2X QoS Prediction

The MEC Sanbox V2X QoS Prediction is based on a grid Map of Monaco City where areas are categorized into residential, commercial and coastal.
PoAs (Point Of Access) are categorized depending on where they lie in each grid.
Each category has its own traffic load patterns which are pre-determin. The V2X QoS Prediction) will give more accurate values of RSRP and RSRQ based on the diurnal traffic patterns for each. The network scenario named "4g-5g-v2x-macro" must be used to get access to the V2X QoS Prediction feature.

**Note:** The MEC Sanbox V2X QoS Prediction is enabled when the PredictedQos.routes.routeInfo.time attribute is present in the request (see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.2.6 Type: Preditecd QoS)

Limitations:
* The Location Granularity is currently not being validated as RSRP/RSRP calculations are done at the exact location provided by the user.
* Time Granularity is currently not supported by the Prediction Function (design limitations of the minimal, emulated, pre-determined traffic prediction)
* Upper limit on the number of elements (10 each) in the routes and routeInfo structures (arrays) to not affect user experience and respoy


%% Cell type:markdown id: tags:

The table below describes the excepted Qos with and without the prediction model in deiffrent area and at different time.

|     Location        |               Time                  |      PoA         |  Category   |    Status     | QoS without Prediction Model | QoS with Prediction Model | Expected |
| 	              | (Unix time in sec) | Standard (GMT) |                  |             |               |    RSRP        |   RSRQ      |    RSRP     |   RSRQ      |          |
| ------------------- | -----------        | -------------- | ---------------- | ----------- | ------------- | -------------- | ----------- | ----------- | ----------- | -------- |
| 43.729416,7.414853  |     1653295620     |    08:47:00    | 4g-macro-cell-2  | Residential | Congested     |     63         |     21      |     60      |     20      |   Yes    |
| 43.732456,7.418417  |     1653299220     |    09:47:00    | 4g-macro-cell-3  | Residential | Not Congested |     55         |     13      |     55      |     13      |   Yes    |
| 43.73692,7.4209256  |     1653302820     |    10:47:00    | 4g-macro-cell-6  | Coastal     | Not Congested |     68         |     26      |     68      |     26      |   Yes    |
| 43.738007,7.4230533 |     1653305220     |    11:27:00    | 4g-macro-cell-6  | Coastal     | Not Congested |     55         |     13      |     55      |     13      |   Yes    |
| 43.739685,7.424881  |     1653308820     |    12:27:00    | 4g-macro-cell-7  | Commercial  | Congested     |     63         |     21      |     40      |     13      |   Yes    |
| 43.74103,7.425759   |     1653312600     |    13:30:00    | 4g-macro-cell-7  | Commercial  | Congested     |     56         |     14      |     40      |     8       |   Yes    |
| 43.74258,7.4277945  |     1653315900     |    14:25:00    | 4g-macro-cell-8  | Coastal     | Congested     |     59         |     17      |     47      |     13      |   Yes    |
| 43.744972,7.4295254 |     1653318900     |    15:15:00    | 4g-macro-cell-8  | Coastal     | Congested     |     53         |     11      |     40      |     5       |   Yes    |
| 43.74773,7.4320855  |     1653322500     |    16:15:00    | 5g-small-cell-14 | Commercial  | Congested     |     78         |     69      |     60      |     53      |   Yes    |
| 43.749264,7.435894  |     1653329700     |    18:15:00    | 5g-small-cell-20 | Commercial  | Not Congested |     84         |     72      |     84      |     72      |  Yes    |	72	84	72	Yes


%% Cell type:markdown id: tags:

The image below illustrate the table above: [here](images/V2X Predicted QoS.jpg).

Here is an example of a basic V2X predicted QoS request based on two point in path at 8am in Residential area:

%% Cell type:markdown id: tags:

```json
{
  "predictionTarget": "SINGLE_UE_PREDICTION",
  "timeGranularity": null,
  "locationGranularity": "30",
  "routes": [
      {
        "routeInfo": [
          {
            "location": {
              "geoArea": {
                "latitude": 43.729416,
                "longitude": 7.414853
              }
            },
            "time": {
              "nanoSeconds": 0,
              "seconds": 1653295620
            }
          },
          {
              "location": {
                "geoArea": {
                  "latitude": 43.732456,
                  "longitude": 7.418417
                }
              },
            "time": {
              "nanoSeconds": 0,
              "seconds": 1653299220
        ]
      }
    ]
}

%% Cell type:markdown id: tags:

Let first create the required types before to prepare a V2X Predicted QoS request based on the JSON above.

%% Cell type:code id: tags:

``` python
class Routes(object):
    swagger_types = {'_route_info': 'list[RouteInfo]'}
    attribute_map = {'_route_info': 'routeInfo'}
    def __init__(self, route_info:list):  # noqa: E501
        self._route_info = None
        self.route_info = route_info
    @property
    def route_info(self):
        return self._route_info
    @route_info.setter
    def route_info(self, route_info):
        if route_info is None:
            raise ValueError("Invalid value for `route_info`, must not be `None`")  # noqa: E501
        self._route_info = route_info
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(Routes, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, Routes):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class LocationInfo(object):
    swagger_types = {'_ecgi': 'Ecgi', '_geo_area': 'LocationInfoGeoArea'}
    attribute_map = {'_ecgi': 'ecgi', '_geo_area': 'geoArea'}
    def __init__(self, ecgi=None, geo_area=None):  # noqa: E501
        self._ecgi = None
        self._geo_area = None
        self.discriminator = None
        if ecgi is not None:
            self.ecgi = ecgi
        if geo_area is not None:
            self.geo_area = geo_area
    @property
    def ecgi(self):
        return self._ecgi
    @ecgi.setter
    def ecgi(self, ecgi):
        self._ecgi = ecgi
    @property
    def geo_area(self):
        return self._geo_area
    @geo_area.setter
    def geo_area(self, geo_area):
        self._geo_area = geo_area
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(LocationInfo, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, LocationInfo):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class RouteInfo(object):
    swagger_types = {'_location': 'LocationInfo', '_time_stamp': 'TimeStamp'}
    attribute_map = {'_location': 'location', '_time_stamp': 'time'}
    def __init__(self, location:LocationInfo, time_stamp=None):  # noqa: E501
        self._location = None
        self.location = location
        self._time_stamp = None
        if time_stamp is not None:
            self.time_stamp = time_stamp
    @property
    def location(self):
        return self._location
    @location.setter
    def location(self, location):
        if location is None:
            raise ValueError("Invalid value for `location`, must not be `None`")  # noqa: E501
        self._location = location
    @property
    def time_stamp(self):
        return self._time_stamp
    @time_stamp.setter
    def time_stamp(self, time_stamp):
        self._time_stamp = time_stamp
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(RouteInfo, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, RouteInfo):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class LocationInfoGeoArea(object):
    swagger_types = {'_latitude': 'float', '_longitude': 'float'}
    attribute_map = {'_latitude': 'latitude', '_longitude': 'longitude'}
    def __init__(self, latitude, longitude):  # noqa: E501
        self._latitude = None
        self._longitude = None
        self.discriminator = None
        if latitude is not None:
            self.latitude = latitude
        if longitude is not None:
            self.longitude = longitude
    @property
    def latitude(self):
        return self._latitude
    @latitude.setter
    def latitude(self, latitude):
        if latitude is None:
            raise ValueError("Invalid value for `latitude`, must not be `None`")  # noqa: E501
        self._latitude = latitude
    @property
    def longitude(self):
        return self._longitude
    @longitude.setter
    def longitude(self, longitude):
        if longitude is None:
            raise ValueError("Invalid value for `longitude`, must not be `None`")  # noqa: E501
        self._longitude = longitude
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(LocationInfoGeoArea, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, LocationInfoGeoArea):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class Ecgi(object):
    swagger_types = {'_cellId': 'CellId', '_plmn': 'Plmn'}
    attribute_map = {'_cellId': 'cellId', '_plmn': 'plmn'}
    def __init__(self, cellId=None, plmn=None):  # noqa: E501
        self._cellId = None
        self._plmn = None
        self.discriminator = None
        if cellId is not None:
            self.cellId = cellId
        if plmn is not None:
            self.plmn = plmn
    @property
    def cellId(self):
        return self._cellId
    @cellId.setter
    def cellId(self, cellId):
        self._cellId = cellId
    @property
    def plmn(self):
        return self._plmn
    @plmn.setter
    def plmn(self, plmn):
        self._plmn = plmn
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(Ecgi, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, Ecgi):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class CellId(object):
    swagger_types = {'_cellId': 'str'}
    attribute_map = {'_cellId': 'cellId'}
    def __init__(self, cellId):  # noqa: E501
        self._cellId = None
        self.cellId = cellId
    @property
    def cellId(self):
        return self._cellId
    @cellId.setter
    def cellId(self, cellId):
        if cellId is None:
            raise ValueError("Invalid value for `cellId`, must not be `None`")  # noqa: E501
        self._cellId = cellId
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(CellId, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, CellId):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class Plmn(object):
    swagger_types = {'_mcc': 'str', '_mnc': 'str'}
    attribute_map = {'_mcc': 'mcc', '_mnc': 'mnc'}
    def __init__(self, mcc:str, mnc:str):  # noqa: E501
        self.discriminator = None
        self._mcc = None
        self._mnc = None
        self.mcc = mcc
        self.mnc = mnc
    @property
    def mcc(self):
        return self._mcc
    @mcc.setter
    def kpi_nmccame(self, mcc):
        if mcc is None:
            raise ValueError("Invalid value for `mcc`, must not be `None`")  # noqa: E501
        self._mcc = mcc
    @property
    def mnc(self):
        return self._mnc
    @mnc.setter
    def kpi_nmccame(self, mnc):
        if mnc is None:
            raise ValueError("Invalid value for `mnc`, must not be `None`")  # noqa: E501
        self._mnc = mnc
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(Plmn, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, Plmn):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class QosKpi(object):
    swagger_types = {'_kpi_name': 'str', '_kpi_value': 'str', '_confidence': 'str'}
    attribute_map = {'_kpi_name': 'kpiName', '_kpi_value': 'kpiValue', '_confidence': 'Confidence'}
    def __init__(self, kpi_name:str, kpi_value:str, confidence=None):  # noqa: E501
        self._kpi_name = None
        self._kpi_value = None
        self._confidence = None
        self.kpi_name = kpi_name
        self.kpi_value = kpi_value
        if confidence is not None:
            self.confidences = confidence
    @property
    def kpi_name(self):
        return self._kpi_name
    @kpi_name.setter
    def kpi_name(self, kpi_name):
        if kpi_name is None:
            raise ValueError("Invalid value for `kpi_name`, must not be `None`")  # noqa: E501
        self._kpi_name = kpi_name
    @property
    def kpi_value(self):
        return self._kpi_value
    @kpi_value.setter
    def kpi_value(self, kpi_value):
        if kpi_value is None:
            raise ValueError("Invalid value for `kpi_value`, must not be `None`")  # noqa: E501
        self._kpi_value = kpi_value
    @property
    def confidence(self):
        return self._confidence
    @confidence.setter
    def confidence(self, confidence):
        self._confidence = confidence
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(QosKpi, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, QosKpi):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class Stream(object):
    swagger_types = {'_stream_id': 'str', '_qos_kpi': 'list[QosKpi]'}
    attribute_map = {'_stream_id': 'streamId', '_qos_kpi': 'qosKpi'}
    def __init__(self, stream_id:str, qos_kpi:list):  # noqa: E501
        self._stream_id = None
        self._qos_kpi = None
        self.stream_id = stream_id
        self.qos_kpi = qos_kpi
    @property
    def stream_id(self):
        return self._stream_id
    @stream_id.setter
    def stream_id(self, stream_id):
        if stream_id is None:
            raise ValueError("Invalid value for `stream_id`, must not be `None`")  # noqa: E501
        self._stream_id = stream_id
    @property
    def qos_kpi(self):
        return self._qos_kpi
    @qos_kpi.setter
    def qos_kpi(self, qos_kpi):
        if qos_kpi is None:
            raise ValueError("Invalid value for `qos_kpi`, must not be `None`")  # noqa: E501
        self._qos_kpi = qos_kpi
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(Stream, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, Stream):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class Qos(object):
    swagger_types = {'_stream': 'list[Stream]'}
    attribute_map = {'_stream': 'stream'}
    def __init__(self, stream:list):  # noqa: E501
        self._stream = None
        self.stream = stream
    @property
    def stream(self):
        return self._stream
    @stream.setter
    def stream(self, stream):
        if stream is None:
            raise ValueError("Invalid value for `stream`, must not be `None`")  # noqa: E501
        self._stream = stream
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(Qos, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, Qos):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class PredictedQos(object):
    swagger_types = {'_location_granularity': 'str', '_notice_period': 'TimeStamp', '_prediction_area': 'PredictionArea', '_prediction_target': 'str', '_qos': 'Qos', '_routes': 'list[Routes]', '_time_granularity': 'TimeStamp'}
    attribute_map = {'_location_granularity': 'locationGranularity', '_notice_period': 'noticePeriod', '_prediction_area': 'predictionArea', '_prediction_target': 'predictionTarget', '_qos': 'qos', '_routes': 'routes', '_time_granularity': 'timeGranularity'}
    def __init__(self, prediction_target:str, location_granularity:str, notice_period=None, time_granularity=None, prediction_area=None, routes=None, qos=None):  # noqa: E501
        self._prediction_target = None
        self._time_granularity = None
        self._location_granularity = None
        self._notice_period = None
        self._prediction_area = None
        self._routes = None
        self._qos = None
        self._prediction_target = prediction_target
        if time_granularity is not None:
            self.time_granularity = time_granularity
        self.location_granularity = location_granularity
        if notice_period is not None:
            self.notice_period = notice_period
        if prediction_area is not None:
            self.prediction_area = prediction_area
        if routes is not None:
            self.routes = routes
        if qos is not None:
            self.qos = qos
    @property
    def prediction_target(self):
        return self._prediction_target
    @prediction_target.setter
    def prediction_target(self, prediction_target):
        if prediction_target is None:
            raise ValueError("Invalid value for `prediction_target`, must not be `None`")  # noqa: E501
        self._prediction_target = prediction_target
    @property
    def time_granularity(self):
        return self._time_granularity
    @time_granularity.setter
    def time_granularity(self, time_granularity):
        self._time_granularity = time_granularity
    @property
    def location_granularity(self):
        return self._location_granularity
    @location_granularity.setter
    def location_granularity(self, location_granularity):
        if location_granularity is None:
            raise ValueError("Invalid value for `location_granularity`, must not be `None`")  # noqa: E501
        self._location_granularity = location_granularity
    @property
    def notice_period(self):
        return self._notice_period
    @notice_period.setter
    def notice_period(self, notice_period):
        self._notice_period = notice_period
    @property
    def prediction_area(self):
        return self._prediction_area
    @prediction_area.setter
    def prediction_area(self, prediction_area):
        self._prediction_area = prediction_area
    @property
    def routes(self):
        return self._routes
    @routes.setter
    def routes(self, routes):
        self._routes = routes
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(PredictedQos, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, PredictedQos):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other
```

%% Cell type:markdown id: tags:

Here is the V2X Prediscted QoS function.

Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.5 Sending a request for journey-specific QoS predictions

%% Cell type:code id: tags:

``` python
def get_qos_prediction(sandbox_name: str) -> object:
    """
    Request to predictede QoS
    :param sandbox_name: The MEC Sandbox instance to use
    :return The HTTP response, the HTTP response status and the HTTP response headers on success, None otherwise
    :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.5 Sending a request for journey-specific QoS predictions
    """
    global MEC_PLTF, logger, service_api

    logger.debug('>>> get_qos_prediction: sandbox_name: ' + sandbox_name)
    try:
        url = '/{sandbox_name}/{mec_pltf}/vis/v2/provide_predicted_qos'
        logger.debug('send_uu_unicast_provisioning_info: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        # HTTP header `Accept`
        header_params = {}
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        # Body request
        loc1 = LocationInfo(geo_area=LocationInfoGeoArea(latitude=43.729416, longitude=7.414853))
        loc2 = LocationInfo(geo_area=LocationInfoGeoArea(latitude=43.732456, longitude=7.418417))
        routeInfo1 = RouteInfo(loc1, TimeStamp(nano_seconds=0, seconds=1653295620))
        routeInfo2 = RouteInfo(loc2, TimeStamp(nano_seconds=0, seconds=1653299220))
        routesInfo = [routeInfo1, routeInfo2]
        predictedQos = PredictedQos(prediction_target="SINGLE_UE_PREDICTION", location_granularity="30", routes=[Routes(routesInfo)])
        (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params=path_params, body=predictedQos, async_req=False)
        return (result, status, headers)
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return None
    # End of function send_uu_unicast_provisioning_info
```

%% Cell type:markdown id: tags:

Grouping all together provides the process_main funtion.. The sequence is the following:
- Mec application setup
- V2X QoS request
- Mec application termination

The expected response should be:
- RSRP: 55
- RSRQ: 13

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Mec application setup
        - V2X QoS request
        - Mec application termination
    """
    global logger

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Setup the MEC application
    sandbox_name, app_inst_id, sub_id = mec_app_setup()

    # QoS Prediction
    (result, status, headers) = get_qos_prediction(sandbox_name)
    if status != 200:
        logger.error('Failed to get UU unicast provisioning information')
    else:
        logger.info('UU unicast provisioning information: result: %s', str(result.data))

    # Any processing here
    logger.info('body: ' + str(result.data))
    data = json.loads(result.data)
    logger.info('data: %s', str(data))
    time.sleep(STABLE_TIME_OUT)

    # Terminate the MEC application
    mec_app_termination(sandbox_name, app_inst_id, sub_id)

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

## Our third MEC application: how to create a new MEC Services

The purpose of this MEC Service application is to provide a custom MEC service that can be use by other MEC applications. For the purpose of this tutorial, our MEC service is simulating some complex calculation based on a set of data provided by the MEC use.
We will use a second MEC application to exploit the features of our new MEC services.

In this clause, we use the following functionalities provided by MEC-011:
- Register a new service
- Retrieve the list of the MEC services exposed by the MEC platform
- Check that our new MEC service is present in the list of the MEC platform services
- Execute a request to the MEC service
- Delete the newly created service

**Note:** We will use a second MEC application to exploit the features of our new MEC services.

**Reference:** ETSI GS MEC 011 V3.2.1 (2024-04) Clause 5.2.4 Service availability update and new service registration

%% Cell type:markdown id: tags:

### Bases of the creation of a MEC service

#### Introduction

From the user perspective, a MEC service provides a set of endpoints which describe the interface of the MEC service (see [HTTP REST APIs
concepts](https://blog.postman.com/rest-api-examples/)). These endpoints come usually with a set of data structures used by the one or more endpoints.

Our service is really basic: it provide one endpoint:
- GET /statistic/v1/quantity: it computes statistical quantities of a set of data (such as average, max, min, standard deviation)

The body of this GET method is a list of datas:
```json
{"time":20180124,"data1":"[1516752000,11590.6,11616.9,11590.4,11616.9,0.25202387,1516752060,11622.4,11651.7,11622.4,11644.6,1.03977764]"}
```

The response body is the list of statistical quantities:
```json
{"time":20180124,"avg": 0.0,"max": 0.0,"min": 0.0,"stddev": 0.0 }
```

#### MEC mechanisms to create a new service

As described in ETSI GS MEC 011 Clause 5.2.4 Service availability update and new service registration, to create a new MEC service, the following information is required:
- A MEC Aplication instance: this is the MEC application providing the new MEC service (ETSI GS MEC 011 V3.2.1 Clause 8.2.6.3.4 POST)
- A ServiceInfo instance which describe the MEC service (ETSI GS MEC 011 V3.2.1 Clause 8.1.2.2 Type: ServiceInfo)
- As part of the ServiceInfo instance, a TransportInfo (ETSI GS MEC 011 V3.2.1 Clause 8.1.2.3 Type: TransportInfo) instance descibes the endpoints to use the MEC service

When created and available, all the other MEC applications are notified about the existance of this MEC service.

%% Cell type:code id: tags:

``` python
class ServiceInfo(object):
    swagger_types = {'ser_instance_id': 'str','ser_name': 'str','ser_category': 'CategoryRef','version': 'str','state': 'str','transport_id': 'str','transport_info': 'TransportInfo','serializer': 'string','scope_of_locality': 'LocalityType','consumed_local_only': 'bool','is_local': 'bool','liveness_interval': 'int','links': 'ServiceInfoLinks'}
    attribute_map = {'ser_instance_id': 'serInstanceId','ser_name': 'serName','ser_category': 'serCategory','version': 'version','state': 'state','transport_id': 'transportId','transport_info': 'transportInfo','serializer': 'serializer','scope_of_locality': 'scopeOfLocality','consumed_local_only': 'consumedLocalOnly','is_local': 'isLocal','liveness_interval': 'livenessInterval','links': '_links'}
    def __init__(self, ser_instance_id=None, ser_name=None, ser_category=None, version=None, state=None, transport_id=None, transport_info=None, serializer=None, scope_of_locality=None, consumed_local_only=None, is_local=None, liveness_interval=None, links=None):  # noqa: E501
        self._ser_instance_id = None
        self._ser_name = None
        self._ser_category = None
        self._version = None
        self._state = None
        self._transport_id = None
        self._transport_info = None
        self._serializer = None
        self._scope_of_locality = None
        self._consumed_local_only = None
        self._is_local = None
        self._liveness_interval = None
        self._links = None
        self.discriminator = None
        if ser_instance_id is not None:
            self.ser_instance_id = ser_instance_id
        self.ser_name = ser_name
        if ser_category is not None:
            self.ser_category = ser_category
        self.version = version
        self.state = state
        if transport_id is not None:
            self.transport_id = transport_id
        self.transport_info = transport_info
        self.serializer = serializer
        if scope_of_locality is not None:
            self.scope_of_locality = scope_of_locality
        if consumed_local_only is not None:
            self.consumed_local_only = consumed_local_only
        if is_local is not None:
            self.is_local = is_local
        if liveness_interval is not None:
            self.liveness_interval = liveness_interval
        if links is not None:
            self.links = links
    @property
    def ser_instance_id(self):
        return self._ser_instance_id
    @ser_instance_id.setter
    def ser_instance_id(self, ser_instance_id):
        self._ser_instance_id = ser_instance_id
    @property
    def ser_name(self):
        return self._ser_name
    @ser_name.setter
    def ser_name(self, ser_name):
        if ser_name is None:
            raise ValueError("Invalid value for `ser_name`, must not be `None`")  # noqa: E501
        self._ser_name = ser_name
    @property
    def ser_category(self):
        return self._ser_category
    @ser_category.setter
    def ser_category(self, ser_category):
        self._ser_category = ser_category
    @property
    def version(self):
        return self._version
    @version.setter
    def version(self, version):
        if version is None:
            raise ValueError("Invalid value for `version`, must not be `None`")  # noqa: E501
        self._version = version
    @property
    def state(self):
        return self._state
    @state.setter
    def state(self, state):
        if state is None:
            raise ValueError("Invalid value for `state`, must not be `None`")  # noqa: E501
        self._state = state
    @property
    def transport_id(self):
        return self._transport_id
    @transport_id.setter
    def transport_id(self, transport_id):
        self._transport_id = transport_id
    @property
    def transport_info(self):
        return self._transport_info
    @transport_info.setter
    def transport_info(self, transport_info):
        if transport_info is None:
            raise ValueError("Invalid value for `transport_info`, must not be `None`")  # noqa: E501
        self._transport_info = transport_info
    @property
    def serializer(self):
        return self._serializer
    @serializer.setter
    def serializer(self, serializer):
        if serializer is None:
            raise ValueError("Invalid value for `serializer`, must not be `None`")  # noqa: E501
        self._serializer = serializer
    @property
    def scope_of_locality(self):
        return self._scope_of_locality
    @scope_of_locality.setter
    def scope_of_locality(self, scope_of_locality):
        self._scope_of_locality = scope_of_locality
    @property
    def consumed_local_only(self):
        return self._consumed_local_only
    @consumed_local_only.setter
    def consumed_local_only(self, consumed_local_only):
        self._consumed_local_only = consumed_local_only
    @property
    def is_local(self):
        return self._is_local
    @is_local.setter
    def is_local(self, is_local):
        self._is_local = is_local
    @property
    def liveness_interval(self):
        return self._liveness_interval
    @liveness_interval.setter
    def liveness_interval(self, liveness_interval):
        self._liveness_interval = liveness_interval
    @property
    def links(self):
        return self._links
    @links.setter
    def links(self, links):
        self._links = links
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
                    value
                ))
            elif hasattr(value, "to_dict"):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], "to_dict") else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(ServiceInfo, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, ServiceInfo):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class CategoryRef(object):
    swagger_types = {'href': 'str','id': 'str','name': 'str','version': 'str'}
    attribute_map = {'href': 'href','id': 'id','name': 'name','version': 'version'}
    def __init__(self, href=None, id=None, name=None, version=None):  # noqa: E501
        self._href = None
        self._id = None
        self._name = None
        self._version = None
        self.discriminator = None
        self.href = href
        self.id = id
        self.name = name
        self.version = version
    @property
    def href(self):
        return self._href
    @href.setter
    def href(self, href):
        if href is None:
            raise ValueError("Invalid value for `href`, must not be `None`")  # noqa: E501
        self._href = href
    @property
    def id(self):
        return self._id
    @id.setter
    def id(self, id):
        if id is None:
            raise ValueError("Invalid value for `id`, must not be `None`")  # noqa: E501
        self._id = id
    @property
    def name(self):
        return self._name
    @name.setter
    def name(self, name):
        if name is None:
            raise ValueError("Invalid value for `name`, must not be `None`")  # noqa: E501
        self._name = name
    @property
    def version(self):
        return self._version
    @version.setter
    def version(self, version):
        if version is None:
            raise ValueError("Invalid value for `version`, must not be `None`")  # noqa: E501
        self._version = version
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
                    value
                ))
            elif hasattr(value, "to_dict"):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], "to_dict") else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(CategoryRef, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, CategoryRef):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class TransportInfo(object):
    swagger_types = {
        'id': 'str','name': 'str','description': 'str','type': 'str','protocol': 'str','version': 'str','endpoint': 'OneOfTransportInfoEndpoint','security': 'SecurityInfo','impl_specific_info': 'str'}
    attribute_map = {'id': 'id','name': 'name','description': 'description','type': 'type','protocol': 'protocol','version': 'version','endpoint': 'endpoint','security': 'security','impl_specific_info': 'implSpecificInfo'}
    def __init__(self, id=None, name=None, description=None, type=None, protocol=None, version=None, endpoint=None, security=None, impl_specific_info=None):  # noqa: E501
        self._id = None
        self._name = None
        self._description = None
        self._type = None
        self._protocol = None
        self._version = None
        self._endpoint = None
        self._security = None
        self._impl_specific_info = None
        self.discriminator = None
        self.id = id
        self.name = name
        if description is not None:
            self.description = description
        self.type = type
        self.protocol = protocol
        self.version = version
        self.endpoint = endpoint
        self.security = security
        if impl_specific_info is not None:
            self.impl_specific_info = impl_specific_info
    @property
    def id(self):
        return self._id
    @id.setter
    def id(self, id):
        if id is None:
            raise ValueError("Invalid value for `id`, must not be `None`")  # noqa: E501
        self._id = id
    @property
    def name(self):
        return self._name
    @name.setter
    def name(self, name):
        if name is None:
            raise ValueError("Invalid value for `name`, must not be `None`")  # noqa: E501
        self._name = name
    @property
    def description(self):
        return self._description
    @description.setter
    def description(self, description):
        self._description = description
    @property
    def type(self):
        return self._type
    @type.setter
    def type(self, type):
        if type is None:
            raise ValueError("Invalid value for `type`, must not be `None`")  # noqa: E501
        self._type = type
    @property
    def protocol(self):
        return self._protocol
    @protocol.setter
    def protocol(self, protocol):
        if protocol is None:
            raise ValueError("Invalid value for `protocol`, must not be `None`")  # noqa: E501
        self._protocol = protocol
    @property
    def version(self):
        return self._version
    @version.setter
    def version(self, version):
        if version is None:
            raise ValueError("Invalid value for `version`, must not be `None`")  # noqa: E501
        self._version = version
    @property
    def endpoint(self):
        return self._endpoint
    @endpoint.setter
    def endpoint(self, endpoint):
        if endpoint is None:
            raise ValueError("Invalid value for `endpoint`, must not be `None`")  # noqa: E501
        self._endpoint = endpoint
    @property
    def security(self):
        return self._security
    @security.setter
    def security(self, security):
        if security is None:
            raise ValueError("Invalid value for `security`, must not be `None`")  # noqa: E501
        self._security = security
    @property
    def impl_specific_info(self):
        return self._impl_specific_info
    @impl_specific_info.setter
    def impl_specific_info(self, impl_specific_info):
        self._impl_specific_info = impl_specific_info
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
                    value
                ))
            elif hasattr(value, "to_dict"):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], "to_dict") else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(TransportInfo, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, TransportInfo):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class SecurityInfo(object):
    swagger_types = {'o_auth2_info': 'SecurityInfoOAuth2Info'}
    attribute_map = {'o_auth2_info': 'oAuth2Info'}
    def __init__(self, o_auth2_info=None):  # noqa: E501
        self._o_auth2_info = None
        self.discriminator = None
        if o_auth2_info is not None:
            self.o_auth2_info = o_auth2_info
    @property
    def o_auth2_info(self):
        return self._o_auth2_info
    @o_auth2_info.setter
    def o_auth2_info(self, o_auth2_info):
        self._o_auth2_info = o_auth2_info
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
                    value
                ))
            elif hasattr(value, "to_dict"):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], "to_dict") else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(SecurityInfo, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, SecurityInfo):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class SecurityInfoOAuth2Info(object):
    swagger_types = {'grant_types': 'list[str]','token_endpoint': 'str'}
    attribute_map = {'grant_types': 'grantTypes','token_endpoint': 'tokenEndpoint'}
    def __init__(self, grant_types=None, token_endpoint=None):  # noqa: E501
        self._grant_types = None
        self._token_endpoint = None
        self.discriminator = None
        self.grant_types = grant_types
        self.token_endpoint = token_endpoint
    @property
    def grant_types(self):
        return self._grant_types
    @grant_types.setter
    def grant_types(self, grant_types):
        if grant_types is None:
            raise ValueError("Invalid value for `grant_types`, must not be `None`")  # noqa: E501
        self._grant_types = grant_types
    @property
    def token_endpoint(self):
        return self._token_endpoint
    @token_endpoint.setter
    def token_endpoint(self, token_endpoint):
        if token_endpoint is None:
            raise ValueError("Invalid value for `token_endpoint`, must not be `None`")  # noqa: E501
        self._token_endpoint = token_endpoint
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
                    value
                ))
            elif hasattr(value, "to_dict"):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], "to_dict") else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(SecurityInfoOAuth2Info, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, SecurityInfoOAuth2Info):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class OneOfTransportInfoEndpoint(object):
    swagger_types = {}
    attribute_map = {}
    def __init__(self):  # noqa: E501
        self.discriminator = None
    @property
    def uris(self):
        return self._uris
    @uris.setter
    def uris(self, uris):
        self._uris = uris
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
                    value
                ))
            elif hasattr(value, "to_dict"):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], "to_dict") else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(OneOfappInstanceIdServicesBody, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, OneOfTransportInfoEndpoint):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class EndPointInfoUris(object):
    swagger_types = {'_uris': 'list[str]'}
    attribute_map = {'_uris': 'uris'}
    def __init__(self, uris:list):  # noqa: E501
        self._uris = None
        self.uris = uris
    @property
    def uris(self):
        return self._uris
    @uris.setter
    def uris(self, uris):
        self._uris = uris
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(EndPointInfoUris, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, EndPointInfoUris):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class EndPointInfoFqdn(object):
    swagger_types = {'_fqdn': 'list[str]'}
    attribute_map = {'_fqdn': 'fqdn'}
    def __init__(self, fqdn:list):  # noqa: E501
        self._fqdn = None
        self.fqdn = fqdn
    @property
    def fqdn(self):
        return self._fqdn
    @fqdn.setter
    def fqdn(self, fqdn):
        self._fqdn = fqdn
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(EndPointInfoFqdn, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, EndPointInfoFqdn):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class EndPointInfoAddress(object):
    swagger_types = {'_host': 'str', '_port': 'int'}
    attribute_map = {'_host': 'host', '_port': 'port'}
    def __init__(self, host:str, port:list):  # noqa: E501
        self._host = None
        self._port = None
        self.host = host
        self.port = port
    @property
    def host(self):
        return self.host
    @host.setter
    def host(self, host):
        self._host = host
    @property
    def port(self):
        return self._port
    @port.setter
    def port(self, port):
        self._port = qosport_kpi
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,
                    value
                ))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(EndPointInfoAddress, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, EndPointInfoAddress):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class EndPointInfoAddresses(object):
    swagger_types = {'_addresses': 'list[EndPointInfoAddress]'}
    attribute_map = {'_addresses': 'addresses'}
    def __init__(self, addresses:list):  # noqa: E501
        self._addresses = None
        self.addresses = addresses
    @property
    def addresses(self):
        return self._addresses
    @addresses.setter
    def addresses(self, addresses):
        self._addresses = addresses
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))
            elif hasattr(value, 'to_dict'):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], 'to_dict') else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(EndPointInfoAddresses, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, EndPointInfoAddresses):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other

class EndPointInfoAlternative(object):
    swagger_types = {'alternative': 'object'}
    attribute_map = {'alternative': 'alternative'}
    def __init__(self, alternative=None):  # noqa: E501
        self._alternative = None
        self.discriminator = None
        self.alternative = alternative
    @property
    def alternative(self):
        return self._alternative
    @alternative.setter
    def alternative(self, alternative):
        if alternative is None:
            raise ValueError("Invalid value for `alternative`, must not be `None`")  # noqa: E501
        self._alternative = alternative
    def to_dict(self):
        result = {}
        for attr, _ in six.iteritems(self.swagger_types):
            value = getattr(self, attr)
            if isinstance(value, list):
                result[attr] = list(map(
                    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
                    value
                ))
            elif hasattr(value, "to_dict"):
                result[attr] = value.to_dict()
            elif isinstance(value, dict):
                result[attr] = dict(map(
                    lambda item: (item[0], item[1].to_dict())
                    if hasattr(item[1], "to_dict") else item,
                    value.items()
                ))
            else:
                result[attr] = value
        if issubclass(EndPointInfoAlternative, dict):
            for key, value in self.items():
                result[key] = value
        return result
    def to_str(self):
        return pprint.pformat(self.to_dict())
    def __repr__(self):
        return self.to_str()
    def __eq__(self, other):
        if not isinstance(other, EndPointInfoAlternative):
            return False
        return self.__dict__ == other.__dict__
    def __ne__(self, other):
        return not self == other
```

%% Cell type:markdown id: tags:

The function below is creating an application MEC services

**Note:** This is call application MEC service in opposition of a standardized MEC service exposed by the MEC Sanbox such as MEC 013, MEC 030...

**Reference:** ETSI GS MEC 011 V3.2.1 (2024-04) Clause 8.2.6.3.4 POST

%% Cell type:code id: tags:

``` python
def create_mec_service(sandbox_name: str, app_inst_id: swagger_client.models.application_info.ApplicationInfo) -> object:
    """
    Request to create a new application MEC service
    :param sandbox_name: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :param sub_id: The subscription identifier
    :return The HTTP response, the response status and the headers on success, None otherwise
    :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 8.2.6.3.4 POST
    """
    global MEC_PLTF, CALLBACK_URI, logger, service_api

    logger.debug('>>> create_mec_service')
    try:
        url = '/{sandbox_name}/{mec_pltf}/mec_service_mgmt/v1/applications/{app_inst_id}/services'
        logger.debug('create_mec_service: url: ' + url)
        path_params = {}
        path_params['sandbox_name'] = sandbox_name
        path_params['mec_pltf'] = MEC_PLTF
        path_params['app_inst_id'] = app_inst_id.id
        # HTTP header `Accept`
        header_params = {}
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        # Body request
        callback = CALLBACK_URI + '/statistic/v1/quantity'
        transport_info = TransportInfo(id=str(uuid.uuid4()), name='HTTP REST API', type='REST_HTTP', protocol='HTTP', version='2.0', security=SecurityInfo(), endpoint=OneOfTransportInfoEndpoint())
        transport_info.endpoint.uris=[EndPointInfoUris(callback)]
        category_ref = CategoryRef(href=callback, id=str(uuid.uuid4()), name='Demo', version='1.0.0')
        appServiceInfo = ServiceInfo(ser_name='demo6 MEC Service', ser_category=category_ref, version='1.0.0', state='ACTIVE',transport_info=transport_info, serializer='JSON')
        (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params=path_params, body=appServiceInfo, async_req=False)
        return (result, status, headers['Location'])
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return None
    # End of function create_mec_service
```

%% Cell type:markdown id: tags:

The function below is deleting an application MEC services.

**Reference:** ETSI GS MEC 011 V3.2.1 (2024-04) Clause 8.2.7.3.5 DELETE

%% Cell type:code id: tags:

``` python
def delete_mec_service(resource_url: str) -> int:
    """
    Request to create a new application MEC service
    :param sandbox_name: The MEC Sandbox instance to use
    :param app_inst_id: The MEC application instance identifier
    :param sub_id: The subscription identifier
    :return 0 on success, -1 otherwise
    :see ETSI GS MEC 011 V3.2.1 (2024-04) Clause 8.2.7.3.5 DELETE
    """
    global logger

    logger.debug('>>> delete_mec_subscription: resource_url: ' + resource_url)
    try:
        res = urllib3.util.parse_url(resource_url)
        if res is None:
            logger.error('delete_mec_subscription: Failed to paerse URL')
            return -1
        header_params = {}
        # HTTP header `Accept`
        header_params['Accept'] = 'application/json'  # noqa: E501
        # HTTP header `Content-Type`
        header_params['Content-Type'] = 'application/json'  # noqa: E501
        service_api.call_api(res.path, 'DELETE', header_params=header_params, async_req=False)
        return 0
    except ApiException as e:
        logger.error('Exception when calling call_api: %s\n' % e)
    return -1
    # End of function delete_mec_service
```

%% Cell type:code id: tags:

``` python
def process_main():
    """
    This is the second sprint of our skeleton of our MEC application:
        - Mec application setup
        - Create new MEC service
        - Send a request to our MEC service
        - Delete newly created MEC service
        - Mec application termination
    """
    global LISTENER_IP, LISTENER_PORT, CALLBACK_URI, logger

    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))
    logger.debug('\t pwd= ' + os.getcwd())

    # Start notification server in a daemonized thread
    httpd = start_notification_server()

    # Setup the MEC application
    sandbox_name, app_inst_id, sub_id = mec_app_setup()

    # Create the MEC service
    result, status, mec_service_resource = create_mec_service(sandbox_name, app_inst_id)
    if status != 201:
        logger.error('Failed to create MEC service')
    else:
        logger.info('mec_service_resource: %s', mec_service_resource)

    # Any processing here
    logger.info('body: ' + str(result.data))
    data = json.loads(result.data)
    logger.info('data: %s', str(data))
    logger.info('=============> Execute the command: curl %s/statistic/v1/quantity', CALLBACK_URI)
    time.sleep(30)

    # Stop notification server
    stop_notification_server(httpd)

    # Delete the MEC servce
    delete_mec_service(mec_service_resource)

    # Terminate the MEC application
    mec_app_termination(sandbox_name, app_inst_id, sub_id)

    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))
    # End of function process_main

if __name__ == '__main__':
    process_main()
```

%% Cell type:markdown id: tags:

# Annexes

## Annex A: How to use an existing MEC sandbox instance

This case is used when the MEC Sandbox API is not used. The procedure is the following:
- Log to the MEC Sandbox using a WEB browser
- Select a network scenario
- Create a new application instance

When it is done, the newly created application instance is used by your application when required. This application instance is usually passed to your application in the command line or using a configuration file

%% Cell type:markdown id: tags:

### Bibliography

1. ETSI GS MEC 002 (V2.2.1) (01-2022): "Multi-access Edge Computing (MEC); Phase 2: Use Cases and Requirements".
2. ETSI GS MEC 010-1 (V1.1.1) (10-2017): "Mobile Edge Computing (MEC); Mobile Edge Management; Part 1: System, host and platform management".
3. ETSI GS MEC 010-2 (V2.2.1) (02-2022): "Multi-access Edge Computing (MEC); MEC Management; Part 2: Application lifecycle, rules and requirements management".
4. ETSI GS MEC 011 (V3.1.1) (09-2022): "Multi-access Edge Computing (MEC); Edge Platform Application Enablement".
5. ETSI GS MEC 012 (V2.2.1) (02-2022): "Multi-access Edge Computing (MEC); Radio Network Information API".
6. ETSI GS MEC 013 (V2.2.1) (01-2022): "Multi-access Edge Computing (MEC); Location API".
7. ETSI GS MEC 014 (V2.1.1) (03-2021): "Multi-access Edge Computing (MEC); UE Identity API".
8. ETSI GS MEC 015 (V2.1.1) (06-2020): "Multi-Access Edge Computing (MEC); Traffic Management APIs".
9. ETSI GS MEC 016 (V2.2.1) (04-2020): "Multi-access Edge Computing (MEC); Device application interface".
10. ETSI GS MEC 021 (V2.2.1) (02-2022): "Multi-access Edge Computing (MEC); Application Mobility Service API".
11. ETSI GS MEC 028 (V2.3.1) (07-2022): "Multi-access Edge Computing (MEC); WLAN Access Information API".
12. ETSI GS MEC 029 (V2.2.1) (01-2022): "Multi-access Edge Computing (MEC); Fixed Access Information API".
13. ETSI GS MEC 030 (V3.2.1) (05-2022): "Multi-access Edge Computing (MEC); V2X Information Service API".
14. ETSI GR MEC-DEC 025 (V2.1.1) (06-2019): "Multi-access Edge Computing (MEC); MEC Testing Framework".
15. ETSI GR MEC 001 (V3.1.1) (01-2022): "Multi-access Edge Computing (MEC); Terminology".
16. ETSI GR MEC 003 (V3.1.1): Multi-access Edge Computing (MEC);
Framework and Reference Architecture
17. [The Wiki MEC web site](https://www.etsi.org/technologies/multi-access-edge-computing)
+2 −0
Original line number Diff line number Diff line
@@ -1331,6 +1331,8 @@ func fedNotify(msg string, systemId string) {
}

func sendNotification(notifyUrl string, notification SystemUpdateNotification) {
	log.Info(">>> sendNotification: ", notifyUrl)

	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
+9 −4
Original line number Diff line number Diff line
@@ -2135,7 +2135,7 @@ func processV2xMsgSubscription(bodyBytes []byte, link *Links, subsIdStr string,
	// Store subscription key in redis
	keyName := baseKey + "subscriptions:" + subsIdStr
	log.Info("processV2xMsgSubscription: keyName: ", keyName)
	log.Info("processV2xMsgSubscription: provChgUuUniSubscription: ", v2xSubscription)
	log.Info("processV2xMsgSubscription: v2xSubscription: ", v2xSubscription)
	err = rc.JSONSetEntry(keyName, ".", convertV2xMsgSubscriptionToJson(v2xSubscription))
	if err != nil {
		log.Error(err.Error())
@@ -3114,9 +3114,14 @@ func individualSubscriptionDelete(w http.ResponseWriter, r *http.Request) {
func v2xNotify(v2xMessage []byte, v2xType int32, msgProtocolVersion int32, stdOrganization string, longitude *float32, latitude *float32) {
	log.Debug(">>> v2xNotify: ", v2xMessage)

	locationInfoGeoArea := LocationInfoGeoArea{*latitude, *longitude}
	locationInfo := LocationInfo{nil, &locationInfoGeoArea}
	msgPropertiesValues := V2xMsgPropertiesValues{&locationInfo, msgProtocolVersion, string(v2xType), stdOrganization}
	var locationInfo *LocationInfo = nil
	if (longitude != nil) && (latitude != nil) {
		var locationInfoGeoArea = LocationInfoGeoArea{*latitude, *longitude}
		locationInfo = new(LocationInfo)
		locationInfo.Ecgi = nil
		locationInfo.GeoArea = &locationInfoGeoArea
	}
	msgPropertiesValues := V2xMsgPropertiesValues{locationInfo, msgProtocolVersion, string(v2xType), stdOrganization}
	v2xMsgNotification := V2xMsgNotification{
		Links:                   nil,
		MsgContent:              hex.EncodeToString(v2xMessage),
+31 −21
Original line number Diff line number Diff line
@@ -30,23 +30,27 @@ type message_broker_mqtt struct {
	running    bool
	opts       *mqtt.ClientOptions
	client     mqtt.Client
	v2x_notify func(v2xMessage []byte, v2xType int32, msgProtocolVersion int32, stdOrganization string, longitude *float32, latitude *float32)
}

// var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
// 	log.Info("Received message: ", msg.Payload(), "on topic ", msg.Topic())
// 	fmt.Println("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
// }
var _v2x_notify func(v2xMessage []byte, v2xType int32, msgProtocolVersion int32, stdOrganization string, longitude *float32, latitude *float32)

func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("Received message: ", msg.Payload(), "on topic ", msg.Topic())
		fmt.Println("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
		log.Info("onMessageReceived: Received message: ", msg.Payload(), " on topic ", msg.Topic())
		if _v2x_notify != nil {
			if msg.Topic() == "3gpp/v2x/obu/vam" { // FIXME FSCOM Need to manage how to extract message type & message version
				_v2x_notify(msg.Payload(), 16, 2, "ETSI", nil, nil)
			}
		} else {
			log.Info("onMessageReceived: null pointer for the callbacl")
		}
	}()
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	go func() {
		log.Info("Connected")
		fmt.Println("====> Connected")
		log.Info("mqtt.OnConnectHandler: Connected")
	}()
}

@@ -55,7 +59,7 @@ var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err
}

func (broker_mqtt *message_broker_mqtt) Init(tm *TrafficMgr) (err error) {
	log.Debug(">>> message_broker_mqtt: Init")
	log.Debug(">>> Init")

	broker_mqtt.running = false

@@ -66,17 +70,18 @@ func (broker_mqtt *message_broker_mqtt) Init(tm *TrafficMgr) (err error) {
	}

	broker_mqtt.opts = mqtt.NewClientOptions()
	broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived /*messagePubHandler*/)
	//broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
	broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived)
	broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
	broker_mqtt.opts.OnConnect = connectHandler
	broker_mqtt.opts.OnConnectionLost = connectLostHandler
	//broker_mqtt.opts.SetUsername("emqx")
	//broker_mqtt.opts.SetPassword("public")
	log.Info("Add brocker: ", fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
	broker_mqtt.opts.CleanSession = true
	broker_mqtt.opts.SetUsername("")
	broker_mqtt.opts.SetPassword("")
	log.Info("Init: Add brocker: ", fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
	broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
	broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)

	log.Info("Connect to MQTT server...")
	log.Info("Init: Connect to MQTT server...")
	token := broker_mqtt.client.Connect()
	if token.Error() != nil {
		log.Error(token.Error())
@@ -85,16 +90,21 @@ func (broker_mqtt *message_broker_mqtt) Init(tm *TrafficMgr) (err error) {
	token.Wait()

	// Subscribe
	log.Info("Subscribe to: ", tm.topic)
	token = broker_mqtt.client.Subscribe(tm.topic, 0, nil) // qos:0
	log.Info("Init: Subscribe to: ", tm.topic+"/+")
	token = broker_mqtt.client.Subscribe(tm.topic+"/+", 0, nil) // qos:0
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	_v2x_notify = broker_mqtt.v2x_notify
	if _v2x_notify == nil {
		log.Error("Init: _v2x_notify is nil")
	}

	broker_mqtt.running = true
	log.Info("mqtt.Init: Client is connected")
	log.Info("Init: Client is connected")

	return nil
}
Loading