Newer
Older
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId,
ServiceTypeEnum, ServiceStatusEnum)
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from context.client.ContextClient import ContextClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
from common.tools.grpc.Tools import grpc_message_to_json_string
import logging
import networkx as nx
from threading import Thread
from websockets.sync.client import connect
from websockets.sync.server import serve
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
LOGGER = logging.getLogger(__name__)
logging.getLogger("requests.packages.urllib3").propagate = True
METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")
context_client: ContextClient = ContextClient()
EXT_HOST = str(get_setting('WS_IP_HOST'))
EXT_PORT = str(get_setting('WS_IP_PORT'))
OWN_HOST = str(get_setting('WS_E2E_HOST'))
OWN_PORT = str(get_setting('WS_E2E_PORT'))
class SubscriptionServer(Thread):
def __init__(self):
Thread.__init__(self)
def run(self):
url = "ws://" + EXT_HOST + ":" + EXT_PORT
request = VNTSubscriptionRequest()
request.host = OWN_HOST
request.port = OWN_PORT
try:
LOGGER.debug("Trying to connect to {}".format(url))
websocket = connect(url)
except Exception as ex:
LOGGER.error('Error connecting to {}'.format(url))
else:
with websocket:
LOGGER.debug("Connected to {}".format(url))
send = grpc_message_to_json_string(request)
websocket.send(send)
LOGGER.debug("Sent: {}".format(send))
try:
message = websocket.recv()
LOGGER.debug("Received message from WebSocket: {}".format(message))
except Exception as ex:
try:
server = serve(self._event_received, all_hosts, int(OWN_PORT))
except Exception as ex:
LOGGER.error('Error starting server on {}:{}'.format(all_hosts, OWN_PORT))
LOGGER.error('Exception!: {}'.format(ex))
with server:
LOGGER.info("Running events server...: {}:{}".format(all_hosts, OWN_PORT))
server.serve_forever()
service = Service()
service.service_id.service_uuid.uuid = link.link_id.link_uuid.uuid
service.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
service.service_type = ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
service_client.CreateService(service)
a_device_uuid = device_get_uuid(link.link_endpoint_ids[0].device_id)
a_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[0])[2]
z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]
for _link in links:
for _endpoint_id in _link.link_endpoint_ids:
if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == a_endpoint_uuid:
a_ep_id = _endpoint_id
elif _endpoint_id.device_id.device_uuid.uuid == z_device_uuid and \
_endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
z_ep_id = _endpoint_id
error_msg = f'Could not get VNT link endpoints\
\n\ta_endpoint_uuid= {a_endpoint_uuid}\
\n\tz_endpoint_uuid= {z_device_uuid}'
service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))
service_client.UpdateService(service)
re_svc = context_client.GetService(service.service_id)
link_id = LinkId(**message_json)
service_id = ServiceId()
service_id.service_uuid.uuid = link_id.link_uuid.uuid
service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
for device in topology_details.devices:
context_client.SetDevice(device)
for link in topology_details.links:
context_client.SetLink(link)
class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
def __init__(self):
LOGGER.debug("Creating Servicer...")
sub_server = SubscriptionServer()
sub_server.start()
LOGGER.debug("Servicer Created")
except Exception as ex:
LOGGER.info("Exception!: {}".format(ex))
def retrieve_external_topologies(self):
i = 1
while True:
try:
ADD = str(get_setting(f'EXT_CONTROLLER{i}_ADD'))
PORT = str(get_setting(f'EXT_CONTROLLER{i}_PORT'))
except Exception as e:
break
try:
url = f'http://{ADD}:{PORT}/tfs-api/context/{DEFAULT_CONTEXT_NAME}/topology_details/{DEFAULT_TOPOLOGY_NAME}'
except Exception as e:
LOGGER.info(f'Exception retrieven topology from external controler #{i}: {e}')
topology_details = TopologyDetails(**topo)
context = Context()
context.context_id.context_uuid.uuid = topology_details.topology_id.context_id.context_uuid.uuid
context_client.SetContext(context)
topology = Topology()
topology.topology_id.context_id.CopyFrom(context.context_id)
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology)
for device in topology_details.devices:
context_client.SetDevice(device)
for link in topology_details.links:
context_client.SetLink(link)
i+=1
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply:
endpoints_ids = []
for endpoint_id in request.service.service_endpoint_ids:
endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2])
graph = nx.Graph()
devices = context_client.ListDevices(Empty()).devices
for device in devices:
endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid
for endpoint in device.device_endpoints]
for ep in endpoints_uuids:
graph.add_node(ep)
for ep in endpoints_uuids:
for ep_i in endpoints_uuids:
if ep == ep_i:
continue
graph.add_edge(ep, ep_i)
links = context_client.ListLinks(Empty()).links
for link in links:
eps = []
for endpoint_id in link.link_endpoint_ids:
eps.append(endpoint_id.endpoint_uuid.uuid)
graph.add_edge(eps[0], eps[1])
shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])
path = E2EOrchestratorReply()
path.services.append(copy.deepcopy(request.service))
for i in range(0, int(len(shortest)/2)):
conn = Connection()
ep_a_uuid = str(shortest[i*2])
ep_z_uuid = str(shortest[i*2+1])
conn.connection_id.connection_uuid.uuid = str(ep_a_uuid) + '_->_' + str(ep_z_uuid)
ep_a_id = EndPointId()
ep_a_id.endpoint_uuid.uuid = ep_a_uuid
conn.path_hops_endpoint_ids.append(ep_a_id)
ep_z_id = EndPointId()
ep_z_id.endpoint_uuid.uuid = ep_z_uuid
conn.path_hops_endpoint_ids.append(ep_z_id)
path.connections.append(conn)