Skip to content
MEC application.ipynb 208 KiB
Newer Older
    "In this clause, we use the following functionalities provided by MEC-030:\n",
    "- Getting UU unicast provisioning information (ETSI GS MEC 030 Clause 5.5.1)\n",
    "- Subscribe to the V2X message distribution server (ETSI GS MEC 030 Clause 5.5.7)\n",
    "- Delete subscription\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Getting UU unicast provisioning information\n",
    "\n",
    "The purpose is to query provisioning information for V2X communication over Uu unicast."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
Yann Garcia's avatar
Yann Garcia committed
    "def send_uu_unicast_provisioning_info(sandbox_name: str, ecgi: str) -> object:\n",
    "    \"\"\"\n",
    "    Request for V2X communication over Uu unicast information\n",
    "    :param sandbox_name: The MEC Sandbox instance to use\n",
    "    :param ecgi: Comma separated list of locations to identify a cell of a base station or a particular geographical area\n",
    "    :return The Uu unicast provisioning information on success, None otherwise\n",
    "    :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.1 Sending a request for provisioning information for V2X communication over Uu unicast\n",
    "    \"\"\"\n",
    "    global MEC_PLTF, logger, service_api\n",
    "    logger.debug('>>> send_uu_unicast_provisioning_info: ' + ecgi)\n",
    "        url = '/{sandbox_name}/{mec_pltf}/vis/v2/queries/uu_unicast_provisioning_info'\n",
    "        logger.debug('send_uu_unicast_provisioning_info: url: ' + url)\n",
    "        path_params = {}\n",
    "        path_params['sandbox_name'] = sandbox_name\n",
    "        path_params['mec_pltf'] = MEC_PLTF\n",
    "        query_params = []\n",
    "        query_params.append(('location_info', 'ecgi,' + ecgi))\n",
    "        header_params = {}\n",
    "        # HTTP header `Accept`\n",
    "        header_params['Accept'] = 'application/json'  # noqa: E501\n",
    "        # HTTP header `Content-Type`\n",
    "        header_params['Content-Type'] = 'application/json'  # noqa: E501\n",
Yann Garcia's avatar
Yann Garcia committed
    "        (result, status, header) = service_api.call_api(url, 'GET', header_params=header_params, path_params=path_params, query_params=query_params, async_req=False)\n",
    "        return (result, status, header)\n",
    "    except ApiException as e:\n",
    "        logger.error('Exception when calling call_api: %s\\n' % e)\n",
    "    return (None, status, None)\n",
    "    # End of function send_uu_unicast_provisioning_info"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's create the our second MEC application.\n",
    "The sequence is the following:\n",
    "- Mec application setup\n",
    "- Get UU unicast provisioning information\n",
    "- Mec application termination\n",
    "\n",
    "Note that the UU unicast provisioning information is returned as a JSON string. To de-serialized it into a Python data structure, please refer to clause [Subscribing to V2X message distribution server](#subscribing_to_v2x_message_distribution_server)."
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Putting everything together"
   ]
  },
   "cell_type": "code",
