Skip to content
Snippets Groups Projects
Commit 36ce58db authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch...

Merge branch 'feat/266-cttc-e2e-orchestrator-subscription-on-device-added-using-socketio' into 'feat/216-cttc-implement-integration-test-between-e2e-ip-optical-sdn-controllers'

Resolve "(CTTC) Update recommendations to use SocketIO on NBI and E2E Orch components"

See merge request !328
parents 36adfc01 8121e286
No related branches found
No related tags found
2 merge requests!328Resolve "(CTTC) Update recommendations to use SocketIO on NBI and E2E Orch components",!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
Showing
with 408 additions and 139 deletions
......@@ -42,7 +42,7 @@ export KFK_REDEPLOY=${KFK_REDEPLOY:-""}
mkdir -p ${TMP_MANIFESTS_FOLDER}
function kafka_deploy() {
# copy zookeeper and kafka manifest files to temporary manifest location
# copy zookeeper and kafka manifest files to temporary manifest location
cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
......@@ -57,11 +57,12 @@ function kafka_deploy() {
# Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
#KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
#KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
# Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
#sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
sed -i "s/<KAFKA_NAMESPACE>/${KFK_NAMESPACE}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
......
src/tests/ecoc24/
\ No newline at end of file
......@@ -41,18 +41,6 @@ spec:
env:
- name: LOG_LEVEL
value: "DEBUG"
- name: WS_IP_HOST
value: "nbiservice.tfs-ip.svc.cluster.local"
- name: WS_IP_PORT
value: "8761"
- name: WS_E2E_HOST
value: "e2e-orchestratorservice.tfs-e2e.svc.cluster.local"
- name: WS_E2E_PORT
value: "8762"
- name: EXT_CONTROLLER1_ADD
value: "10.1.1.96"
- name: EXT_CONTROLLER1_PORT
value: "8003"
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10050"]
......
......@@ -19,14 +19,13 @@ metadata:
labels:
app: zookeeper-service
name: zookeeper-service
namespace: kafka
spec:
type: NodePort
type: ClusterIP
ports:
- name: zookeeper-port
port: 2181
nodePort: 30181
targetPort: 2181
#nodePort: 30181
#targetPort: 2181
selector:
app: zookeeper
---
......@@ -36,7 +35,6 @@ metadata:
labels:
app: zookeeper
name: zookeeper
namespace: kafka
spec:
replicas: 1
selector:
......@@ -52,4 +50,4 @@ spec:
imagePullPolicy: IfNotPresent
name: zookeeper
ports:
- containerPort: 2181
\ No newline at end of file
- containerPort: 2181
......@@ -19,7 +19,6 @@ metadata:
labels:
app: kafka-broker
name: kafka-service
namespace: kafka
spec:
ports:
- port: 9092
......@@ -32,7 +31,6 @@ metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
replicas: 1
selector:
......@@ -49,11 +47,12 @@ spec:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: <ZOOKEEPER_INTERNAL_IP>:2181
#value: <ZOOKEEPER_INTERNAL_IP>:2181
value: zookeeper-service.<KAFKA_NAMESPACE>.svc.cluster.local:2181
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092
value: PLAINTEXT://kafka-service.<KAFKA_NAMESPACE>.svc.cluster.local:9092
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
......
......@@ -39,25 +39,28 @@ spec:
#- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
value: "DEBUG"
- name: FLASK_ENV
value: "production" # change to "development" if developing
- name: IETF_NETWORK_RENDERER
value: "LIBYANG"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
initialDelaySeconds: 30 # NBI's gunicorn takes 30~40 seconds to bootstrap
periodSeconds: 10
failureThreshold: 3
failureThreshold: 6
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
initialDelaySeconds: 30 # NBI's gunicorn takes 30~40 seconds to bootstrap
periodSeconds: 10
failureThreshold: 3
failureThreshold: 6
resources:
requests:
cpu: 150m
......
......@@ -17,12 +17,28 @@ kind: Ingress
metadata:
name: tfs-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /$2
nginx.ingress.kubernetes.io/limit-rps: "50"
nginx.ingress.kubernetes.io/limit-connections: "50"
nginx.ingress.kubernetes.io/proxy-connect-timeout: "50"
nginx.ingress.kubernetes.io/proxy-send-timeout: "50"
nginx.ingress.kubernetes.io/proxy-read-timeout: "50"
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
# Enable websocket services and configure sticky cookies (seems not to work)
#nginx.org/websocket-services: "nbiservice"
#nginx.org/sticky-cookie-services: "serviceName=nbiservice tfs-nbi-session expires=1h path=/socket.io"
# Enable sticky sessions (use same backend for all connections
# originated by a specific client, identified through its cookie)
nginx.ingress.kubernetes.io/affinity: "cookie"
nginx.ingress.kubernetes.io/affinity-mode: "persistent"
nginx.ingress.kubernetes.io/session-cookie-name: "tfs-nbi-session"
nginx.ingress.kubernetes.io/session-cookie-path: "/socket.io"
nginx.ingress.kubernetes.io/session-cookie-expires: "3600"
nginx.ingress.kubernetes.io/session-cookie-change-on-failure: "true"
nginx.ingress.kubernetes.io/limit-rps: "50" # max requests per second per source IP
nginx.ingress.kubernetes.io/limit-connections: "50" # max concurrent connections per source IP
nginx.ingress.kubernetes.io/proxy-connect-timeout: "60" # max timeout for connecting to server
# Enable long-lived connections, required for websocket/socket.io streams
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600" # max timeout between two successive read operations
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600" # max timeout between two successive write operations
spec:
rules:
- http:
......@@ -48,6 +64,13 @@ spec:
name: nbiservice
port:
number: 8080
- path: /()(socket.io/.*)
pathType: Prefix
backend:
service:
name: nbiservice
port:
number: 8080
- path: /()(tfs-api/.*)
pathType: Prefix
backend:
......
......@@ -36,7 +36,7 @@ spec:
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
value: "DEBUG"
- name: ENABLE_FORECASTER
value: "NO"
readinessProbe:
......
......@@ -39,11 +39,10 @@ spec:
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
- name: WS_IP_PORT
value: "8761"
- name: WS_E2E_PORT
value: "8762"
value: "DEBUG"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10080"]
......
ofc25 0 → 120000
src/tests/ofc25/
\ No newline at end of file
......@@ -12,26 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// protocol buffers documentation: https://developers.google.com/protocol-buffers/docs/proto3
syntax = "proto3";
package vnt_manager;
import "context.proto";
service VNTManagerService {
rpc VNTSubscript (VNTSubscriptionRequest) returns (VNTSubscriptionReply) {}
rpc ListVirtualLinkIds (context.Empty) returns (context.LinkIdList) {}
rpc ListVirtualLinks (context.Empty) returns (context.LinkList) {}
rpc GetVirtualLink (context.LinkId) returns (context.Link) {}
rpc SetVirtualLink (context.Link) returns (context.LinkId) {}
rpc RemoveVirtualLink (context.LinkId) returns (context.Empty) {}
}
message VNTSubscriptionRequest {
string host = 1;
string port = 2;
}
message VNTSubscriptionReply {
string subscription = 1;
rpc ListVirtualLinkIds(context.Empty ) returns (context.LinkIdList) {}
rpc ListVirtualLinks (context.Empty ) returns (context.LinkList ) {}
rpc GetVirtualLink (context.LinkId) returns (context.Link ) {}
rpc SetVirtualLink (context.Link ) returns (context.LinkId ) {}
rpc RemoveVirtualLink (context.LinkId) returns (context.Empty ) {}
}
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum, logging, requests
from requests.auth import HTTPBasicAuth
from typing import Any, Optional, Set
class RestRequestMethod(enum.Enum):
GET = 'get'
POST = 'post'
PUT = 'put'
PATCH = 'patch'
DELETE = 'delete'
EXPECTED_STATUS_CODES : Set[int] = {
requests.codes['OK' ], # 200 - OK
requests.codes['CREATED' ], # 201 - Created
requests.codes['ACCEPTED' ], # 202 - Accepted
requests.codes['NO_CONTENT'], # 204 - No Content
}
URL_TEMPLATE = '{:s}://{:s}:{:d}/{:s}'
def compose_basic_auth(
username : Optional[str] = None, password : Optional[str] = None
) -> Optional[HTTPBasicAuth]:
if username is None or password is None: return None
return HTTPBasicAuth(username, password)
class SchemeEnum(enum.Enum):
HTTP = 'http'
HTTPS = 'https'
def check_scheme(scheme : str) -> str:
str_scheme = str(scheme).lower()
enm_scheme = SchemeEnum._value2member_map_[str_scheme]
return enm_scheme.value
class RestClient:
def __init__(
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None,
timeout : int = 30, verify_certs : bool = True, allow_redirects : bool = True,
logger : Optional[logging.Logger] = None
) -> None:
self._address = address
self._port = int(port)
self._scheme = check_scheme(scheme)
self._auth = compose_basic_auth(username=username, password=password)
self._timeout = int(timeout)
self._verify_certs = verify_certs
self._allow_redirects = allow_redirects
self._logger = logger
def _compose_url(self, endpoint : str) -> str:
endpoint = endpoint.lstrip('/')
return URL_TEMPLATE.format(self._scheme, self._address, self._port, endpoint)
def _log_msg_request(
self, method : RestRequestMethod, request_url : str, body : Optional[Any],
log_level : int = logging.INFO
) -> str:
msg = 'Request: {:s} {:s}'.format(str(method.value).upper(), str(request_url))
if body is not None: msg += ' body={:s}'.format(str(body))
if self._logger is not None: self._logger.log(log_level, msg)
return msg
def _log_msg_check_reply(
self, method : RestRequestMethod, request_url : str, body : Optional[Any],
reply : requests.Response, expected_status_codes : Set[int],
log_level : int = logging.INFO
) -> str:
msg = 'Reply: {:s}'.format(str(reply.text))
if self._logger is not None: self._logger.log(log_level, msg)
http_status_code = reply.status_code
if http_status_code in expected_status_codes: return msg
MSG = 'Request failed. method={:s} url={:s} body={:s} status_code={:s} reply={:s}'
msg = MSG.format(
str(method.value).upper(), str(request_url), str(body),
str(http_status_code), str(reply.text)
)
self._logger.error(msg)
raise Exception(msg)
def _do_rest_request(
self, method : RestRequestMethod, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
request_url = self._compose_url(endpoint)
self._log_msg_request(method, request_url, body)
try:
headers = {'accept': 'application/json'}
reply = requests.request(
method.value, request_url, headers=headers, json=body,
auth=self._auth, verify=self._verify_certs, timeout=self._timeout,
allow_redirects=self._allow_redirects
)
except Exception as e:
MSG = 'Request failed. method={:s} url={:s} body={:s}'
msg = MSG.format(str(method.value).upper(), request_url, str(body))
self._logger.exception(msg)
raise Exception(msg) from e
self._log_msg_check_reply(method, request_url, body, reply, expected_status_codes)
if reply.content and len(reply.content) > 0: return reply.json()
return None
def get(
self, endpoint : str,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.GET, endpoint,
expected_status_codes=expected_status_codes
)
def post(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.POST, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def put(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.PUT, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def patch(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.PATCH, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def delete(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.DELETE, endpoint, body=body,
expected_status_codes=expected_status_codes
)
......@@ -45,12 +45,13 @@ from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from vnt_manager.client.VNTManagerClient import VNTManagerClient
from .Tools import (
format_device_custom_config_rules, format_service_custom_config_rules,
format_slice_custom_config_rules, get_descriptors_add_contexts,
get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_controllers_and_network_devices,
split_devices_by_rules
split_devices_by_rules, split_links_by_type
)
LOGGER = logging.getLogger(__name__)
......@@ -112,7 +113,8 @@ class DescriptorLoader:
self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None,
num_workers : int = 1,
context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None,
vntm_client : Optional[VNTManagerClient] = None
) -> None:
if (descriptors is None) == (descriptors_file is None):
# pylint: disable=broad-exception-raised
......@@ -190,10 +192,11 @@ class DescriptorLoader:
self.__services_add = None
self.__slices_add = None
self.__ctx_cli = ContextClient() if context_client is None else context_client
self.__dev_cli = DeviceClient() if device_client is None else device_client
self.__svc_cli = ServiceClient() if service_client is None else service_client
self.__slc_cli = SliceClient() if slice_client is None else slice_client
self.__ctx_cli = ContextClient() if context_client is None else context_client
self.__dev_cli = DeviceClient() if device_client is None else device_client
self.__svc_cli = ServiceClient() if service_client is None else service_client
self.__slc_cli = SliceClient() if slice_client is None else slice_client
self.__vnt_cli = VNTManagerClient() if vntm_client is None else vntm_client
self.__results : TypeResults = list()
......@@ -351,22 +354,38 @@ class DescriptorLoader:
controllers_add, network_devices_add = split_controllers_and_network_devices(self.__devices_add)
typed_links = split_links_by_type(self.__links)
typed_normal_links = typed_links.get('normal', list())
typed_optical_links = typed_links.get('optical', list())
typed_optical_links.extend(self.__optical_links)
typed_virtual_links = typed_links.get('virtual', list())
self.__ctx_cli.connect()
self.__dev_cli.connect()
self.__svc_cli.connect()
self.__slc_cli.connect()
if len(self.__services ) > 0: self.__svc_cli.connect()
if len(self.__slices ) > 0: self.__slc_cli.connect()
if len(typed_virtual_links) > 0: self.__vnt_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
self._process_descr('controller', 'add', self.__dev_cli.AddDevice, Device, controllers_add )
self._process_descr('device', 'add', self.__dev_cli.AddDevice, Device, network_devices_add )
self._process_descr('device', 'config', self.__dev_cli.ConfigureDevice, Device, self.__devices_config)
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links )
self._process_descr('link', 'add', self.__ctx_cli.SetOpticalLink, OpticalLink, self.__optical_links )
self._process_descr('service', 'add', self.__svc_cli.CreateService, Service, self.__services_add )
self._process_descr('service', 'update', self.__svc_cli.UpdateService, Service, self.__services )
self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add )
self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices )
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, typed_normal_links )
if len(typed_optical_links) > 0:
self._process_descr('link', 'add', self.__ctx_cli.SetOpticalLink, OpticalLink, typed_optical_links )
if len(typed_virtual_links) > 0:
self._process_descr('link', 'add', self.__vnt_cli.SetVirtualLink, Link, typed_virtual_links )
if len(self.__services) > 0:
self._process_descr('service','add', self.__svc_cli.CreateService, Service, self.__services_add )
self._process_descr('service','update', self.__svc_cli.UpdateService, Service, self.__services )
if len(self.__slices) > 0:
self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add )
self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices )
# By default the Context component automatically assigns devices and links to topologies based on their
# endpoints, and assigns topologies, services, and slices to contexts based on their identifiers.
......@@ -467,10 +486,17 @@ class DescriptorLoader:
def _unload_normal_mode(self) -> None:
# Normal mode: follows the automated workflows in the different components
typed_links = split_links_by_type(self.links)
typed_normal_links = typed_links.get('normal', list())
typed_optical_links = typed_links.get('optical', list())
typed_optical_links.extend(self.optical_links)
typed_virtual_links = typed_links.get('virtual', list())
self.__ctx_cli.connect()
self.__dev_cli.connect()
self.__svc_cli.connect()
self.__slc_cli.connect()
if len(self.services ) > 0: self.__svc_cli.connect()
if len(self.slices ) > 0: self.__slc_cli.connect()
if len(typed_virtual_links) > 0: self.__vnt_cli.connect()
for _, slice_list in self.slices.items():
for slice_ in slice_list:
......@@ -480,10 +506,13 @@ class DescriptorLoader:
for service in service_list:
self.__svc_cli.DeleteService(ServiceId(**service['service_id']))
for optical_link in self.optical_links:
for virtual_link in typed_virtual_links:
self.__vnt_cli.RemoveVirtualLink(LinkId(**virtual_link['link_id']))
for optical_link in typed_optical_links:
self.__ctx_cli.DeleteOpticalLink(LinkId(**optical_link['link_id']))
for link in self.links:
for link in typed_normal_links:
self.__ctx_cli.RemoveLink(LinkId(**link['link_id']))
for device in self.devices:
......
......@@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import copy, json
from typing import Dict, List, Optional, Tuple, Union
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import DeviceDriverEnum
from common.proto.context_pb2 import DeviceDriverEnum, LinkTypeEnum
def get_descriptors_add_contexts(contexts : List[Dict]) -> List[Dict]:
contexts_add = copy.deepcopy(contexts)
......@@ -131,3 +132,30 @@ def split_controllers_and_network_devices(devices : List[Dict]) -> Tuple[List[Di
else:
network_devices.append(device)
return controllers, network_devices
def link_type_to_str(link_type : Union[int, str]) -> Optional[str]:
if isinstance(link_type, int): return LinkTypeEnum.Name(link_type)
if isinstance(link_type, str): return LinkTypeEnum.Name(LinkTypeEnum.Value(link_type))
return None
def split_links_by_type(links : List[Dict]) -> Dict[str, List[Dict]]:
typed_links = collections.defaultdict(list)
for link in links:
link_type = link.get('link_type', LinkTypeEnum.LINKTYPE_UNKNOWN)
str_link_type = link_type_to_str(link_type)
if str_link_type is None:
MSG = 'Unsupported LinkType in Link({:s})'
raise Exception(MSG.format(str(link)))
link_type = LinkTypeEnum.Value(str_link_type)
if link_type in {LinkTypeEnum.LINKTYPE_UNKNOWN, LinkTypeEnum.LINKTYPE_COPPER, LinkTypeEnum.LINKTYPE_RADIO}:
typed_links['normal'].append(link)
elif link_type in {LinkTypeEnum.LINKTYPE_FIBER}:
typed_links['optical'].append(link)
elif link_type in {LinkTypeEnum.LINKTYPE_VIRTUAL}:
typed_links['virtual'].append(link)
else:
MSG = 'Unsupported LinkType({:s}) in Link({:s})'
raise Exception(MSG.format(str_link_type, str(link)))
return typed_links
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import logging, time
from enum import Enum
from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting
......@@ -21,6 +21,12 @@ from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
KAFKA_TOPIC_NUM_PARTITIONS = 1
KAFKA_TOPIC_REPLICATION_FACTOR = 1
KAFKA_TOPIC_LIST_TIMEOUT = 5
TOPIC_CREATE_WAIT_ITERATIONS = 10
TOPIC_CREATE_WAIT_TIME = 1
class KafkaConfig(Enum):
@staticmethod
......@@ -35,59 +41,87 @@ class KafkaConfig(Enum):
@staticmethod
def get_admin_client():
SERVER_ADDRESS = KafkaConfig.get_kafka_address()
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS })
ADMIN_CLIENT = AdminClient({'bootstrap.servers': SERVER_ADDRESS})
return ADMIN_CLIENT
class KafkaTopic(Enum):
# TODO: Later to be populated from ENV variable.
TELEMETRY_REQUEST = 'topic_telemetry_request'
TELEMETRY_RESPONSE = 'topic_telemetry_response'
RAW = 'topic_raw'
LABELED = 'topic_labeled'
VALUE = 'topic_value'
ALARMS = 'topic_alarms'
ANALYTICS_REQUEST = 'topic_analytics_request'
ANALYTICS_RESPONSE = 'topic_analytics_response'
TELEMETRY_REQUEST = 'topic_telemetry_request'
TELEMETRY_RESPONSE = 'topic_telemetry_response'
RAW = 'topic_raw'
LABELED = 'topic_labeled'
VALUE = 'topic_value'
ALARMS = 'topic_alarms'
ANALYTICS_REQUEST = 'topic_analytics_request'
ANALYTICS_RESPONSE = 'topic_analytics_response'
VNTMANAGER_REQUEST = 'topic_vntmanager_request'
VNTMANAGER_RESPONSE = 'topic_vntmanager_response'
NBI_SOCKETIO_WORKERS = 'tfs_nbi_socketio'
@staticmethod
def create_all_topics() -> bool:
"""
'''
Method to create Kafka topics defined as class members
"""
all_topics = [member.value for member in KafkaTopic]
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address()))
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully or Already Exists")
'''
LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address())))
kafka_admin_client = KafkaConfig.get_admin_client()
topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
existing_topics = set(topic_metadata.topics.keys())
LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))
missing_topics = [
NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
for topic in KafkaTopic
if topic.value not in existing_topics
]
LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))
if len(missing_topics) == 0:
LOGGER.debug('All topics already existed.')
return True
else:
LOGGER.debug("Error creating all topics")
return False
@staticmethod
def create_new_topic_if_not_exists(new_topics: list) -> bool:
"""
Method to create Kafka topic if it does not exist.
Args:
list of topic: containing the topic name(s) to be created on Kafka
"""
LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics))
for topic in new_topics:
create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
failed_topic_creations = set()
for topic, future in create_topic_future_map.items():
try:
topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5)
# LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics))
if topic not in topic_metadata.topics:
# If the topic does not exist, create a new topic
print("Topic {:} does not exist. Creating...".format(topic))
LOGGER.debug("Topic {:} does not exist. Creating...".format(topic))
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
KafkaConfig.get_admin_client().create_topics([new_topic])
else:
print("Topic name already exists: {:}".format(topic))
LOGGER.debug("Topic name already exists: {:}".format(topic))
except Exception as e:
LOGGER.debug("Failed to create topic: {:}".format(e))
return False
return True
LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
future.result() # Blocks until topic is created or raises an exception
LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
except: # pylint: disable=bare-except
LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
failed_topic_creations.add(topic)
if len(failed_topic_creations) > 0: return False
LOGGER.debug('All topics created.')
# Wait until topics appear in metadata
desired_topics = {topic.value for topic in KafkaTopic}
missing_topics = set()
for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
existing_topics = set(topic_metadata.topics.keys())
missing_topics = desired_topics.difference(existing_topics)
if len(missing_topics) == 0: break
MSG = 'Waiting for Topics({:s}) to appear in metadata...'
LOGGER.debug(MSG.format(str(missing_topics)))
time.sleep(TOPIC_CREATE_WAIT_TIME)
if len(missing_topics) > 0:
MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
LOGGER.error(MSG.format(str(missing_topics)))
return False
else:
LOGGER.debug('All topics created and available.')
return True
# TODO: create all topics after the deployments (Telemetry and Analytics)
if __name__ == '__main__':
import os
if 'KFK_SERVER_ADDRESS' not in os.environ:
os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
KafkaTopic.create_all_topics()
......@@ -15,6 +15,8 @@
import copy
from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import LinkTypeEnum
def get_link_uuid(a_endpoint_id : Dict, z_endpoint_id : Dict) -> str:
return '{:s}/{:s}=={:s}/{:s}'.format(
a_endpoint_id['device_id']['device_uuid']['uuid'], a_endpoint_id['endpoint_uuid']['uuid'],
......@@ -25,9 +27,13 @@ def json_link_id(link_uuid : str) -> Dict:
def json_link(
link_uuid : str, endpoint_ids : List[Dict], name : Optional[str] = None,
link_type : LinkTypeEnum = LinkTypeEnum.LINKTYPE_UNKNOWN,
total_capacity_gbps : Optional[float] = None, used_capacity_gbps : Optional[float] = None
) -> Dict:
result = {'link_id': json_link_id(link_uuid), 'link_endpoint_ids': copy.deepcopy(endpoint_ids)}
result = {
'link_id': json_link_id(link_uuid), 'link_type': link_type,
'link_endpoint_ids': copy.deepcopy(endpoint_ids),
}
if name is not None: result['name'] = name
if total_capacity_gbps is not None:
attributes : Dict = result.setdefault('attributes', dict())
......
......@@ -30,10 +30,10 @@ def json_service_id(service_uuid : str, context_id : Optional[Dict] = None):
def json_service(
service_uuid : str, service_type : ServiceTypeEnum, context_id : Optional[Dict] = None,
status : ServiceStatusEnum = ServiceStatusEnum.SERVICESTATUS_PLANNED,
endpoint_ids : List[Dict] = [], constraints : List[Dict] = [], config_rules : List[Dict] = []):
return {
name : Optional[str] = None, status : ServiceStatusEnum = ServiceStatusEnum.SERVICESTATUS_PLANNED,
endpoint_ids : List[Dict] = [], constraints : List[Dict] = [], config_rules : List[Dict] = []
) -> Dict:
result = {
'service_id' : json_service_id(service_uuid, context_id=context_id),
'service_type' : service_type,
'service_status' : {'service_status': status},
......@@ -41,6 +41,8 @@ def json_service(
'service_constraints' : copy.deepcopy(constraints),
'service_config' : {'config_rules': copy.deepcopy(config_rules)},
}
if name is not None: result['name'] = name
return result
def json_service_qkd_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
......
......@@ -18,7 +18,9 @@ from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, selectinload, sessionmaker
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, List, Optional, Set, Tuple
from common.proto.context_pb2 import Empty, EventTypeEnum, Link, LinkId, LinkIdList, LinkList, TopologyId
from common.proto.context_pb2 import (
Empty, EventTypeEnum, Link, LinkId, LinkIdList, LinkList, TopologyId
)
from common.message_broker.MessageBroker import MessageBroker
from common.method_wrappers.ServiceExceptions import NotFoundException
from common.tools.object_factory.Link import json_link_id
......
......@@ -66,9 +66,10 @@ def optical_link_set(db_engine : Engine, messagebroker : MessageBroker, request
now = datetime.datetime.now(datetime.timezone.utc)
# By default, always add link to default Context/Topology
topology_uuids : Set[str] = set()
related_topologies : List[Dict] = list()
# By default, always add link to default Context/Topology
_,topology_uuid = topology_get_uuid(TopologyId(), allow_random=False, allow_default=True)
related_topologies.append({
'topology_uuid': topology_uuid,
......@@ -77,15 +78,14 @@ def optical_link_set(db_engine : Engine, messagebroker : MessageBroker, request
topology_uuids.add(topology_uuid)
link_endpoints_data : List[Dict] = list()
for i,endpoint_id in enumerate(request.link_endpoint_ids):
endpoint_topology_uuid, endpoint_device_uuid, endpoint_uuid = endpoint_get_uuid(
endpoint_id, endpoint_name="", allow_random=True)
endpoint_topology_uuid, _, endpoint_uuid = endpoint_get_uuid(
endpoint_id, allow_random=False)
link_endpoints_data.append({
'link_uuid' : link_uuid,
'endpoint_uuid': endpoint_uuid,
'position' : i,
})
if endpoint_topology_uuid not in topology_uuids:
......
......@@ -12,13 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String ,Boolean
import operator
from sqlalchemy import (
Boolean, CheckConstraint, Column, DateTime, ForeignKey, Integer, String
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from typing import Dict
from ._Base import _Base
from .Slot import C_Slot ,S_Slot , L_Slot
from .Slot import C_Slot, S_Slot, L_Slot
class OpticalLinkModel(_Base):
__tablename__ = 'opticallink'
......@@ -59,7 +61,7 @@ class OpticalLinkModel(_Base):
},
'link_endpoint_ids' : [
optical_endpoint.endpoint.dump_id()
for optical_endpoint in self.opticallink_endpoints
for optical_endpoint in sorted(self.opticallink_endpoints, key=operator.attrgetter('position'))
],
}
return result
......@@ -69,6 +71,11 @@ class OpticalLinkEndPointModel(_Base):
link_uuid = Column(ForeignKey('opticallink.opticallink_uuid', ondelete='CASCADE' ), primary_key=True)
endpoint_uuid = Column(ForeignKey('endpoint.endpoint_uuid', ondelete='RESTRICT'), primary_key=True, index=True)
position = Column(Integer, nullable=False)
optical_link = relationship('OpticalLinkModel', back_populates='opticallink_endpoints')
endpoint = relationship('EndPointModel', lazy='selectin')
__table_args__ = (
CheckConstraint(position >= 0, name='check_position_value'),
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment