Skip to content
Snippets Groups Projects
Commit 7c6bffe5 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

E2E Orchetsrator component:

- Code cleanup
parent 846ddd21
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
......@@ -12,29 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import requests
import copy, grpc, json, logging, networkx, requests, threading
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import (
Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId,
ServiceTypeEnum, ServiceStatusEnum)
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.Settings import get_setting
from context.client.ContextClient import ContextClient
from service.client.ServiceClient import ServiceClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
from context.service.database.uuids.Device import device_get_uuid
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.tools.grpc.Tools import grpc_message_to_json_string
import grpc
import json
import logging
import networkx as nx
from threading import Thread
from service.client.ServiceClient import ServiceClient
from websockets.sync.client import connect
from websockets.sync.server import serve
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
LOGGER = logging.getLogger(__name__)
......@@ -48,53 +42,46 @@ context_client: ContextClient = ContextClient()
service_client: ServiceClient = ServiceClient()
EXT_HOST = str(get_setting('WS_IP_HOST'))
EXT_PORT = str(get_setting('WS_IP_PORT'))
EXT_PORT = int(get_setting('WS_IP_PORT'))
EXT_URL = 'ws://{:s}:{:d}'.format(EXT_HOST, EXT_PORT)
OWN_HOST = str(get_setting('WS_E2E_HOST'))
OWN_PORT = str(get_setting('WS_E2E_PORT'))
OWN_PORT = int(get_setting('WS_E2E_PORT'))
ALL_HOSTS = "0.0.0.0"
ALL_HOSTS = '0.0.0.0'
class SubscriptionServer(Thread):
def __init__(self):
Thread.__init__(self)
class SubscriptionServer(threading.Thread):
def run(self):
url = "ws://" + EXT_HOST + ":" + EXT_PORT
request = VNTSubscriptionRequest()
request.host = OWN_HOST
request.port = OWN_PORT
try:
LOGGER.debug("Trying to connect to {}".format(url))
websocket = connect(url)
except Exception as ex:
LOGGER.error('Error connecting to {}'.format(url))
LOGGER.debug('Trying to connect to {:s}'.format(EXT_URL))
websocket = connect(EXT_URL)
except: # pylint: disable=bare-except
LOGGER.exception('Error connecting to {:s}'.format(EXT_URL))
else:
with websocket:
LOGGER.debug("Connected to {}".format(url))
LOGGER.debug('Connected to {:s}'.format(EXT_URL))
send = grpc_message_to_json_string(request)
websocket.send(send)
LOGGER.debug("Sent: {}".format(send))
LOGGER.debug('Sent: {:s}'.format(send))
try:
message = websocket.recv()
LOGGER.debug("Received message from WebSocket: {}".format(message))
LOGGER.debug('Received message from WebSocket: {:s}'.format(message))
except Exception as ex:
LOGGER.error('Exception receiving from WebSocket: {}'.format(ex))
LOGGER.error('Exception receiving from WebSocket: {:s}'.format(ex))
self._events_server()
def _events_server(self):
all_hosts = "0.0.0.0"
try:
server = serve(self._event_received, all_hosts, int(OWN_PORT))
except Exception as ex:
LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT))
LOGGER.error('Exception!: {}'.format(ex))
server = serve(self._event_received, ALL_HOSTS, int(OWN_PORT))
except: # pylint: disable=bare-except
LOGGER.exception('Error starting server on {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
else:
with server:
LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT))
LOGGER.info('Running events server...: {:s}:{:d}'.format(ALL_HOSTS, OWN_PORT))
server.serve_forever()
......@@ -178,32 +165,36 @@ class SubscriptionServer(Thread):
class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
def __init__(self):
LOGGER.debug("Creating Servicer...")
LOGGER.debug('Creating Servicer...')
try:
LOGGER.debug("Requesting subscription")
LOGGER.debug('Requesting subscription')
sub_server = SubscriptionServer()
sub_server.start()
LOGGER.debug("Servicer Created")
LOGGER.debug('Servicer Created')
self.retrieve_external_topologies()
except Exception as ex:
LOGGER.info("Exception!: {}".format(ex))
except:
LOGGER.exception('Unhandled Exception')
def retrieve_external_topologies(self):
i = 1
while True:
try:
ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD'))
PORT = str(get_setting(f'EXT_CONTROLLER{i}_PORT'))
except Exception as e:
ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD'))
PORT = int(get_setting(f'EXT_CONTROLLER{i}_PORT'))
except: # pylint: disable=bare-except
break
try:
LOGGER.info(f'Retrieving external controller #{i}')
url = f'http://{ADD}:{PORT}/tfs-api/context/{DEFAULT_CONTEXT_NAME}/topology_details/{DEFAULT_TOPOLOGY_NAME}'
LOGGER.info(f'url= {url}')
LOGGER.info('Retrieving external controller #{:d}'.format(i))
url = 'http://{:s}:{:d}/tfs-api/context/{:s}/topology_details/{:s}'.format(
ADD, PORT, DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
)
LOGGER.info('url={:s}'.format(str(url)))
topo = requests.get(url).json()
LOGGER.info(f'Retrieved external controller #{i}')
except Exception as e:
LOGGER.info(f'Exception retrieven topology from external controler #{i}: {e}')
LOGGER.info('Retrieved external controller #{:d}'.format(i))
except: # pylint: disable=bare-except
LOGGER.exception('Exception retrieven topology from external controler #{:d}'.format(i))
topology_details = TopologyDetails(**topo)
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
......@@ -229,7 +220,7 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
for endpoint_id in request.service.service_endpoint_ids:
endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2])
graph = nx.Graph()
graph = networkx.Graph()
devices = context_client.ListDevices(Empty()).devices
......@@ -253,7 +244,7 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
graph.add_edge(eps[0], eps[1])
shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])
shortest = networkx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])
path = E2EOrchestratorReply()
path.services.append(copy.deepcopy(request.service))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment