Skip to content
Snippets Groups Projects
Commit afeea9f9 authored by Carlos Manso's avatar Carlos Manso
Browse files

update

parent 8b07c76a
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!225Resolve "Integrate Support for IP-E2E-Optical SDN controllers to manage hierarchical virtual topologies"
......@@ -15,7 +15,9 @@
import copy
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import Empty, Connection, ContextId, EndPointId, Link, LinkId, TopologyDetails, TopologyId, Device, Topology, Context, Service, ServiceStatus, DeviceId, ServiceTypeEnum, ServiceStatusEnum
from common.proto.context_pb2 import (
Empty, Connection, EndPointId, Link, TopologyDetails, Topology, Context, Service, ServiceTypeEnum,
ServiceStatusEnum)
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from context.client.ContextClient import ContextClient
from service.client.ServiceClient import ServiceClient
......@@ -34,7 +36,7 @@ from common.Constants import DEFAULT_CONTEXT_NAME
LOGGER = logging.getLogger(__name__)
logging.getLogger("websockets").propagate = False
logging.getLogger("websockets").propagate = True
METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")
......@@ -48,121 +50,115 @@ EXT_PORT = "8762"
OWN_HOST = "e2e-orchestratorservice.tfs-e2e.svc.cluster.local"
OWN_PORT = "8761"
ALL_HOSTS = "0.0.0.0"
def _event_received(websocket):
for message in websocket:
message_json = json.loads(message)
if 'link_id' in message_json:
link = Link(**message_json)
service = Service()
service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
service_client.CreateService(service)
links = context_client.ListLinks(Empty()).links
a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]
for _link in links:
for _endpoint_id in _link.link_endpoint_ids:
if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
a_ep_id = _endpoint_id
elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
z_ep_id = _endpoint_id
class SubscriptionServer(Thread):
def __init__(self):
Thread.__init__(self)
def run(self):
url = "ws://" + EXT_HOST + ":" + EXT_PORT
request = VNTSubscriptionRequest()
request.host = OWN_HOST
request.port = OWN_PORT
try:
LOGGER.debug("Trying to connect to {}".format(url))
websocket = connect(url)
except Exception as ex:
LOGGER.error('Error connecting to {}'.format(url))
else:
with websocket:
LOGGER.debug("Connected to {}".format(url))
send = grpc_message_to_json_string(request)
websocket.send(send)
LOGGER.debug("Sent: {}".format(send))
try:
message = websocket.recv()
LOGGER.debug("Received message from WebSocket: {}".format(message))
except Exception as ex:
LOGGER.info('Exception receiving from WebSocket: {}'.format(ex))
self._events_server()
service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))
service_client.UpdateService(service)
websocket.send(grpc_message_to_json_string(link))
def _events_server(self):
all_hosts = "0.0.0.0"
try:
server = serve(self._event_received, all_hosts, int(OWN_PORT))
except Exception as ex:
LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT))
LOGGER.error('Exception!: {}'.format(ex))
else:
topology_details = TopologyDetails(**message_json)
with server:
LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT))
server.serve_forever()
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology = Topology()
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
def _event_received(self, connection):
for message in connection:
message_json = json.loads(message)
for device in topology_details.devices:
context_client.SetDevice(device)
if 'link_id' in message_json:
link = Link(**message_json)
for link in topology_details.links:
context_client.SetLink(link)
service = Service()
service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
service_client.CreateService(service)
links = context_client.ListLinks(Empty()).links
a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]
for _link in links:
for _endpoint_id in _link.link_endpoint_ids:
if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
a_ep_id = _endpoint_id
elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
z_ep_id = _endpoint_id
def requestSubscription():
url = "ws://" + EXT_HOST + ":" + EXT_PORT
request = VNTSubscriptionRequest()
request.host = OWN_HOST
request.port = OWN_PORT
LOGGER.debug("Connecting to {}".format(url))
try:
websocket = connect(url)
except Exception as ex:
LOGGER.error('Error connecting to {}'.format(url))
else:
with websocket:
LOGGER.debug("Connected to {}".format(url))
send = grpc_message_to_json_string(request)
websocket.send(send)
try:
message = websocket.recv()
LOGGER.debug("Received message from WebSocket: {}".format(message))
except Exception as ex:
LOGGER.error('Exception receiving from WebSocket: {}'.format(ex))
events_server()
LOGGER.info('Subscription requested')
service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))
service_client.UpdateService(service)
connection.send(grpc_message_to_json_string(link))
def events_server():
all_hosts = "0.0.0.0"
else:
topology_details = TopologyDetails(**message_json)
try:
server = serve(_event_received, all_hosts, int(OWN_PORT))
except Exception as ex:
LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT))
LOGGER.error('Exception!: {}'.format(ex))
else:
with server:
LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT))
server.serve_forever()
LOGGER.info("Exiting events server...")
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology = Topology()
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
for device in topology_details.devices:
context_client.SetDevice(device)
for link in topology_details.links:
context_client.SetLink(link)
class SubscriptionServer():
class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
def __init__(self):
LOGGER.debug("Creating Servicer...")
LOGGER.debug("Servicer Created")
try:
LOGGER.debug("Requesting subscription")
subscription_thread = Thread(target=requestSubscription)
subscription_thread.start()
# import_optical()
sub_server = SubscriptionServer()
sub_server.start()
LOGGER.debug("Servicer Created")
except Exception as ex:
LOGGER.info("Exception!: {}".format(ex))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment