Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import networkx as nx
import grpc
import copy
from websockets.sync.client import connect
import time
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from context.client.ContextClient import ContextClient
from context.service.database.uuids.EndPoint import endpoint_get_uuid
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.tools.grpc.Tools import grpc_message_to_json_string
from websockets.sync.server import serve
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")
context_client: ContextClient = ContextClient()
class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
def __init__(self):
LOGGER.debug("Creating Servicer...")
LOGGER.debug("Servicer Created")
try:
LOGGER.info("Requesting subscription")
self.RequestSubscription()
except Exception as E:
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply:
endpoints_ids = []
for endpoint_id in request.service.service_endpoint_ids:
endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2])
graph = nx.Graph()
devices = context_client.ListDevices(Empty()).devices
for device in devices:
endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid
for endpoint in device.device_endpoints]
for ep in endpoints_uuids:
graph.add_node(ep)
for ep in endpoints_uuids:
for ep_i in endpoints_uuids:
if ep == ep_i:
continue
graph.add_edge(ep, ep_i)
links = context_client.ListLinks(Empty()).links
for link in links:
eps = []
for endpoint_id in link.link_endpoint_ids:
eps.append(endpoint_id.endpoint_uuid.uuid)
graph.add_edge(eps[0], eps[1])
shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])
path = E2EOrchestratorReply()
path.services.append(copy.deepcopy(request.service))
for i in range(0, int(len(shortest)/2)):
conn = Connection()
ep_a_uuid = str(shortest[i*2])
ep_z_uuid = str(shortest[i*2+1])
conn.connection_id.connection_uuid.uuid = str(ep_a_uuid) + '_->_' + str(ep_z_uuid)
ep_a_id = EndPointId()
ep_a_id.endpoint_uuid.uuid = ep_a_uuid
conn.path_hops_endpoint_ids.append(ep_a_id)
ep_z_id = EndPointId()
ep_z_id.endpoint_uuid.uuid = ep_z_uuid
conn.path_hops_endpoint_ids.append(ep_z_id)
path.connections.append(conn)
EXT_HOST = "nbiservice.tfs-ip.svc.cluster.local"
EXT_PORT = "8762"
OWN_HOST = "e2e-orchestratorservice.tfs-e2e.svc.cluster.local"
OWN_PORT = "8761"
LOGGER.info("Trying to connect... to {}".format(url))
with connect(url) as websocket:
LOGGER.info("CONNECTED!!! {}")
send = grpc_message_to_json_string(request)
LOGGER.info("Sending {}".format(send))
websocket.send(send)
try:
message = websocket.recv()
except Exception as e:
LOGGER.info('Exception1!: {}'.format(e))
try:
LOGGER.info("Received ws: {}".format(message))
except Exception as e:
LOGGER.info('Exception2!: {}'.format(e))
with serve(self._event_received, "0.0.0.0", OWN_PORT, logger=LOGGER) as server:
LOGGER.info("Running subscription server...: {}:{}".format("0.0.0.0", OWN_PORT))
server.serve_forever()
LOGGER.info("Exiting subscription server...")
def _event_received(self, websocket):
for message in websocket:
LOGGER.info("Message received!!!: {}".format(message))
message_json = json.loads(message)
if 'event_type' in message_json:
pass
elif 'link_id' in message_json:
obj = Link(**message_json)
if self._check_policies(obj):
pass
elif 'link_uuid' in message_json:
obj = LinkId(**message_json)
websocket.send(message)
def _check_policies(self, link):
return True