Commit 04677a22 authored by George Papathanail's avatar George Papathanail
Browse files

Merge branch 'fix/zone-subscription' into 'main'

Add zone subscription endpoints and some extra fixes

See merge request !15
parents a0d5c0b0 a4dbc0eb
Loading
Loading
Loading
Loading
Loading
+10 −9
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ class Configuration(BaseSettings):
    FEDERATION_MANAGER_HOST=os.getenv("FEDERATION_MANAGER_HOST")
    TOKEN_ENDPOINT = os.getenv('TOKEN_ENDPOINT')
    PARTNER_API_ROOT = os.getenv('PARTNER_API_ROOT')
    AVAIL_ZONE_NOTIF_LINK = os.getenv('AVAIL_ZONE_NOTIF_LINK')


config = Configuration()
+271 −35
Original line number Diff line number Diff line
@@ -4,7 +4,8 @@ from edge_cloud_management_api.managers.log_manager import logger
from edge_cloud_management_api.services.edge_cloud_services import PiEdgeAPIClientFactory
from edge_cloud_management_api.services.federation_services import FederationManagerClientFactory
from edge_cloud_management_api.services.storage_service import get_zone
from edge_cloud_management_api.services.storage_service import get_fed
from edge_cloud_management_api.services.storage_service import insert_zones
from edge_cloud_management_api.services.storage_service import get_fed, get_all_feds
import json
import re
import uuid
@@ -150,6 +151,61 @@ def delete_app(appId, x_correlator=None):
        pi_edge_factory = PiEdgeAPIClientFactory()
        api_client = pi_edge_factory.create_pi_edge_api_client()
        response = api_client.delete_app(appId=appId)
        if isinstance(response, dict) and int(response.get("status_code", 500)) >= 400:
            logger.info("SRM app delete failed, attempting federation cleanup")
        else:
            return response

        feds = get_all_feds()
        if feds:
            app_provider_id = None
            app_response = api_client.get_app(appId)
            if isinstance(app_response, dict):
                app_provider_id = app_response.get("appProvider") or app_response.get("appProviderId")
                manifest = app_response.get("appManifest")
                if isinstance(manifest, dict) and not app_provider_id:
                    app_provider_id = manifest.get("appProvider") or manifest.get("appProviderId")

            for fed in feds:
                fed_token = fed.get("token")
                federation_context_id = fed.get("_id")
                if not federation_context_id or not fed_token:
                    continue
                if app_provider_id:
                    fed_instances, fed_code = federation_client.get_all_app_instances(
                        federation_context_id=federation_context_id,
                        app_id=appId,
                        app_provider_id=app_provider_id,
                        token=fed_token,
                    )
                    if fed_code == 200 and isinstance(fed_instances, list):
                        for zone_info in fed_instances:
                            if not isinstance(zone_info, dict):
                                continue
                            zone_id = zone_info.get("zoneId")
                            instances_list = zone_info.get("appInstanceInfo", [])
                            if not zone_id or not isinstance(instances_list, list):
                                continue
                            for instance in instances_list:
                                if not isinstance(instance, dict):
                                    continue
                                instance_id = instance.get("appInstIdentifier")
                                if not instance_id:
                                    continue
                                federation_client.remove_app_instance(
                                    federation_context_id=federation_context_id,
                                    app_id=appId,
                                    app_instance_id=instance_id,
                                    zone_id=zone_id,
                                    token=fed_token,
                                )

                remove_response, remove_status = federation_client.delete_onboarded_app(
                    federation_context_id, appId, fed_token
                )
                if remove_status in (200, 202, 204):
                    return jsonify(remove_response), remove_status

        return response

    except NotFound404Exception:
@@ -186,12 +242,37 @@ def create_app_instance():
        first_zone = app_zones[0] if isinstance(app_zones, list) and app_zones else {}
        if not isinstance(first_zone, dict):
            first_zone = {}
        edge_cloud_zone_id = (
            first_zone
            .get("EdgeCloudZone", {})
            .get("edgeCloudZoneId")
        zone_payload = (
            first_zone.get("EdgeCloudZone", {})
            if isinstance(first_zone.get("EdgeCloudZone", {}), dict)
            else first_zone
        )
        zone = get_zone(edge_cloud_zone_id)
        edge_cloud_zone_id = None
        if isinstance(zone_payload, dict):
            edge_cloud_zone_id = zone_payload.get("edgeCloudZoneId") or zone_payload.get("zoneId")

        zone = get_zone(edge_cloud_zone_id) if edge_cloud_zone_id else None
        if not zone and edge_cloud_zone_id:
            try:
                zones = pi_edge_client.edge_cloud_zones()
                if isinstance(zones, list):
                    for z in zones:
                        if isinstance(z, dict) and z.get("edgeCloudZoneId") == edge_cloud_zone_id:
                            z["_id"] = edge_cloud_zone_id
                            z["isLocal"] = "true"
                            insert_zones([z])
                            zone = z
                            break
            except Exception as exc:
                logger.info(f"Failed to refresh zones from SRM: {exc}")

        if not zone and edge_cloud_zone_id and isinstance(zone_payload, dict):
            zone_payload = dict(zone_payload)
            zone_payload["_id"] = edge_cloud_zone_id
            zone_payload.setdefault("isLocal", "true")
            insert_zones([zone_payload])
            zone = zone_payload

        if not zone:
            return jsonify({
                "error": "Edge Cloud Zone not found",
@@ -795,12 +876,84 @@ def get_app_instance(app_id=None, x_correlator=None, app_instance_id=None, regio
    Supports filtering by app_id, app_instance_id, and region.
    """
    try:
        instances = None
        instances = []
        pi_edge_client_factory = PiEdgeAPIClientFactory()
        pi_edge_client = pi_edge_client_factory.create_pi_edge_api_client()

        if app_id is None and app_instance_id is None:
            instances = pi_edge_client.get_app_instances()
        if app_instance_id is None:
            local_instances = pi_edge_client.get_app_instances()
            if isinstance(local_instances, list):
                instances.extend(local_instances)

            def resolve_app_provider(app_id_value, app_payload=None):
                if isinstance(app_payload, dict):
                    provider = app_payload.get("appProvider") or app_payload.get("appProviderId")
                    if provider:
                        return provider
                    manifest = app_payload.get("appManifest")
                    if isinstance(manifest, dict):
                        provider = manifest.get("appProvider") or manifest.get("appProviderId")
                        if provider:
                            return provider
                app_response = pi_edge_client.get_app(app_id_value)
                if isinstance(app_response, dict):
                    manifest = app_response.get("appManifest")
                    if isinstance(manifest, dict):
                        provider = manifest.get("appProvider") or manifest.get("appProviderId")
                        if provider:
                            return provider
                    provider = app_response.get("appProvider") or app_response.get("appProviderId")
                    if provider:
                        return provider
                return None

            feds = get_all_feds()
            if app_id:
                app_provider_id = resolve_app_provider(app_id)
                if app_provider_id:
                    for fed in feds:
                        fed_token = fed.get("token")
                        federation_context_id = fed.get("_id")
                        if not federation_context_id or not fed_token:
                            continue
                        fed_instances, fed_code = federation_client.get_all_app_instances(
                            federation_context_id=federation_context_id,
                            app_id=app_id,
                            app_provider_id=app_provider_id,
                            token=fed_token,
                        )
                        if fed_code == 200 and isinstance(fed_instances, list):
                            instances.extend(fed_instances)
                else:
                    logger.info("Skipping federated lookup; no appProviderId for appId=%s", app_id)
            else:
                apps = pi_edge_client.get_service_functions_catalogue()
                if isinstance(apps, list):
                    app_provider_map = {}
                    for app in apps:
                        if not isinstance(app, dict):
                            continue
                        app_id_value = app.get("appId") or app.get("id")
                        if not app_id_value:
                            continue
                        provider = resolve_app_provider(app_id_value, app_payload=app)
                        if provider:
                            app_provider_map[app_id_value] = provider

                    for app_id_value, app_provider_id in app_provider_map.items():
                        for fed in feds:
                            fed_token = fed.get("token")
                            federation_context_id = fed.get("_id")
                            if not federation_context_id or not fed_token:
                                continue
                            fed_instances, fed_code = federation_client.get_all_app_instances(
                                federation_context_id=federation_context_id,
                                app_id=app_id_value,
                                app_provider_id=app_provider_id,
                                token=fed_token,
                            )
                            if fed_code == 200 and isinstance(fed_instances, list):
                                instances.extend(fed_instances)

        if not instances:
            return jsonify({
@@ -831,14 +984,97 @@ def delete_app_instance(appInstanceId: str, x_correlator=None):
        pi_edge_client_factory = PiEdgeAPIClientFactory()
        pi_edge_client = pi_edge_client_factory.create_pi_edge_api_client()
        response = pi_edge_client.delete_app_instance(appInstanceId)
        if isinstance(response, dict):
        if isinstance(response, dict) and response.get("status_code") != 404:
            status_code = response.get("status_code", 500)
            return jsonify(response), status_code
        if not isinstance(response, dict) and response.status_code != 404:
            return jsonify({
                "result": response.text,
                "status": response.status_code
            }), response.status_code

        if not isinstance(response, dict):
            status_code = response.status_code
        else:
            status_code = response.get("status_code", 404)

        def resolve_app_provider(app_id_value, app_payload=None):
            if isinstance(app_payload, dict):
                provider = app_payload.get("appProvider") or app_payload.get("appProviderId")
                if provider:
                    return provider
                manifest = app_payload.get("appManifest")
                if isinstance(manifest, dict):
                    provider = manifest.get("appProvider") or manifest.get("appProviderId")
                    if provider:
                        return provider
            app_response = pi_edge_client.get_app(app_id_value)
            if isinstance(app_response, dict):
                manifest = app_response.get("appManifest")
                if isinstance(manifest, dict):
                    provider = manifest.get("appProvider") or manifest.get("appProviderId")
                    if provider:
                        return provider
                provider = app_response.get("appProvider") or app_response.get("appProviderId")
                if provider:
                    return provider
            return None

        apps = pi_edge_client.get_service_functions_catalogue()
        app_provider_map = {}
        if isinstance(apps, list):
            for app in apps:
                if not isinstance(app, dict):
                    continue
                app_id_value = app.get("appId") or app.get("id")
                if not app_id_value:
                    continue
                provider = resolve_app_provider(app_id_value, app_payload=app)
                if provider:
                    app_provider_map[app_id_value] = provider

        feds = get_all_feds()
        for fed in feds:
            fed_token = fed.get("token")
            federation_context_id = fed.get("_id")
            if not federation_context_id or not fed_token:
                continue
            for app_id_value, app_provider_id in app_provider_map.items():
                fed_instances, fed_code = federation_client.get_all_app_instances(
                    federation_context_id=federation_context_id,
                    app_id=app_id_value,
                    app_provider_id=app_provider_id,
                    token=fed_token,
                )
                if fed_code != 200 or not isinstance(fed_instances, list):
                    continue
                for zone_info in fed_instances:
                    if not isinstance(zone_info, dict):
                        continue
                    zone_id = zone_info.get("zoneId")
                    instances_list = zone_info.get("appInstanceInfo", [])
                    if not zone_id or not isinstance(instances_list, list):
                        continue
                    for instance in instances_list:
                        if not isinstance(instance, dict):
                            continue
                        instance_id = instance.get("appInstIdentifier") or instance.get("appInstanceId")
                        if instance_id != appInstanceId:
                            continue
                        remove_response, remove_status = federation_client.remove_app_instance(
                            federation_context_id=federation_context_id,
                            app_id=app_id_value,
                            app_instance_id=appInstanceId,
                            zone_id=zone_id,
                            token=fed_token,
                        )
                        return jsonify(remove_response), remove_status

        return jsonify({
            "error": response.get("error") if isinstance(response, dict) else response.text,
            "status_code": status_code
        }), status_code

    except Exception as e:
        return (
            jsonify({
+26 −5
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ from typing import List
from edge_cloud_management_api.configs.env_config import config
from edge_cloud_management_api.managers.log_manager import logger
from edge_cloud_management_api.services.edge_cloud_services import PiEdgeAPIClientFactory
from edge_cloud_management_api.services.storage_service import insert_zones
from edge_cloud_management_api.services.storage_service import insert_zones, get_zones
from edge_cloud_management_api.services.federation_services import FederationManagerClientFactory


@@ -31,14 +31,14 @@ class EdgeCloudZone(BaseModel):
        pattern="^(active|inactive|unknown)$",
    )
    edgeCloudProvider: str = Field(..., description="Name of the Edge Cloud Provider")
    edgeCloudRegion: str | None = Field(..., description="Region of the Edge Cloud Zone")
    edgeCloudRegion: str | None = Field(None, description="Region of the Edge Cloud Zone")


class EdgeCloudQueryParams(BaseModel):
    x_correlator: str | None
    region: str | None
    status: str | None = Field(
        ...,
        None,
        description="Status of the Edge Cloud Zone",
        pattern="^(active|inactive|unknown)$",
    )
@@ -56,7 +56,9 @@ def get_local_zones() -> list[dict]:
        if isinstance(result, dict) and "error" in result:
            logger.error(f"SRM error: {result['error']}")
            return []
        if isinstance(result, list):
            return result
        return []

    except Exception as e:
        logger.exception("Unexpected error while retrieving local zones from SRM: %s", e)
@@ -67,6 +69,25 @@ def get_federated_zones() -> List[EdgeCloudZone]:
    """get partner/federated Operator Platform available zones from Federation Manager"""
    return []

def get_cached_zones() -> list[dict]:
    """Retrieve cached zones and merge with local SRM zones."""
    cached = []
    try:
        cached = get_zones()
    except Exception as e:
        logger.warning("Failed to read cached zones: %s", e)

    merged = list(cached or [])
    existing_ids = {
        zone.get("edgeCloudZoneId") for zone in merged if isinstance(zone, dict)
    }
    for zone in get_local_zones():
        zone_id = zone.get("edgeCloudZoneId") if isinstance(zone, dict) else None
        if zone_id and zone_id not in existing_ids:
            merged.append(zone)
            existing_ids.add(zone_id)
    return merged

def get_all_cloud_zones() -> List[EdgeCloudZone]:
    """Get all available zones from local and federated Operator Platforms"""

@@ -108,7 +129,7 @@ def get_edge_cloud_zones(x_correlator: str | None = None, region=None, status=No
        def query_status_matches(zone: EdgeCloudZone) -> bool:
            return query_params.status is None or zone.edgeCloudZoneStatus == query_params.status

        response = [EdgeCloudZone(**zone).model_dump() for zone in get_all_cloud_zones()]
        response = [EdgeCloudZone(**zone).model_dump() for zone in get_cached_zones()]
        return jsonify(response), 200

    except ValidationError as e:
+37 −19
Original line number Diff line number Diff line
@@ -27,7 +27,9 @@ def create_federation():
    """POST /partner - Create federation with partner OP."""

    body = request.get_json()
    token = requests.post(TOKEN_ENDPOINT, headers=token_headers, data=data).json().get('access_token')
    token = __get_token()
    if not token:
        return jsonify({"error": "Unable to obtain access token"}), 500
    response, code = federation_client.post_partner(body, token)
    fed = {'_id': response.get('federationContextId'), 'token': token}
    if code==200:
@@ -47,17 +49,24 @@ def create_federation():
        insert_zones(zones_to_insert)    
        insert_federation(fed)

        zone_ids = [zone.get('zoneId') for zone in av_zones]
        callback_url = body.get('availZoneNotifLink')  # Optional from request
        avail_zone_notif_link = config.AVAIL_ZONE_NOTIF_LINK or body.get("availZoneNotifLink")
        accepted_availability_zones = []
        for zone in av_zones or []:
            if isinstance(zone, dict) and zone.get("zoneId"):
                accepted_availability_zones.append({"zoneId": zone.get("zoneId")})
        if accepted_availability_zones:
            zone_response, zone_code = federation_client.subscribe_to_zones(
            response.get('federationContextId'),
            zone_ids,
                response.get("federationContextId"),
                accepted_availability_zones,
                token,
            callback_url
                avail_zone_notif_link,
            )
            if zone_code != 200:
            logger.warning(f"Zone subscription returned non-200: {zone_code} - {zone_response}")

                logger.warning(
                    "Zone subscription returned non-200: %s - %s",
                    zone_code,
                    zone_response,
                )
    return response, code

def get_federation(federationContextId):
@@ -119,8 +128,14 @@ def delete_onboarded_app(federationContextId, appId):
def request_zone_synch(federationContextId):
    token = __get_token()
    body = request.get_json()
    response = federation_client.request_zone_sync(federation_context_id=federationContextId, body=body, token=token)
    return jsonify(response)
    if not body:
        body = {}
    if not body.get("availZoneNotifLink"):
        body["availZoneNotifLink"] = config.AVAIL_ZONE_NOTIF_LINK
    response, code = federation_client.request_zone_sync(
        federation_context_id=federationContextId, body=body, token=token
    )
    return jsonify(response), code

def get_zone_resource_info(federationContextId, zoneId):
    token = __get_token()
@@ -133,8 +148,11 @@ def remove_zone_sync(federationContextId, zoneId):
    return jsonify(response)

def __get_token():
    bearer = connexion.request.headers['Authorization']
    token = bearer.split()[1]
    # __token = requests.post(TOKEN_ENDPOINT, headers=token_headers, data=data).json().get('access_token')
    return token
    bearer = connexion.request.headers.get('Authorization')
    if bearer:
        parts = bearer.split()
        if len(parts) == 2 and parts[0].lower() == "bearer":
            return parts[1]
    if TOKEN_ENDPOINT:
        return requests.post(TOKEN_ENDPOINT, headers=token_headers, data=data).json().get('access_token')
    return None
+101 −8

File changed.

Preview size limit exceeded, changes collapsed.

Loading