Commit 88c5b30d authored by Adriana Fernández-Fernández's avatar Adriana Fernández-Fernández
Browse files

Replaces i2edge client by the SDK

parent 33139876
Loading
Loading
Loading
Loading

src/clients/i2edge.py

deleted100644 → 0
+0 −335
Original line number Diff line number Diff line
# -------------------------------------------------------------------------- #
# Copyright 2025-present, Federation Manager, by Software Networks, i2CAT    #
#                                                                            #
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
# not use this file except in compliance with the License. You may obtain    #
# a copy of the License at                                                   #
#                                                                            #
# http://www.apache.org/licenses/LICENSE-2.0                                 #
#                                                                            #
# Unless required by applicable law or agreed to in writing, software        #
# distributed under the License is distributed on an "AS IS" BASIS,          #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
# See the License for the specific language governing permissions and        #
# limitations under the License.                                             #
# -------------------------------------------------------------------------- #

import requests
from configparser import ConfigParser
import json
import base64
import os
from requests.models import Response
import util

CONFIG = ConfigParser()
CONFIG.read("conf/config.cfg")
HOST = CONFIG.get("i2edge", "host")
PORT = int(CONFIG.get("i2edge", "port"))
TIMEOUT_REQUESTS = 20


def get_zones_list():

    url = f"http://{HOST}:{PORT}/zones/list"
    response = requests.get(url, timeout=TIMEOUT_REQUESTS)

    if response.status_code != 200:
        return None

    data = response.json()

    return data


def get_zone_by_zone_id(zone_id):

    url = f"http://{HOST}:{PORT}/zone/{zone_id}"
    response = requests.get(url, timeout=TIMEOUT_REQUESTS)

    if response.status_code != 200:
        return None

    data = response.json()

    return data


def get_zones():

  url = f"http://{HOST}:{PORT}/zones"
  response = requests.get(url, timeout=TIMEOUT_REQUESTS)

  if response.status_code != 200:
    return None

  data = response.json()

  return data

def onboarding_artefact(body):
    posted = True
    params = {}
    files = {}
    file_path = ""

    if body.repo_type == util.RepoType.UPLOAD.value:
        base64_data = body.artefact_file

        # Decode base64_data to obtain file
        decoded_data = base64.b64decode(base64_data)

        # Define the path to save the decoded file
        file_path = f'/tmp/{body.artefact_name}.tgz'

        # Save the decoded bytes to a file
        with open(file_path, 'wb') as file:
             file.write(decoded_data)

        # Define the tar.gz file to be uploaded
        files = {
            'artefact_file': (f'{body.artefact_id}-{body.artefact_version_info}.tgz', open(file_path, 'rb'), 'application/x-compressed-tar')
        }

        params = {
            'artefact_id': body.artefact_id,
            'name': body.artefact_name,
            'repo_type': util.RepoType.UPLOAD.value
        }

    if body.repo_type == util.RepoType.PUBLIC.value:
        params = {
            'artefact_id': body.artefact_id,
            'name': body.artefact_name,
            'repo_url': body.artefact_repo_location.repo_url,
            'repo_type': util.RepoType.PUBLIC.value
        }

    if body.repo_type == util.RepoType.PRIVATE.value:
        params = {
            'artefact_id': body.artefact_id,
            'name': body.artefact_name,
            'repo_url': body.artefact_repo_location.repo_url,
            'repo_user_name': body.artefact_repo_location.user_name,
            'repo_password': body.artefact_repo_location.password,
            'repo_token': body.artefact_repo_location.token,
            'repo_type': util.RepoType.PRIVATE.value
        }

    url = f"http://{HOST}:{PORT}/artefact"

    headers = {
        'Accept': 'application/json'
    }

    # Make the POST request
    try:
        if body.repo_type == util.RepoType.UPLOAD.value:
            response = requests.post(url, headers=headers, files=files, data=params, timeout=TIMEOUT_REQUESTS)
            # Close the file handles and clean up temporary files
            files['artefact_file'][1].close()
            os.remove(file_path)
        else:
            response = requests.post(url, headers=headers, data=params, timeout=TIMEOUT_REQUESTS)
    except Exception as error:
        if body.repo_type == util.RepoType.UPLOAD.value:
            try:
                # Close the file handles and clean up temporary files
                files['artefact_file'][1].close()
                os.remove(file_path)
            except:
                pass
        definition = f"Unable to upload artefact. Reason: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 422
        response = create_manual_response(content, status_code)
        return response

    return response

def get_helm_chart(artefact_name):
    helm_chart = None

    url = f"http://{HOST}:{PORT}/helmcharts/{artefact_name}"

    response = requests.get(url)

    if response.status_code != 200:
        return None

    helm_chart = response.json()

    return helm_chart[0]

def delete_helm_chart(artefact_name, version):
    deleted = True

    url = f"http://{HOST}:{PORT}/helmchart/{artefact_name}/{version}"

    response = requests.delete(url)

    if response.status_code != 200:
        return False

    return deleted

def delete_artefact(artefact_id):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/artefact/{artefact_id}"
        response = requests.delete(url)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response

def post_onboarding(onboarding_data):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/application/onboarding/"
        headers = {"Content-Type": "application/json; accept=application/json"}
        response = requests.post(url, headers=headers, json=onboarding_data)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response


def update_onboarding(app_id, onboarding_data):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/application/onboarding/{app_id}"
        headers = {"Content-Type": "application/json; accept=application/json"}
        response = requests.patch(url, headers=headers, json=onboarding_data)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response


def delete_onboarding(app_id):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/application/onboarding/{app_id}"
        response = requests.delete(url)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response


def get_onboarding(app_id):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/application/onboarding/{app_id}"
        response = requests.get(url)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response


def post_app_command(app_command_data):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/app/"
        headers = {"Content-Type": "application/json; accept=application/json"}
        response = requests.post(url, headers=headers, json=app_command_data)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response


def get_app_by_zone(zone_id):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/apps/{zone_id}"
        response = requests.get(url)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response


def get_app_by_zone_ap_name(zone_id, app_name):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/app/{zone_id}/{app_name}"
        response = requests.get(url)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response


def delete_app(app_name):
    response = None

    try:
        url = f"http://{HOST}:{PORT}/app/{app_name}"
        response = requests.delete(url)
    except Exception as error:
        definition = f"Unexpected Error: {error}"
        dict = {"message": definition}
        content = json.dumps(dict)
        status_code = 500
        response = create_manual_response(content, status_code)

    return response

def create_manual_response(content, status_code):
    # Create a new Response object
    response = Response()

    # Set the status code
    response.status_code = status_code

    # Set the content (must be bytes)
    response._content = content.encode('utf-8')

    # Optionally, set other attributes like headers
    response.headers['Content-Type'] = 'application/json'

    return response
+243 −0
Original line number Diff line number Diff line
# -------------------------------------------------------------------------- #
# Copyright 2025-present, Federation Manager, by Software Networks, i2CAT    #
#                                                                            #
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
# not use this file except in compliance with the License. You may obtain    #
# a copy of the License at                                                   #
#                                                                            #
# http://www.apache.org/licenses/LICENSE-2.0                                 #
#                                                                            #
# Unless required by applicable law or agreed to in writing, software        #
# distributed under the License is distributed on an "AS IS" BASIS,          #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
# See the License for the specific language governing permissions and        #
# limitations under the License.                                             #
# -------------------------------------------------------------------------- #
from configparser import ConfigParser
from sunrise6g_opensdk.common.sdk import Sdk as sdkclient
import logging

logger = logging.getLogger(__name__)

CONFIG = ConfigParser()
CONFIG.read("conf/config.cfg")
HOST = CONFIG.get("edge_cloud_platform", "host")
PORT = int(CONFIG.get("edge_cloud_platform", "port"))
CLIENT_NAME = CONFIG.get("edge_cloud_platform", "client_name")
FLAVOUR_ID = CONFIG.get("edge_cloud_platform", "flavour_id")


class EdgeCloudClient:
    """
    EdgeCloud Client using sunrise6g_opensdk with _gsma functions for FM operations
    """

    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(EdgeCloudClient, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self, host=HOST, port=PORT, client_name=CLIENT_NAME):
        if self._initialized:
            return

        self.host = host
        self.port = port
        self.base_url = f"http://{host}:{port}"
        self.client_name = client_name

        client_specs = {
            "edgecloud": {
                "client_name": client_name,
                "base_url": self.base_url,
                "flavour_id": FLAVOUR_ID
            }
        }

        try:
            clients = sdkclient.create_adapters_from(client_specs)
            self.edgecloud_client = clients.get("edgecloud")
            self._initialized = True
            logger.info(f"EdgeCloud client initialized with base_url: {self.base_url}")
        except Exception as e:
            logger.error(f"Failed to initialize EdgeCloud client: {e}")
            raise

    # Zone Management Functions
    def get_list_zones(self):  # --> OK
        """Get list of all available zones using GSMA-compliant API"""
        try:
            response = self.edgecloud_client.get_edge_cloud_zones_list_gsma()
            return response.json()
        except Exception as e:
            logger.error(f"Error getting zones: {e}")
            raise

    def get_zones(self):  # --> OK
        """Get detailed info of all available zones using GSMA-compliant API"""
        try:
            response = self.edgecloud_client.get_edge_cloud_zones_gsma()
            return response.json()
        except Exception as e:
            logger.error(f"Error getting zones: {e}")
            raise

    def get_zone_by_zone_id(self, zone_id):  # --> OK
        """Get zone details by zone ID using GSMA-compliant API"""
        try:
            response = self.edgecloud_client.get_edge_cloud_zone_details_gsma(zone_id)
            return response.json()
        except Exception as e:
            logger.error(f"Error getting zone {zone_id}: {e}")
            raise

    # Artefact Management Functions
    def onboarding_artefact_gsma(self, artefact_data):  # --> NOK
        """Upload artefact using GSMA-compliant API"""
        try:
            return self.edgecloud_client.create_artefact_gsma(artefact_data)
        except Exception as e:
            logger.error(f"Error uploading artefact: {e}")
            raise

    def delete_artefact_gsma(self, artefact_id):  # --> OK
        """Delete artefact using GSMA-compliant API"""
        try:
            return self.edgecloud_client.delete_artefact_gsma(artefact_id)
        except Exception as e:
            logger.error(f"Error deleting artefact {artefact_id}: {e}")
            raise

    # Application Onboarding Functions
    def get_onboarding(self, app_id):  # --> OK
        """Get application onboarding details using GSMA-compliant API"""
        try:
            return self.edgecloud_client.get_onboarded_app_gsma(app_id)
        except Exception as e:
            logger.error(f"Error getting onboarding for app {app_id}: {e}")
            raise

    def post_onboarding(self, onboarding_data):  # --> OK
        """Create application onboarding using GSMA-compliant API"""
        try:
            return self.edgecloud_client.onboard_app_gsma(onboarding_data)
        except Exception as e:
            logger.error(f"Error creating onboarding: {e}")
            raise

    def update_onboarding(self, app_id, onboarding_data):  # --> NOK
        """Update application onboarding using GSMA-compliant API"""
        try:
            return self.edgecloud_client.patch_onboarded_app_gsma(app_id, onboarding_data)
        except Exception as e:
            logger.error(f"Error updating onboarding for app {app_id}: {e}")
            raise

    def delete_onboarding(self, app_id):  # --> OK
        """Delete application onboarding using GSMA-compliant API"""
        try:
            return self.edgecloud_client.delete_onboarded_app_gsma(app_id)
        except Exception as e:
            logger.error(f"Error deleting onboarding for app {app_id}: {e}")
            raise

    # Application Deployment Functions
    def post_app_command(self, deployment_data):  # --> OK
        """Deploy application using GSMA-compliant API"""
        try:
            return self.edgecloud_client.deploy_app_gsma(deployment_data)
        except Exception as e:
            logger.error(f"Error deploying app: {e}")
            raise

    def get_app_by_zone_app_instance_id(self, app_id, app_instance_id, zone_id):
        """Get application instance details using GSMA-compliant API"""
        try:
            return self.edgecloud_client.get_deployed_app_gsma(app_id, app_instance_id, zone_id)
        except Exception as e:
            logger.error(f"Error getting app instance {app_instance_id} in zone {zone_id}: {e}")
            raise

    def delete_app(self, app_id, app_instance_id, zone_id):  # --> OK
        """Delete application instance using GSMA-compliant API"""
        try:
            return self.edgecloud_client.undeploy_app_gsma(app_id, app_instance_id, zone_id)
        except Exception as e:
            logger.error(f"Error deleting app instance {app_instance_id}: {e}")
            raise


# Global client instance
_client_instance = None


def get_client():
    """Get the singleton EdgeCloud client instance"""
    global _client_instance
    if _client_instance is None:
        _client_instance = EdgeCloudClient()
    return _client_instance


# Convenience functions for direct usage
def get_list_zones():
    """Get all available zones"""
    return get_client().get_list_zones()


def get_zones():
    """Get all available zones"""
    return get_client().get_zones()


def get_zone_by_zone_id(zone_id):
    """Get zone details by zone ID"""
    return get_client().get_zone_by_zone_id(zone_id)


def onboarding_artefact(artefact_data):
    """Upload artefact"""
    return get_client().onboarding_artefact_gsma(artefact_data)


def delete_artefact(artefact_id):
    """Delete artefact"""
    return get_client().delete_artefact_gsma(artefact_id)


def get_onboarding(app_id):
    """Get application onboarding details"""
    return get_client().get_onboarding(app_id)


def post_onboarding(onboarding_data):
    """Create application onboarding"""
    return get_client().post_onboarding(onboarding_data)


def update_onboarding(app_id, onboarding_data):
    """Update application onboarding"""
    return get_client().update_onboarding(app_id, onboarding_data)


def delete_onboarding(app_id):
    """Delete application onboarding"""
    return get_client().delete_onboarding(app_id)


def post_app_command(deployment_data):
    """Deploy application"""
    return get_client().post_app_command(deployment_data)


def get_app_by_zone_app_instance_id(app_id, app_instance_id, zone_id):
    """Get application instance details"""
    return get_client().get_app_by_zone_app_instance_id(app_id, app_instance_id, zone_id)


def delete_app(app_id, app_instance_id, zone_id):
    """Delete application instance"""
    return get_client().delete_app(app_id, app_instance_id, zone_id)
+6 −4
Original line number Diff line number Diff line
@@ -18,10 +18,6 @@ protocol = http
host = mongodb
port = 27017

[i2edge]
host = 192.168.123.237
port = 30769

[op_data]
partnerOPFederationId = i2cat
partnerOPCountryCode = ES
@@ -37,3 +33,9 @@ lcmServiceEndPoint_port = 8989
lcmServiceEndPoint_fqdn =
lcmServiceEndPoint_ipv4Addresses = 127.0.0.1
lcmServiceEndPoint_ipv6Addresses =

[edge_cloud_platform]
host = 192.168.123.237
port = 30769
client_name = i2edge
flavour_id = 67f3a0b0e3184a85952e174d