Commit 3d5413fb authored by Laskaratos Dimitris's avatar Laskaratos Dimitris
Browse files

Minor fixes in K8s adapter

parent 68595234
Loading
Loading
Loading
Loading
+31 −31
Original line number Diff line number Diff line
@@ -15,15 +15,15 @@ from sunrise6g_opensdk.edgecloud.adapters.i2edge.client import (
from sunrise6g_opensdk.edgecloud.adapters.kubernetes.client import (
    EdgeApplicationManager as kubernetesClient,
)
from sunrise6g_opensdk.network.adapters.oai.client import (
    NetworkManager as OaiCoreClient,
)
from sunrise6g_opensdk.network.adapters.open5gcore.client import (
    NetworkManager as Open5GCoreClient,
)
from sunrise6g_opensdk.network.adapters.open5gs.client import (
    NetworkManager as Open5GSClient,
)
# from sunrise6g_opensdk.network.adapters.oai.client import (
#     NetworkManager as OaiCoreClient,
# )
# from sunrise6g_opensdk.network.adapters.open5gcore.client import (
#     NetworkManager as Open5GCoreClient,
# )
# from sunrise6g_opensdk.network.adapters.open5gs.client import (
#     NetworkManager as Open5GSClient,
# )


def _edgecloud_adapters_factory(client_name: str, base_url: str, **kwargs):
@@ -44,28 +44,28 @@ def _edgecloud_adapters_factory(client_name: str, base_url: str, **kwargs):
        )


def _network_adapters_factory(client_name: str, base_url: str, **kwargs):
    if "scs_as_id" not in kwargs:
        raise ValueError("Missing required 'scs_as_id' for network adapters.")
    scs_as_id = kwargs.pop("scs_as_id")
# def _network_adapters_factory(client_name: str, base_url: str, **kwargs):
#     if "scs_as_id" not in kwargs:
#         raise ValueError("Missing required 'scs_as_id' for network adapters.")
#     scs_as_id = kwargs.pop("scs_as_id")

    network_factory = {
        "open5gs": lambda url, scs_id, **kw: Open5GSClient(
            base_url=url, scs_as_id=scs_id, **kw
        ),
        "oai": lambda url, scs_id, **kw: OaiCoreClient(
            base_url=url, scs_as_id=scs_id, **kw
        ),
        "open5gcore": lambda url, scs_id, **kw: Open5GCoreClient(
            base_url=url, scs_as_id=scs_id, **kw
        ),
    }
    try:
        return network_factory[client_name](base_url, scs_as_id, **kwargs)
    except KeyError:
        raise ValueError(
            f"Invalid network client '{client_name}'. Available: {list(network_factory)}"
        )
#     network_factory = {
#         "open5gs": lambda url, scs_id, **kw: Open5GSClient(
#             base_url=url, scs_as_id=scs_id, **kw
#         ),
#         "oai": lambda url, scs_id, **kw: OaiCoreClient(
#             base_url=url, scs_as_id=scs_id, **kw
#         ),
#         "open5gcore": lambda url, scs_id, **kw: Open5GCoreClient(
#             base_url=url, scs_as_id=scs_id, **kw
#         ),
#     }
#     try:
#         return network_factory[client_name](base_url, scs_as_id, **kwargs)
#     except KeyError:
#         raise ValueError(
#             f"Invalid network client '{client_name}'. Available: {list(network_factory)}"
#         )


# def _oran_adapters_factory(client_name: str, base_url: str):
@@ -75,7 +75,7 @@ def _network_adapters_factory(client_name: str, base_url: str, **kwargs):
class AdaptersFactory:
    _domain_factories = {
        "edgecloud": _edgecloud_adapters_factory,
        "network": _network_adapters_factory,
        # "network": _network_adapters_factory,
        # "oran": _oran_adapters_factory,
    }

+6 −3
Original line number Diff line number Diff line
@@ -36,12 +36,14 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        kubernetes_port = kwargs.get("KUBERNETES_MASTER_PORT")
        storage_uri = kwargs.get("EMP_STORAGE_URI")
        username = kwargs.get("KUBERNETES_USERNAME")
        namespace = kwargs.get('K8S_NAMESPACE')
        if base_url is not None and base_url != "":
            self.k8s_connector = KubernetesConnector(
                ip=self.kubernetes_host,
                port=kubernetes_port,
                token=kubernetes_token,
                username=username,
                namespace=namespace
            )
        if storage_uri is not None:
            self.connector_db = ConnectorDB(storage_uri)
@@ -126,6 +128,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
                connector_db=self.connector_db,
                kubernetes_connector=self.k8s_connector,
            )
           
        if type(result) is V1Deployment:
            response = {}
            response["name"] = body.get("name")