Yann Garcia's avatar
Yann Garcia committed
   "source": [
Yann Garcia's avatar
Yann Garcia committed
    "%%script echo skipping\n",
    "# Uncomment the ;line above to skip execution of this cell\n",
Yann Garcia's avatar
Yann Garcia committed
    "def process_main():\n",
    "    \"\"\"\n",
    "    This is the second sprint of our skeleton of our MEC application:\n",
    "        - Mec application setup\n",
    "        - Get UU unicast provisioning information\n",
    "        - Mec application termination\n",
    "    \"\"\" \n",
    "    global logger, nw_scenarios\n",
    "\n",
    "    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
    "    logger.debug('\\t pwd= ' + os.getcwd())\n",
    "\n",
    "    # Setup the MEC application\n",
    "    (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n",
    "\n",
    "    # Get UU unicast provisioning information\n",
    "    ecgi = \"268708941961,268711972264\" # List of ecgi spearated by a ','\n",
    "    result, status, header = send_uu_unicast_provisioning_info(sandbox_name, ecgi)\n",
    "    logger.info('UU unicast provisioning information: status: %s', str(status))\n",
    "    if status != 200:\n",
    "        logger.error('Failed to get UU unicast provisioning information')\n",
    "    else:\n",
    "        logger.info('UU unicast provisioning information: %s', str(result.data))\n",
    "\n",
    "    # Any processing comes here\n",
    "    logger.info('body: ' + str(result.data))\n",
    "    data = json.loads(result.data)\n",
    "    logger.info('data: ' + str(data))\n",
    "\n",
    "    # Terminate the MEC application\n",
    "    mec_app_termination(sandbox_name, app_inst_id, sub_id)\n",
    "\n",
    "    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
    "    # End of function process_main\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    process_main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Subscribing to V2X message distribution server\n",
    "\n",
    "Here, we need to come back to the MEC 030 standard to create the type V2xMsgSubscription. It involves the creation of a set of basic types described below.\n",
    "\n",
    "These new type shall be 'JSON' serializable. It means that they have to implement the following methods:\n",
    "- to_dict()\n",
    "- to_str()\n",
    "- \\_\\_repr\\_\\_()\n",
    "- \\_\\_eq\\_\\_()\n",
    "- \\_\\_ne\\_\\_()\n",
    "**Reference:** ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.5.13 Type: LinkType\n"
Yann Garcia's avatar
Yann Garcia committed
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class LinkType(object):\n",
    "    swagger_types = {'href': 'str'}\n",
    "    attribute_map = {'href': 'href'}\n",
    "    def __init__(self, href=None):  # noqa: E501\n",
    "        self._href = None\n",
    "        if href is not None:\n",
    "            self._href = href\n",
    "    @property\n",
    "    def href(self):\n",
    "        return self._href\n",
    "    @href.setter\n",
    "    def href(self, href):\n",
    "        self._href = href\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(\n",
    "                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(LinkType, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, LinkType):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other\n",
    "\n",
    "class Links(object):\n",
    "    swagger_types = {'self': 'LinkType'}\n",
    "    attribute_map = {'self': 'self'}\n",
    "    def __init__(self, self_=None):  # noqa: E501\n",
    "        self._self = None\n",
    "        if self_ is not None:\n",
    "            self._self = self_\n",
    "    @property\n",
    "    def self_(self):\n",
    "        return self._self\n",
    "    @self_.setter\n",
    "    def self_(self, self_):\n",
    "        self._self = self_\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(\n",
    "                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(Links, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, Links):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other\n",
    "\n",
    "class TimeStamp(object):\n",
    "    swagger_types = {'seconds': 'int', 'nano_seconds': 'int'}\n",
    "    attribute_map = {'seconds': 'seconds', 'nano_seconds': 'nanoSeconds'}\n",
    "    def __init__(self, seconds=None, nano_seconds=None):  # noqa: E501\n",
    "        self._seconds = None\n",
    "        self._nano_seconds = None\n",
    "        if seconds is not None:\n",
    "            self._seconds = seconds\n",
    "        if nano_seconds is not None:\n",
    "            self._nano_seconds = nano_seconds\n",
    "    @property\n",
    "    def seconds(self):\n",
    "        return self._seconds\n",
    "    @seconds.setter\n",
    "    def seconds(self, seconds):\n",
    "        self._seconds = seconds\n",
    "    @property\n",
    "    def nano_seconds(self):\n",
    "        return self._nano_seconds\n",
    "    @nano_seconds.setter\n",
    "    def nano_seconds(self, nano_seconds):\n",
    "        self._nano_seconds = nano_seconds\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(\n",
    "                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(TimeStamp, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, TimeStamp):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
    "The cell below implements the V2xMsgSubscription data structure.\"}\n",
    "Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.3.5 Type: V2xMsgSubscription\n"
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class V2xMsgSubscription(object):\n",
    "    swagger_types = {'links': 'Links', 'callback_reference': 'str', 'filter_criteria': 'V2xMsgSubscriptionFilterCriteria', 'request_test_notification': 'bool', 'subscription_type': 'str'}\n",
    "    attribute_map = {'links': 'Links', 'callback_reference': 'callbackReference', 'filter_criteria': 'filterCriteria', 'request_test_notification': 'requestTestNotification', 'subscription_type': 'subscriptionType'}\n",
    "    def __init__(self, links=None, callback_reference=None, filter_criteria=None, request_test_notification=None):  # noqa: E501\n",
    "        self._links = None\n",
    "        self._callback_reference = None\n",
    "        self._filter_criteria = None\n",
    "        self._request_test_notification = None\n",
    "        self._subscription_type = \"V2xMsgSubscription\"\n",
    "        if links is not None:\n",
    "            self.links = links\n",
    "        if callback_reference is not None:\n",
    "            self.callback_reference = callback_reference\n",
    "        if filter_criteria is not None:\n",
    "            self.filter_criteria = filter_criteria\n",
    "        if request_test_notification is not None:\n",
    "            self.request_test_notification = request_test_notification\n",
    "    @property\n",
    "    def links(self):\n",
    "        return self._links\n",
    "    @links.setter\n",
    "    def links(self, links):\n",
    "        self_.links = links\n",
    "    @property\n",
    "    def callback_reference(self):\n",
    "        return self._callback_reference\n",
    "    @callback_reference.setter\n",
    "    def callback_reference(self, callback_reference):\n",
    "        self._callback_reference = callback_reference\n",
    "    @property\n",
    "    def links(self):\n",
    "        return self._links\n",
    "    @links.setter\n",
    "    def links(self, links):\n",
    "        self._links = links\n",
    "    @property\n",
    "    def filter_criteria(self):\n",
    "        return self._filter_criteria\n",
    "    @filter_criteria.setter\n",
    "    def filter_criteria(self, filter_criteria):\n",
    "        self._filter_criteria = filter_criteria\n",
    "    @property\n",
    "    def request_test_notification(self):\n",
    "        return self._request_test_notification\n",
    "    @request_test_notification.setter\n",
    "    def request_test_notification(self, request_test_notification):\n",
    "        self._request_test_notification = request_test_notification\n",
    "    @property\n",
    "    def subscription_type(self):\n",
    "        return self._subscription_type\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(\n",
    "                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(V2xMsgSubscription, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, V2xMsgSubscription):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other\n",
    "\n",
    "class V2xMsgSubscriptionFilterCriteria(object):\n",
    "    swagger_types = {'msg_type': 'list[str]', 'std_organization': 'str'}\n",
    "    attribute_map = {'msg_type': 'MsgType', 'std_organization': 'stdOrganization'}\n",
    "    def __init__(self, msg_type, std_organization):  # noqa: E501\n",
    "        self._msg_type = None\n",
    "        self._std_organization = None\n",
    "        self.msg_type = msg_type\n",
    "        self.std_organization = std_organization\n",
    "    @property\n",
    "    def msg_type(self):\n",
    "        return self._msg_type\n",
    "    @msg_type.setter\n",
    "    def msg_type(self, msg_type):\n",
    "        self._msg_type = msg_type\n",
    "    @property\n",
    "    def std_organization(self):\n",
    "        return self._std_organization\n",
    "    @std_organization.setter\n",
    "    def std_organization(self, std_organization):\n",
    "        self._std_organization = std_organization\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(\n",
    "                    lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(V2xMsgSubscriptionFilterCriteria, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, V2xMsgSubscriptionFilterCriteria):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other"
   ]
  },
Yann Garcia's avatar
Yann Garcia committed
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here is the V2X message subscription function. The HTTP Request message body contains a 'JSON' serialized instance of the class V2xMsgSubscription.\n",
    "\n",
    "Reference: ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.10 V2X message interoperability\n"
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
Yann Garcia's avatar
Yann Garcia committed
    "def subscribe_v2x_message(sandbox_name: str, v2xMsgSubscription: V2xMsgSubscription) -> object:\n",
    "    \"\"\"\n",
    "    Request to subscribe the V2X messages which come from different vehicle OEMs or operators\n",
    "    :param sandbox_name: The MEC Sandbox instance to use\n",
    "    :param app_inst_id: The MEC application instance identifier\n",
    "    :param sub_id: The subscription identifier\n",
    "    :return The HTTP response, the HTTP response status, the subscription identifier and the subscription URL on success, None otherwise\n",
    "    :see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 5.5.10 V2X message interoperability\n",
    "    \"\"\"\n",
    "    global MEC_PLTF, logger, service_api\n",
    "    logger.debug('>>> subscribe_v2x_message: v2xMsgSubscription: ' + str(v2xMsgSubscription))\n",
    "        url = '/{sandbox_name}/{mec_pltf}/vis/v2/subscriptions'\n",
    "        logger.debug('subscribe_v2x_message: url: ' + url)\n",
    "        path_params = {}\n",
    "        path_params['sandbox_name'] = sandbox_name\n",
    "        path_params['mec_pltf'] = MEC_PLTF\n",
    "        header_params = {}\n",
    "        # HTTP header `Accept`\n",
    "        header_params['Accept'] = 'application/json'  # noqa: E501\n",
    "        # HTTP header `Content-Type`\n",
    "        header_params['Content-Type'] = 'application/json'  # noqa: E501\n",
    "        (result, status, headers) = service_api.call_api(url, 'POST', header_params=header_params, path_params=path_params, body=v2xMsgSubscription, async_req=False)\n",
Yann Garcia's avatar
Yann Garcia committed
    "        return (result, status, extract_sub_id(headers['Location']), headers['Location'])\n",
    "    except ApiException as e:\n",
    "        logger.error('Exception when calling call_api: %s\\n' % e)\n",
    "    return (None, status, None)\n",
    "    # End of function subscribe_v2x_message"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here is a generic function to delete any MEC service subscription based on the subscription resource URL provided in the Location header of the subscription creation response."
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def delete_mec_subscription(resource_url: str) -> int:\n",
    "    \"\"\"\n",
    "    Delete any existing MEC subscription\n",
    "    :param resource_url: The subscription URL\n",
    "    :return 0 on success, -1 otherwise\n",
    "    \"\"\"\n",
    "    global MEC_PLTF, logger, service_api\n",
    "\n",
    "    logger.debug('>>> delete_mec_subscription: resource_url: ' + resource_url)\n",
    "    try:\n",
    "        res = urllib3.util.parse_url(resource_url)\n",
    "        if res is None:\n",
    "            logger.error('delete_mec_subscription: Failed to paerse URL')\n",
    "            return -1\n",
    "        header_params = {}\n",
    "        # HTTP header `Accept`\n",
    "        header_params['Accept'] = 'application/json'  # noqa: E501\n",
    "        # HTTP header `Content-Type`\n",
    "        header_params['Content-Type'] = 'application/json'  # noqa: E501\n",
    "        service_api.call_api(res.path, 'DELETE', header_params=header_params, async_req=False)\n",
    "        return 0\n",
    "    except ApiException as e:\n",
    "        logger.error('Exception when calling call_api: %s\\n' % e)\n",
    "    return -1\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finaly, here is how to implement the V2X message subscription:"
   ]
  },
  {
   "cell_type": "code",
Yann Garcia's avatar
Yann Garcia committed
   "execution_count": null,
Yann Garcia's avatar
Yann Garcia committed
   "outputs": [],
Yann Garcia's avatar
Yann Garcia committed
    "%%script echo skipping\n",
    "# Uncomment the ;line above to skip execution of this cell\n",
    "def process_main():\n",
    "    \"\"\"\n",
    "    This is the second sprint of our skeleton of our MEC application:\n",
    "        - Mec application setup\n",
    "        - Subscribe to V2XMessage\n",
    "        - Delete subscription\n",
    "        - Mec application termination\n",
    "    \"\"\" \n",
    "    global MEC_PLTF, CALLBACK_URI, logger\n",
    "\n",
    "    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
    "    logger.debug('\\t pwd= ' + os.getcwd())\n",
    "\n",
    "    # Setup the MEC application\n",
    "    (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n",
    "    # Create a V2X message subscritpion\n",
    "    filter_criteria = V2xMsgSubscriptionFilterCriteria(['1', '2'], 'ETSI')\n",
    "    v2xMsgSubscription = V2xMsgSubscription(callback_reference = CALLBACK_URI + '/vis/v2/v2x_msg_notification', filter_criteria = filter_criteria)\n",
Yann Garcia's avatar
Yann Garcia committed
    "    result, status, v2x_sub_id, v2x_resource = subscribe_v2x_message(sandbox_name, v2xMsgSubscription)\n",
    "    if status != 201:\n",
    "        logger.error('Failed to create subscription')\n",
    "    # Any processing here\n",
Yann Garcia's avatar
Yann Garcia committed
    "    logger.info('body: ' + str(result.data))\n",
    "    data = json.loads(result.data)\n",
    "    logger.info('data: %s', str(data))\n",
    "    logger.info('app_inst_id: ' + app_inst_id.id)\n",
    "    if sub_id is not None:\n",
    "        logger.info('sub_id: ' + sub_id)\n",
    "    time.sleep(STABLE_TIME_OUT)\n",
    "\n",
    "    # Delete the V2X message subscritpion\n",
    "    delete_mec_subscription(v2x_resource)\n",
    "\n",
    "    # Terminate the MEC application\n",
    "    mec_app_termination(sandbox_name, app_inst_id, sub_id)\n",
    "\n",
    "    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
    "    # End of function process_main\n",
    "\n",
    "if __name__ == '__main__':\n",
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Putting everything together\n",
Yann Garcia's avatar
Yann Garcia committed
    "\n",
    "let's add a subscription the our previous MEC application.\n",
    "The sequence is the following:\n",
    "- Mec application setup\n",
    "- Start the notification server\n",
Yann Garcia's avatar
Yann Garcia committed
    "- Get UU unicast provisioning information\n",
    "- Add subscription\n",
    "- Stop the notification server\n",
Yann Garcia's avatar
Yann Garcia committed
    "- Mec application termination"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
Yann Garcia's avatar
Yann Garcia committed
    "%%script echo skipping\n",
    "# Uncomment the ;line above to skip execution of this cell\n",
Yann Garcia's avatar
Yann Garcia committed
    "def process_main():\n",
    "    \"\"\"\n",
    "    This is the third sprint of our skeleton of our MEC application:\n",
Yann Garcia's avatar
Yann Garcia committed
    "        - Mec application setup\n",
    "        - Start the notification server\n",
Yann Garcia's avatar
Yann Garcia committed
    "        - Get UU unicast provisioning information\n",
    "        - Add subscription\n",
    "        - Stop the notification server\n",
Yann Garcia's avatar
Yann Garcia committed
    "        - Mec application termination\n",
    "    \"\"\" \n",
    "    global CALLBACK_URI, logger\n",
Yann Garcia's avatar
Yann Garcia committed
    "\n",
    "    logger.debug('Starting at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
    "    logger.debug('\\t pwd= ' + os.getcwd())\n",
    "\n",
    "    # Setup the MEC application\n",
    "    (sandbox_name, app_inst_id, sub_id) = mec_app_setup()\n",
    "\n",
    "    # Get UU unicast provisioning information\n",
    "    ecgi = \"268708941961,268711972264\" # List of ecgi spearated by a ','\n",
    "    result = send_uu_unicast_provisioning_info(sandbox_name, ecgi)\n",
    "    if result is None:\n",
    "        logger.error('Failed to get UU unicast provisioning information')\n",
    "    else:\n",
    "        logger.info('UU unicast provisioning information: ' + str(result))\n",
Yann Garcia's avatar
Yann Garcia committed
    "\n",
    "    # Start notification server in a daemonized thread\n",
    "    httpd = start_notification_server()\n",
Yann Garcia's avatar
Yann Garcia committed
    "\n",
    "    # Create a V2X message subscritpion\n",
    "    filter_criteria = V2xMsgSubscriptionFilterCriteria(['1', '2'], 'ETSI')\n",
    "    v2xMsgSubscription = V2xMsgSubscription(callback_reference = CALLBACK_URI + '/vis/v2/v2x_msg_notification', filter_criteria = filter_criteria)\n",
    "    result, status, v2x_sub_id, v2x_resource = subscribe_v2x_message(sandbox_name, v2xMsgSubscription)\n",
    "    if status != 201:\n",
    "        logger.error('Failed to create subscription')\n",
    "\n",
    "    # Any processing here\n",
    "    logger.info('body: ' + str(result.data))\n",
    "    data = json.loads(result.data)\n",
    "    logger.info('data: %s', str(data))\n",
    "    logger.info('v2x_resource: ' + v2x_resource)\n",
    "    if sub_id is not None:\n",
    "        logger.info('sub_id: ' + sub_id)\n",
    "    time.sleep(STABLE_TIME_OUT)\n",
    "\n",
    "    # Stop notification server\n",
    "    stop_notification_server(httpd)\n",
    "\n",
Yann Garcia's avatar
Yann Garcia committed
    "    # Delete the V2X message subscritpion\n",
    "    delete_mec_subscription(v2x_resource)\n",
    "\n",
    "    # Terminate the MEC application\n",
    "    mec_app_termination(sandbox_name, app_inst_id, sub_id)\n",
    "\n",
    "    logger.debug('Stopped at ' + time.strftime('%Y%m%d-%H%M%S'))\n",
    "    # End of function process_main\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    process_main()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create our fourth MEC application: how to use V2X QoS Prediction\n",
Yann Garcia's avatar
Yann Garcia committed
    "\n",
    "The MEC Sanbox V2X QoS Prediction is based on a grid Map of Monaco City where areas are categorized into residential, commercial and coastal. \n",
    "PoAs (Point Of Access) are categorized depending on where they lie in each grid. \n",
    "Each category has its own traffic load patterns which are pre-determin. The V2X QoS Prediction) will give more accurate values of RSRP and RSRQ based on the diurnal traffic patterns for each. The network scenario named \"4g-5g-v2x-macro\" must be used to get access to the V2X QoS Prediction feature.\n",
    "\n",
    "**Note:** The MEC Sanbox V2X QoS Prediction is enabled when the PredictedQos.routes.routeInfo.time attribute is present in the request (see ETSI GS MEC 030 V3.2.1 (2024-02) Clause 6.2.6 Type: Preditecd QoS)\n",
    "\n",
    "Limitations:\n",
    "* The Location Granularity is currently not being validated as RSRP/RSRP calculations are done at the exact location provided by the user.\n",
    "* Time Granularity is currently not supported by the Prediction Function (design limitations of the minimal, emulated, pre-determined traffic prediction)\n",
    "* Upper limit on the number of elements (10 each) in the routes and routeInfo structures (arrays) to not affect user experience and respoy\n",
Yann Garcia's avatar
Yann Garcia committed
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The table below describes the excepted Qos with and without the prediction model in deiffrent area and at different time.\n",
    "\n",
    "|     Location        |               Time                  |      PoA         |  Category   |    Status     | QoS without Prediction Model | QoS with Prediction Model | Expected |\n",
    "| \t              | (Unix time in sec) | Standard (GMT) |                  |             |               |    RSRP        |   RSRQ      |    RSRP     |   RSRQ      |          |\n",
    "| ------------------- | -----------        | -------------- | ---------------- | ----------- | ------------- | -------------- | ----------- | ----------- | ----------- | -------- |\n",
    "| 43.729416,7.414853  |     1653295620     |    08:47:00    | 4g-macro-cell-2  | Residential | Congested     |     63         |     21      |     60      |     20      |   Yes    |\n",
    "| 43.732456,7.418417  |     1653299220     |    09:47:00    | 4g-macro-cell-3  | Residential | Not Congested |     55         |     13      |     55      |     13      |   Yes    |\n",
    "| 43.73692,7.4209256  |     1653302820     |    10:47:00    | 4g-macro-cell-6  | Coastal     | Not Congested |     68         |     26      |     68      |     26      |   Yes    |\n",
    "| 43.738007,7.4230533 |     1653305220     |    11:27:00    | 4g-macro-cell-6  | Coastal     | Not Congested |     55         |     13      |     55      |     13      |   Yes    |\n",
    "| 43.739685,7.424881  |     1653308820     |    12:27:00    | 4g-macro-cell-7  | Commercial  | Congested     |     63         |     21      |     40      |     13      |   Yes    |\n",
    "| 43.74103,7.425759   |     1653312600     |    13:30:00    | 4g-macro-cell-7  | Commercial  | Congested     |     56         |     14      |     40      |     8       |   Yes    |\n",
    "| 43.74258,7.4277945  |     1653315900     |    14:25:00    | 4g-macro-cell-8  | Coastal     | Congested     |     59         |     17      |     47      |     13      |   Yes    |\n",
    "| 43.744972,7.4295254 |     1653318900     |    15:15:00    | 4g-macro-cell-8  | Coastal     | Congested     |     53         |     11      |     40      |     5       |   Yes    |\n",
    "| 43.74773,7.4320855  |     1653322500     |    16:15:00    | 5g-small-cell-14 | Commercial  | Congested     |     78         |     69      |     60      |     53      |   Yes    |\n",
    "| 43.749264,7.435894  |     1653329700     |    18:15:00    | 5g-small-cell-20 | Commercial  | Not Congested |     84         |     72      |     84      |     72      |  Yes    |\t72\t84\t72\tYes\n",
    "\n"
Yann Garcia's avatar
Yann Garcia committed
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The image below illustrate the table above: [here](images/V2X Predicted QoS.jpg).\n",
Yann Garcia's avatar
Yann Garcia committed
    "\n",
    "Here is an example of a basic V2X predicted QoS request based on two point in path at 8am in Residential area:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```json\n",
    "{\n",
    "  \"predictionTarget\": \"SINGLE_UE_PREDICTION\",\n",
    "  \"timeGranularity\": null,\n",
    "  \"locationGranularity\": \"30\",\n",
    "  \"routes\": [\n",
    "      {\n",
    "        \"routeInfo\": [\n",
    "          {\n",
    "            \"location\": {\n",
    "              \"geoArea\": {\n",
    "                \"latitude\": 43.729416,\n",
    "                \"longitude\": 7.414853\n",
    "              }\n",
    "            },\n",
    "            \"time\": {\n",
    "              \"nanoSeconds\": 0,\n",
    "              \"seconds\": 1653295620\n",
    "            }\n",
    "          },\n",
    "          {\n",
    "              \"location\": {\n",
    "                \"geoArea\": {\n",
    "                  \"latitude\": 43.732456,\n",
    "                  \"longitude\": 7.418417\n",
    "                }\n",
    "              },\n",
    "            \"time\": {\n",
    "              \"nanoSeconds\": 0,\n",
    "              \"seconds\": 1653299220\n",
    "        ]\n",
    "      }\n",
    "    ]\n",
    "}\n"
Yann Garcia's avatar
Yann Garcia committed
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let first create the required types before to prepare a V2X Predicted QoS request based on the JSON above.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
Yann Garcia's avatar
Yann Garcia committed
   "metadata": {},
   "outputs": [],
   "source": [
    "class Routes(object):\n",
    "    swagger_types = {'_route_info': 'list[RouteInfo]'}\n",
    "    attribute_map = {'_route_info': 'routeInfo'}\n",
    "    def __init__(self, route_info:list):  # noqa: E501\n",
    "        self._route_info = None\n",
    "        self.route_info = route_info\n",
    "    @property\n",
    "    def route_info(self):\n",
    "        return self._route_info\n",
    "    @route_info.setter\n",
    "    def route_info(self, route_info):\n",
    "        if route_info is None:\n",
    "            raise ValueError(\"Invalid value for `route_info`, must not be `None`\")  # noqa: E501\n",
    "        self._route_info = route_info\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(Routes, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, Routes):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other\n",
    "\n",
    "class LocationInfo(object):\n",
    "    swagger_types = {'_ecgi': 'Ecgi', '_geo_area': 'LocationInfoGeoArea'}\n",
    "    attribute_map = {'_ecgi': 'ecgi', '_geo_area': 'geoArea'}\n",
    "    def __init__(self, ecgi=None, geo_area=None):  # noqa: E501\n",
    "        self._ecgi = None\n",
    "        self._geo_area = None\n",
    "        self.discriminator = None\n",
    "        if ecgi is not None:\n",
    "            self.ecgi = ecgi\n",
    "        if geo_area is not None:\n",
    "            self.geo_area = geo_area\n",
    "    @property\n",
    "    def ecgi(self):\n",
    "        return self._ecgi\n",
    "    @ecgi.setter\n",
    "    def ecgi(self, ecgi):\n",
    "        self._ecgi = ecgi\n",
    "    @property\n",
    "    def geo_area(self):\n",
    "        return self._geo_area\n",
    "    @geo_area.setter\n",
    "    def geo_area(self, geo_area):\n",
    "        self._geo_area = geo_area\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(LocationInfo, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, LocationInfo):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other\n",
    "\n",
    "class RouteInfo(object):\n",
    "    swagger_types = {'_location': 'LocationInfo', '_time_stamp': 'TimeStamp'}\n",
    "    attribute_map = {'_location': 'location', '_time_stamp': 'time'}\n",
    "    def __init__(self, location:LocationInfo, time_stamp=None):  # noqa: E501\n",
    "        self._location = None\n",
    "        self.location = location\n",
    "        self._time_stamp = None\n",
    "        if time_stamp is not None:\n",
    "            self.time_stamp = time_stamp\n",
    "    @property\n",
    "    def location(self):\n",
    "        return self._location\n",
    "    @location.setter\n",
    "    def location(self, location):\n",
    "        if location is None:\n",
    "            raise ValueError(\"Invalid value for `location`, must not be `None`\")  # noqa: E501\n",
    "        self._location = location\n",
    "    @property\n",
    "    def time_stamp(self):\n",
    "        return self._time_stamp\n",
    "    @time_stamp.setter\n",
    "    def time_stamp(self, time_stamp):\n",
    "        self._time_stamp = time_stamp\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(RouteInfo, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, RouteInfo):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other\n",
    "\n",
    "class LocationInfoGeoArea(object):\n",
    "    swagger_types = {'_latitude': 'float', '_longitude': 'float'}\n",
    "    attribute_map = {'_latitude': 'latitude', '_longitude': 'longitude'}\n",
    "    def __init__(self, latitude, longitude):  # noqa: E501\n",
    "        self._latitude = None\n",
    "        self._longitude = None\n",
    "        self.discriminator = None\n",
    "        if latitude is not None:\n",
    "            self.latitude = latitude\n",
    "        if longitude is not None:\n",
    "            self.longitude = longitude\n",
    "    @property\n",
    "    def latitude(self):\n",
    "        return self._latitude\n",
    "    @latitude.setter\n",
    "    def latitude(self, latitude):\n",
    "        if latitude is None:\n",
    "            raise ValueError(\"Invalid value for `latitude`, must not be `None`\")  # noqa: E501\n",
    "        self._latitude = latitude\n",
    "    @property\n",
    "    def longitude(self):\n",
    "        return self._longitude\n",
    "    @longitude.setter\n",
    "    def longitude(self, longitude):\n",
    "        if longitude is None:\n",
    "            raise ValueError(\"Invalid value for `longitude`, must not be `None`\")  # noqa: E501\n",
    "        self._longitude = longitude\n",
    "    def to_dict(self):\n",
    "        result = {}\n",
    "        for attr, _ in six.iteritems(self.swagger_types):\n",
    "            value = getattr(self, attr)\n",
    "            if isinstance(value, list):\n",
    "                result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, 'to_dict') else x,value))\n",
    "            elif hasattr(value, 'to_dict'):\n",
    "                result[attr] = value.to_dict()\n",
    "            elif isinstance(value, dict):\n",
    "                result[attr] = dict(map(\n",
    "                    lambda item: (item[0], item[1].to_dict())\n",
    "                    if hasattr(item[1], 'to_dict') else item,\n",
    "                    value.items()\n",
    "                ))\n",
    "            else:\n",
    "                result[attr] = value\n",
    "        if issubclass(LocationInfoGeoArea, dict):\n",
    "            for key, value in self.items():\n",
    "                result[key] = value\n",
    "        return result\n",
    "    def to_str(self):\n",
    "        return pprint.pformat(self.to_dict())\n",
    "    def __repr__(self):\n",
    "        return self.to_str()\n",
    "    def __eq__(self, other):\n",
    "        if not isinstance(other, LocationInfoGeoArea):\n",
    "            return False\n",
    "        return self.__dict__ == other.__dict__\n",
    "    def __ne__(self, other):\n",
    "        return not self == other\n",
    "\n",
    "class Ecgi(object):\n",
    "    swagger_types = {'_cellId': 'CellId', '_plmn': 'Plmn'}\n",
    "    attribute_map = {'_cellId': 'cellId', '_plmn': 'plmn'}\n",
    "    def __init__(self, cellId=None, plmn=None):  # noqa: E501\n",
    "        self._cellId = None\n",
    "        self._plmn = None\n",
    "        self.discriminator = None\n",