Commit d34ee10f authored by Antonio Gines Buendia Lopez's avatar Antonio Gines Buendia Lopez
Browse files

WIP

parent a6a0bd60
Loading
Loading
Loading
Loading
+118 −0
Original line number Original line Diff line number Diff line
import logging
from typing import Any, Dict

from pydantic import BaseModel

from pathcompextended.connector.api_client import APIClient
from pathcompextended.model.health import HealthCheckResponse

LOGGER = logging.getLogger(__name__)


# ============================================================================
# HEALTH CHECK MODELS
# ============================================================================


class HealthCheckGetResponse(HealthCheckResponse):
    """Response model for GET /health."""

    pass


# ============================================================================
# EXTERNAL KPIs MODELS
# ============================================================================


class ExternalKpisPostRequest(BaseModel):
    """Request model for POST /external-kpis."""

    # Accepts arbitrary JSON structure
    data: Dict[str, Any]


class ExternalKpisPostResponse(BaseModel):
    """Response model for POST /external-kpis."""

    message: str


# ============================================================================
# GENERAL HRAT ENDPOINTS CONNECTOR
# ============================================================================


class GeneralAPI:
    """
    Connector for general H-RAT endpoints (health, external KPIs, etc.).
    """

    def __init__(self, api_client: APIClient, base_api_url: str) -> None:
        self._api = api_client
        self._base_api_url = base_api_url

    def health(self) -> bool:
        """
        Check H-RAT service health.

        Returns:
            True if H-RAT is healthy, False otherwise.
        """
        LOGGER.debug("HRAT health check")
        res = self._api.get("/health")

        if res.status_code == 200:
            try:
                health_response = HealthCheckGetResponse(**res.json())
                LOGGER.debug(
                    "HRAT health check result: %s - %s",
                    health_response.status,
                    health_response.timestamp,
                )
                return health_response.status == "OK"
            except Exception as exc:  # pragma: no cover - defensive
                LOGGER.warning("Failed to parse health response: %s", exc)
                # If status is 200, consider it healthy even if parsing failed
                return True

        LOGGER.warning("HRAT health check failed: %s", res.status_code)
        return False

    def post_external_kpis(self, kpis: Dict[str, Any]) -> ExternalKpisPostResponse:
        """
        Send external KPIs to H-RAT.

        Args:
            kpis: Dictionary containing KPI metrics.

        Returns:
            ExternalKpisPostResponse with confirmation message.
        """
        LOGGER.debug("Sending external KPIs to HRAT")

        endpoint = f"{self._base_api_url}/external-kpis"

        # Create request model
        request = ExternalKpisPostRequest(data=kpis)
        body = request.model_dump(by_alias=True, exclude_none=True)

        res = self._api.post(endpoint, body)

        if res.status_code != 200:
            LOGGER.warning(
                "Failed to send external KPIs: %s - %s", res.status_code, res.text
            )
            # Don't raise, KPIs are not critical
            return ExternalKpisPostResponse(message=f"Failed: {res.text}")

        LOGGER.debug(
            "Response from HRAT external-kpis: %s - %s", res.status_code, res.text
        )

        try:
            return ExternalKpisPostResponse(**res.json())
        except Exception as exc:  # pragma: no cover - defensive
            LOGGER.warning("Failed to parse KPIs response: %s", exc)
            return ExternalKpisPostResponse(message=res.text)
+84 −495

File changed.

Preview size limit exceeded, changes collapsed.

+158 −0
Original line number Original line Diff line number Diff line
import logging
from typing import List, Optional

from pydantic import BaseModel, Field

from pathcompextended.connector.api_client import APIClient
from pathcompextended.model.network_context import NetworkContextRequest, Topology

LOGGER = logging.getLogger(__name__)


# ============================================================================
# NETWORK CONTEXT MODELS
# ============================================================================


class NetworkContextPostRequest(NetworkContextRequest):
    """Request model for POST /network-context."""

    pass


class NetworkContextPostResponse(BaseModel):
    """Response model for POST /network-context."""

    message: str
    total_topologies: int = Field(..., alias="total-topologies")

    class Config:
        populate_by_name = True


