Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (216)
Showing
with 322 additions and 152 deletions
......@@ -34,7 +34,7 @@ include:
- local: '/src/opticalcontroller/.gitlab-ci.yml'
- local: '/src/ztp/.gitlab-ci.yml'
- local: '/src/policy/.gitlab-ci.yml'
- local: '/src/automation/.gitlab-ci.yml'
#- local: '/src/automation/.gitlab-ci.yml'
- local: '/src/forecaster/.gitlab-ci.yml'
#- local: '/src/webui/.gitlab-ci.yml'
#- local: '/src/l3_distributedattackdetector/.gitlab-ci.yml'
......
......@@ -42,7 +42,7 @@ export KFK_REDEPLOY=${KFK_REDEPLOY:-""}
mkdir -p ${TMP_MANIFESTS_FOLDER}
function kafka_deploy() {
# copy zookeeper and kafka manifest files to temporary manifest location
# copy zookeeper and kafka manifest files to temporary manifest location
cp "${KFK_MANIFESTS_PATH}/${KFK_ZOOKEEPER_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
cp "${KFK_MANIFESTS_PATH}/${KFK_MANIFEST}" "${TMP_MANIFESTS_FOLDER}/${KFK_MANIFEST}"
......@@ -57,11 +57,12 @@ function kafka_deploy() {
# Kafka zookeeper service should be deployed before the kafka service
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/${KFK_ZOOKEEPER_MANIFEST}"
KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
#KFK_ZOOKEEPER_SERVICE="zookeeper-service" # this command may be replaced with command to extract service name automatically
#KFK_ZOOKEEPER_IP=$(kubectl --namespace ${KFK_NAMESPACE} get service ${KFK_ZOOKEEPER_SERVICE} -o 'jsonpath={.spec.clusterIP}')
# Kafka service should be deployed after the zookeeper service
sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
#sed -i "s/<ZOOKEEPER_INTERNAL_IP>/${KFK_ZOOKEEPER_IP}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
sed -i "s/<KAFKA_NAMESPACE>/${KFK_NAMESPACE}/" "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
# echo ">>> Deploying Apache Kafka Broker"
kubectl --namespace ${KFK_NAMESPACE} apply -f "${TMP_MANIFESTS_FOLDER}/$KFK_MANIFEST"
......
src/tests/ecoc24/
\ No newline at end of file
......@@ -37,18 +37,10 @@ spec:
ports:
- containerPort: 10050
- containerPort: 9192
- containerPort: 8761
- containerPort: 8762
env:
- name: LOG_LEVEL
value: "INFO"
- name: WS_IP_HOST
value: "nbiservice.tfs-ip.svc.cluster.local"
- name: WS_IP_PORT
value: "8761"
- name: WS_E2E_HOST
value: "e2e-orchestratorservice.tfs-e2e.svc.cluster.local"
- name: WS_E2E_PORT
value: "8762"
value: "DEBUG"
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10050"]
......@@ -81,5 +73,5 @@ spec:
port: 9192
targetPort: 9192
- name: ws
port: 8761
targetPort: 8761
port: 8762
targetPort: 8762
......@@ -19,14 +19,13 @@ metadata:
labels:
app: zookeeper-service
name: zookeeper-service
namespace: kafka
spec:
type: NodePort
type: ClusterIP
ports:
- name: zookeeper-port
port: 2181
nodePort: 30181
targetPort: 2181
#nodePort: 30181
#targetPort: 2181
selector:
app: zookeeper
---
......@@ -36,7 +35,6 @@ metadata:
labels:
app: zookeeper
name: zookeeper
namespace: kafka
spec:
replicas: 1
selector:
......@@ -52,4 +50,4 @@ spec:
imagePullPolicy: IfNotPresent
name: zookeeper
ports:
- containerPort: 2181
\ No newline at end of file
- containerPort: 2181
......@@ -19,7 +19,6 @@ metadata:
labels:
app: kafka-broker
name: kafka-service
namespace: kafka
spec:
ports:
- port: 9092
......@@ -32,7 +31,6 @@ metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
replicas: 1
selector:
......@@ -49,11 +47,12 @@ spec:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: <ZOOKEEPER_INTERNAL_IP>:2181
#value: <ZOOKEEPER_INTERNAL_IP>:2181
value: zookeeper-service.<KAFKA_NAMESPACE>.svc.cluster.local:2181
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092
value: PLAINTEXT://kafka-service.<KAFKA_NAMESPACE>.svc.cluster.local:9092
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
......
......@@ -31,8 +31,10 @@ container:
config:
cluster:
enabled: true
replicas: 3
#enabled: true
enabled: false
#replicas: 3
replicas: 1
jetstream:
enabled: true
fileStore:
......
......@@ -23,9 +23,6 @@ spec:
replicas: 1
template:
metadata:
annotations:
config.linkerd.io/skip-inbound-ports: "8762"
config.linkerd.io/skip-outbound-ports: "8762"
labels:
app: nbiservice
spec:
......@@ -36,29 +33,41 @@ spec:
imagePullPolicy: Always
ports:
- containerPort: 8080
- containerPort: 9090
- containerPort: 9192
- containerPort: 8762
# Metrics disabled for now. No NBI endpoint uses it and
# causes "address already in use" when deploying multiple
# gunicorn workers.
#- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
value: "DEBUG"
- name: FLASK_ENV
value: "production" # change to "development" if developing
- name: IETF_NETWORK_RENDERER
value: "LIBYANG"
- name: WS_E2E_PORT
value: "8762"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:9090"]
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 30 # NBI's gunicorn takes 30~40 seconds to bootstrap
periodSeconds: 10
failureThreshold: 6
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:9090"]
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 30 # NBI's gunicorn takes 30~40 seconds to bootstrap
periodSeconds: 10
failureThreshold: 6
resources:
requests:
cpu: 50m
memory: 64Mi
limits:
cpu: 500m
cpu: 150m
memory: 512Mi
limits:
cpu: 1000m
memory: 2048Mi
---
apiVersion: v1
kind: Service
......@@ -75,15 +84,10 @@ spec:
protocol: TCP
port: 8080
targetPort: 8080
- name: grpc
protocol: TCP
port: 9090
targetPort: 9090
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
- name: ws
protocol: TCP
port: 8762
targetPort: 8762
# Metrics disabled for now. No NBI endpoint uses it and
# causes "address already in use" when deploying multiple
# gunicorn workers.
#- name: metrics
# protocol: TCP
# port: 9192
# targetPort: 9192
......@@ -17,12 +17,28 @@ kind: Ingress
metadata:
name: tfs-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /$2
nginx.ingress.kubernetes.io/limit-rps: "50"
nginx.ingress.kubernetes.io/limit-connections: "50"
nginx.ingress.kubernetes.io/proxy-connect-timeout: "50"
nginx.ingress.kubernetes.io/proxy-send-timeout: "50"
nginx.ingress.kubernetes.io/proxy-read-timeout: "50"
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
# Enable websocket services and configure sticky cookies (seems not to work)
#nginx.org/websocket-services: "nbiservice"
#nginx.org/sticky-cookie-services: "serviceName=nbiservice tfs-nbi-session expires=1h path=/socket.io"
# Enable sticky sessions (use same backend for all connections
# originated by a specific client, identified through its cookie)
nginx.ingress.kubernetes.io/affinity: "cookie"
nginx.ingress.kubernetes.io/affinity-mode: "persistent"
nginx.ingress.kubernetes.io/session-cookie-name: "tfs-nbi-session"
nginx.ingress.kubernetes.io/session-cookie-path: "/socket.io"
nginx.ingress.kubernetes.io/session-cookie-expires: "3600"
nginx.ingress.kubernetes.io/session-cookie-change-on-failure: "true"
nginx.ingress.kubernetes.io/limit-rps: "50" # max requests per second per source IP
nginx.ingress.kubernetes.io/limit-connections: "50" # max concurrent connections per source IP
nginx.ingress.kubernetes.io/proxy-connect-timeout: "60" # max timeout for connecting to server
# Enable long-lived connections, required for websocket/socket.io streams
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600" # max timeout between two successive read operations
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600" # max timeout between two successive write operations
spec:
rules:
- http:
......@@ -48,6 +64,13 @@ spec:
name: nbiservice
port:
number: 8080
- path: /()(socket.io/.*)
pathType: Prefix
backend:
service:
name: nbiservice
port:
number: 8080
- path: /()(tfs-api/.*)
pathType: Prefix
backend:
......
......@@ -36,7 +36,7 @@ spec:
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
value: "DEBUG"
#readinessProbe:
# exec:
# command: ["/bin/grpc_health_probe", "-addr=:10060"]
......
......@@ -36,7 +36,7 @@ spec:
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
value: "DEBUG"
- name: ENABLE_FORECASTER
value: "NO"
readinessProbe:
......
......@@ -39,11 +39,10 @@ spec:
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
- name: WS_IP_PORT
value: "8761"
- name: WS_E2E_PORT
value: "8762"
value: "DEBUG"
envFrom:
- secretRef:
name: kfk-kpi-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10080"]
......
src/tests/ofc25/
\ No newline at end of file
// Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package nbi;
import "context.proto";
service NbiService {
rpc CheckCredentials (context.TeraFlowController) returns (context.AuthenticationResult) {}
rpc GetConnectivityServiceStatus (context.ServiceId ) returns (context.ServiceStatus ) {}
rpc CreateConnectivityService (context.Service ) returns (context.ServiceId ) {}
rpc EditConnectivityService (context.Service ) returns (context.ServiceId ) {}
rpc DeleteConnectivityService (context.Service ) returns (context.Empty ) {}
rpc GetAllActiveConnectivityServices (context.Empty ) returns (context.ServiceIdList ) {}
rpc ClearAllConnectivityServices (context.Empty ) returns (context.Empty ) {}
}
......@@ -12,26 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// protocol buffers documentation: https://developers.google.com/protocol-buffers/docs/proto3
syntax = "proto3";
package vnt_manager;
import "context.proto";
service VNTManagerService {
rpc VNTSubscript (VNTSubscriptionRequest) returns (VNTSubscriptionReply) {}
rpc ListVirtualLinkIds (context.Empty) returns (context.LinkIdList) {}
rpc ListVirtualLinks (context.Empty) returns (context.LinkList) {}
rpc GetVirtualLink (context.LinkId) returns (context.Link) {}
rpc SetVirtualLink (context.Link) returns (context.LinkId) {}
rpc RemoveVirtualLink (context.LinkId) returns (context.Empty) {}
}
message VNTSubscriptionRequest {
string host = 1;
string port = 2;
}
message VNTSubscriptionReply {
string subscription = 1;
rpc ListVirtualLinkIds(context.Empty ) returns (context.LinkIdList) {}
rpc ListVirtualLinks (context.Empty ) returns (context.LinkList ) {}
rpc GetVirtualLink (context.LinkId) returns (context.Link ) {}
rpc SetVirtualLink (context.Link ) returns (context.LinkId ) {}
rpc RemoveVirtualLink (context.LinkId) returns (context.Empty ) {}
}
#!/bin/bash
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
......@@ -12,12 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_baseurl_http, get_service_port_http
from common.tools.service.GenericRestServer import GenericRestServer
class RestServer(GenericRestServer):
def __init__(self, cls_name: str = __name__) -> None:
bind_port = get_service_port_http(ServiceNameEnum.NBI)
base_url = get_service_baseurl_http(ServiceNameEnum.NBI)
super().__init__(bind_port, base_url, cls_name=cls_name)
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
# helpful pytest flags: --log-level=INFO -o log_cli=true --verbose --maxfail=1 --durations=0
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
nbi/tests/test_core.py
......@@ -90,7 +90,6 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.POLICY .value : 6060,
ServiceNameEnum.MONITORING .value : 7070,
ServiceNameEnum.DLT .value : 8080,
ServiceNameEnum.NBI .value : 9090,
ServiceNameEnum.L3_CAD .value : 10001,
ServiceNameEnum.L3_AM .value : 10002,
ServiceNameEnum.DBSCANSERVING .value : 10008,
......
......@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
import grpc #, logging
from typing import Any, Dict, List, Set
LOGGER = logging.getLogger(__name__)
#LOGGER = logging.getLogger(__name__)
class InMemoryObjectDatabase:
def __init__(self) -> None:
......@@ -29,12 +29,12 @@ class InMemoryObjectDatabase:
return [container[entry_uuid] for entry_uuid in sorted(container.keys())]
def has_entry(self, container_name : str, entry_uuid : str) -> Any:
LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(self._database)))
#LOGGER.debug('[has_entry] BEFORE database={:s}'.format(str(self._database)))
container = self._get_container(container_name)
return entry_uuid in container
def get_entry(self, container_name : str, entry_uuid : str, context : grpc.ServicerContext) -> Any:
LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(self._database)))
#LOGGER.debug('[get_entry] BEFORE database={:s}'.format(str(self._database)))
container = self._get_container(container_name)
if entry_uuid not in container:
MSG = '{:s}({:s}) not found; available({:s})'
......@@ -44,18 +44,18 @@ class InMemoryObjectDatabase:
def set_entry(self, container_name : str, entry_uuid : str, entry : Any) -> Any:
container = self._get_container(container_name)
LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(self._database)))
#LOGGER.debug('[set_entry] BEFORE database={:s}'.format(str(self._database)))
container[entry_uuid] = entry
LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(self._database)))
#LOGGER.debug('[set_entry] AFTER database={:s}'.format(str(self._database)))
return entry
def del_entry(self, container_name : str, entry_uuid : str, context : grpc.ServicerContext) -> None:
container = self._get_container(container_name)
LOGGER.debug('[del_entry] BEFORE database={:s}'.format(str(self._database)))
#LOGGER.debug('[del_entry] BEFORE database={:s}'.format(str(self._database)))
if entry_uuid not in container:
context.abort(grpc.StatusCode.NOT_FOUND, str('{:s}({:s}) not found'.format(container_name, entry_uuid)))
del container[entry_uuid]
LOGGER.debug('[del_entry] AFTER database={:s}'.format(str(self._database)))
#LOGGER.debug('[del_entry] AFTER database={:s}'.format(str(self._database)))
def select_entries(self, container_name : str, entry_uuids : Set[str]) -> List[Any]:
if len(entry_uuids) == 0: return self.get_entries(container_name)
......
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum, logging, requests
from requests.auth import HTTPBasicAuth
from typing import Any, Optional, Set
class RestRequestMethod(enum.Enum):
GET = 'get'
POST = 'post'
PUT = 'put'
PATCH = 'patch'
DELETE = 'delete'
EXPECTED_STATUS_CODES : Set[int] = {
requests.codes['OK' ], # 200 - OK
requests.codes['CREATED' ], # 201 - Created
requests.codes['ACCEPTED' ], # 202 - Accepted
requests.codes['NO_CONTENT'], # 204 - No Content
}
URL_TEMPLATE = '{:s}://{:s}:{:d}/{:s}'
def compose_basic_auth(
username : Optional[str] = None, password : Optional[str] = None
) -> Optional[HTTPBasicAuth]:
if username is None or password is None: return None
return HTTPBasicAuth(username, password)
class SchemeEnum(enum.Enum):
HTTP = 'http'
HTTPS = 'https'
def check_scheme(scheme : str) -> str:
str_scheme = str(scheme).lower()
enm_scheme = SchemeEnum._value2member_map_[str_scheme]
return enm_scheme.value
class RestClient:
def __init__(
self, address : str, port : int, scheme : str = 'http',
username : Optional[str] = None, password : Optional[str] = None,
timeout : int = 30, verify_certs : bool = True, allow_redirects : bool = True,
logger : Optional[logging.Logger] = None
) -> None:
self._address = address
self._port = int(port)
self._scheme = check_scheme(scheme)
self._auth = compose_basic_auth(username=username, password=password)
self._timeout = int(timeout)
self._verify_certs = verify_certs
self._allow_redirects = allow_redirects
self._logger = logger
def _compose_url(self, endpoint : str) -> str:
endpoint = endpoint.lstrip('/')
return URL_TEMPLATE.format(self._scheme, self._address, self._port, endpoint)
def _log_msg_request(
self, method : RestRequestMethod, request_url : str, body : Optional[Any],
log_level : int = logging.INFO
) -> str:
msg = 'Request: {:s} {:s}'.format(str(method.value).upper(), str(request_url))
if body is not None: msg += ' body={:s}'.format(str(body))
if self._logger is not None: self._logger.log(log_level, msg)
return msg
def _log_msg_check_reply(
self, method : RestRequestMethod, request_url : str, body : Optional[Any],
reply : requests.Response, expected_status_codes : Set[int],
log_level : int = logging.INFO
) -> str:
msg = 'Reply: {:s}'.format(str(reply.text))
if self._logger is not None: self._logger.log(log_level, msg)
http_status_code = reply.status_code
if http_status_code in expected_status_codes: return msg
MSG = 'Request failed. method={:s} url={:s} body={:s} status_code={:s} reply={:s}'
msg = MSG.format(
str(method.value).upper(), str(request_url), str(body),
str(http_status_code), str(reply.text)
)
self._logger.error(msg)
raise Exception(msg)
def _do_rest_request(
self, method : RestRequestMethod, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
request_url = self._compose_url(endpoint)
self._log_msg_request(method, request_url, body)
try:
headers = {'accept': 'application/json'}
reply = requests.request(
method.value, request_url, headers=headers, json=body,
auth=self._auth, verify=self._verify_certs, timeout=self._timeout,
allow_redirects=self._allow_redirects
)
except Exception as e:
MSG = 'Request failed. method={:s} url={:s} body={:s}'
msg = MSG.format(str(method.value).upper(), request_url, str(body))
self._logger.exception(msg)
raise Exception(msg) from e
self._log_msg_check_reply(method, request_url, body, reply, expected_status_codes)
if reply.content and len(reply.content) > 0: return reply.json()
return None
def get(
self, endpoint : str,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.GET, endpoint,
expected_status_codes=expected_status_codes
)
def post(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.POST, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def put(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.PUT, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def patch(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.PATCH, endpoint, body=body,
expected_status_codes=expected_status_codes
)
def delete(
self, endpoint : str, body : Optional[Any] = None,
expected_status_codes : Set[int] = EXPECTED_STATUS_CODES
) -> Optional[Any]:
return self._do_rest_request(
RestRequestMethod.DELETE, endpoint, body=body,
expected_status_codes=expected_status_codes
)
......@@ -45,12 +45,13 @@ from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from vnt_manager.client.VNTManagerClient import VNTManagerClient
from .Tools import (
format_device_custom_config_rules, format_service_custom_config_rules,
format_slice_custom_config_rules, get_descriptors_add_contexts,
get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_controllers_and_network_devices,
split_devices_by_rules
split_devices_by_rules, split_links_by_type
)
LOGGER = logging.getLogger(__name__)
......@@ -112,7 +113,8 @@ class DescriptorLoader:
self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None,
num_workers : int = 1,
context_client : Optional[ContextClient] = None, device_client : Optional[DeviceClient] = None,
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None
service_client : Optional[ServiceClient] = None, slice_client : Optional[SliceClient] = None,
vntm_client : Optional[VNTManagerClient] = None
) -> None:
if (descriptors is None) == (descriptors_file is None):
# pylint: disable=broad-exception-raised
......@@ -190,10 +192,11 @@ class DescriptorLoader:
self.__services_add = None
self.__slices_add = None
self.__ctx_cli = ContextClient() if context_client is None else context_client
self.__dev_cli = DeviceClient() if device_client is None else device_client
self.__svc_cli = ServiceClient() if service_client is None else service_client
self.__slc_cli = SliceClient() if slice_client is None else slice_client
self.__ctx_cli = ContextClient() if context_client is None else context_client
self.__dev_cli = DeviceClient() if device_client is None else device_client
self.__svc_cli = ServiceClient() if service_client is None else service_client
self.__slc_cli = SliceClient() if slice_client is None else slice_client
self.__vnt_cli = VNTManagerClient() if vntm_client is None else vntm_client
self.__results : TypeResults = list()
......@@ -352,22 +355,38 @@ class DescriptorLoader:
controllers_add, network_devices_add = split_controllers_and_network_devices(self.__devices_add)
typed_links = split_links_by_type(self.__links)
typed_normal_links = typed_links.get('normal', list())
typed_optical_links = typed_links.get('optical', list())
typed_optical_links.extend(self.__optical_links)
typed_virtual_links = typed_links.get('virtual', list())
self.__ctx_cli.connect()
self.__dev_cli.connect()
self.__svc_cli.connect()
self.__slc_cli.connect()
if len(self.__services ) > 0: self.__svc_cli.connect()
if len(self.__slices ) > 0: self.__slc_cli.connect()
if len(typed_virtual_links) > 0: self.__vnt_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
self._process_descr('controller', 'add', self.__dev_cli.AddDevice, Device, controllers_add )
self._process_descr('device', 'add', self.__dev_cli.AddDevice, Device, network_devices_add )
self._process_descr('device', 'config', self.__dev_cli.ConfigureDevice, Device, self.__devices_config)
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, self.__links )
self._process_descr('link', 'add', self.__ctx_cli.SetOpticalLink, OpticalLink, self.__optical_links )
self._process_descr('service', 'add', self.__svc_cli.CreateService, Service, self.__services_add )
self._process_descr('service', 'update', self.__svc_cli.UpdateService, Service, self.__services )
self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add )
self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices )
self._process_descr('link', 'add', self.__ctx_cli.SetLink, Link, typed_normal_links )
if len(typed_optical_links) > 0:
self._process_descr('link', 'add', self.__ctx_cli.SetOpticalLink, OpticalLink, typed_optical_links )
if len(typed_virtual_links) > 0:
self._process_descr('link', 'add', self.__vnt_cli.SetVirtualLink, Link, typed_virtual_links )
if len(self.__services) > 0:
self._process_descr('service','add', self.__svc_cli.CreateService, Service, self.__services_add )
self._process_descr('service','update', self.__svc_cli.UpdateService, Service, self.__services )
if len(self.__slices) > 0:
self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add )
self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices )
# By default the Context component automatically assigns devices and links to topologies based on their
# endpoints, and assigns topologies, services, and slices to contexts based on their identifiers.
......@@ -468,10 +487,17 @@ class DescriptorLoader:
def _unload_normal_mode(self) -> None:
# Normal mode: follows the automated workflows in the different components
typed_links = split_links_by_type(self.links)
typed_normal_links = typed_links.get('normal', list())
typed_optical_links = typed_links.get('optical', list())
typed_optical_links.extend(self.optical_links)
typed_virtual_links = typed_links.get('virtual', list())
self.__ctx_cli.connect()
self.__dev_cli.connect()
self.__svc_cli.connect()
self.__slc_cli.connect()
if len(self.services ) > 0: self.__svc_cli.connect()
if len(self.slices ) > 0: self.__slc_cli.connect()
if len(typed_virtual_links) > 0: self.__vnt_cli.connect()
for _, slice_list in self.slices.items():
for slice_ in slice_list:
......@@ -481,10 +507,13 @@ class DescriptorLoader:
for service in service_list:
self.__svc_cli.DeleteService(ServiceId(**service['service_id']))
for optical_link in self.optical_links:
for virtual_link in typed_virtual_links:
self.__vnt_cli.RemoveVirtualLink(LinkId(**virtual_link['link_id']))
for optical_link in typed_optical_links:
self.__ctx_cli.DeleteOpticalLink(LinkId(**optical_link['link_id']))
for link in self.links:
for link in typed_normal_links:
self.__ctx_cli.RemoveLink(LinkId(**link['link_id']))
for device in self.devices:
......