@@ -146,7 +149,7 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        app_instance_id: Optional[str] = None,
        region: Optional[str] = None,
    ) -> List[Dict]:
        logging.info("Retreiving all deployed apps in the edge cloud platform")
        logging.info("Retrieving all deployed apps in the edge cloud platform")
        deployments = self.k8s_connector.get_deployed_service_functions(
            self.connector_db
        )
@@ -154,9 +157,9 @@ class EdgeApplicationManager(EdgeCloudManagementInterface):
        for deployment in deployments:
            item = {}
            item["name"] = deployment.get("service_function_catalogue_name")
            item["appId"] = deployment.get("id")
            item["appId"] = deployment.get("appId")
            item["appProvider"] = deployment.get("appProvider")
            item["appInstanceId"] = deployment.get("uid")
            item["appInstanceId"] = deployment.get("appInstanceId")
            item["status"] = deployment.get("status")
            interfaces = []
            for port in deployment.get("ports"):
+1 −1
Original line number Diff line number Diff line
@@ -220,7 +220,7 @@ def deploy_service_function(
            "Node is selected by the K8s scheduler"
        )
    if type(response) is V1Deployment:
        deployed_service_function_db["uid"] = response.metadata.uid
        deployed_service_function_db["_id"] = response.metadata.uid
        connector_db.insert_document_deployed_service_function(
            document=deployed_service_function_db
        )
+1 −1
Original line number Diff line number Diff line
@@ -72,7 +72,7 @@ class ConnectorDB:
        # raise Exception("Already Registered: PaaS name", document["paas_name"])
        try:
            insert_doc = {}
            insert_doc["_id"] = document["id"]
            insert_doc["_id"] = document["_id"]
            insert_doc["name"] = document["service_function_name"]
            # insert_doc["type"] = document["paas_type"]
            insert_doc["location"] = document["location"]
+31 −83
Original line number Diff line number Diff line
@@ -15,10 +15,11 @@ configuration = client.Configuration()


class KubernetesConnector:
    def __init__(self, ip, port, token, username):
    def __init__(self, ip, port, token, username, namespace):
        master_node_ip = ip
        master_node_port = port
        username = username
        self.namespace = 'default' if namespace is None else namespace
        self.token_k8s = token
        if port is None:
            self.host = master_node_ip
@@ -227,16 +228,16 @@ class KubernetesConnector:

    def delete_service_function(self, connector_db: ConnectorDB, service_function_name):
        self.api_instance_appsv1.delete_namespaced_deployment(
            name=service_function_name, namespace="sunrise6g"
            name=service_function_name, namespace=self.namespace
        )

        self.v1.delete_namespaced_service(
            name=service_function_name, namespace="sunrise6g"
            name=service_function_name, namespace=self.namespace
        )

        hpa_list = (
            self.api_instance_v1autoscale.list_namespaced_horizontal_pod_autoscaler(
                "sunrise6g"
                self.namespace
            )
        )

@@ -245,18 +246,18 @@ class KubernetesConnector:
        for hpa in hpa_list.items:
            if hpa.metadata.name == service_function_name:
                self.api_instance_v1autoscale.delete_namespaced_horizontal_pod_autoscaler(
                    name=service_function_name, namespace="sunrise6g"
                    name=service_function_name, namespace=self.namespace
                )
                break
        # deletevolume
        volume_list = self.v1.list_namespaced_persistent_volume_claim("sunrise6g")
        volume_list = self.v1.list_namespaced_persistent_volume_claim(self.namespace)
        for volume in volume_list.items:
            name_v = service_function_name + str("-")
            if name_v in volume.metadata.name:
                self.v1.delete_persistent_volume(name=volume.spec.volume_name)

                self.v1.delete_namespaced_persistent_volume_claim(
                    name=volume.metadata.name, namespace="sunrise6g"
                    name=volume.metadata.name, namespace=self.namespace
                )

        doc = {}
@@ -292,7 +293,7 @@ class KubernetesConnector:
                    try:
                        url = (
                            self.host
                            + "/api/v1/namespaces/sunrise6g/persistentvolumeclaims"
                            + "/api/v1/namespaces/"+self.namespace+"/persistentvolumeclaims"
                        )
                        body_volume = self.create_pvc_dict(
                            descriptor_service_function["name"], volume
@@ -304,7 +305,7 @@ class KubernetesConnector:
                    except requests.exceptions.HTTPError as e:
                        # logging.error(traceback.format_exc())
                        return (
                            "Exception when calling CoreV1Api->/api/v1/namespaces/sunrise6g/persistentvolumeclaims: %s\n"
                            "Exception when calling CoreV1Api->/api/v1/namespaces/"+self.namespace+"/persistentvolumeclaims: %s\n"
                            % e
                        )

@@ -315,16 +316,16 @@ class KubernetesConnector:
        try:
            api_response_deployment = (
                self.api_instance_appsv1.create_namespaced_deployment(
                    "sunrise6g", body_deployment
                    self.namespace, body_deployment
                )
            )
            # api_response_service = api_instance_apiregv1.create_api_service(body_service)
            self.v1.create_namespaced_service("sunrise6g", body_service)
            self.v1.create_namespaced_service(self.namespace, body_service)
            if "autoscaling_policies" in descriptor_service_function:
                # V1 AUTOSCALER
                body_hpa = self.create_hpa(descriptor_service_function)
                self.api_instance_v1autoscale.create_namespaced_horizontal_pod_autoscaler(
                    "sunrise6g", body_hpa
                    self.namespace, body_hpa
                )
                # V2beta1 AUTOSCALER
                # body_hpa = create_hpa(descriptor_paas)
@@ -341,7 +342,7 @@ class KubernetesConnector:

    def create_deployment(self, descriptor_service_function):
        metadata = client.V1ObjectMeta(name=descriptor_service_function["name"])
        dict_label = {"sunrise6g": descriptor_service_function["name"]}
        dict_label = {self.namespace: descriptor_service_function["name"]}
        selector = client.V1LabelSelector(match_labels=dict_label)
        metadata_spec = client.V1ObjectMeta(labels=dict_label)

@@ -507,7 +508,7 @@ class KubernetesConnector:

    def create_service(self, descriptor_service_function):
        dict_label = {}
        dict_label["sunrise6g"] = descriptor_service_function["name"]
        dict_label[self.namespace] = descriptor_service_function["name"]
        metadata = client.V1ObjectMeta(
            name=descriptor_service_function["name"], labels=dict_label
        )
@@ -558,10 +559,10 @@ class KubernetesConnector:

        return body

    def create_pvc(name, volumes):
    def create_pvc(self, name, volumes):
        dict_label = {}
        name_vol = name + str("-") + volumes["name"]
        dict_label["sunrise6g"] = name_vol
        dict_label[self.namespace] = name_vol
        # metadata = client.V1ObjectMeta(name=name_vol)
        metadata = client.V1ObjectMeta(name=name_vol, labels=dict_label)
        # api_version = ("v1",)
@@ -578,7 +579,7 @@ class KubernetesConnector:

        return body

    def create_pvc_dict(
    def create_pvc_dict(self,
        name, volumes, storage_class="microk8s-hostpath", volume_name=None
    ):
        name_vol = name + str("-") + volumes["name"]
@@ -590,7 +591,7 @@ class KubernetesConnector:
        body = {
            "api_version": "v1",
            "kind": "PersistentVolumeClaim",
            "metadata": {"labels": {"sunrise6g": name_vol}, "name": name_vol},
            "metadata": {"labels": {self.namespace: name_vol}, "name": name_vol},
            "spec": {
                "accessModes": ["ReadWriteOnce"],
                "resources": {"requests": {"storage": volumes["storage"]}},
@@ -603,7 +604,7 @@ class KubernetesConnector:

        return body

    def create_pv_dict(name, volumes, storage_class, node=None):
    def create_pv_dict(self, name, volumes, storage_class, node=None):
        name_vol = name + "-" + volumes["name"]

        body = {
@@ -612,7 +613,7 @@ class KubernetesConnector:
            "metadata": {
                "name": name_vol,
                "labels": {
                    "sunrise6g": name_vol,
                    self.namespace: name_vol,
                },
            },
            "spec": {
@@ -670,7 +671,7 @@ class KubernetesConnector:
                                desc_paas["autoscaling_policies"] = policies
                                body_hpa = self.create_hpa(desc_paas)
                                self.api_instance_v1autoscale.patch_namespaced_horizontal_pod_autoscaler(
                                    namespace="sunrise6g",
                                    namespace=self.namespace,
                                    name=desc_paas["name"],
                                    body=body_hpa,
                                )
@@ -711,67 +712,12 @@ class KubernetesConnector:
            spec=spec,
        )

        # V2BETA1 K8S API IMPLEMENTATION!!!!

        # dict_label = {}
        # dict_label['name'] = descriptor_paas["name"]
        # metadata = client.V1ObjectMeta(name=descriptor_paas["name"], labels=dict_label)
        #
        # #  spec
        #
        # scale_target = client.V2beta1CrossVersionObjectReference(api_version="extensions/v1beta1", kind="Deployment",
        #                                                     name=descriptor_paas["name"])
        #
        # metrics=[]
        #
        # for metric in descriptor_paas["autoscaling_policies"]:

        #     resource_=client.V2beta1ResourceMetricSource(name=metric["metric"],target_average_utilization=int(metric["util_percent"]))
        #     metric_=client.V2beta1MetricSpec(type="Resource", resource=resource_)
        #     metrics.append(metric_)
        #
        #
        # spec = client.V2beta1HorizontalPodAutoscalerSpec(max_replicas=descriptor_paas["count-max"],
        #                                             min_replicas=descriptor_paas["count-min"],
        #                                             metrics=metrics,
        #                                             scale_target_ref=scale_target)
        # body = client.V2beta1HorizontalPodAutoscaler(api_version="autoscaling/v2beta1", kind="HorizontalPodAutoscaler",
        #                                         metadata=metadata, spec=spec)

        # V2BETA2 K8S API IMPLEMENTATION!!!!

        # dict_label = {}
        # dict_label['name'] = descriptor_paas["name"]
        # metadata = client.V1ObjectMeta(name=descriptor_paas["name"], labels=dict_label)
        #
        # #  spec
        #
        # scale_target = client.V2beta2CrossVersionObjectReference(api_version="apps/v1", kind="Deployment",
        #                                                          name=descriptor_paas["name"])
        #
        # metrics = []
        #
        # for metric in descriptor_paas["autoscaling_policies"]:
        #
        #     target=client.V2beta2MetricTarget(average_utilization=int(metric["util_percent"]),type="Utilization")
        #     resource_ = client.V2beta2ResourceMetricSource(name=metric["metric"],
        #                                                   target=target)
        #     metric_ = client.V2beta2MetricSpec(type="Resource", resource=resource_)
        #     metrics.append(metric_)
        #
        # spec = client.V2beta2HorizontalPodAutoscalerSpec(max_replicas=descriptor_paas["count-max"],
        #                                                  min_replicas=descriptor_paas["count-min"],
        #                                                  metrics=metrics,
        #                                                  scale_target_ref=scale_target)
        # body = client.V2beta2HorizontalPodAutoscaler(api_version="autoscaling/v2beta2", kind="HorizontalPodAutoscaler",
        #                                              metadata=metadata, spec=spec)

        return body

    def get_deployed_dataspace_connector(self, instance_name):
        api_response = self.api_instance_appsv1.list_namespaced_deployment("sunrise6g")
        api_response = self.api_instance_appsv1.list_namespaced_deployment(self.namespace)

        api_response_service = self.v1.list_namespaced_service("sunrise6g")
        api_response_service = self.v1.list_namespaced_service(self.namespace)
        app_ = {}
        for app in api_response.items:
            metadata = app.metadata
@@ -826,9 +772,9 @@ class KubernetesConnector:

    def get_deployed_service_functions(self, connector_db: ConnectorDB):
        self.get_deployed_hpas(connector_db)
        api_response = self.api_instance_appsv1.list_namespaced_deployment("sunrise6g")
        api_response_service = self.v1.list_namespaced_service("sunrise6g")
        api_response_pvc = self.v1.list_namespaced_persistent_volume_claim("sunrise6g")
        api_response = self.api_instance_appsv1.list_namespaced_deployment(self.namespace)
        api_response_service = self.v1.list_namespaced_service(self.namespace)
        api_response_pvc = self.v1.list_namespaced_persistent_volume_claim(self.namespace)

        apps_col = connector_db.get_documents_from_collection(
            collection_input="service_functions"
@@ -871,6 +817,8 @@ class KubernetesConnector:
        for app_col in apps_col:
            if actual_name == app_col["name"]:
                app_["service_function_catalogue_name"] = app_col["name"]
                app_['appId'] = app_col['_id']
                app_['appProvider'] = app_col.get('appProvider')
                break

        # find volumes!
@@ -945,7 +893,7 @@ class KubernetesConnector:
        # APPV1 Implementation!
        api_response = (
            self.api_instance_v1autoscale.list_namespaced_horizontal_pod_autoscaler(
                "sunrise6g"
                self.namespace
            )
        )

@@ -1010,7 +958,7 @@ class KubernetesConnector:

    def is_job_completed(self, job_name):
        job = self.api_instance_batchv1.read_namespaced_job(
            name=job_name, namespace="sunrise6g"
            name=job_name, namespace=self.namespace
        )
        if job.status.succeeded is not None and job.status.succeeded > 0:
            return True
Loading