Skip to content
Snippets Groups Projects
Commit 386dca04 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch...

Merge branch 'feat/216-cttc-implement-integration-test-between-e2e-ip-optical-sdn-controllers' of ssh://gifrerenom_labs.etsi.org/tfs/controller into feat/216-cttc-implement-integration-test-between-e2e-ip-optical-sdn-controllers
parents 4988a93a 36ce58db
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
Showing
with 408 additions and 139 deletions
...@@ -57,11 +57,12 @@ function kafka_deploy() { ...@@ -57,11 +57,12 @@ function kafka_deploy() {
# Kafka zookeeper service should be deployed before the kafka service # Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}" kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically #KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}') #KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
# Kafka service should be deployed after the zookeeper service # Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" #sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
sed -i "s/<KAFKA_NAMESPACE>/${KFK_NAMESPACE}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Deploying Apache Kafka Broker" # echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST" kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
......
src/tests/ecoc24/
\ No newline at end of file
...@@ -41,18 +41,6 @@ spec: ...@@ -41,18 +41,6 @@ spec:
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "DEBUG" value: "DEBUG"
- name: WS_IP_HOST
value: "nbiservice.tfs-ip.svc.cluster.local"
- name: WS_IP_PORT
value: "8761"
- name: WS_E2E_HOST
value: "e2e-orchestratorservice.tfs-e2e.svc.cluster.local"
- name: WS_E2E_PORT
value: "8762"
- name: EXT_CONTROLLER1_ADD
value: "10.1.1.96"
- name: EXT_CONTROLLER1_PORT
value: "8003"
readinessProbe: readinessProbe:
exec: exec:
command: ["/bin/grpc_health_probe", "-addr=:10050"] command: ["/bin/grpc_health_probe", "-addr=:10050"]
......
...@@ -19,14 +19,13 @@ metadata: ...@@ -19,14 +19,13 @@ metadata:
labels: labels:
app: zookeeper-service app: zookeeper-service
name: zookeeper-service name: zookeeper-service
namespace: kafka
spec: spec:
type: NodePort type: ClusterIP
ports: ports:
- name: zookeeper-port - name: zookeeper-port
port: 2181 port: 2181
nodePort: 30181 #nodePort: 30181
targetPort: 2181 #targetPort: 2181
selector: selector:
app: zookeeper app: zookeeper
--- ---
...@@ -36,7 +35,6 @@ metadata: ...@@ -36,7 +35,6 @@ metadata:
labels: labels:
app: zookeeper app: zookeeper
name: zookeeper name: zookeeper
namespace: kafka
spec: spec:
replicas: 1 replicas: 1
selector: selector:
......
...@@ -19,7 +19,6 @@ metadata: ...@@ -19,7 +19,6 @@ metadata:
labels: labels:
app: kafka-broker app: kafka-broker
name: kafka-service name: kafka-service
namespace: kafka
spec: spec:
ports: ports:
- port: 9092 - port: 9092
...@@ -32,7 +31,6 @@ metadata: ...@@ -32,7 +31,6 @@ metadata:
labels: labels:
app: kafka-broker app: kafka-broker
name: kafka-broker name: kafka-broker
namespace: kafka
spec: spec:
replicas: 1 replicas: 1
selector: selector:
...@@ -49,11 +47,12 @@ spec: ...@@ -49,11 +47,12 @@ spec:
- name: KAFKA_BROKER_ID - name: KAFKA_BROKER_ID
value: "1" value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT - name: KAFKA_ZOOKEEPER_CONNECT
value: <ZOOKEEPER_INTERNAL_IP>:2181 #value: <ZOOKEEPER_INTERNAL_IP>:2181
value: zookeeper-service.<KAFKA_NAMESPACE>.svc.cluster.local:2181
- name: KAFKA_LISTENERS - name: KAFKA_LISTENERS
value: PLAINTEXT://:9092 value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS - name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092 value: PLAINTEXT://kafka-service.<KAFKA_NAMESPACE>.svc.cluster.local:9092
image: wurstmeister/kafka image: wurstmeister/kafka
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
name: kafka-broker name: kafka-broker
......
...@@ -39,25 +39,28 @@ spec: ...@@ -39,25 +39,28 @@ spec:
#- containerPort: 9192 #- containerPort: 9192
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "DEBUG"
- name: FLASK_ENV - name: FLASK_ENV
value: "production" # change to "development" if developing value: "production" # change to "development" if developing
- name: IETF_NETWORK_RENDERER - name: IETF_NETWORK_RENDERER
value: "LIBYANG" value: "LIBYANG"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe: readinessProbe:
httpGet: httpGet:
path: /healthz path: /healthz
port: 8080 port: 8080
initialDelaySeconds: 5 initialDelaySeconds: 30 # NBI's gunicorn takes 30~40 seconds to bootstrap
periodSeconds: 10 periodSeconds: 10
failureThreshold: 3 failureThreshold: 6
livenessProbe: livenessProbe:
httpGet: httpGet:
path: /healthz path: /healthz
port: 8080 port: 8080
initialDelaySeconds: 5 initialDelaySeconds: 30 # NBI's gunicorn takes 30~40 seconds to bootstrap
periodSeconds: 10 periodSeconds: 10
failureThreshold: 3 failureThreshold: 6
resources: resources:
requests: requests:
cpu: 150m cpu: 150m
......
...@@ -17,12 +17,28 @@ kind: Ingress ...@@ -17,12 +17,28 @@ kind: Ingress
metadata: metadata:
name: tfs-ingress name: tfs-ingress
annotations: annotations:
nginx.ingress.kubernetes.io/rewrite-target: /$2 nginx.ingress.kubernetes.io/rewrite-target: "/$2"
nginx.ingress.kubernetes.io/limit-rps: "50"
nginx.ingress.kubernetes.io/limit-connections: "50" # Enable websocket services and configure sticky cookies (seems not to work)
nginx.ingress.kubernetes.io/proxy-connect-timeout: "50" #nginx.org/websocket-services: "nbiservice"
nginx.ingress.kubernetes.io/proxy-send-timeout: "50" #nginx.org/sticky-cookie-services: "serviceName=nbiservice tfs-nbi-session expires=1h path=/socket.io"
nginx.ingress.kubernetes.io/proxy-read-timeout: "50"
# Enable sticky sessions (use same backend for all connections
# originated by a specific client, identified through its cookie)
nginx.ingress.kubernetes.io/affinity: "cookie"
nginx.ingress.kubernetes.io/affinity-mode: "persistent"
nginx.ingress.kubernetes.io/session-cookie-name: "tfs-nbi-session"
nginx.ingress.kubernetes.io/session-cookie-path: "/socket.io"
nginx.ingress.kubernetes.io/session-cookie-expires: "3600"
nginx.ingress.kubernetes.io/session-cookie-change-on-failure: "true"
nginx.ingress.kubernetes.io/limit-rps: "50" # max requests per second per source IP
nginx.ingress.kubernetes.io/limit-connections: "50" # max concurrent connections per source IP
nginx.ingress.kubernetes.io/proxy-connect-timeout: "60" # max timeout for connecting to server
# Enable long-lived connections, required for websocket/socket.io streams
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600" # max timeout between two successive read operations
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600" # max timeout between two successive write operations
spec: spec:
rules: rules:
- http: - http:
...@@ -48,6 +64,13 @@ spec: ...@@ -48,6 +64,13 @@ spec:
name: nbiservice name: nbiservice
port: port:
number: 8080 number: 8080
- path: /()(socket.io/.*)
pathType: Prefix
backend:
service:
name: nbiservice
port:
number: 8080
- path: /()(tfs-api/.*) - path: /()(tfs-api/.*)
pathType: Prefix pathType: Prefix
backend: backend:
......
...@@ -36,7 +36,7 @@ spec: ...@@ -36,7 +36,7 @@ spec:
- containerPort: 9192 - containerPort: 9192
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "DEBUG"
- name: ENABLE_FORECASTER - name: ENABLE_FORECASTER
value: "NO" value: "NO"
readinessProbe: readinessProbe:
......
...@@ -39,11 +39,10 @@ spec: ...@@ -39,11 +39,10 @@ spec:
- containerPort: 9192 - containerPort: 9192
env: env:
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "DEBUG"
- name: WS_IP_PORT envFrom:
value: "8761" - secretRef:
- name: WS_E2E_PORT name: kfk-kpi-data
value: "8762"
readinessProbe: readinessProbe:
exec: exec:
command: ["/bin/grpc_health_probe", "-addr=:10080"] command: ["/bin/grpc_health_probe", "-addr=:10080"]
......
ofc25 0 → 120000
src/tests/ofc25/
\ No newline at end of file
...@@ -12,26 +12,14 @@ ...@@ -12,26 +12,14 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// protocol buffers documentation: https://developers.google.com/protocol-buffers/docs/proto3
syntax = "proto3"; syntax = "proto3";
package vnt_manager; package vnt_manager;
import "context.proto"; import "context.proto";
service VNTManagerService { service VNTManagerService {
rpc VNTSubscript (VNTSubscriptionRequest) returns (VNTSubscriptionReply) {}
rpc ListVirtualLinkIds(context.Empty ) returns (context.LinkIdList) {} rpc ListVirtualLinkIds(context.Empty ) returns (context.LinkIdList) {}
rpc ListVirtualLinks (context.Empty ) returns (context.LinkList ) {} rpc ListVirtualLinks (context.Empty ) returns (context.LinkList ) {}
rpc GetVirtualLink (context.LinkId) returns (context.Link ) {} rpc GetVirtualLink (context.LinkId) returns (context.Link ) {}
rpc SetVirtualLink (context.Link ) returns (context.LinkId ) {} rpc SetVirtualLink (context.Link ) returns (context.LinkId ) {}
rpc RemoveVirtualLink (context.LinkId) returns (context.Empty ) {} rpc RemoveVirtualLink (context.LinkId) returns (context.Empty ) {}
} }
message VNTSubscriptionRequest {
string host = 1;
string port = 2;
}
message VNTSubscriptionReply {
string subscription = 1;
}
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum, logging, requests
from requests.auth import HTTPBasicAuth
from typing import Any, Optional, Set
class RestRequestMethod(enum.Enum):
GET = 'get'
POST = 'post'
PUT = 'put'
PATCH = 'patch'
DELETE = 'delete'
EXPECTED_STATUS_CODES : Set[int] = {
requests.codes['OK' ], # 200 - OK
requests.codes['CREATED' ], # 201 - Created
requests.codes['ACCEPTED' ], # 202 - Accepted
requests.codes['NO_CONTENT'], # 204 - No Content
}
URL_TEMPLATE = '{:s}://{:s}:{:d}/{:s}'
def compose_basic_auth(
username : Optional[str] = None, password : Optional[str] = None
) -> Optional[HTTPBasicAuth]:
if username is None or password is None: return None
return HTTPBasicAuth(username, password)
class SchemeEnum(enum.Enum):
HTTP = 'http'
HTTPS = 'https'
def check_scheme(scheme : str) -> str:
str_scheme = str(scheme).lower()
enm_scheme = SchemeEnum._value2member_map_[str_scheme]
return enm_scheme.value
class RestClient:
def __init__(
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None,
timeout : int = 30, verify_certs : bool = True, allow_redirects : bool = True,
logger : Optional[logging.Logger] = None
) -> None:
self._address = address
self._port = int(port)
self._scheme = check_scheme(scheme)
self._auth = compose_basic_auth(username=username, password=password)
self._timeout = int(timeout)
self._verify_certs = verify_certs
self._allow_redirects = allow_redirects
self._logger = logger
def _compose_url(self, endpoint : str) -> str:
endpoint = endpoint.lstrip('/')
return URL_TEMPLATE.format(self._scheme, self._address, self._port, endpoint)
def _log_msg_request(
self, method : RestRequestMethod, request_url : str, body : Optional[Any],
log_level : int = logging.INFO
) -> str:
msg = 'Request: {:s} {:s}'.format(str(method.value).upper(), str(request_url))
if body is not None: msg += ' body={:s}'.format(str(body))
if self._logger is not None: self._logger.log(log_level, msg)
return msg
def _log_msg_check_reply(
self, method : RestRequestMethod, request_url : str, body : Optional[Any],
reply : requests.Response, expected_status_codes : Set[int],
log_level : int = logging.INFO
) -> str:
msg = 'Reply: {:s}'.format(str(reply.text))
if self._logger is not None: self._logger.log(log_level, msg)
http_status_code = reply.status_code
if http_status_code in expected_status_codes: return msg
MSG = 'Request failed. method={:s} url={:s} body={:s} status_code={:s} reply={:s}'
msg = MSG.format(
str(method.value).upper(), str(request_url), str(body),
str(http_status_code), str(reply.text)
)
self._logger.error(msg)
raise Exception(msg)
def _do_rest_request(
self, method : RestRequestMethod, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
request_url = self._compose_url(endpoint)
self._log_msg_request(method, request_url, body)
try:
headers = {'accept': 'application/json'}
reply = requests.request(
method.value, request_url, headers=headers, json=body,
auth=self._auth, verify=self._verify_certs, timeout=self._timeout,
allow_redirects=self._allow_redirects
)
except Exception as e:
MSG = 'Request failed. method={:s} url={:s} body={:s}'
msg = MSG.format(str(method.value).upper(), request_url, str(body))
self._logger.exception(msg)
raise Exception(msg) from e
self._log_msg_check_reply(method, request_url, body, reply, expected_status_codes)
if reply.content and len(reply.content) > 0: return reply.json()
return None
def get(
self, endpoint : str,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.GET, endpoint,
expected_status_codes=expected_status_codes
)
def post(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.POST, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def put(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.PUT, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def patch(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.PATCH, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def delete(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.DELETE, endpoint, body=body,
expected_status_codes=expected_status_codes
)
...@@ -45,12 +45,13 @@ from context.client.ContextClient import ContextClient ...@@ -45,12 +45,13 @@ from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient from slice.client.SliceClient import SliceClient
from vnt_manager.client.VNTManagerClient import VNTManagerClient
from .Tools import ( from .Tools import (
format_device_custom_config_rules, format_service_custom_config_rules, format_device_custom_config_rules, format_service_custom_config_rules,
format_slice_custom_config_rules, get_descriptors_add_contexts, format_slice_custom_config_rules, get_descriptors_add_contexts,
get_descriptors_add_services, get_descriptors_add_slices, get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_controllers_and_network_devices, get_descriptors_add_topologies, split_controllers_and_network_devices,
split_devices_by_rules split_devices_by_rules, split_links_by_type
) )
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -112,7 +113,8 @@ class DescriptorLoader: ...@@ -112,7 +113,8 @@ class DescriptorLoader:
self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None, self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None,
num_workers : int = 1, num_workers : int = 1,
context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None, context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None,
vntm_client : Optional[VNTManagerClient] = None
) -> None: ) -> None:
if (descriptors is None) == (descriptors_file is None): if (descriptors is None) == (descriptors_file is None):
# pylint: disable=broad-exception-raised # pylint: disable=broad-exception-raised
...@@ -194,6 +196,7 @@ class DescriptorLoader: ...@@ -194,6 +196,7 @@ class DescriptorLoader:
self.__dev_cli = DeviceClient() if device_client is None else device_client self.__dev_cli = DeviceClient() if device_client is None else device_client
self.__svc_cli = ServiceClient() if service_client is None else service_client self.__svc_cli = ServiceClient() if service_client is None else service_client
self.__slc_cli = SliceClient() if slice_client is None else slice_client self.__slc_cli = SliceClient() if slice_client is None else slice_client
self.__vnt_cli = VNTManagerClient() if vntm_client is None else vntm_client
self.__results : TypeResults = list() self.__results : TypeResults = list()
...@@ -351,20 +354,36 @@ class DescriptorLoader: ...@@ -351,20 +354,36 @@ class DescriptorLoader:
controllers_add, network_devices_add = split_controllers_and_network_devices(self.__devices_add) controllers_add, network_devices_add = split_controllers_and_network_devices(self.__devices_add)
typed_links = split_links_by_type(self.__links)
typed_normal_links = typed_links.get('normal', list())
typed_optical_links = typed_links.get('optical', list())
typed_optical_links.extend(self.__optical_links)
typed_virtual_links = typed_links.get('virtual', list())
self.__ctx_cli.connect() self.__ctx_cli.connect()
self.__dev_cli.connect() self.__dev_cli.connect()
self.__svc_cli.connect() if len(self.__services ) > 0: self.__svc_cli.connect()
self.__slc_cli.connect() if len(self.__slices ) > 0: self.__slc_cli.connect()
if len(typed_virtual_links) > 0: self.__vnt_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add ) self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add) self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
self._process_descr('controller', 'add', self.__dev_cli.AddDevice, Device, controllers_add ) self._process_descr('controller', 'add', self.__dev_cli.AddDevice, Device, controllers_add )
self._process_descr('device', 'add', self.__dev_cli.AddDevice, Device, network_devices_add ) self._process_descr('device', 'add', self.__dev_cli.AddDevice, Device, network_devices_add )
self._process_descr('device', 'config', self.__dev_cli.ConfigureDevice, Device, self.__devices_config) self._process_descr('device', 'config', self.__dev_cli.ConfigureDevice, Device, self.__devices_config)
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links ) self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, typed_normal_links )
self._process_descr('link', 'add', self.__ctx_cli.SetOpticalLink, OpticalLink, self.__optical_links )
if len(typed_optical_links) > 0:
self._process_descr('link', 'add', self.__ctx_cli.SetOpticalLink, OpticalLink, typed_optical_links )
if len(typed_virtual_links) > 0:
self._process_descr('link', 'add', self.__vnt_cli.SetVirtualLink, Link, typed_virtual_links )
if len(self.__services) > 0:
self._process_descr('service','add', self.__svc_cli.CreateService, Service, self.__services_add ) self._process_descr('service','add', self.__svc_cli.CreateService, Service, self.__services_add )
self._process_descr('service','update', self.__svc_cli.UpdateService, Service, self.__services ) self._process_descr('service','update', self.__svc_cli.UpdateService, Service, self.__services )
if len(self.__slices) > 0:
self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add ) self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add )
self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices ) self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices )
...@@ -467,10 +486,17 @@ class DescriptorLoader: ...@@ -467,10 +486,17 @@ class DescriptorLoader:
def _unload_normal_mode(self) -> None: def _unload_normal_mode(self) -> None:
# Normal mode: follows the automated workflows in the different components # Normal mode: follows the automated workflows in the different components
typed_links = split_links_by_type(self.links)
typed_normal_links = typed_links.get('normal', list())
typed_optical_links = typed_links.get('optical', list())
typed_optical_links.extend(self.optical_links)
typed_virtual_links = typed_links.get('virtual', list())
self.__ctx_cli.connect() self.__ctx_cli.connect()
self.__dev_cli.connect() self.__dev_cli.connect()
self.__svc_cli.connect() if len(self.services ) > 0: self.__svc_cli.connect()
self.__slc_cli.connect() if len(self.slices ) > 0: self.__slc_cli.connect()
if len(typed_virtual_links) > 0: self.__vnt_cli.connect()
for _, slice_list in self.slices.items(): for _, slice_list in self.slices.items():
for slice_ in slice_list: for slice_ in slice_list:
...@@ -480,10 +506,13 @@ class DescriptorLoader: ...@@ -480,10 +506,13 @@ class DescriptorLoader:
for service in service_list: for service in service_list:
self.__svc_cli.DeleteService(ServiceId(**service['service_id'])) self.__svc_cli.DeleteService(ServiceId(**service['service_id']))
for optical_link in self.optical_links: for virtual_link in typed_virtual_links:
self.__vnt_cli.RemoveVirtualLink(LinkId(**virtual_link['link_id']))
for optical_link in typed_optical_links:
self.__ctx_cli.DeleteOpticalLink(LinkId(**optical_link['link_id'])) self.__ctx_cli.DeleteOpticalLink(LinkId(**optical_link['link_id']))
for link in self.links: for link in typed_normal_links:
self.__ctx_cli.RemoveLink(LinkId(**link['link_id'])) self.__ctx_cli.RemoveLink(LinkId(**link['link_id']))
for device in self.devices: for device in self.devices:
......
...@@ -12,10 +12,11 @@ ...@@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import collections
import copy, json import copy, json
from typing import Dict, List, Optional, Tuple, Union from typing import Dict, List, Optional, Tuple, Union
from common.DeviceTypes import DeviceTypeEnum from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import DeviceDriverEnum from common.proto.context_pb2 import DeviceDriverEnum, LinkTypeEnum
def get_descriptors_add_contexts(contexts : List[Dict]) -> List[Dict]: def get_descriptors_add_contexts(contexts : List[Dict]) -> List[Dict]:
contexts_add = copy.deepcopy(contexts) contexts_add = copy.deepcopy(contexts)
...@@ -131,3 +132,30 @@ def split_controllers_and_network_devices(devices : List[Dict]) -> Tuple[List[Di ...@@ -131,3 +132,30 @@ def split_controllers_and_network_devices(devices : List[Dict]) -> Tuple[List[Di
else: else:
network_devices.append(device) network_devices.append(device)
return controllers, network_devices return controllers, network_devices
def link_type_to_str(link_type : Union[int, str]) -> Optional[str]:
if isinstance(link_type, int): return LinkTypeEnum.Name(link_type)
if isinstance(link_type, str): return LinkTypeEnum.Name(LinkTypeEnum.Value(link_type))
return None
def split_links_by_type(links : List[Dict]) -> Dict[str, List[Dict]]:
typed_links = collections.defaultdict(list)
for link in links:
link_type = link.get('link_type', LinkTypeEnum.LINKTYPE_UNKNOWN)
str_link_type = link_type_to_str(link_type)
if str_link_type is None:
MSG = 'Unsupported LinkType in Link({:s})'
raise Exception(MSG.format(str(link)))
link_type = LinkTypeEnum.Value(str_link_type)
if link_type in {LinkTypeEnum.LINKTYPE_UNKNOWN, LinkTypeEnum.LINKTYPE_COPPER, LinkTypeEnum.LINKTYPE_RADIO}:
typed_links['normal'].append(link)
elif link_type in {LinkTypeEnum.LINKTYPE_FIBER}:
typed_links['optical'].append(link)
elif link_type in {LinkTypeEnum.LINKTYPE_VIRTUAL}:
typed_links['virtual'].append(link)
else:
MSG = 'Unsupported LinkType({:s}) in Link({:s})'
raise Exception(MSG.format(str_link_type, str(link)))
return typed_links
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging, time
from enum import Enum from enum import Enum
from confluent_kafka.admin import AdminClient, NewTopic from confluent_kafka.admin import AdminClient, NewTopic
from common.Settings import get_setting from common.Settings import get_setting
...@@ -21,6 +21,12 @@ from common.Settings import get_setting ...@@ -21,6 +21,12 @@ from common.Settings import get_setting
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}' KFK_SERVER_ADDRESS_TEMPLATE = 'kafka-service.{:s}.svc.cluster.local:{:s}'
KAFKA_TOPIC_NUM_PARTITIONS = 1
KAFKA_TOPIC_REPLICATION_FACTOR = 1
KAFKA_TOPIC_LIST_TIMEOUT = 5
TOPIC_CREATE_WAIT_ITERATIONS = 10
TOPIC_CREATE_WAIT_TIME = 1
class KafkaConfig(Enum): class KafkaConfig(Enum):
@staticmethod @staticmethod
...@@ -49,45 +55,73 @@ class KafkaTopic(Enum): ...@@ -49,45 +55,73 @@ class KafkaTopic(Enum):
ALARMS = 'topic_alarms' ALARMS = 'topic_alarms'
ANALYTICS_REQUEST = 'topic_analytics_request' ANALYTICS_REQUEST = 'topic_analytics_request'
ANALYTICS_RESPONSE = 'topic_analytics_response' ANALYTICS_RESPONSE = 'topic_analytics_response'
VNTMANAGER_REQUEST = 'topic_vntmanager_request'
VNTMANAGER_RESPONSE = 'topic_vntmanager_response'
NBI_SOCKETIO_WORKERS = 'tfs_nbi_socketio'
@staticmethod @staticmethod
def create_all_topics() -> bool: def create_all_topics() -> bool:
""" '''
Method to create Kafka topics defined as class members Method to create Kafka topics defined as class members
""" '''
all_topics = [member.value for member in KafkaTopic] LOGGER.debug('Kafka server address: {:s}'.format(str(KafkaConfig.get_kafka_address())))
LOGGER.debug("Kafka server address is: {:} ".format(KafkaConfig.get_kafka_address())) kafka_admin_client = KafkaConfig.get_admin_client()
if( KafkaTopic.create_new_topic_if_not_exists( all_topics )):
LOGGER.debug("All topics are created sucsessfully or Already Exists") topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
existing_topics = set(topic_metadata.topics.keys())
LOGGER.debug('Existing Kafka topics: {:s}'.format(str(existing_topics)))
missing_topics = [
NewTopic(topic.value, KAFKA_TOPIC_NUM_PARTITIONS, KAFKA_TOPIC_REPLICATION_FACTOR)
for topic in KafkaTopic
if topic.value not in existing_topics
]
LOGGER.debug('Missing Kafka topics: {:s}'.format(str(missing_topics)))
if len(missing_topics) == 0:
LOGGER.debug('All topics already existed.')
return True return True
else:
LOGGER.debug("Error creating all topics")
return False
@staticmethod create_topic_future_map = kafka_admin_client.create_topics(missing_topics)
def create_new_topic_if_not_exists(new_topics: list) -> bool: LOGGER.debug('create_topic_future_map: {:s}'.format(str(create_topic_future_map)))
""" failed_topic_creations = set()
Method to create Kafka topic if it does not exist. for topic, future in create_topic_future_map.items():
Args:
list of topic: containing the topic name(s) to be created on Kafka
"""
LOGGER.debug("Topics names to be verified and created: {:}".format(new_topics))
for topic in new_topics:
try: try:
topic_metadata = KafkaConfig.get_admin_client().list_topics(timeout=5) LOGGER.info('Waiting for Topic({:s})...'.format(str(topic)))
# LOGGER.debug("Existing topic list: {:}".format(topic_metadata.topics)) future.result() # Blocks until topic is created or raises an exception
if topic not in topic_metadata.topics: LOGGER.info('Topic({:s}) successfully created.'.format(str(topic)))
# If the topic does not exist, create a new topic except: # pylint: disable=bare-except
print("Topic {:} does not exist. Creating...".format(topic)) LOGGER.exception('Failed to create Topic({:s})'.format(str(topic)))
LOGGER.debug("Topic {:} does not exist. Creating...".format(topic)) failed_topic_creations.add(topic)
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
KafkaConfig.get_admin_client().create_topics([new_topic]) if len(failed_topic_creations) > 0: return False
else:
print("Topic name already exists: {:}".format(topic)) LOGGER.debug('All topics created.')
LOGGER.debug("Topic name already exists: {:}".format(topic))
except Exception as e: # Wait until topics appear in metadata
LOGGER.debug("Failed to create topic: {:}".format(e)) desired_topics = {topic.value for topic in KafkaTopic}
missing_topics = set()
for _ in range(TOPIC_CREATE_WAIT_ITERATIONS):
topic_metadata = kafka_admin_client.list_topics(timeout=KAFKA_TOPIC_LIST_TIMEOUT)
existing_topics = set(topic_metadata.topics.keys())
missing_topics = desired_topics.difference(existing_topics)
if len(missing_topics) == 0: break
MSG = 'Waiting for Topics({:s}) to appear in metadata...'
LOGGER.debug(MSG.format(str(missing_topics)))
time.sleep(TOPIC_CREATE_WAIT_TIME)
if len(missing_topics) > 0:
MSG = 'Something went wrong... Topics({:s}) does not appear in metadata'
LOGGER.error(MSG.format(str(missing_topics)))
return False return False
else:
LOGGER.debug('All topics created and available.')
return True return True
# TODO: create all topics after the deployments (Telemetry and Analytics) # TODO: create all topics after the deployments (Telemetry and Analytics)
if __name__ == '__main__':
import os
if 'KFK_SERVER_ADDRESS' not in os.environ:
os.environ['KFK_SERVER_ADDRESS'] = 'kafka-service.kafka.svc.cluster.local:9092'
KafkaTopic.create_all_topics()
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
import copy import copy
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from common.proto.context_pb2 import LinkTypeEnum
def get_link_uuid(a_endpoint_id : Dict, z_endpoint_id : Dict) -> str: def get_link_uuid(a_endpoint_id : Dict, z_endpoint_id : Dict) -> str:
return '{:s}/{:s}=={:s}/{:s}'.format( return '{:s}/{:s}=={:s}/{:s}'.format(
a_endpoint_id['device_id']['device_uuid']['uuid'], a_endpoint_id['endpoint_uuid']['uuid'], a_endpoint_id['device_id']['device_uuid']['uuid'], a_endpoint_id['endpoint_uuid']['uuid'],
...@@ -25,9 +27,13 @@ def json_link_id(link_uuid : str) -> Dict: ...@@ -25,9 +27,13 @@ def json_link_id(link_uuid : str) -> Dict:
def json_link( def json_link(
link_uuid : str, endpoint_ids : List[Dict], name : Optional[str] = None, link_uuid : str, endpoint_ids : List[Dict], name : Optional[str] = None,
link_type : LinkTypeEnum = LinkTypeEnum.LINKTYPE_UNKNOWN,
total_capacity_gbps : Optional[float] = None, used_capacity_gbps : Optional[float] = None total_capacity_gbps : Optional[float] = None, used_capacity_gbps : Optional[float] = None
) -> Dict: ) -> Dict:
result = {'link_id': json_link_id(link_uuid), 'link_endpoint_ids': copy.deepcopy(endpoint_ids)} result = {
'link_id': json_link_id(link_uuid), 'link_type': link_type,
'link_endpoint_ids': copy.deepcopy(endpoint_ids),
}
if name is not None: result['name'] = name if name is not None: result['name'] = name
if total_capacity_gbps is not None: if total_capacity_gbps is not None:
attributes : Dict = result.setdefault('attributes', dict()) attributes : Dict = result.setdefault('attributes', dict())
......
...@@ -30,10 +30,10 @@ def json_service_id(service_uuid : str, context_id : Optional[Dict] = None): ...@@ -30,10 +30,10 @@ def json_service_id(service_uuid : str, context_id : Optional[Dict] = None):
def json_service( def json_service(
service_uuid : str, service_type : ServiceTypeEnum, context_id : Optional[Dict] = None, service_uuid : str, service_type : ServiceTypeEnum, context_id : Optional[Dict] = None,
status : ServiceStatusEnum = ServiceStatusEnum.SERVICESTATUS_PLANNED, name : Optional[str] = None, status : ServiceStatusEnum = ServiceStatusEnum.SERVICESTATUS_PLANNED,
endpoint_ids : List[Dict] = [], constraints : List[Dict] = [], config_rules : List[Dict] = []): endpoint_ids : List[Dict] = [], constraints : List[Dict] = [], config_rules : List[Dict] = []
) -> Dict:
return { result = {
'service_id' : json_service_id(service_uuid, context_id=context_id), 'service_id' : json_service_id(service_uuid, context_id=context_id),
'service_type' : service_type, 'service_type' : service_type,
'service_status' : {'service_status': status}, 'service_status' : {'service_status': status},
...@@ -41,6 +41,8 @@ def json_service( ...@@ -41,6 +41,8 @@ def json_service(
'service_constraints' : copy.deepcopy(constraints), 'service_constraints' : copy.deepcopy(constraints),
'service_config' : {'config_rules': copy.deepcopy(config_rules)}, 'service_config' : {'config_rules': copy.deepcopy(config_rules)},
} }
if name is not None: result['name'] = name
return result
def json_service_qkd_planned( def json_service_qkd_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [], service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
......
...@@ -18,7 +18,9 @@ from sqlalchemy.engine import Engine ...@@ -18,7 +18,9 @@ from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy.orm import Session, selectinload, sessionmaker
from sqlalchemy_cockroachdb import run_transaction from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, List, Optional, Set, Tuple from typing import Dict, List, Optional, Set, Tuple
from common.proto.context_pb2 import Empty, EventTypeEnum, Link, LinkId, LinkIdList, LinkList, TopologyId from common.proto.context_pb2 import (
Empty, EventTypeEnum, Link, LinkId, LinkIdList, LinkList, TopologyId
)
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from common.method_wrappers.ServiceExceptions import NotFoundException from common.method_wrappers.ServiceExceptions import NotFoundException
from common.tools.object_factory.Link import json_link_id from common.tools.object_factory.Link import json_link_id
......
...@@ -66,9 +66,10 @@ def optical_link_set(db_engine : Engine, messagebroker : MessageBroker, request ...@@ -66,9 +66,10 @@ def optical_link_set(db_engine : Engine, messagebroker : MessageBroker, request
now = datetime.datetime.now(datetime.timezone.utc) now = datetime.datetime.now(datetime.timezone.utc)
# By default, always add link to default Context/Topology
topology_uuids : Set[str] = set() topology_uuids : Set[str] = set()
related_topologies : List[Dict] = list() related_topologies : List[Dict] = list()
# By default, always add link to default Context/Topology
_,topology_uuid = topology_get_uuid(TopologyId(), allow_random=False, allow_default=True) _,topology_uuid = topology_get_uuid(TopologyId(), allow_random=False, allow_default=True)
related_topologies.append({ related_topologies.append({
'topology_uuid': topology_uuid, 'topology_uuid': topology_uuid,
...@@ -77,15 +78,14 @@ def optical_link_set(db_engine : Engine, messagebroker : MessageBroker, request ...@@ -77,15 +78,14 @@ def optical_link_set(db_engine : Engine, messagebroker : MessageBroker, request
topology_uuids.add(topology_uuid) topology_uuids.add(topology_uuid)
link_endpoints_data : List[Dict] = list() link_endpoints_data : List[Dict] = list()
for i,endpoint_id in enumerate(request.link_endpoint_ids): for i,endpoint_id in enumerate(request.link_endpoint_ids):
endpoint_topology_uuid, endpoint_device_uuid, endpoint_uuid = endpoint_get_uuid( endpoint_topology_uuid, _, endpoint_uuid = endpoint_get_uuid(
endpoint_id, endpoint_name="", allow_random=True) endpoint_id, allow_random=False)
link_endpoints_data.append({ link_endpoints_data.append({
'link_uuid' : link_uuid, 'link_uuid' : link_uuid,
'endpoint_uuid': endpoint_uuid, 'endpoint_uuid': endpoint_uuid,
'position' : i,
}) })
if endpoint_topology_uuid not in topology_uuids: if endpoint_topology_uuid not in topology_uuids:
......
...@@ -12,8 +12,10 @@ ...@@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import operator
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String ,Boolean from sqlalchemy import (
Boolean, CheckConstraint, Column, DateTime, ForeignKey, Integer, String
)
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from typing import Dict from typing import Dict
...@@ -59,7 +61,7 @@ class OpticalLinkModel(_Base): ...@@ -59,7 +61,7 @@ class OpticalLinkModel(_Base):
}, },
'link_endpoint_ids' : [ 'link_endpoint_ids' : [
optical_endpoint.endpoint.dump_id() optical_endpoint.endpoint.dump_id()
for optical_endpoint in self.opticallink_endpoints for optical_endpoint in sorted(self.opticallink_endpoints, key=operator.attrgetter('position'))
], ],
} }
return result return result
...@@ -69,6 +71,11 @@ class OpticalLinkEndPointModel(_Base): ...@@ -69,6 +71,11 @@ class OpticalLinkEndPointModel(_Base):
link_uuid = Column(ForeignKey('opticallink.opticallink_uuid', ondelete='CASCADE' ), primary_key=True) link_uuid = Column(ForeignKey('opticallink.opticallink_uuid', ondelete='CASCADE' ), primary_key=True)
endpoint_uuid = Column(ForeignKey('endpoint.endpoint_uuid', ondelete='RESTRICT'), primary_key=True, index=True) endpoint_uuid = Column(ForeignKey('endpoint.endpoint_uuid', ondelete='RESTRICT'), primary_key=True, index=True)
position = Column(Integer, nullable=False)
optical_link = relationship('OpticalLinkModel', back_populates='opticallink_endpoints') optical_link = relationship('OpticalLinkModel', back_populates='opticallink_endpoints')
endpoint = relationship('EndPointModel', lazy='selectin') endpoint = relationship('EndPointModel', lazy='selectin')
__table_args__ = (
CheckConstraint(position >= 0, name='check_position_value'),
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment