Skip to content
Snippets Groups Projects
Commit 8b07c76a authored by Carlos Manso's avatar Carlos Manso
Browse files

update

parent c98b136b
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!225Resolve "Integrate Support for IP-E2E-Optical SDN controllers to manage hierarchical virtual topologies"
...@@ -15,11 +15,12 @@ ...@@ -15,11 +15,12 @@
import copy import copy
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, TopologyId, Device, Topology, Context, Service from common.proto.context_pb2 import Empty, Connection, ContextId, EndPointId, Link, LinkId, TopologyDetails, TopologyId, Device, Topology, Context, Service, ServiceStatus, DeviceId, ServiceTypeEnum, ServiceStatusEnum
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from service.client.ServiceClient import ServiceClient from service.client.ServiceClient import ServiceClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid from context.service.database.uuids.EndPoint import endpoint_get_uuid
from context.service.database.uuids.Device import device_get_uuid
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
import grpc import grpc
...@@ -27,9 +28,10 @@ import json ...@@ -27,9 +28,10 @@ import json
import logging import logging
import networkx as nx import networkx as nx
from threading import Thread from threading import Thread
import time
from websockets.sync.client import connect from websockets.sync.client import connect
from websockets.sync.server import serve from websockets.sync.server import serve
from common.Constants import DEFAULT_CONTEXT_NAME
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
logging.getLogger("websockets").propagate = False logging.getLogger("websockets").propagate = False
...@@ -55,50 +57,61 @@ def _event_received(websocket): ...@@ -55,50 +57,61 @@ def _event_received(websocket):
link = Link(**message_json) link = Link(**message_json)
service = Service() service = Service()
service.service_id = link.link_id.link_uuid service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
service.serivice_type = 2 # Optical service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service.service_status = 1 service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
# service_client.CreateService(service) service_client.CreateService(service)
links = context_client.ListLinks(Empty()).links
a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]
for _link in links:
for _endpoint_id in _link.link_endpoint_ids:
if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
a_ep_id = _endpoint_id
elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
z_ep_id = _endpoint_id
websocket.send(message)
service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))
service_client.UpdateService(service)
websocket.send(grpc_message_to_json_string(link))
else: else:
topology_details = TopologyDetails(**message_json) topology_details = TopologyDetails(**message_json)
context_id = topology_details.topology_id.context_id
context = Context() context = Context()
context.context_id.CopyFrom(context_id) context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context) context_client.SetContext(context)
topology_id = topology_details.topology_id
topology = Topology() topology = Topology()
topology.topology_id.CopyFrom(topology_id) topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology) context_client.SetTopology(topology)
for device in topology_details.devices: for device in topology_details.devices:
LOGGER.info('Setting Device: {}'.format(device))
context_client.SetDevice(device) context_client.SetDevice(device)
for link in topology_details.links: for link in topology_details.links:
LOGGER.info('Setting Link: {}'.format(link))
context_client.SetLink(link) context_client.SetLink(link)
def _check_policies(link):
return True
def requestSubscription(): def requestSubscription():
url = "ws://" + EXT_HOST + ":" + EXT_PORT url = "ws://" + EXT_HOST + ":" + EXT_PORT
request = VNTSubscriptionRequest() request = VNTSubscriptionRequest()
request.host = OWN_HOST request.host = OWN_HOST
request.port = OWN_PORT request.port = OWN_PORT
LOGGER.debug("Trying to connect to {}".format(url)) LOGGER.debug("Connecting to {}".format(url))
try: try:
websocket = connect(url) websocket = connect(url)
except Exception as ex: except Exception as ex:
...@@ -108,12 +121,11 @@ def requestSubscription(): ...@@ -108,12 +121,11 @@ def requestSubscription():
LOGGER.debug("Connected to {}".format(url)) LOGGER.debug("Connected to {}".format(url))
send = grpc_message_to_json_string(request) send = grpc_message_to_json_string(request)
websocket.send(send) websocket.send(send)
LOGGER.debug("Sent: {}".format(send))
try: try:
message = websocket.recv() message = websocket.recv()
LOGGER.debug("Received message from WebSocket: {}".format(message)) LOGGER.debug("Received message from WebSocket: {}".format(message))
except Exception as ex: except Exception as ex:
LOGGER.info('Exception receiving from WebSocket: {}'.format(ex)) LOGGER.error('Exception receiving from WebSocket: {}'.format(ex))
events_server() events_server()
LOGGER.info('Subscription requested') LOGGER.info('Subscription requested')
...@@ -127,22 +139,31 @@ def events_server(): ...@@ -127,22 +139,31 @@ def events_server():
except Exception as ex: except Exception as ex:
LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT)) LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT))
LOGGER.error('Exception!: {}'.format(ex)) LOGGER.error('Exception!: {}'.format(ex))
with server: else:
LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT)) with server:
server.serve_forever() LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT))
LOGGER.info("Exiting events server...") server.serve_forever()
LOGGER.info("Exiting events server...")
class SubscriptionServer():
class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
def __init__(self): def __init__(self):
LOGGER.debug("Creating Servicer...") LOGGER.debug("Creating Servicer...")
LOGGER.debug("Servicer Created") LOGGER.debug("Servicer Created")
try: try:
LOGGER.info("Requesting subscription") LOGGER.debug("Requesting subscription")
subscription_thread = Thread(target=requestSubscription) subscription_thread = Thread(target=requestSubscription)
subscription_thread.start() subscription_thread.start()
# import_optical()
except Exception as ex: except Exception as ex:
LOGGER.info("Exception!: {}".format(ex)) LOGGER.info("Exception!: {}".format(ex))
......
...@@ -15,10 +15,9 @@ ...@@ -15,10 +15,9 @@
import logging import logging
from websockets.sync.server import serve from websockets.sync.server import serve
from common.proto.vnt_manager_pb2 import VNTSubscriptionReply, VNTSubscriptionRequest from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
# from vnt_manager.client.VNTManagerClient import VNTManagerClient
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.tools.object_factory.Topology import json_topology_id from common.tools.object_factory.Topology import json_topology_id
......
...@@ -207,18 +207,9 @@ class VirtualLink(_Resource): ...@@ -207,18 +207,9 @@ class VirtualLink(_Resource):
def get(self, virtual_link_uuid : str): def get(self, virtual_link_uuid : str):
return format_grpc_to_json(self.vntmanager_client.GetVirtualLink(grpc_link_id(virtual_link_uuid))) return format_grpc_to_json(self.vntmanager_client.GetVirtualLink(grpc_link_id(virtual_link_uuid)))
def post(self, virtual_link_uuid : str): # pylint: disable=unused-argument def post(self, virtual_link_uuid : str): # pylint: disable=unused-argument
link = request.get_json() link_json = request.get_json()
LOGGER.info('---------------------------LINK received------------------------------') link = grpc_link(link_json)
LOGGER.info(link) return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(link))
LOGGER.info('----------------------------------------------------------------------')
LOGGER.info(link['link_id'])
LOGGER.info('type: {}'.format(type(link['link_id'])))
LOGGER.info('---------------------------LINK received------------------------------')
link = grpc_link(link)
return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(grpc_link(link)))
def put(self, virtual_link_uuid : str): # pylint: disable=unused-argument def put(self, virtual_link_uuid : str): # pylint: disable=unused-argument
link = request.get_json() link = request.get_json()
return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(grpc_link(link))) return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(grpc_link(link)))
......
...@@ -41,10 +41,8 @@ from typing import Any, Dict, Set ...@@ -41,10 +41,8 @@ from typing import Any, Dict, Set
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.grpc.Tools import grpc_message_to_json from common.tools.grpc.Tools import grpc_message_to_json
import time
import json import json
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool("VNTManager", "RPC") METRICS_POOL = MetricsPool("VNTManager", "RPC")
...@@ -66,11 +64,10 @@ PORT = str(8765) ...@@ -66,11 +64,10 @@ PORT = str(8765)
WEBSOCKET = None WEBSOCKET = None
def send_msg(msg): def send_msg(msg):
LOGGER.info('-------------------------SENDING------------------------------') try:
LOGGER.info(msg) WEBSOCKET.send(msg)
LOGGER.info('-------------------------------------------------------') except Exception as e:
WEBSOCKET.send(msg) LOGGER.info(e)
class VNTMEventDispatcher(threading.Thread): class VNTMEventDispatcher(threading.Thread):
...@@ -109,6 +106,7 @@ class VNTMEventDispatcher(threading.Thread): ...@@ -109,6 +106,7 @@ class VNTMEventDispatcher(threading.Thread):
LOGGER.debug('Connecting to {}'.format(url)) LOGGER.debug('Connecting to {}'.format(url))
try: try:
LOGGER.info("Connecting to events server...: {}".format(url))
WEBSOCKET = connect(url) WEBSOCKET = connect(url)
except Exception as ex: except Exception as ex:
LOGGER.error('Error connecting to {}'.format(url)) LOGGER.error('Error connecting to {}'.format(url))
...@@ -128,9 +126,7 @@ class VNTMEventDispatcher(threading.Thread): ...@@ -128,9 +126,7 @@ class VNTMEventDispatcher(threading.Thread):
event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
if event is None: continue if event is None: continue
LOGGER.info('event!!!!!!!!!!!!!!!!!!!!!!: {}'.format(event))
topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id)) topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
LOGGER.info('topodetails..................................... ')
to_send = grpc_message_to_json_string(topology_details) to_send = grpc_message_to_json_string(topology_details)
...@@ -180,15 +176,22 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): ...@@ -180,15 +176,22 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
try: try:
send_msg(request) send_msg(grpc_message_to_json_string(request))
message = WEBSOCKET.recv() message = WEBSOCKET.recv()
message_json = json.loads(message) message_json = json.loads(message)
link = Link(**message_json) link = Link(**message_json)
context_client.SetLink(link) context_client.SetLink(link)
except Exception as e: except Exception as e:
LOGGER.error('Exception setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) LOGGER.error('Exception setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e))
return request.linkd_id return request.link_id
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment