Commit 22bb16c8 authored by Antonio Gines Buendia Lopez's avatar Antonio Gines Buendia Lopez
Browse files

Base API connection for H-RAT

parent 5495ba0b
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -13,3 +13,5 @@
# limitations under the License.



_old_component/
 No newline at end of file
+31 −5
Original line number Diff line number Diff line
@@ -41,16 +41,42 @@ BACKEND_URL = '{:s}://{:s}:{:d}{:s}'.format(


SETTING_NAME_ENABLE_FORECASTER = 'ENABLE_FORECASTER'
SETTING_NAME_ENABLE_HRAT = 'ENABLE_HRAT'
TRUE_VALUES = {'Y', 'YES', 'TRUE', 'T', 'E', 'ENABLE', 'ENABLED'}

def is_hrat_enabled() -> bool:
    pass
    # Get the configuration from the config


def is_forecaster_enabled() -> bool:
    if not is_deployed_forecaster(): return False
    is_enabled = get_setting(SETTING_NAME_ENABLE_FORECASTER, default=None)
    if is_enabled is None: return False
    str_is_enabled = str(is_enabled).upper()
    return str_is_enabled in TRUE_VALUES


# ------------------------------------------------------------ #
# --- HRAT Configuration ------------------------------------- #
# ------------------------------------------------------------ #

DEFAULT_HRAT_HOST  = '10.0.2.10'
DEFAULT_HRAT_PORT  = 9090
DEFAULT_HRAT_SCHEME = 'http'
DEFAULT_HRAT_BASEURL = '/api/resource-allocation'

HRAT_HOST  = str(os.environ.get('HRAT_HOST',  DEFAULT_HRAT_HOST ))
HRAT_PORT  = int(os.environ.get('HRAT_PORT',  DEFAULT_HRAT_PORT ))
HRAT_SCHEME = str(os.environ.get('HRAT_SCHEME',  DEFAULT_HRAT_SCHEME ))
HRAT_BASEURL = str(os.environ.get('HRAT_BASEURL', DEFAULT_HRAT_BASEURL))

HRAT_URL = '{:s}://{:s}:{:d}{:s}'.format(
    HRAT_SCHEME, HRAT_HOST, HRAT_PORT, HRAT_BASEURL)

def is_hrat_enabled() -> bool:
    """
    Check if H-RAT integration is enabled via configuration setting.
    
    Returns:
        True if H-RAT is enabled, False otherwise
    """
    is_enabled = get_setting(SETTING_NAME_ENABLE_HRAT, default=None)
    if is_enabled is None: return False
    str_is_enabled = str(is_enabled).upper()
    return str_is_enabled in TRUE_VALUES
+115 −7
Original line number Diff line number Diff line
@@ -13,9 +13,14 @@
# limitations under the License.

import grpc, logging
from typing import Iterator
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.pathcompextended_pb2 import PathCompReply, PathCompRequest
from common.proto.context_pb2 import Empty
from common.proto.pathcompextended_pb2 import (
    IetfNetworkSlice, LivenessProbe, NetworkContext, NetworkTopology,
    TransportNetworkSliceL3, TransportOpticalSlice, UUID
)
from common.proto.pathcompextended_pb2_grpc import PathCompExtendedServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
@@ -46,19 +51,122 @@ class PathCompExtendedClient:
        self.stub = None


    # ------------------------------------------------------------------------ #
    # -- Health checks ------------------------------------------------------- #
    # ------------------------------------------------------------------------ #

    @RETRY_DECORATOR
    def HealthCheck(self, request: Empty = None) -> LivenessProbe:
        if request is None:
            request = Empty()
        LOGGER.debug('HealthCheck request')
        response = self.stub.HealthCheck(request)
        LOGGER.debug('HealthCheck result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    # ------------------------------------------------------------------------ #
    # -- Network Context Operations ------------------------------------------ #
    # ------------------------------------------------------------------------ #

    @RETRY_DECORATOR
    def CreateNetworkContext(self, request: NetworkContext) -> Empty:
        LOGGER.debug('CreateNetworkContext request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.CreateNetworkContext(request)
        LOGGER.debug('CreateNetworkContext result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def DeleteNetworkContext(self, request: UUID) -> Empty:
        LOGGER.debug('DeleteNetworkContext request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.DeleteNetworkContext(request)
        LOGGER.debug('DeleteNetworkContext result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetNetworkContext(self, request: Empty = None) -> NetworkContext:
        if request is None:
            request = Empty()
        LOGGER.debug('GetNetworkContext request')
        response = self.stub.GetNetworkContext(request)
        LOGGER.debug('GetNetworkContext result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetSpecificNetworkContext(self, request: UUID) -> NetworkTopology:
        LOGGER.debug('GetSpecificNetworkContext request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetSpecificNetworkContext(request)
        LOGGER.debug('GetSpecificNetworkContext result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    # ------------------------------------------------------------------------ #
    # -- Transport Optical Slice Operations ---------------------------------- #
    # ------------------------------------------------------------------------ #

    @RETRY_DECORATOR
    def HealthCheck(self):
    def CreateTransportOpticalSlice(self, request: IetfNetworkSlice) -> TransportOpticalSlice:
        LOGGER.debug('CreateTransportOpticalSlice request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.CreateTransportOpticalSlice(request)
        LOGGER.debug('CreateTransportOpticalSlice result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def DeleteTransportOpticalSlice(self, request: UUID) -> Empty:
        LOGGER.debug('DeleteTransportOpticalSlice request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.DeleteTransportOpticalSlice(request)
        LOGGER.debug('DeleteTransportOpticalSlice result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetTransportOpticalSlices(self, request: Empty = None) -> Iterator[TransportOpticalSlice]:
        if request is None:
            request = Empty()
        LOGGER.debug('GetTransportOpticalSlices request')
        response = self.stub.GetTransportOpticalSlices(request)
        LOGGER.debug('GetTransportOpticalSlices result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetTransportOpticalSlice(self, request: UUID) -> TransportOpticalSlice:
        LOGGER.debug('GetTransportOpticalSlice request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetTransportOpticalSlice(request)
        LOGGER.debug('GetTransportOpticalSlice result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    # ------------------------------------------------------------------------ #
    # -- Transport Network Slice L3 Operations ------------------------------ #
    # ------------------------------------------------------------------------ #

    @RETRY_DECORATOR
    def CreateTransportNetworkSliceL3(self, request: IetfNetworkSlice) -> TransportNetworkSliceL3:
        LOGGER.debug('CreateTransportNetworkSliceL3 request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.CreateTransportNetworkSliceL3(request)
        LOGGER.debug('CreateTransportNetworkSliceL3 result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def DeleteTransportNetworkSliceL3(self, request: UUID) -> Empty:
        LOGGER.debug('DeleteTransportNetworkSliceL3 request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.DeleteTransportNetworkSliceL3(request)
        LOGGER.debug('DeleteTransportNetworkSliceL3 result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def GetTransportNetworkSlicesL3(self, request: Empty = None) -> Iterator[TransportNetworkSliceL3]:
        if request is None:
            request = Empty()
        LOGGER.debug('GetTransportNetworkSlicesL3 request')
        response = self.stub.GetTransportNetworkSlicesL3(request)
        LOGGER.debug('GetTransportNetworkSlicesL3 result: {:s}'.format(grpc_message_to_json_string(response)))
        return response

    @RETRY_DECORATOR
    def Compute(self, request : PathCompRequest) -> PathCompReply:
        LOGGER.debug('Compute request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.Compute(request)
        LOGGER.debug('Compute result: {:s}'.format(grpc_message_to_json_string(response)))
    def GetTransportNetworkSliceL3(self, request: UUID) -> TransportNetworkSliceL3:
        LOGGER.debug('GetTransportNetworkSliceL3 request: {:s}'.format(grpc_message_to_json_string(request)))
        response = self.stub.GetTransportNetworkSliceL3(request)
        LOGGER.debug('GetTransportNetworkSliceL3 result: {:s}'.format(grpc_message_to_json_string(response)))
        return response




+0 −1
Original line number Diff line number Diff line
@@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+102 −0
Original line number Diff line number Diff line
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import requests
import logging

LOGGER = logging.getLogger(__name__)

class APIClient:
    def __init__(self, base_url):
        self.base_url = base_url

    @staticmethod
    def build_headers(headers, **kwargs):
        headers = headers or {}

        if kwargs.get("jwt", "") != "":
            headers["Authorization"] = f"Bearer {kwargs['jwt']}"

        if headers.get("body", "") != "":
            headers["Content-Type"] = "application/json"

        return headers

    def get(self, endpoint, body=None, headers=None, jwt: str = None):
        url = f"{self.base_url}{endpoint}"
        LOGGER.info(f"GET request to {url}")

        headers = APIClient.build_headers(headers, jwt=jwt, body=body)

        LOGGER.debug(f"Headers: {headers}")
        LOGGER.debug(f"Body: {body}")

        if body:
            response = requests.get(url, headers=headers, json=body)
        else:
            response = requests.get(url, headers=headers)

        LOGGER.info(f"Response: {response.status_code}")
        return response

    def post(self, endpoint, body, headers=None, jwt: str = None):
        url = f"{self.base_url}{endpoint}"
        LOGGER.info(f"POST request to {url}")

        headers = APIClient.build_headers(headers, jwt=jwt, body=body)

        LOGGER.debug(f"Headers: {headers}")
        LOGGER.debug(f"Body: {body}")

        if body:
            response = requests.post(url, headers=headers, json=body)
        else:
            response = requests.post(url, headers=headers)

        LOGGER.info(f"Response: {response.status_code}")
        return response

    def patch(self, endpoint, body, headers=None, jwt: str = None):
        url = f"{self.base_url}{endpoint}"
        LOGGER.info(f"PATCH request to {url}")

        headers = APIClient.build_headers(headers, jwt=jwt, body=body)

        LOGGER.debug(f"Headers: {headers}")
        LOGGER.debug(f"Body: {body}")

        if body:
            response = requests.patch(url, headers=headers, json=body)
        else:
            response = requests.patch(url, headers=headers)

        LOGGER.info(f"Response: {response.status_code}")
        return response

    def delete(self, endpoint, body=None, headers=None, jwt: str = None):
        url = f"{self.base_url}{endpoint}"
        LOGGER.info(f"DELETE request to {url}")

        headers = APIClient.build_headers(headers, jwt=jwt, body=body)

        LOGGER.debug(f"Headers: {headers}")
        LOGGER.debug(f"Body: {body}")

        if body:
            response = requests.delete(url, headers=headers, json=body)
        else:
            response = requests.delete(url, headers=headers)

        LOGGER.info(f"Response: {response.status_code}")
        return response
Loading