Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 802 additions and 149 deletions
......@@ -15,17 +15,18 @@
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.proto.context_pb2 import Empty
from common.proto.kpi_manager_pb2 import KpiId
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiAlarms
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class KpiValueApiClient:
......@@ -34,8 +35,8 @@ class KpiValueApiClient:
if not port: port = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
......@@ -46,18 +47,25 @@ class KpiValueApiClient:
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
self.stub = None
@RETRY_DECORATOR
def StoreKpiValues(self, request: KpiValueList) -> Empty:
def StoreKpiValues(self, request: KpiValueList) -> Empty: # type: ignore
LOGGER.debug('StoreKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.StoreKpiValues(request)
LOGGER.debug('StoreKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList:
def SelectKpiValues(self, request: KpiValueFilter) -> KpiValueList: # type: ignore
LOGGER.debug('SelectKpiValues: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SelectKpiValues(request)
LOGGER.debug('SelectKpiValues result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetKpiAlarms(self, request: KpiId) -> KpiAlarms: # type: ignore
LOGGER.debug('GetKpiAlarms: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetKpiAlarms(request)
LOGGER.debug('GetKpiAlarms result: {:s}'.format(grpc_message_to_json_string(response)))
return response
......@@ -14,4 +14,5 @@
confluent-kafka==2.3.*
requests==2.27.*
prometheus-api-client==0.5.3
\ No newline at end of file
prometheus-api-client==0.5.3
apscheduler==3.10.1
......@@ -12,18 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc, json
from datetime import datetime
import logging, grpc, json, queue
from typing import Dict
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from confluent_kafka import KafkaError
from common.proto.context_pb2 import Empty
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_value_api_pb2_grpc import KpiValueAPIServiceServicer
from common.proto.kpi_value_api_pb2 import KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from common.proto.kpi_value_api_pb2 import KpiAlarms, KpiValueList, KpiValueFilter, KpiValue, KpiValueType
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from confluent_kafka import Producer as KafkaProducer
from confluent_kafka import Consumer as KafkaConsumer
from prometheus_api_client import PrometheusConnect
from prometheus_api_client.utils import parse_datetime
......@@ -37,8 +41,14 @@ PROM_URL = "http://prometheus-k8s.monitoring.svc.cluster.local:9090" # TO
class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
def __init__(self):
LOGGER.debug('Init KpiValueApiService')
self.listener_topic = KafkaTopic.ALARMS.value
self.result_queue = queue.Queue()
self.scheduler = BackgroundScheduler()
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'kpi-value-api-frontend',
'auto.offset.reset' : 'latest'})
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def StoreKpiValues(self, request: KpiValueList, grpc_context: grpc.ServicerContext
) -> Empty:
......@@ -109,17 +119,14 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
kpi_value = KpiValue()
kpi_value.kpi_id.kpi_id = record['metric']['__name__'],
kpi_value.timestamp = value[0],
kpi_value.kpi_value_type = self.ConverValueToKpiValueType(value[1])
kpi_value.kpi_value_type.CopyFrom(self.ConverValueToKpiValueType(value['kpi_value']))
response.kpi_value_list.append(kpi_value)
return response
def GetKpiSampleType(self, kpi_value: str, kpi_manager_client):
print("--- START -----")
kpi_id = KpiId()
kpi_id.kpi_id.uuid = kpi_value.kpi_id.kpi_id.uuid
# print("KpiId generated: {:}".format(kpi_id))
try:
kpi_descriptor_object = KpiDescriptor()
kpi_descriptor_object = kpi_manager_client.GetKpiDescriptor(kpi_id)
......@@ -135,26 +142,90 @@ class KpiValueApiServiceServicerImpl(KpiValueAPIServiceServicer):
LOGGER.info("Unable to get KpiDescriptor. Error: {:}".format(e))
print ("Unable to get KpiDescriptor. Error: {:}".format(e))
def ConverValueToKpiValueType(self, value):
# Check if the value is an integer (int64)
try:
int_value = int(value)
return KpiValueType(int64Val=int_value)
except (ValueError, TypeError):
pass
# Check if the value is a float
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetKpiAlarms(self, request: KpiId, grpc_context: grpc.ServicerContext) -> KpiAlarms: # type: ignore
"""
Get Alarms from Kafka return Alrams periodically.
"""
LOGGER.debug('GetKpiAlarms: {:}'.format(request))
response = KpiAlarms()
for alarm_key, value in self.StartResponseListener(request.kpi_id.uuid):
response.start_timestamp.timestamp = datetime.strptime(
value["window_start"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
response.kpi_id.kpi_id.uuid = value['kpi_id']
for key, threshold in value.items():
if key not in ['kpi_id', 'window']:
response.alarms[key] = threshold
yield response
def StartResponseListener(self, filter_key=None):
"""
Start the Kafka response listener with APScheduler and return key-value pairs periodically.
"""
LOGGER.info("Starting StartResponseListener")
# Schedule the ResponseListener at fixed intervals
self.scheduler.add_job(
self.response_listener,
trigger=IntervalTrigger(seconds=5),
args=[filter_key],
id=f"response_listener_{self.listener_topic}",
replace_existing=True
)
self.scheduler.start()
LOGGER.info(f"Started Kafka listener for topic {self.listener_topic}...")
try:
float_value = float(value)
return KpiValueType(floatVal=float_value)
except (ValueError, TypeError):
pass
# Check if the value is a boolean
if value.lower() in ['true', 'false']:
bool_value = value.lower() == 'true'
return KpiValueType(boolVal=bool_value)
# If none of the above, treat it as a string
return KpiValueType(stringVal=value)
while True:
LOGGER.info("entering while...")
key, value = self.result_queue.get() # Wait until a result is available
LOGGER.info("In while true ...")
yield key, value # Yield the result to the calling function
except Exception as e:
LOGGER.warning("Listener stopped. Error: {:}".format(e))
finally:
self.scheduler.shutdown()
def response_listener(self, filter_key=None):
"""
Poll Kafka messages and put key-value pairs into the queue.
"""
LOGGER.info(f"Polling Kafka topic {self.listener_topic}...")
consumer = self.kafka_consumer
consumer.subscribe([self.listener_topic])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
elif msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
LOGGER.error(f"Kafka error: {msg.error()}")
break
try:
key = msg.key().decode('utf-8') if msg.key() else None
if filter_key is not None and key == filter_key:
value = json.loads(msg.value().decode('utf-8'))
LOGGER.info(f"Received key: {key}, value: {value}")
self.result_queue.put((key, value))
else:
LOGGER.warning(f"Skipping message with unmatched key: {key} - {filter_key}")
except Exception as e:
LOGGER.error(f"Error processing Kafka message: {e}")
def delivery_callback(self, err, msg):
if err: LOGGER.debug('Message delivery failed: {:}'.format(err))
else: LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
def ConverValueToKpiValueType(self, value):
kpi_value_type = KpiValueType()
if isinstance(value, int):
kpi_value_type.int32Val = value
elif isinstance(value, float):
kpi_value_type.floatVal = value
elif isinstance(value, str):
kpi_value_type.stringVal = value
elif isinstance(value, bool):
kpi_value_type.boolVal = value
# Add other checks for different types as needed
return kpi_value_type
......@@ -13,9 +13,16 @@
# limitations under the License.
import uuid, time
from common.proto import kpi_manager_pb2
from common.proto.kpi_value_api_pb2 import KpiValue, KpiValueList
def create_kpi_id_request():
_create_kpi_id = kpi_manager_pb2.KpiId()
_create_kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
# _create_kpi_id.kpi_id.uuid = str(uuid.uuid4())
return _create_kpi_id
def create_kpi_value_list():
_create_kpi_value_list = KpiValueList()
# To run this experiment sucessfully, add an existing UUID of a KPI Descriptor from the KPI DB.
......
......@@ -21,8 +21,8 @@ from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc)
from kpi_value_api.service.KpiValueApiService import KpiValueApiService
from kpi_value_api.client.KpiValueApiClient import KpiValueApiClient
from kpi_value_api.tests.messages import create_kpi_value_list
from kpi_value_api.tests.messages import create_kpi_value_list, create_kpi_id_request
from common.proto.kpi_value_api_pb2 import KpiAlarms
LOCAL_HOST = '127.0.0.1'
KPIVALUEAPI_SERVICE_PORT = get_service_port_grpc(ServiceNameEnum.KPIVALUEAPI) # type: ignore
......@@ -78,7 +78,14 @@ def test_validate_kafka_topics():
response = KafkaTopic.create_all_topics()
assert isinstance(response, bool)
def test_store_kpi_values(kpi_value_api_client):
LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list())
assert isinstance(response, Empty)
# def test_GetKpiAlarms(kpi_value_api_client):
# LOGGER.debug(" >>> test_GetKpiAlarms")
# stream = kpi_value_api_client.GetKpiAlarms(create_kpi_id_request())
# for response in stream:
# LOGGER.debug(str(response))
# assert isinstance(response, KpiAlarms)
# def test_store_kpi_values(kpi_value_api_client):
# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# response = kpi_value_api_client.StoreKpiValues(create_kpi_value_list())
# assert isinstance(response, Empty)
......@@ -59,9 +59,11 @@ unit_test kpi-value-writer:
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
--env ALLOW_ANONYMOUS_LOGIN=yes
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
- >
docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
......@@ -76,6 +78,8 @@ unit_test kpi-value-writer:
$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs zookeeper
- docker logs kafka
- docker logs $IMAGE_NAME
- >
docker exec -i $IMAGE_NAME bash -c
......@@ -84,8 +88,8 @@ unit_test kpi-value-writer:
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker rm -f zookeeper
- docker rm -f kafka
- docker rm -f zookeeper
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
......
......@@ -15,7 +15,6 @@
# read Kafka stream from Kafka topic
import logging
from typing import Dict
from prometheus_client import Gauge
from common.proto.kpi_sample_types_pb2 import KpiSampleType
......@@ -45,13 +44,13 @@ class MetricWriterToPrometheus:
'slice_id' : kpi_descriptor.slice_id.slice_uuid.uuid,
'connection_id' : kpi_descriptor.connection_id.connection_uuid.uuid,
'link_id' : kpi_descriptor.link_id.link_uuid.uuid,
'time_stamp' : kpi_value['timestamp'],
'kpi_value' : kpi_value['kpi_value_type']
'time_stamp' : kpi_value.timestamp.timestamp,
'kpi_value' : kpi_value.kpi_value_type.floatVal
}
# LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
LOGGER.debug("Cooked Kpi: {:}".format(cooked_kpi))
return cooked_kpi
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: Dict):
def create_and_expose_cooked_kpi(self, kpi_descriptor: KpiDescriptor, kpi_value: KpiValue):
# merge both gRPC messages into single varible.
cooked_kpi = self.merge_kpi_descriptor_and_kpi_value(kpi_descriptor, kpi_value)
tags_to_exclude = {'kpi_description', 'kpi_sample_type', 'kpi_value'}
......@@ -74,7 +73,7 @@ class MetricWriterToPrometheus:
connection_id = cooked_kpi['connection_id'],
link_id = cooked_kpi['link_id'],
time_stamp = cooked_kpi['time_stamp'],
).set(cooked_kpi['kpi_value'])
).set(float(cooked_kpi['kpi_value']))
LOGGER.debug("Metric pushed to the endpoints: {:}".format(PROM_METRICS[metric_name]))
except ValueError as e:
......
......@@ -29,5 +29,5 @@ def test_validate_kafka_topics():
def test_KafkaConsumer():
LOGGER.debug(" --->>> test_kafka_consumer: START <<<--- ")
kpi_value_writer = KpiValueWriter()
kpi_value_writer.RunKafkaConsumer()
# kpi_value_writer = KpiValueWriter()
# kpi_value_writer.RunKafkaConsumer()
......@@ -16,9 +16,24 @@ FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
apt-get --yes --quiet --quiet install wget g++ git build-essential cmake libpcre2-dev python3-dev python3-cffi && \
rm -rf /var/lib/apt/lists/*
# Download, build and install libyang. Note that APT package is outdated
# - Ref: https://github.com/CESNET/libyang
# - Ref: https://github.com/CESNET/libyang-python/
RUN mkdir -p /var/libyang
RUN git clone https://github.com/CESNET/libyang.git /var/libyang
WORKDIR /var/libyang
RUN git fetch
RUN git checkout v2.1.148
RUN mkdir -p /var/libyang/build
WORKDIR /var/libyang/build
RUN cmake -D CMAKE_BUILD_TYPE:String="Release" ..
RUN make
RUN make install
RUN ldconfig
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
......@@ -53,24 +68,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Download, build and install libyang. Note that APT package is outdated
# - Ref: https://github.com/CESNET/libyang
# - Ref: https://github.com/CESNET/libyang-python/
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install build-essential cmake libpcre2-dev python3-dev python3-cffi && \
rm -rf /var/lib/apt/lists/*
RUN mkdir -p /var/libyang
RUN git clone https://github.com/CESNET/libyang.git /var/libyang
WORKDIR /var/libyang
RUN git fetch
RUN git checkout v2.1.148
RUN mkdir -p /var/libyang/build
WORKDIR /var/libyang/build
RUN cmake -D CMAKE_BUILD_TYPE:String="Release" ..
RUN make
RUN make install
RUN ldconfig
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/nbi
WORKDIR /var/teraflow/nbi
......@@ -89,6 +86,8 @@ COPY src/service/__init__.py service/__init__.py
COPY src/service/client/. service/client/
COPY src/slice/__init__.py slice/__init__.py
COPY src/slice/client/. slice/client/
COPY src/qkd_app/__init__.py qkd_app/__init__.py
COPY src/qkd_app/client/. qkd_app/client/
RUN mkdir -p /var/teraflow/tests/tools
COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/
......
......@@ -16,9 +16,10 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables)
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables
)
from .NbiService import NbiService
from .rest_server.RestServer import RestServer
from .rest_server.nbi_plugins.etsi_bwm import register_etsi_bwm_api
......@@ -28,6 +29,7 @@ from .rest_server.nbi_plugins.ietf_l3vpn import register_ietf_l3vpn
from .rest_server.nbi_plugins.ietf_network import register_ietf_network
from .rest_server.nbi_plugins.ietf_network_slice import register_ietf_nss
from .rest_server.nbi_plugins.ietf_acl import register_ietf_acl
from .rest_server.nbi_plugins.qkd_app import register_qkd_app
from .rest_server.nbi_plugins.tfs_api import register_tfs_api
terminate = threading.Event()
......@@ -47,8 +49,10 @@ def main():
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SLICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
......@@ -72,6 +76,7 @@ def main():
register_ietf_network(rest_server)
register_ietf_nss(rest_server) # Registering NSS entrypoint
register_ietf_acl(rest_server)
register_qkd_app(rest_server)
register_tfs_api(rest_server)
rest_server.start()
......
......@@ -19,7 +19,7 @@ class NameMappings:
def __init__(self) -> None:
self._device_uuid_to_name : Dict[str, str] = dict()
self._endpoint_uuid_to_name : Dict[Tuple[str, str], str] = dict()
def store_device_name(self, device : Device) -> None:
device_uuid = device.device_id.device_uuid.uuid
device_name = device.name
......
......@@ -12,19 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging
import enum, json, logging
import pyangbind.lib.pybindJSON as pybindJSON
from flask import request
from flask.json import jsonify
from flask_restful import Resource
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.Settings import get_setting
from common.proto.context_pb2 import ContextId, Empty
from common.tools.context_queries.Topology import get_topology_details
from common.tools.object_factory.Context import json_context_id
from context.client.ContextClient import ContextClient
from nbi.service.rest_server.nbi_plugins.tools.Authentication import HTTP_AUTH
from nbi.service.rest_server.nbi_plugins.tools.HttpStatusCodes import HTTP_OK, HTTP_SERVERERROR
from .bindings import ietf_network
from .ComposeNetwork import compose_network
from .ManualFixes import manual_fixes
from .YangHandler import YangHandler
LOGGER = logging.getLogger(__name__)
......@@ -33,6 +37,14 @@ TE_TOPOLOGY_NAMES = [
'providerId-10-clientId-0-topologyId-2'
]
class Renderer(enum.Enum):
LIBYANG = 'LIBYANG'
PYANGBIND = 'PYANGBIND'
DEFAULT_RENDERER = Renderer.LIBYANG
USE_RENDERER = get_setting('IETF_NETWORK_RENDERER', default=DEFAULT_RENDERER.value)
class Networks(Resource):
@HTTP_AUTH.login_required
def get(self):
......@@ -40,31 +52,59 @@ class Networks(Resource):
topology_id = ''
try:
context_client = ContextClient()
#target = get_slice_by_uuid(context_client, vpn_id, rw_copy=True)
#if target is None:
# raise Exception('VPN({:s}) not found in database'.format(str(vpn_id)))
ietf_nets = ietf_network()
if USE_RENDERER == Renderer.PYANGBIND.value:
#target = get_slice_by_uuid(context_client, vpn_id, rw_copy=True)
#if target is None:
# raise Exception('VPN({:s}) not found in database'.format(str(vpn_id)))
ietf_nets = ietf_network()
topology_details = get_topology_details(
context_client, DEFAULT_TOPOLOGY_NAME, context_uuid=DEFAULT_CONTEXT_NAME,
#rw_copy=True
)
if topology_details is None:
MSG = 'Topology({:s}/{:s}) not found'
raise Exception(MSG.format(DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME))
topology_details = get_topology_details(
context_client, DEFAULT_TOPOLOGY_NAME, context_uuid=DEFAULT_CONTEXT_NAME, #rw_copy=True
)
if topology_details is None:
MSG = 'Topology({:s}/{:s}) not found'
raise Exception(MSG.format(DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME))
for te_topology_name in TE_TOPOLOGY_NAMES:
ietf_net = ietf_nets.networks.network.add(te_topology_name)
compose_network(ietf_net, te_topology_name, topology_details)
for te_topology_name in TE_TOPOLOGY_NAMES:
ietf_net = ietf_nets.networks.network.add(te_topology_name)
compose_network(ietf_net, te_topology_name, topology_details)
# TODO: improve these workarounds to enhance performance
json_response = json.loads(pybindJSON.dumps(ietf_nets, mode='ietf'))
# Workaround; pyangbind does not allow to set otn_topology / eth-tran-topology
manual_fixes(json_response)
elif USE_RENDERER == Renderer.LIBYANG.value:
yang_handler = YangHandler()
json_response = []
# TODO: improve these workarounds to enhance performance
json_response = json.loads(pybindJSON.dumps(ietf_nets, mode='ietf'))
# Workaround; pyangbind does not allow to set otn_topology / eth-tran-topology
manual_fixes(json_response)
contexts = context_client.ListContexts(Empty()).contexts
context_names = [context.name for context in contexts]
LOGGER.info(f'Contexts detected: {context_names}')
for context_name in context_names:
topologies = context_client.ListTopologies(ContextId(**json_context_id(context_name))).topologies
topology_names = [topology.name for topology in topologies]
LOGGER.info(f'Topologies detected for context {context_name}: {topology_names}')
for topology_name in topology_names:
topology_details = get_topology_details(context_client, topology_name, context_name)
if topology_details is None:
raise Exception(f'Topology({context_name}/{topology_name}) not found')
network_reply = yang_handler.compose_network(topology_name, topology_details)
json_response.append(network_reply)
yang_handler.destroy()
else:
raise Exception('Unsupported Renderer: {:s}'.format(str(USE_RENDERER)))
response = jsonify(json_response)
response.status_code = HTTP_OK
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Something went wrong Retrieving Topology({:s})'.format(str(topology_id)))
response = jsonify({'error': str(e)})
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import libyang, logging, os
from typing import Any
from common.proto.context_pb2 import TopologyDetails, Device, Link
from .NameMapping import NameMappings
from context.client.ContextClient import ContextClient
from common.tools.object_factory.Device import json_device_id
from common.proto.context_pb2 import DeviceId
LOGGER = logging.getLogger(__name__)
YANG_DIR = os.path.join(os.path.dirname(__file__), 'yang')
YANG_MODULES = ['ietf-network', 'ietf-network-topology', 'ietf-l3-unicast-topology']
class YangHandler:
def __init__(self) -> None:
self._yang_context = libyang.Context(YANG_DIR)
for yang_module_name in YANG_MODULES:
LOGGER.info('Loading module: {:s}'.format(str(yang_module_name)))
self._yang_context.load_module(yang_module_name).feature_enable_all()
def compose_network(self, te_topology_name: str, topology_details: TopologyDetails) -> dict:
networks = self._yang_context.create_data_path('/ietf-network:networks')
network = networks.create_path(f'network[network-id="{te_topology_name}"]')
network.create_path('network-id', te_topology_name)
network_types = network.create_path('network-types')
network_types.create_path('ietf-l3-unicast-topology:l3-unicast-topology')
name_mappings = NameMappings()
for device in topology_details.devices:
self.compose_node(device, name_mappings, network)
for link in topology_details.links:
self.compose_link(link, name_mappings, network)
return json.loads(networks.print_mem('json'))
def compose_node(self, dev: Device, name_mappings: NameMappings, network: Any) -> None:
device_name = dev.name
name_mappings.store_device_name(dev)
node = network.create_path(f'node[node-id="{device_name}"]')
node.create_path('node-id', device_name)
node_attributes = node.create_path('ietf-l3-unicast-topology:l3-node-attributes')
node_attributes.create_path('name', device_name)
context_client = ContextClient()
device = context_client.GetDevice(DeviceId(**json_device_id(device_name)))
for endpoint in device.device_endpoints:
name_mappings.store_endpoint_name(dev, endpoint)
self._process_device_config(device, node)
def _process_device_config(self, device: Device, node: Any) -> None:
for config in device.device_config.config_rules:
if config.WhichOneof('config_rule') != 'custom' or '/interface[' not in config.custom.resource_key:
continue
for endpoint in device.device_endpoints:
endpoint_name = endpoint.name
if f'/interface[{endpoint_name}]' in config.custom.resource_key or f'/interface[{endpoint_name}.' in config.custom.resource_key:
interface_name = config.custom.resource_key.split('interface[')[1].split(']')[0]
self._create_termination_point(node, interface_name, endpoint_name, config.custom.resource_value)
def _create_termination_point(self, node: Any, interface_name: str, endpoint_name: str, resource_value: str) -> None:
ip_addresses = self._extract_ip_addresses(json.loads(resource_value))
if ip_addresses:
tp = node.create_path(f'ietf-network-topology:termination-point[tp-id="{interface_name}"]')
tp.create_path('tp-id', interface_name)
tp_attributes = tp.create_path('ietf-l3-unicast-topology:l3-termination-point-attributes')
for ip in ip_addresses:
tp_attributes.create_path('ip-address', ip)
tp_attributes.create_path('interface-name', endpoint_name)
@staticmethod
def _extract_ip_addresses(resource_value: dict) -> list:
ip_addresses = []
if 'address_ip' in resource_value:
ip_addresses.append(resource_value['address_ip'])
if 'address_ipv6' in resource_value:
ip_addresses.append(resource_value['address_ipv6'])
return ip_addresses
def compose_link(self, link_specs: Link, name_mappings: NameMappings, network: Any) -> None:
link_name = link_specs.name
links = network.create_path(f'ietf-network-topology:link[link-id="{link_name}"]')
links.create_path('link-id', link_name)
self._create_link_endpoint(links, 'source', link_specs.link_endpoint_ids[0], name_mappings)
self._create_link_endpoint(links, 'destination', link_specs.link_endpoint_ids[-1], name_mappings)
def _create_link_endpoint(self, links: Any, endpoint_type: str, endpoint_id: Any, name_mappings: NameMappings) -> None:
endpoint = links.create_path(endpoint_type)
if endpoint_type == 'destination': endpoint_type = 'dest'
endpoint.create_path(f'{endpoint_type}-node', name_mappings.get_device_name(endpoint_id.device_id))
endpoint.create_path(f'{endpoint_type}-tp', name_mappings.get_endpoint_name(endpoint_id))
def destroy(self) -> None:
self._yang_context.destroy()
module ietf-l3-unicast-topology {
yang-version 1.1;
namespace
"urn:ietf:params:xml:ns:yang:ietf-l3-unicast-topology";
prefix "l3t";
import ietf-network {
prefix "nw";
}
import ietf-network-topology {
prefix "nt";
}
import ietf-inet-types {
prefix "inet";
}
import ietf-routing-types {
prefix "rt-types";
}
organization
"IETF I2RS (Interface to the Routing System) Working Group";
contact
"WG Web: <https://datatracker.ietf.org/wg/i2rs/>
WG List: <mailto:i2rs@ietf.org>
Editor: Alexander Clemm
<mailto:ludwig@clemm.org>
Editor: Jan Medved
<mailto:jmedved@cisco.com>
Editor: Robert Varga
<mailto:robert.varga@pantheon.tech>
Editor: Xufeng Liu
<mailto:xufeng.liu.ietf@gmail.com>
Editor: Nitin Bahadur
<mailto:nitin_bahadur@yahoo.com>
Editor: Hariharan Ananthakrishnan
<mailto:hari@packetdesign.com>";
description
"This module defines a model for Layer 3 Unicast
topologies.
Copyright (c) 2018 IETF Trust and the persons identified as
authors of the code. All rights reserved.
Redistribution and use in source and binary forms, with or
without modification, is permitted pursuant to, and subject
to the license terms contained in, the Simplified BSD License
set forth in Section 4.c of the IETF Trust's Legal Provisions
Relating to IETF Documents
(https://trustee.ietf.org/license-info).
This version of this YANG module is part of
RFC 8346; see the RFC itself for full legal notices.";
revision "2018-02-26" {
description
"Initial revision.";
reference
"RFC 8346: A YANG Data Model for Layer 3 Topologies";
}
identity flag-identity {
description "Base type for flags";
}
typedef l3-event-type {
type enumeration {
enum "add" {
description
"A Layer 3 node, link, prefix, or termination point has
been added";
}
enum "remove" {
description
"A Layer 3 node, link, prefix, or termination point has
been removed";
}
enum "update" {
description
"A Layer 3 node, link, prefix, or termination point has
been updated";
}
}
description "Layer 3 event type for notifications";
}
typedef prefix-flag-type {
type identityref {
base "flag-identity";
}
description "Prefix flag attributes";
}
typedef node-flag-type {
type identityref {
base "flag-identity";
}
description "Node flag attributes";
}
typedef link-flag-type {
type identityref {
base "flag-identity";
}
description "Link flag attributes";
}
typedef l3-flag-type {
type identityref {
base "flag-identity";
}
description "L3 flag attributes";
}
grouping l3-prefix-attributes {
description
"L3 prefix attributes";
leaf prefix {
type inet:ip-prefix;
description
"IP prefix value";
}
leaf metric {
type uint32;
description
"Prefix metric";
}
leaf-list flag {
type prefix-flag-type;
description
"Prefix flags";
}
}
grouping l3-unicast-topology-type {
description "Identifies the topology type to be L3 Unicast.";
container l3-unicast-topology {
presence "indicates L3 Unicast topology";
description
"The presence of the container node indicates L3 Unicast
topology";
}
}
grouping l3-topology-attributes {
description "Topology scope attributes";
container l3-topology-attributes {
description "Contains topology attributes";
leaf name {
type string;
description
"Name of the topology";
}
leaf-list flag {
type l3-flag-type;
description
"Topology flags";
}
}
}
grouping l3-node-attributes {
description "L3 node scope attributes";
container l3-node-attributes {
description
"Contains node attributes";
leaf name {
type inet:domain-name;
description
"Node name";
}
leaf-list flag {
type node-flag-type;
description
"Node flags";
}
leaf-list router-id {
type rt-types:router-id;
description
"Router-id for the node";
}
list prefix {
key "prefix";
description
"A list of prefixes along with their attributes";
uses l3-prefix-attributes;
}
}
}
grouping l3-link-attributes {
description
"L3 link scope attributes";
container l3-link-attributes {
description
"Contains link attributes";
leaf name {
type string;
description
"Link Name";
}
leaf-list flag {
type link-flag-type;
description
"Link flags";
}
leaf metric1 {
type uint64;
description
"Link Metric 1";
}
leaf metric2 {
type uint64;
description
"Link Metric 2";
}
}
}
grouping l3-termination-point-attributes {
description "L3 termination point scope attributes";
container l3-termination-point-attributes {
description
"Contains termination point attributes";
choice termination-point-type {
description
"Indicates the termination point type";
case ip {
leaf-list ip-address {
type inet:ip-address;
description
"IPv4 or IPv6 address.";
}
}
case unnumbered {
leaf unnumbered-id {
type uint32;
description
"Unnumbered interface identifier.
The identifier will correspond to the ifIndex value
of the interface, i.e., the ifIndex value of the
ifEntry that represents the interface in
implementations where the Interfaces Group MIB
(RFC 2863) is supported.";
reference
"RFC 2863: The Interfaces Group MIB";
}
}
case interface-name {
leaf interface-name {
type string;
description
"Name of the interface. The name can (but does not
have to) correspond to an interface reference of a
containing node's interface, i.e., the path name of a
corresponding interface data node on the containing
node reminiscent of data type interface-ref defined
in RFC 8343. It should be noted that data type
interface-ref of RFC 8343 cannot be used directly,
as this data type is used to reference an interface
in a datastore of a single node in the network, not
to uniquely reference interfaces across a network.";
reference
"RFC 8343: A YANG Data Model for Interface Management";
}
}
}
}
}
augment "/nw:networks/nw:network/nw:network-types" {
description
"Introduces new network type for L3 Unicast topology";
uses l3-unicast-topology-type;
}
augment "/nw:networks/nw:network" {
when "nw:network-types/l3t:l3-unicast-topology" {
description
"Augmentation parameters apply only for networks with
L3 Unicast topology";
}
description
"L3 Unicast for the network as a whole";
uses l3-topology-attributes;
}
augment "/nw:networks/nw:network/nw:node" {
when "../nw:network-types/l3t:l3-unicast-topology" {
description
"Augmentation parameters apply only for networks with
L3 Unicast topology";
}
description
"L3 Unicast node-level attributes ";
uses l3-node-attributes;
}
augment "/nw:networks/nw:network/nt:link" {
when "../nw:network-types/l3t:l3-unicast-topology" {
description
"Augmentation parameters apply only for networks with
L3 Unicast topology";
}
description
"Augments topology link attributes";
uses l3-link-attributes;
}
augment "/nw:networks/nw:network/nw:node/"
+"nt:termination-point" {
when "../../nw:network-types/l3t:l3-unicast-topology" {
description
"Augmentation parameters apply only for networks with
L3 Unicast topology";
}
description "Augments topology termination point configuration";
uses l3-termination-point-attributes;
}
notification l3-node-event {
description
"Notification event for L3 node";
leaf l3-event-type {
type l3-event-type;
description
"Event type";
}
uses nw:node-ref;
uses l3-unicast-topology-type;
uses l3-node-attributes;
}
notification l3-link-event {
description
"Notification event for L3 link";
leaf l3-event-type {
type l3-event-type;
description
"Event type";
}
uses nt:link-ref;
uses l3-unicast-topology-type;
uses l3-link-attributes;
}
notification l3-prefix-event {
description
"Notification event for L3 prefix";
leaf l3-event-type {
type l3-event-type;
description
"Event type";
}
uses nw:node-ref;
uses l3-unicast-topology-type;
container prefix {
description
"Contains L3 prefix attributes";
uses l3-prefix-attributes;
}
}
notification termination-point-event {
description
"Notification event for L3 termination point";
leaf l3-event-type {
type l3-event-type;
description
"Event type";
}
uses nt:tp-ref;
uses l3-unicast-topology-type;
uses l3-termination-point-attributes;
}
}
......@@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid, json
import uuid
import json
from flask import request
from flask_restful import Resource
from common.proto.context_pb2 import Empty
......@@ -21,7 +22,6 @@ from common.Constants import DEFAULT_CONTEXT_NAME
from context.client.ContextClient import ContextClient
from qkd_app.client.QKDAppClient import QKDAppClient
class _Resource(Resource):
def __init__(self) -> None:
super().__init__()
......@@ -32,18 +32,56 @@ class Index(_Resource):
def get(self):
return {'hello': 'world'}
class ListDevices(_Resource):
def get(self):
"""
List devices and associate the apps with them.
"""
devices = self.context_client.ListDevices(Empty()).devices
for device in devices:
# Fetch apps associated with this device
device.apps = self.get_apps_for_device(device.device_id.device_uuid.uuid)
return {'devices': [self.format_device(device) for device in devices]}
def get_apps_for_device(self, device_uuid):
"""
Fetch the apps associated with a given device UUID.
"""
try:
# Call the AppService to get the list of apps
apps_list = self.qkd_app_client.ListApps(Empty())
# Filter apps for this specific device
device_apps = []
for app in apps_list.apps:
if app.local_device_id.device_uuid.uuid == device_uuid or \
app.remote_device_id.device_uuid.uuid == device_uuid:
device_apps.append(app)
return device_apps
except Exception as e:
print(f"Error fetching apps for device {device_uuid}: {e}")
return []
def format_device(self, device):
"""
Formats a device object to include the associated apps in the response.
"""
return {
'device_uuid': device.device_id.device_uuid.uuid,
'name': device.name,
'type': device.device_type,
'status': device.device_operational_status,
'apps': [{'app_id': app.app_id.app_uuid.uuid, 'app_status': app.app_status, 'app_type': app.app_type} for app in device.apps]
}
class CreateQKDApp(_Resource):
# Optare: Post request for the QKD Node to call the TeraflowSDN. Example of requests below
def post(self):
app = request.get_json()['app']
devices = self.context_client.ListDevices(Empty())
devices = devices.devices
devices = self.context_client.ListDevices(Empty()).devices
local_device = None
# This for-loop won't be necessary if we can garantee Device ID is the same as QKDN Id
# This for-loop won't be necessary if Device ID is guaranteed to be the same as QKDN Id
for device in devices:
for config_rule in device.device_config.config_rules:
if config_rule.custom.resource_key == '__node__':
......@@ -53,15 +91,6 @@ class CreateQKDApp(_Resource):
local_device = device
break
# Optare: Todo: Verify that a service is present for this app
'''
requests.post('http://10.211.36.220/app/create_qkd_app', json={'app': {'server_app_id':'1', 'client_app_id':[], 'app_status':'ON', 'local_qkdn_id':'00000001-0000-0000-0000-000000000000', 'backing_qkdl_id':['00000003-0002-0000-0000-000000000000']}})
requests.post('http://10.211.36.220/app/create_qkd_app', json={'app': {'server_app_id':'1', 'client_app_id':[], 'app_status':'ON', 'local_qkdn_id':'00000003-0000-0000-0000-000000000000', 'backing_qkdl_id':['00000003-0002-0000-0000-000000000000']}})
'''
if local_device is None:
return {"status": "fail"}
......@@ -76,11 +105,7 @@ class CreateQKDApp(_Resource):
'remote_device_id': {'device_uuid': {'uuid': ''}},
}
# Optare: This will call our internal RegisterApp which supports the creation of both internal and external app.
# Optare the verification for knowing if two parties are requesting the same app is done inside RegisterApp's function
self.qkd_app_client.RegisterApp(App(**external_app_src_dst))
# Optare: Todo: Communicate by SBI with both Nodes of the new App
return {"status": "success"}
......@@ -4,7 +4,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
......@@ -12,19 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from qkd_app.service.rest_server.RestServer import RestServer
from .Resources import (
CreateQKDApp, Index)
from nbi.service.rest_server.RestServer import RestServer
from .Resources import CreateQKDApp, Index
URL_PREFIX = '/qkd_app'
# Use 'path' type since some identifiers might contain char '/' and Flask is unable to recognize them in 'string' type.
RESOURCES = [
# (endpoint_name, resource_class, resource_url)
('api.index', Index, '/'),
('api.register_qkd_app', CreateQKDApp, '/create_qkd_app'),
('api.index', Index, '/'),
('api.register_qkd_app', CreateQKDApp, '/create_qkd_app'),
]
def register_qkd_app(app_server : RestServer):
def register_qkd_app(rest_server : RestServer):
for endpoint_name, resource_class, resource_url in RESOURCES:
app_server.add_resource(resource_class, URL_PREFIX + resource_url, endpoint=endpoint_name)
rest_server.add_resource(resource_class, URL_PREFIX + resource_url, endpoint=endpoint_name)
This diff is collapsed.
This diff is collapsed.
......@@ -37,6 +37,7 @@ FROM registry.access.redhat.com/ubi8/ubi-minimal:8.4 AS release
ARG JAVA_PACKAGE=java-11-openjdk-headless
ARG RUN_JAVA_VERSION=1.3.8
ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en'
# Install java and the run-java script
# Also set up permissions for user `1001`
RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \
......