Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 1155 additions and 54 deletions
def retrieve_external_topologies(self):
i = 1
while True:
try:
ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD'))
PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT'))
except: # pylint: disable=bare-except
break
try:
LOGGER.info('Retrieving external controller #{:d}'.format(i))
url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format(
ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
)
LOGGER.info('url={:s}'.format(str(url)))
topo = requests.get(url).json()
LOGGER.info('Retrieved external controller #{:d}'.format(i))
except: # pylint: disable=bare-except
LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i))
topology_details = TopologyDetails(**topo)
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology = Topology()
topology.topology_id.context_id.CopyFrom(context.context_id)
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
for device in topology_details.devices:
context_client.SetDevice(device)
for link in topology_details.links:
context_client.SetLink(link)
i+=1
...@@ -12,62 +12,68 @@ ...@@ -12,62 +12,68 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging, signal, sys, threading
import signal
import sys
import threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST, from common.Settings import (
ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
get_log_level, get_metrics_port, get_log_level, get_metrics_port, wait_for_environment_variables
wait_for_environment_variables) )
from .subscriptions.ControllerDiscoverer import ControllerDiscoverer
from .subscriptions.Subscriptions import Subscriptions
from .subscriptions.dispatchers.Dispatchers import Dispatchers
from .subscriptions.dispatchers.recommendation.Dispatcher import RecommendationDispatcher
from .E2EOrchestratorService import E2EOrchestratorService from .E2EOrchestratorService import E2EOrchestratorService
terminate = threading.Event() TERMINATE = threading.Event()
LOGGER = None
LOG_LEVEL = get_log_level()
logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning("Terminate signal received")
terminate.set()
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
TERMINATE.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level() def main():
logging.basicConfig(level=log_level) wait_for_environment_variables([
LOGGER = logging.getLogger(__name__) get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info("Starting...") LOGGER.info('Starting...')
# Start metrics server # Start metrics server
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
# Starting CentralizedCybersecurity service # Starting service
grpc_service = E2EOrchestratorService() grpc_service = E2EOrchestratorService()
grpc_service.start() grpc_service.start()
LOGGER.info("Started...")
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1):
pass
dispatchers = Dispatchers(TERMINATE)
dispatchers.add_dispatcher(RecommendationDispatcher)
subscriptions = Subscriptions(dispatchers, TERMINATE)
discoverer = ControllerDiscoverer(subscriptions, TERMINATE)
discoverer.start()
LOGGER.info('Running...')
# Wait for Ctrl+C or termination signal
while not TERMINATE.wait(timeout=1.0): pass
LOGGER.info("Terminating...") LOGGER.info('Terminating...')
discoverer.stop()
grpc_service.stop() grpc_service.stop()
LOGGER.info("Bye") LOGGER.info('Bye')
return 0 return 0
if __name__ == "__main__": if __name__ == '__main__':
sys.exit(main()) sys.exit(main())
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, grpc, json, logging, networkx, requests, threading
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import (
Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId,
ServiceTypeEnum, ServiceStatusEnum)
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
from context.service.database.uuids.Device import device_get_uuid
from service.client.ServiceClient import ServiceClient
from websockets.sync.client import connect
from websockets.sync.server import serve
LOGGER = logging.getLogger(__name__)
logging.getLogger("websockets").propagate = True
logging.getLogger("requests.packages.urllib3").propagate = True
METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")
context_client: ContextClient = ContextClient()
service_client: ServiceClient = ServiceClient()
EXT_HOST = str(get_setting('WS_IP_HOST'))
EXT_PORT = int(get_setting('WS_IP_PORT'))
EXT_URL = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT)
OWN_HOST = str(get_setting('WS_E2E_HOST'))
OWN_PORT = int(get_setting('WS_E2E_PORT'))
ALL_HOSTS = '0.0.0.0'
class SubscriptionServer(threading.Thread):
def run(self):
request = VNTSubscriptionRequest()
request.host = OWN_HOST
request.port = OWN_PORT
try:
LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL))
websocket = connect(EXT_URL)
except: # pylint: disable=bare-except
LOGGER.exception('Error connecting to {:s}'.format(EXT_URL))
else:
with websocket:
LOGGER.debug('Connected to {:s}'.format(EXT_URL))
send = grpc_message_to_json_string(request)
websocket.send(send)
LOGGER.debug('Sent: {:s}'.format(send))
try:
message = websocket.recv()
LOGGER.debug('Received message from WebSocket: {:s}'.format(message))
except Exception as ex:
LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex))
self._events_server()
def _events_server(self):
try:
server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT))
except: # pylint: disable=bare-except
LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
else:
with server:
LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
server.serve_forever()
def _event_received(self, connection):
LOGGER.debug('Event received')
for message in connection:
message_json = json.loads(message)
# Link creation
if 'link_id' in message_json:
LOGGER.debug('Link creation')
link = Link(**message_json)
service = Service()
service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
service_client.CreateService(service)
a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]
links = context_client.ListLinks(Empty()).links
for _link in links:
for _endpoint_id in _link.link_endpoint_ids:
if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
a_ep_id = _endpoint_id
elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
z_ep_id = _endpoint_id
if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()):
error_msg = f'Could not get VNT link endpoints\
\n\ta_endpoint_uuid= {a_endpoint_uuid}\
\n\tz_endpoint_uuid= {z_device_uuid}'
LOGGER.error(error_msg)
connection.send(error_msg)
return
service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))
service_client.UpdateService(service)
re_svc = context_client.GetService(service.service_id)
connection.send(grpc_message_to_json_string(link))
context_client.SetLink(link)
elif 'link_uuid' in message_json:
LOGGER.debug('Link removal')
link_id = LinkId(**message_json)
service_id = ServiceId()
service_id.service_uuid.uuid = link_id.link_uuid.uuid
service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service_client.DeleteService(service_id)
connection.send(grpc_message_to_json_string(link_id))
context_client.RemoveLink(link_id)
else:
LOGGER.debug('Topology received')
topology_details = TopologyDetails(**message_json)
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology = Topology()
topology.topology_id.context_id.CopyFrom(context.context_id)
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
for device in topology_details.devices:
context_client.SetDevice(device)
for link in topology_details.links:
context_client.SetLink(link)
class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
def __init__(self):
LOGGER.debug('Creating Servicer...')
try:
LOGGER.debug('Requesting subscription')
sub_server = SubscriptionServer()
sub_server.start()
LOGGER.debug('Servicer Created')
self.retrieve_external_topologies()
except:
LOGGER.exception('Unhandled Exception')
def retrieve_external_topologies(self):
i = 1
while True:
try:
ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD'))
PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT'))
except: # pylint: disable=bare-except
break
try:
LOGGER.info('Retrieving external controller #{:d}'.format(i))
url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format(
ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
)
LOGGER.info('url={:s}'.format(str(url)))
topo = requests.get(url).json()
LOGGER.info('Retrieved external controller #{:d}'.format(i))
except: # pylint: disable=bare-except
LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i))
topology_details = TopologyDetails(**topo)
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology = Topology()
topology.topology_id.context_id.CopyFrom(context.context_id)
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
for device in topology_details.devices:
context_client.SetDevice(device)
for link in topology_details.links:
context_client.SetLink(link)
i+=1
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply:
endpoints_ids = []
for endpoint_id in request.service.service_endpoint_ids:
endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2])
graph = networkx.Graph()
devices = context_client.ListDevices(Empty()).devices
for device in devices:
endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid
for endpoint in device.device_endpoints]
for ep in endpoints_uuids:
graph.add_node(ep)
for ep in endpoints_uuids:
for ep_i in endpoints_uuids:
if ep == ep_i:
continue
graph.add_edge(ep, ep_i)
links = context_client.ListLinks(Empty()).links
for link in links:
eps = []
for endpoint_id in link.link_endpoint_ids:
eps.append(endpoint_id.endpoint_uuid.uuid)
graph.add_edge(eps[0], eps[1])
shortest = networkx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])
path = E2EOrchestratorReply()
path.services.append(copy.deepcopy(request.service))
for i in range(0, int(len(shortest)/2)):
conn = Connection()
ep_a_uuid = str(shortest[i*2])
ep_z_uuid = str(shortest[i*2+1])
conn.connection_id.connection_uuid.uuid = str(ep_a_uuid) + '_->_' + str(ep_z_uuid)
ep_a_id = EndPointId()
ep_a_id.endpoint_uuid.uuid = ep_a_uuid
conn.path_hops_endpoint_ids.append(ep_a_id)
ep_z_id = EndPointId()
ep_z_id.endpoint_uuid.uuid = ep_z_uuid
conn.path_hops_endpoint_ids.append(ep_z_id)
path.connections.append(conn)
return path
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, queue, threading
from typing import Any, Optional
from common.proto.context_pb2 import DeviceEvent, Empty
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from .Subscriptions import Subscriptions
from .TFSControllerSettings import get_tfs_controller_settings
LOGGER = logging.getLogger(__name__)
class EventDispatcher(BaseEventDispatcher):
def __init__(
self, events_queue : queue.PriorityQueue,
context_client : ContextClient,
subscriptions : Subscriptions,
terminate : Optional[threading.Event] = None
) -> None:
super().__init__(events_queue, terminate)
self._context_client = context_client
self._subscriptions = subscriptions
def dispatch_device_create(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
tfs_ctrl_settings = get_tfs_controller_settings(
self._context_client, device_event
)
if tfs_ctrl_settings is None: return
self._subscriptions.add_subscription(tfs_ctrl_settings)
def dispatch_device_update(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
tfs_ctrl_settings = get_tfs_controller_settings(
self._context_client, device_event
)
if tfs_ctrl_settings is None: return
self._subscriptions.add_subscription(tfs_ctrl_settings)
def dispatch_device_remove(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
device_uuid = device_event.device_id.device_uuid.uuid
self._subscriptions.remove_subscription(device_uuid)
def dispatch(self, event : Any) -> None:
MSG = 'Unexpected Event: {:s}'
LOGGER.warning(MSG.format(grpc_message_to_json_string(event)))
class ControllerDiscoverer:
def __init__(
self, subscriptions : Subscriptions, terminate : threading.Event
) -> None:
self._context_client = ContextClient()
self._event_collector = BaseEventCollector(terminate=terminate)
self._event_collector.install_collector(
self._context_client.GetDeviceEvents, Empty(), log_events_received=True
)
self._event_dispatcher = EventDispatcher(
self._event_collector.get_events_queue(), self._context_client, subscriptions,
terminate=terminate
)
def start(self) -> None:
self._context_client.connect()
self._event_dispatcher.start()
self._event_collector.start()
def stop(self):
self._event_collector.stop()
self._event_dispatcher.stop()
self._context_client.close()
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socketio, threading
from common.Constants import ServiceNameEnum
from common.Settings import get_service_baseurl_http
from .dispatchers.Dispatchers import Dispatchers
from .TFSControllerSettings import TFSControllerSettings
NBI_SERVICE_PREFIX_URL = get_service_baseurl_http(ServiceNameEnum.NBI) or ''
CHILD_SOCKETIO_URL = 'http://{:s}:{:s}@{:s}:{:d}' + NBI_SERVICE_PREFIX_URL
class Subscription(threading.Thread):
def __init__(
self, tfs_ctrl_settings : TFSControllerSettings, dispatchers : Dispatchers,
terminate : threading.Event
) -> None:
super().__init__(daemon=True)
self._settings = tfs_ctrl_settings
self._dispatchers = dispatchers
self._terminate = terminate
self._is_running = threading.Event()
@property
def is_running(self): return self._is_running.is_set()
def run(self) -> None:
child_socketio_url = CHILD_SOCKETIO_URL.format(
self._settings.nbi_username,
self._settings.nbi_password,
self._settings.nbi_address,
self._settings.nbi_port,
)
sio = socketio.Client(logger=True, engineio_logger=True)
self._dispatchers.register(sio)
sio.connect(child_socketio_url)
while not self._terminate.is_set():
sio.sleep(seconds=0.5)
sio.shutdown()
def stop(self):
self._terminate.set()
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, threading
from typing import Dict
from .dispatchers.Dispatchers import Dispatchers
from .Subscription import Subscription
from .TFSControllerSettings import TFSControllerSettings
LOGGER = logging.getLogger(__name__)
class Subscriptions:
def __init__(self, dispatchers : Dispatchers, terminate : threading.Event) -> None:
self._dispatchers = dispatchers
self._terminate = terminate
self._lock = threading.Lock()
self._subscriptions : Dict[str, Subscription] = dict()
def add_subscription(self, tfs_ctrl_settings : TFSControllerSettings) -> None:
device_uuid = tfs_ctrl_settings.device_uuid
with self._lock:
subscription = self._subscriptions.get(device_uuid)
if subscription is not None: return
subscription = Subscription(tfs_ctrl_settings, self._dispatchers, self._terminate)
self._subscriptions[device_uuid] = subscription
subscription.start()
def remove_subscription(self, device_uuid : str) -> None:
with self._lock:
subscription = self._subscriptions.get(device_uuid)
if subscription is None: return
if subscription.is_running: subscription.stop()
self._subscriptions.pop(device_uuid, None)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from dataclasses import dataclass
from typing import Optional
from common.DeviceTypes import DeviceTypeEnum
from common.proto.context_pb2 import ConfigActionEnum, DeviceEvent
from common.tools.context_queries.Device import get_device
from context.client.ContextClient import ContextClient
@dataclass
class TFSControllerSettings:
device_uuid : str
device_type : DeviceTypeEnum
nbi_address : str
nbi_port : int
nbi_username : str
nbi_password : str
SELECTED_DEVICE_TYPES = {
DeviceTypeEnum.TERAFLOWSDN_CONTROLLER.value
}
def get_tfs_controller_settings(
context_client : ContextClient, device_event : DeviceEvent
) -> Optional[TFSControllerSettings]:
device_uuid = device_event.device_id.device_uuid.uuid
device = get_device(
context_client, device_uuid, rw_copy=False,
include_endpoints=False, include_config_rules=True,
include_components=False
)
device_type = device.device_type
if device_type not in SELECTED_DEVICE_TYPES: return None
connect_rules = dict()
for config_rule in device.device_config.config_rules:
if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
if config_rule.WhichOneof('config_rule') != 'custom': continue
if not config_rule.custom.resource_key.startswith('_connect/'): continue
connect_attribute = config_rule.custom.resource_key.replace('_connect/', '')
if connect_attribute == 'settings':
settings = json.loads(config_rule.custom.resource_value)
for field in ['username', 'password']:
connect_rules[field] = settings[field]
else:
connect_rules[connect_attribute] = config_rule.custom.resource_value
return TFSControllerSettings(
device_uuid = device_uuid,
device_type = device_type,
nbi_address = str(connect_rules['address' ]),
nbi_port = int(connect_rules['port' ]),
nbi_username = str(connect_rules['username']),
nbi_password = str(connect_rules['password']),
)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, socketio, threading
from typing import List, Type
from ._Dispatcher import _Dispatcher
LOGGER = logging.getLogger(__name__)
class Dispatchers:
def __init__(self, terminate : threading.Event) -> None:
self._terminate = terminate
self._dispatchers : List[_Dispatcher] = list()
def add_dispatcher(self, dispatcher_class : Type[_Dispatcher]) -> None:
dispatcher = dispatcher_class(self._terminate)
self._dispatchers.append(dispatcher)
dispatcher.start()
def register(self, sio_client : socketio.Client) -> None:
for dispatcher in self._dispatchers:
dispatcher.register(sio_client)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import queue, socketio, threading
from concurrent.futures import Future
from typing import Any, Tuple
class _Dispatcher(threading.Thread):
def __init__(self, terminate : threading.Event):
super().__init__(daemon=True)
self._dispatcher_queue = queue.Queue[Tuple[Any, Future]]()
self._terminate = terminate
@property
def dispatcher_queue(self): return self._dispatcher_queue
def register(self, sio_client : socketio.Client) -> None:
raise NotImplementedError('To be implemented in subclass')
def run(self):
while not self._terminate.is_set():
try:
request,future = self._dispatcher_queue.get(block=True, timeout=1.0)
except queue.Empty:
continue
try:
result = self.process_request(request)
except Exception as e:
future.set_exception(e)
else:
future.set_result(result)
def process_request(self, request : Any) -> Any:
raise NotImplementedError('To be implemented in subclass')
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, queue, socketio
from concurrent.futures import Future
from .Constants import SIO_NAMESPACE
from .Recommendation import Recommendation, RecommendationAction
LOGGER = logging.getLogger(__name__)
class ClientNamespace(socketio.ClientNamespace):
def __init__(self, dispatcher_queue : queue.Queue[Recommendation]):
self._dispatcher_queue = dispatcher_queue
super().__init__(namespace=SIO_NAMESPACE)
def on_connect(self):
LOGGER.info('[on_connect] Connected')
def on_disconnect(self, reason):
MSG = '[on_disconnect] Disconnected!, reason: {:s}'
LOGGER.info(MSG.format(str(reason)))
def on_vlink_create(self, data):
MSG = '[on_vlink_create] begin data={:s}'
LOGGER.info(MSG.format(str(data)))
json_data = json.loads(data)
request_key = json_data.pop('_request_key')
recommendation = Recommendation(
action = RecommendationAction.VLINK_CREATE,
data = json_data,
)
result = Future()
MSG = '[on_vlink_create] Recommendation ({:s}): {:s}'
LOGGER.info(MSG.format(str(request_key), str(recommendation)))
LOGGER.debug('[on_vlink_create] Queuing recommendation...')
self._dispatcher_queue.put_nowait((recommendation, result))
reply = dict()
reply['_request_key'] = request_key
try:
reply['result'] = result.result()
event = reply['result'].pop('event')
except Exception as e:
reply['error'] = str(e)
#reply['stacktrace'] = str(e)
event = 'error'
LOGGER.debug('[on_vlink_create] Replying...')
self.emit(event, json.dumps(reply))
LOGGER.debug('[on_vlink_create] end')
def on_vlink_remove(self, data):
MSG = '[on_vlink_remove] begin data={:s}'
LOGGER.info(MSG.format(str(data)))
json_data = json.loads(data)
request_key = json_data.pop('_request_key')
recommendation = Recommendation(
action = RecommendationAction.VLINK_REMOVE,
data = json_data,
)
result = Future()
MSG = '[on_vlink_remove] Recommendation ({:s}): {:s}'
LOGGER.info(MSG.format(str(request_key), str(recommendation)))
LOGGER.debug('[on_vlink_remove] Queuing recommendation...')
self._dispatcher_queue.put_nowait((recommendation, result))
reply = dict()
reply['_request_key'] = request_key
try:
reply['result'] = result.result()
event = reply['result'].pop('event')
except Exception as e:
reply['error'] = str(e)
#reply['stacktrace'] = str(e)
event = 'error'
LOGGER.debug('[on_vlink_remove] Replying...')
self.emit(event, json.dumps(reply))
LOGGER.debug('[on_vlink_remove] end')
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
SIO_NAMESPACE = '/recommendations'
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, logging, socketio
from typing import Dict
from common.Constants import DEFAULT_CONTEXT_NAME
from common.proto.context_pb2 import Service, ServiceId
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Service import json_service_id
from service.client.ServiceClient import ServiceClient
from .._Dispatcher import _Dispatcher
from .ClientNamespace import ClientNamespace
from .Recommendation import Recommendation, RecommendationAction
from .Tools import compose_optical_service
LOGGER = logging.getLogger(__name__)
class RecommendationDispatcher(_Dispatcher):
def register(self, sio_client : socketio.Client) -> None:
sio_client.register_namespace(ClientNamespace(self.dispatcher_queue))
def process_request(self, request : Recommendation) -> Dict:
LOGGER.info('[process_request] request={:s}'.format(str(request)))
if request.action == RecommendationAction.VLINK_CREATE:
vlink_optical_service = compose_optical_service(request.data)
vlink_optical_service_add = copy.deepcopy(vlink_optical_service)
vlink_optical_service_add.pop('service_endpoint_ids', None)
vlink_optical_service_add.pop('service_constraints', None)
vlink_optical_service_add.pop('service_config', None)
service_client = ServiceClient()
service_id = service_client.CreateService(Service(**vlink_optical_service_add))
service_uuid = service_id.service_uuid.uuid
vlink_optical_service['service_id']['service_uuid']['uuid'] = service_uuid
service_id = service_client.UpdateService(Service(**vlink_optical_service))
result = {'event': 'vlink_created', 'vlink_uuid': service_uuid}
elif request.action == RecommendationAction.VLINK_REMOVE:
vlink_service_uuid = request.data['link_uuid']['uuid']
context_id = json_context_id(DEFAULT_CONTEXT_NAME)
vlink_optical_service_id = json_service_id(vlink_service_uuid, context_id=context_id)
service_client = ServiceClient()
service_id = service_client.DeleteService(ServiceId(**vlink_optical_service_id))
if vlink_service_uuid == 'IP1/PORT-xe1==IP2/PORT-xe1':
service_id = service_client.DeleteService(ServiceId(**vlink_optical_service_id))
result = {'event': 'vlink_removed'}
else:
MSG = 'RecommendationAction not supported in Recommendation({:s})'
raise NotImplementedError(MSG.format(str(request)))
return result
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict
class RecommendationAction(Enum):
VLINK_CREATE = 'vlink-create'
VLINK_REMOVE = 'vlink-remove'
@dataclass
class Recommendation:
action : RecommendationAction
data : Dict = field(default_factory=dict)
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, networkx
from dataclasses import dataclass, field
from typing import Dict, List, Set
from common.proto.context_pb2 import ServiceTypeEnum
from common.tools.context_queries.Topology import get_topology_details
from common.tools.object_factory.Constraint import json_constraint_custom
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Device import json_device_id
from common.tools.object_factory.EndPoint import json_endpoint_id
from common.tools.object_factory.Service import json_service
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.DeviceTypes import DeviceTypeEnum
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
@dataclass
class GraphAndMapping:
graph : networkx.Graph = field(default_factory=networkx.Graph)
device_to_type : Dict[str, str] = field(default_factory=dict)
device_name_to_uuid : Dict[str, str] = field(default_factory=dict)
endpoint_name_to_uuid : Dict[Dict[str, str], str] = field(default_factory=dict)
endpoint_to_device_uuid : Dict[str, str] = field(default_factory=dict)
EXCLUDED_DEVICE_TYPES : Set[str] = {
DeviceTypeEnum.EMULATED_IP_SDN_CONTROLLER.value,
DeviceTypeEnum.EMULATED_MICROWAVE_RADIO_SYSTEM.value,
DeviceTypeEnum.EMULATED_OPEN_LINE_SYSTEM.value,
DeviceTypeEnum.EMULATED_XR_CONSTELLATION.value,
DeviceTypeEnum.IETF_SLICE.value,
DeviceTypeEnum.IP_SDN_CONTROLLER.value,
DeviceTypeEnum.MICROWAVE_RADIO_SYSTEM.value,
DeviceTypeEnum.NCE.value,
DeviceTypeEnum.OPEN_LINE_SYSTEM.value,
DeviceTypeEnum.TERAFLOWSDN_CONTROLLER.value,
DeviceTypeEnum.XR_CONSTELLATION.value,
}
def compose_graph_from_topology() -> GraphAndMapping:
context_client = ContextClient()
topology_details = get_topology_details(
context_client, DEFAULT_TOPOLOGY_NAME,
context_uuid=DEFAULT_CONTEXT_NAME, rw_copy=False
)
graph_and_mapping = GraphAndMapping()
excluded_device_uuids : Set[str] = set()
for device in topology_details.devices:
device_uuid = device.device_id.device_uuid.uuid
graph_and_mapping.device_name_to_uuid.setdefault(device.name, device_uuid)
graph_and_mapping.device_name_to_uuid.setdefault(device_uuid, device_uuid)
graph_and_mapping.device_to_type.setdefault(device_uuid, device.device_type)
if device.device_type in EXCLUDED_DEVICE_TYPES:
excluded_device_uuids.add(device_uuid)
continue
endpoint_uuids = list()
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_uuids.append(endpoint_uuid)
graph_and_mapping.graph.add_node(endpoint_uuid)
graph_and_mapping.endpoint_name_to_uuid.setdefault((device_uuid, endpoint.name), endpoint_uuid)
graph_and_mapping.endpoint_name_to_uuid.setdefault((device_uuid, endpoint_uuid), endpoint_uuid)
graph_and_mapping.endpoint_to_device_uuid.setdefault(endpoint_uuid, device_uuid)
for endpoint_uuid_i in endpoint_uuids:
for endpoint_uuid_j in endpoint_uuids:
if endpoint_uuid_i == endpoint_uuid_j: continue
graph_and_mapping.graph.add_edge(endpoint_uuid_i, endpoint_uuid_j)
for link in topology_details.links:
endpoint_id_a = link.link_endpoint_ids[ 0]
endpoint_id_z = link.link_endpoint_ids[-1]
device_uuid_a = endpoint_id_a.device_id.device_uuid.uuid
if device_uuid_a in excluded_device_uuids: continue
device_uuid_z = endpoint_id_z.device_id.device_uuid.uuid
if device_uuid_z in excluded_device_uuids: continue
graph_and_mapping.graph.add_edge(
endpoint_id_a.endpoint_uuid.uuid,
endpoint_id_z.endpoint_uuid.uuid,
)
return graph_and_mapping
def compose_optical_service(vlink_request : Dict) -> Dict:
graph_and_mapping = compose_graph_from_topology()
vlink_endpoint_id_a = vlink_request['link_endpoint_ids'][ 0]
vlink_endpoint_id_b = vlink_request['link_endpoint_ids'][-1]
device_uuid_or_name_a = vlink_endpoint_id_a['device_id']['device_uuid']['uuid']
device_uuid_or_name_b = vlink_endpoint_id_b['device_id']['device_uuid']['uuid']
endpoint_uuid_or_name_a = vlink_endpoint_id_a['endpoint_uuid']['uuid']
endpoint_uuid_or_name_b = vlink_endpoint_id_b['endpoint_uuid']['uuid']
device_uuid_a = graph_and_mapping.device_name_to_uuid[device_uuid_or_name_a]
device_uuid_b = graph_and_mapping.device_name_to_uuid[device_uuid_or_name_b]
endpoint_uuid_a = graph_and_mapping.endpoint_name_to_uuid[(device_uuid_a, endpoint_uuid_or_name_a)]
endpoint_uuid_b = graph_and_mapping.endpoint_name_to_uuid[(device_uuid_b, endpoint_uuid_or_name_b)]
path_hops = networkx.shortest_path(
graph_and_mapping.graph, endpoint_uuid_a, endpoint_uuid_b
)
LOGGER.info('[compose_optical_service] path_hops={:s}'.format(str(path_hops)))
optical_border_endpoint_ids : List[str] = list()
for endpoint_uuid in path_hops:
LOGGER.info('[compose_optical_service] endpoint_uuid={:s}'.format(str(endpoint_uuid)))
device_uuid = graph_and_mapping.endpoint_to_device_uuid[endpoint_uuid]
LOGGER.info('[compose_optical_service] device_uuid={:s}'.format(str(device_uuid)))
device_type = graph_and_mapping.device_to_type[device_uuid]
LOGGER.info('[compose_optical_service] device_type={:s}'.format(str(device_type)))
if device_type != DeviceTypeEnum.EMULATED_OPTICAL_TRANSPONDER.value: continue
device_id = json_device_id(device_uuid)
endpoint_id = json_endpoint_id(device_id, endpoint_uuid)
LOGGER.info('[compose_optical_service] endpoint_id={:s}'.format(str(endpoint_id)))
optical_border_endpoint_ids.append(endpoint_id)
LOGGER.info('[compose_optical_service] optical_border_endpoint_ids={:s}'.format(str(optical_border_endpoint_ids)))
constraints = [
json_constraint_custom('bandwidth[gbps]', str(vlink_request['attributes']['total_capacity_gbps'])),
json_constraint_custom('bidirectionality', '1'),
]
vlink_service_uuid = vlink_request['link_id']['link_uuid']['uuid']
if vlink_service_uuid == 'IP1/PORT-xe1==IP2/PORT-xe1':
constraints.append(json_constraint_custom('optical-band-width[GHz]', '300'))
vlink_optical_service = json_service(
vlink_service_uuid,
ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY,
context_id=json_context_id(DEFAULT_CONTEXT_NAME),
endpoint_ids=[
optical_border_endpoint_ids[0], optical_border_endpoint_ids[-1]
],
constraints=constraints,
)
return vlink_optical_service
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -16,27 +16,28 @@ import logging, signal, sys, threading ...@@ -16,27 +16,28 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import ( from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
wait_for_environment_variables) get_log_level, get_metrics_port, wait_for_environment_variables
)
from .ForecasterService import ForecasterService from .ForecasterService import ForecasterService
terminate = threading.Event()
LOGGER : logging.Logger = None TERMINATE = threading.Event()
LOG_LEVEL = get_log_level()
logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING)
logging.getLogger('monitoring-client').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__)
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received') LOGGER.warning('Terminate signal received')
terminate.set() TERMINATE.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
logging.getLogger('apscheduler.scheduler').setLevel(logging.WARNING)
logging.getLogger('monitoring-client').setLevel(logging.WARNING)
LOGGER = logging.getLogger(__name__)
def main():
wait_for_environment_variables([ wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
...@@ -53,12 +54,13 @@ def main(): ...@@ -53,12 +54,13 @@ def main():
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
# Starting Forecaster service # Starting service
grpc_service = ForecasterService() grpc_service = ForecasterService()
grpc_service.start() grpc_service.start()
LOGGER.info('Running...')
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass while not TERMINATE.wait(timeout=1.0): pass
LOGGER.info('Terminating...') LOGGER.info('Terminating...')
grpc_service.stop() grpc_service.stop()
...@@ -66,5 +68,6 @@ def main(): ...@@ -66,5 +68,6 @@ def main():
LOGGER.info('Bye') LOGGER.info('Bye')
return 0 return 0
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) sys.exit(main())
...@@ -46,6 +46,7 @@ unit_test nbi: ...@@ -46,6 +46,7 @@ unit_test nbi:
stage: unit_test stage: unit_test
needs: needs:
- build nbi - build nbi
- build mock_tfs_nbi_dependencies
before_script: before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- > - >
...@@ -62,19 +63,43 @@ unit_test nbi: ...@@ -62,19 +63,43 @@ unit_test nbi:
fi fi
script: script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 9090:9090 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - docker pull "$CI_REGISTRY_IMAGE/mock_tfs_nbi_dependencies:test"
- >
docker run --name mock_tfs_nbi_dependencies -d -p 10000:10000
--network=teraflowbridge
--env BIND_ADDRESS=0.0.0.0
--env BIND_PORT=10000
--env LOG_LEVEL=INFO
$CI_REGISTRY_IMAGE/mock_tfs_nbi_dependencies:test
- >
docker run --name $IMAGE_NAME -d -v "$PWD/src/$IMAGE_NAME/tests:/opt/results"
--network=teraflowbridge
--env LOG_LEVEL=DEBUG
--env FLASK_ENV=development
--env IETF_NETWORK_RENDERER=LIBYANG
$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5 - sleep 5
- docker ps -a - docker ps -a
- docker logs mock_tfs_nbi_dependencies
- docker logs $IMAGE_NAME - docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_tfs_api.py --junitxml=/opt/results/${IMAGE_NAME}_report_tfs_api.xml" - date
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_l2vpn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_l2vpn.xml" - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_core.py --junitxml=/opt/results/${IMAGE_NAME}_report_core.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_network.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_network.xml" - date
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_ietf_l3vpn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_l3vpn.xml" - docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_tfs_api.py --junitxml=/opt/results/${IMAGE_NAME}_report_tfs_api.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_etsi_bwm.py --junitxml=/opt/results/${IMAGE_NAME}_report_etsi_bwm.xml" - date
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_ietf_l2vpn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_l2vpn.xml"
- date
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_ietf_network.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_network.xml"
- date
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_ietf_l3vpn.py --junitxml=/opt/results/${IMAGE_NAME}_report_ietf_l3vpn.xml"
- date
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_etsi_bwm.py --junitxml=/opt/results/${IMAGE_NAME}_report_etsi_bwm.xml"
- date
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script: after_script:
- docker rm -f $IMAGE_NAME - docker logs mock_tfs_nbi_dependencies
- docker rm -f mock_tfs_nbi_dependencies $IMAGE_NAME
- docker network rm teraflowbridge - docker network rm teraflowbridge
rules: rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
......
...@@ -18,3 +18,6 @@ from werkzeug.security import generate_password_hash ...@@ -18,3 +18,6 @@ from werkzeug.security import generate_password_hash
RESTAPI_USERS = { # TODO: implement a database of credentials and permissions RESTAPI_USERS = { # TODO: implement a database of credentials and permissions
'admin': generate_password_hash('admin'), 'admin': generate_password_hash('admin'),
} }
# Rebuild using: "python -c 'import secrets; print(secrets.token_hex())'"
SECRET_KEY = '2b8ab76763d81f7bced786de8ba40bd67eea6ff79217a711eb5f8d1f19c145c1'