class NetworkContextGetResponse(BaseModel):
    """Response model for GET /network-context."""

    data: List[Topology]
    total: Optional[int] = None
    message: Optional[str] = None


class NetworkContextGetByControllerResponse(BaseModel):
    """Response model for GET /network-context/{controllerId}."""

    data: List[Topology]
    total: Optional[int] = None
    message: Optional[str] = None


class NetworkContextDeleteResponse(BaseModel):
    """Response model for DELETE /network-context/{controllerId}."""

    deleted_count: int = Field(..., alias="deleted-count")
    message: str

    class Config:
        populate_by_name = True


# ============================================================================
# NETWORK CONTEXT CONNECTOR
# ============================================================================


class NetworkContextAPI:
    """
    Connector for H-RAT Network Context endpoints.
    """

    def __init__(self, api_client: APIClient, base_api_url: str) -> None:
        self._api = api_client
        self._base_api_url = base_api_url

    @property
    def _base_path(self) -> str:
        return f"{self._base_api_url}/network-context"

    def post_network_context(
        self, request: NetworkContextPostRequest
    ) -> NetworkContextPostResponse:
        """
        Send network context to H-RAT.
        """
        LOGGER.debug("Sending network context to HRAT")

        endpoint = self._base_path
        body = request.model_dump(by_alias=True, exclude_none=True)

        res = self._api.post(endpoint, body)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to create network context: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return NetworkContextPostResponse(**res.json())

    def get_network_context(self) -> NetworkContextGetResponse:
        """
        Get all network contexts from H-RAT.
        """
        LOGGER.debug("Getting all network contexts from HRAT")

        endpoint = self._base_path
        res = self._api.get(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to get network contexts: %s - %s", res.status_code, res.text
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return NetworkContextGetResponse(**res.json())

    def get_network_context_by_controller(
        self, controller_id: str
    ) -> NetworkContextGetByControllerResponse:
        """
        Get network contexts by controller ID.
        """
        LOGGER.debug("Getting network contexts for controller %s", controller_id)

        endpoint = f"{self._base_path}/{controller_id}"
        res = self._api.get(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to get network contexts: %s - %s", res.status_code, res.text
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return NetworkContextGetByControllerResponse(**res.json())

    def delete_network_context_by_controller(
        self, controller_id: str
    ) -> NetworkContextDeleteResponse:
        """
        Delete network contexts by controller ID.
        """
        LOGGER.debug("Deleting network contexts for controller %s", controller_id)

        endpoint = f"{self._base_path}/{controller_id}"
        res = self._api.delete(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to delete network contexts: %s - %s", res.status_code, res.text
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return NetworkContextDeleteResponse(**res.json())
+208 −0
Original line number Original line Diff line number Diff line
import logging
from typing import List

from pydantic import BaseModel, Field

from pathcompextended.connector.api_client import APIClient
from pathcompextended.model.transport_network_slice_l3 import (
    TransportNetworkSliceL3Request,
    TransportNetworkSliceL3Response,
)

LOGGER = logging.getLogger(__name__)


# ============================================================================
# TRANSPORT NETWORK SLICE L3 MODELS
# ============================================================================


class TransportNetworkSliceL3PostRequest(TransportNetworkSliceL3Request):
    """Request model for POST /transport-network-slice-l3."""

    pass


class TransportNetworkSliceL3PostResponse(BaseModel):
    """Response model for POST /transport-network-slice-l3 (returns UUID string)."""

    # The API returns a UUID string directly, but we'll parse it
    transport_network_slice_l3_uuid: str


class TransportNetworkSliceL3GetResponse(BaseModel):
    """Response model for GET /transport-network-slice-l3/{uuid}."""

    data: TransportNetworkSliceL3Response
    message: str


class TransportNetworkSliceL3GetAllResponse(BaseModel):
    """Response model for GET /transport-network-slice-l3."""

    total: int
    data: List[TransportNetworkSliceL3Response]
    message: str


class TransportNetworkSliceL3DeleteResponse(BaseModel):
    """Response model for DELETE /transport-network-slice-l3/{uuid}."""

    deleted_count: int = Field(..., alias="deleted-count")
    deleted_transport_network_slice_l3_uuids: List[str] = Field(
        ..., alias="deleted-transport-network-slice-l3-uuids"
    )
    message: str

    class Config:
        populate_by_name = True


class TransportNetworkSliceL3DeleteAllResponse(BaseModel):
    """Response model for DELETE /transport-network-slice-l3."""

    deleted_count: int = Field(..., alias="deleted-count")
    deleted_transport_network_slice_l3_uuids: List[str] = Field(
        ..., alias="deleted-transport-network-slice-l3-uuids"
    )
    message: str

    class Config:
        populate_by_name = True


# ============================================================================
# TRANSPORT NETWORK SLICE L3 CONNECTOR
# ============================================================================


class TransportNetworkSliceL3API:
    """
    Connector for H-RAT Transport Network Slice L3 endpoints.
    """

    def __init__(self, api_client: APIClient, base_api_url: str) -> None:
        self._api = api_client
        self._base_api_url = base_api_url

    @property
    def _base_path(self) -> str:
        return f"{self._base_api_url}/transport-network-slice-l3"

    def post_transport_network_slice_l3(
        self, request: TransportNetworkSliceL3PostRequest
    ) -> TransportNetworkSliceL3PostResponse:
        """
        Request transport network slice L3 computation from H-RAT.
        """
        LOGGER.debug("Requesting transport network slice L3 to HRAT")

        endpoint = self._base_path
        body = request.model_dump(by_alias=True, exclude_none=True)

        res = self._api.post(endpoint, body)

        if res.status_code not in [200, 409]:
            LOGGER.error(
                "Failed to create transport network slice L3: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)

        # The API returns a UUID string directly
        uuid_response = res.text.strip('"')
        return TransportNetworkSliceL3PostResponse(
            transport_network_slice_l3_uuid=uuid_response
        )

    def get_transport_network_slice_l3(
        self, uuid: str
    ) -> TransportNetworkSliceL3GetResponse:
        """
        Get transport network slice L3 by UUID.
        """
        LOGGER.debug("Getting transport network slice L3 %s", uuid)

        endpoint = f"{self._base_path}/{uuid}"
        res = self._api.get(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to get transport network slice L3: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return TransportNetworkSliceL3GetResponse(**res.json())

    def get_all_transport_network_slices_l3(
        self,
    ) -> TransportNetworkSliceL3GetAllResponse:
        """
        Get all transport network slices L3.
        """
        LOGGER.debug("Getting all transport network slices L3")

        endpoint = self._base_path
        res = self._api.get(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to get transport network slices L3: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return TransportNetworkSliceL3GetAllResponse(**res.json())

    def delete_transport_network_slice_l3(
        self, uuid: str
    ) -> TransportNetworkSliceL3DeleteResponse:
        """
        Delete transport network slice L3 by UUID.
        """
        LOGGER.debug("Deleting transport network slice L3 %s", uuid)

        endpoint = f"{self._base_path}/{uuid}"
        res = self._api.delete(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to delete transport network slice L3: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return TransportNetworkSliceL3DeleteResponse(**res.json())

    def delete_all_transport_network_slices_l3(
        self,
    ) -> TransportNetworkSliceL3DeleteAllResponse:
        """
        Delete all transport network slices L3.
        """
        LOGGER.debug("Deleting all transport network slices L3")

        endpoint = self._base_path
        res = self._api.delete(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to delete all transport network slices L3: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return TransportNetworkSliceL3DeleteAllResponse(**res.json())
+206 −0
Original line number Original line Diff line number Diff line
import logging
from typing import List

from pydantic import BaseModel, Field

from pathcompextended.connector.api_client import APIClient
from pathcompextended.model.transport_optical_slice import (
    TransportOpticalSliceRequest,
    TransportOpticalSliceResponse,
)

LOGGER = logging.getLogger(__name__)


# ============================================================================
# TRANSPORT OPTICAL SLICE MODELS
# ============================================================================


class TransportOpticalSlicePostRequest(TransportOpticalSliceRequest):
    """Request model for POST /transport-optical-slice."""

    pass


class TransportOpticalSlicePostResponse(BaseModel):
    """Response model for POST /transport-optical-slice (returns UUID string)."""

    # The API returns a UUID string directly, but we'll parse it
    optical_slice_uuid: str


class TransportOpticalSliceGetResponse(BaseModel):
    """Response model for GET /transport-optical-slice/{uuid}."""

    data: TransportOpticalSliceResponse
    message: str


class TransportOpticalSliceGetAllResponse(BaseModel):
    """Response model for GET /transport-optical-slice."""

    total: int
    data: List[TransportOpticalSliceResponse]
    message: str


class TransportOpticalSliceDeleteResponse(BaseModel):
    """Response model for DELETE /transport-optical-slice/{uuid}."""

    deleted_optical_slice_uuid: str = Field(..., alias="deleted-optical-slice-uuid")
    deleted_transport_network_slice_l3_uuids: List[str] = Field(
        ..., alias="deleted-transport-network-slice-l3-uuids"
    )
    message: str

    class Config:
        populate_by_name = True


class TransportOpticalSliceDeleteAllResponse(BaseModel):
    """Response model for DELETE /transport-optical-slice."""

    deleted_optical_slice_uuids: List[str] = Field(
        ..., alias="deleted-optical-slice-uuids"
    )
    deleted_transport_network_slice_l3_uuids: List[str] = Field(
        ..., alias="deleted-transport-network-slice-l3-uuids"
    )
    message: str

    class Config:
        populate_by_name = True


# ============================================================================
# TRANSPORT OPTICAL SLICE CONNECTOR
# ============================================================================


class TransportOpticalSliceAPI:
    """
    Connector for H-RAT Transport Optical Slice endpoints.
    """

    def __init__(self, api_client: APIClient, base_api_url: str) -> None:
        self._api = api_client
        self._base_api_url = base_api_url

    @property
    def _base_path(self) -> str:
        return f"{self._base_api_url}/transport-optical-slice"

    def post_transport_optical_slice(
        self, request: TransportOpticalSlicePostRequest
    ) -> TransportOpticalSlicePostResponse:
        """
        Request transport optical slice computation from H-RAT.
        """
        LOGGER.debug("Requesting transport optical slice to HRAT")

        endpoint = self._base_path
        body = request.model_dump(by_alias=True, exclude_none=True)

        res = self._api.post(endpoint, body)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to create transport optical slice: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)

        # The API returns a UUID string directly
        uuid_response = res.text.strip('"')
        return TransportOpticalSlicePostResponse(optical_slice_uuid=uuid_response)

    def get_transport_optical_slice(
        self, uuid: str
    ) -> TransportOpticalSliceGetResponse:
        """
        Get transport optical slice by UUID.
        """
        LOGGER.debug("Getting transport optical slice %s", uuid)

        endpoint = f"{self._base_path}/{uuid}"
        res = self._api.get(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to get transport optical slice: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return TransportOpticalSliceGetResponse(**res.json())

    def get_all_transport_optical_slices(self) -> TransportOpticalSliceGetAllResponse:
        """
        Get all transport optical slices.
        """
        LOGGER.debug("Getting all transport optical slices")

        endpoint = self._base_path
        res = self._api.get(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to get transport optical slices: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return TransportOpticalSliceGetAllResponse(**res.json())

    def delete_transport_optical_slice(
        self, uuid: str
    ) -> TransportOpticalSliceDeleteResponse:
        """
        Delete transport optical slice by UUID.
        """
        LOGGER.debug("Deleting transport optical slice %s", uuid)

        endpoint = f"{self._base_path}/{uuid}"
        res = self._api.delete(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to delete transport optical slice: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return TransportOpticalSliceDeleteResponse(**res.json())

    def delete_all_transport_optical_slices(
        self,
    ) -> TransportOpticalSliceDeleteAllResponse:
        """
        Delete all transport optical slices.
        """
        LOGGER.debug("Deleting all transport optical slices")

        endpoint = self._base_path
        res = self._api.delete(endpoint)

        if res.status_code != 200:
            LOGGER.error(
                "Failed to delete all transport optical slices: %s - %s",
                res.status_code,
                res.text,
            )
            res.raise_for_status()

        LOGGER.debug("Response from HRAT: %s - %s", res.status_code, res.text)
        return TransportOpticalSliceDeleteAllResponse(**res.json())