Skip to content
Snippets Groups Projects
Commit 84fcce79 authored by Carlos Manso's avatar Carlos Manso
Browse files

update

parent e85f3829
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!225Resolve "Integrate Support for IP-E2E-Optical SDN controllers to manage hierarchical virtual topologies"
...@@ -16,7 +16,7 @@ import copy ...@@ -16,7 +16,7 @@ import copy
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import ( from common.proto.context_pb2 import (
Empty, Connection, EndPointId, Link, TopologyDetails, Topology, Context, Service, ServiceTypeEnum, Empty, Connection, EndPointId, Link, LinkId, TopologyDetails, Topology, Context, Service, ServiceId, ServiceTypeEnum,
ServiceStatusEnum) ServiceStatusEnum)
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
...@@ -96,8 +96,10 @@ class SubscriptionServer(Thread): ...@@ -96,8 +96,10 @@ class SubscriptionServer(Thread):
def _event_received(self, connection): def _event_received(self, connection):
LOGGER.info("EVENT received!")
for message in connection: for message in connection:
message_json = json.loads(message) message_json = json.loads(message)
# LOGGER.info("message_json: {}".format(message_json))
if 'link_id' in message_json: if 'link_id' in message_json:
link = Link(**message_json) link = Link(**message_json)
...@@ -115,7 +117,6 @@ class SubscriptionServer(Thread): ...@@ -115,7 +117,6 @@ class SubscriptionServer(Thread):
z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id) z_device_uuid = device_get_uuid(link.link_endpoint_ids[1].device_id)
z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2] z_endpoint_uuid = endpoint_get_uuid(link.link_endpoint_ids[1])[2]
for _link in links: for _link in links:
for _endpoint_id in _link.link_endpoint_ids: for _endpoint_id in _link.link_endpoint_ids:
if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \ if _endpoint_id.device_id.device_uuid.uuid == a_device_uuid and \
...@@ -125,14 +126,29 @@ class SubscriptionServer(Thread): ...@@ -125,14 +126,29 @@ class SubscriptionServer(Thread):
_endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid: _endpoint_id.endpoint_uuid.uuid == z_endpoint_uuid:
z_ep_id = _endpoint_id z_ep_id = _endpoint_id
if (not 'a_ep_id' in locals()) or (not 'z_ep_id' in locals()):
error_msg = 'Could not get VNT link endpoints'
LOGGER.error(error_msg)
connection.send(error_msg)
return
service.service_endpoint_ids.append(copy.deepcopy(a_ep_id)) service.service_endpoint_ids.append(copy.deepcopy(a_ep_id))
service.service_endpoint_ids.append(copy.deepcopy(z_ep_id)) service.service_endpoint_ids.append(copy.deepcopy(z_ep_id))
service_client.UpdateService(service) # service_client.UpdateService(service)
connection.send(grpc_message_to_json_string(link)) connection.send(grpc_message_to_json_string(link))
elif 'link_uuid' in message_json:
LOGGER.info('REMOVING VIRTUAL LINK')
link_id = LinkId(**message_json)
service_id = ServiceId()
service_id.service_uuid.uuid = link_id.link_uuid.uuid
service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_NAME
# service_client.DeleteService(service_id)
connection.send(grpc_message_to_json_string(link_id))
else: else:
LOGGER.info('TOPOLOGY')
topology_details = TopologyDetails(**message_json) topology_details = TopologyDetails(**message_json)
context = Context() context = Context()
...@@ -140,6 +156,7 @@ class SubscriptionServer(Thread): ...@@ -140,6 +156,7 @@ class SubscriptionServer(Thread):
context_client.SetContext(context) context_client.SetContext(context)
topology = Topology() topology = Topology()
topology.topology_id.context_id.CopyFrom(context.context_id)
topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid topology.topology_id.topology_uuid.uuid = topology_details.topology_id.topology_uuid.uuid
context_client.SetTopology(topology) context_client.SetTopology(topology)
......
...@@ -79,6 +79,8 @@ RUN python3 -m pip install -r vnt_manager/requirements.txt ...@@ -79,6 +79,8 @@ RUN python3 -m pip install -r vnt_manager/requirements.txt
# Add component files into working directory # Add component files into working directory
COPY --chown=teraflow:teraflow ./src/context/. context COPY --chown=teraflow:teraflow ./src/context/. context
COPY --chown=teraflow:teraflow ./src/vnt_manager/. vnt_manager COPY --chown=teraflow:teraflow ./src/vnt_manager/. vnt_manager
COPY --chown=teraflow:teraflow ./src/device/. device
# Start the service # Start the service
ENTRYPOINT ["python", "-m", "vnt_manager.service"] ENTRYPOINT ["python", "-m", "vnt_manager.service"]
...@@ -41,6 +41,7 @@ from typing import Any, Dict, Set ...@@ -41,6 +41,7 @@ from typing import Any, Dict, Set
from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.grpc.Tools import grpc_message_to_json from common.tools.grpc.Tools import grpc_message_to_json
from .vntm_config_device import configure, deconfigure
import json import json
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -61,13 +62,6 @@ HOST = "10.1.1.83" ...@@ -61,13 +62,6 @@ HOST = "10.1.1.83"
PORT = str(8765) PORT = str(8765)
WEBSOCKET = None
def send_msg(msg):
try:
WEBSOCKET.send(msg)
except Exception as e:
LOGGER.info(e)
class VNTMEventDispatcher(threading.Thread): class VNTMEventDispatcher(threading.Thread):
...@@ -86,8 +80,18 @@ class VNTMEventDispatcher(threading.Thread): ...@@ -86,8 +80,18 @@ class VNTMEventDispatcher(threading.Thread):
def stop(self): def stop(self):
self._terminate.set() self._terminate.set()
def send_msg(self, msg):
try:
self.websocket.send(msg)
except Exception as e:
LOGGER.info(e)
def recv_msg(self):
message = self.websocket.recv()
return message
def run(self) -> None: def run(self) -> None:
global WEBSOCKET
time.sleep(5) time.sleep(5)
events_collector = EventsCollector( events_collector = EventsCollector(
...@@ -107,7 +111,7 @@ class VNTMEventDispatcher(threading.Thread): ...@@ -107,7 +111,7 @@ class VNTMEventDispatcher(threading.Thread):
try: try:
LOGGER.info("Connecting to events server...: {}".format(url)) LOGGER.info("Connecting to events server...: {}".format(url))
WEBSOCKET = connect(url) self.websocket = connect(url)
except Exception as ex: except Exception as ex:
LOGGER.error('Error connecting to {}'.format(url)) LOGGER.error('Error connecting to {}'.format(url))
else: else:
...@@ -120,17 +124,18 @@ class VNTMEventDispatcher(threading.Thread): ...@@ -120,17 +124,18 @@ class VNTMEventDispatcher(threading.Thread):
except Exception as ex: except Exception as ex:
LOGGER.warning('No topology found') LOGGER.warning('No topology found')
else: else:
send_msg(grpc_message_to_json_string(topology_details)) self.send_msg(grpc_message_to_json_string(topology_details))
while not self._terminate.is_set(): while not self._terminate.is_set():
event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
LOGGER.info('Event type: {}'.format(event))
if event is None: continue if event is None: continue
LOGGER.debug('Received event: {}'.format(event))
topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id)) topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
to_send = grpc_message_to_json_string(topology_details) to_send = grpc_message_to_json_string(topology_details)
send_msg(to_send) self.send_msg(to_send)
LOGGER.info('Exiting') LOGGER.info('Exiting')
events_collector.stop() events_collector.stop()
...@@ -149,11 +154,11 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): ...@@ -149,11 +154,11 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
reply = VNTSubscriptionReply() reply = VNTSubscriptionReply()
reply.subscription = "OK" reply.subscription = "OK"
event_dispatcher = VNTMEventDispatcher(request.host, int(request.port)) self.event_dispatcher = VNTMEventDispatcher(request.host, int(request.port))
self.host = request.host self.host = request.host
self.port = request.port self.port = request.port
event_dispatcher.start() self.event_dispatcher.start()
return reply return reply
...@@ -176,17 +181,11 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): ...@@ -176,17 +181,11 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
try: try:
send_msg(grpc_message_to_json_string(request)) LOGGER.info('SETTING virtual link')
message = WEBSOCKET.recv() self.event_dispatcher.send_msg(grpc_message_to_json_string(request))
# configure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
response = self.event_dispatcher.recv_msg()
message_json = json.loads(response)
message_json = json.loads(message)
link = Link(**message_json) link = Link(**message_json)
context_client.SetLink(link) context_client.SetLink(link)
except Exception as e: except Exception as e:
...@@ -195,6 +194,22 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): ...@@ -195,6 +194,22 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
# TODO try:
self.event_dispatcher.send_msg(grpc_message_to_json_string(request))
# deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
response = self.event_dispatcher.recv_msg()
message_json = json.loads(response)
link_id = LinkId(**message_json)
context_client.RemoveLink(link_id)
LOGGER.info('Removed')
except Exception as e:
msg_error = 'Exception removing virtual link={}\n\t{}'.format(request.link_uuid.uuid, e)
LOGGER.error(msg_error)
return msg_error
else:
context_client.RemoveLink(request)
LOGGER.info('Removed')
return Empty() return Empty()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict
from common.proto.context_pb2 import ConfigRule
from common.tools.context_queries.Device import get_device
from common.tools.object_factory.ConfigRule import json_config_rule_set, json_config_rule_delete
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
##### Config Rule Composers ####################################################
def compose_config_rule(resource_key, resource_value, delete) -> Dict:
json_config_rule = json_config_rule_delete if delete else json_config_rule_set
return ConfigRule(**json_config_rule(resource_key, resource_value))
def network_instance(ni_name, ni_type, ni_router_id=None, ni_route_distinguisher=None, delete=False) -> Dict:
path = '/network_instance[{:s}]'.format(ni_name)
data = {'name': ni_name, 'type': ni_type}
if ni_router_id is not None: data['router_id'] = ni_router_id
if ni_route_distinguisher is not None: data['route_distinguisher'] = ni_route_distinguisher
return compose_config_rule(path, data, delete)
def network_instance_add_protocol_bgp(ni_name, ni_type, ni_router_id, ni_bgp_as, neighbors=[], delete=False)-> Dict:
path = '/network_instance[{:s}]/protocols[BGP]'.format(ni_name)
data = {
'name': ni_name, 'type': ni_type, 'router_id': ni_router_id, 'identifier': 'BGP',
'protocol_name': ni_bgp_as, 'as': ni_bgp_as
}
if len(neighbors) > 0:
data['neighbors'] = [
{'ip_address': neighbor_ip_address, 'remote_as': neighbor_remote_as}
for neighbor_ip_address, neighbor_remote_as in neighbors
]
return compose_config_rule(path, data, delete)
def network_instance_add_protocol_direct(ni_name, ni_type, delete=False) -> Dict:
path = '/network_instance[{:s}]/protocols[DIRECTLY_CONNECTED]'.format(ni_name)
data = {
'name': ni_name, 'type': ni_type, 'identifier': 'DIRECTLY_CONNECTED',
'protocol_name': 'DIRECTLY_CONNECTED'
}
return compose_config_rule(path, data, delete)
def network_instance_add_protocol_static(ni_name, ni_type, delete=False) -> Dict:
path = '/network_instance[{:s}]/protocols[STATIC]'.format(ni_name)
data = {
'name': ni_name, 'type': ni_type, 'identifier': 'STATIC',
'protocol_name': 'STATIC'
}
return compose_config_rule(path, data, delete)
def network_instance_add_table_connection(
ni_name, src_protocol, dst_protocol, address_family, default_import_policy, bgp_as=None, delete=False
) -> Dict:
path = '/network_instance[{:s}]/table_connections[{:s}][{:s}][{:s}]'.format(
ni_name, src_protocol, dst_protocol, address_family
)
data = {
'name': ni_name, 'src_protocol': src_protocol, 'dst_protocol': dst_protocol,
'address_family': address_family, 'default_import_policy': default_import_policy,
}
if bgp_as is not None: data['as'] = bgp_as
return compose_config_rule(path, data, delete)
def interface(
name, index, description=None, if_type=None, vlan_id=None, mtu=None, ipv4_address_prefix=None,
enabled=None, delete=False
) -> Dict:
path = '/interface[{:s}]/subinterface[{:d}]'.format(name, index)
data = {'name': name, 'index': index}
if description is not None: data['description'] = description
if if_type is not None: data['type' ] = if_type
if vlan_id is not None: data['vlan_id' ] = vlan_id
if mtu is not None: data['mtu' ] = mtu
if enabled is not None: data['enabled' ] = enabled
if ipv4_address_prefix is not None:
ipv4_address, ipv4_prefix = ipv4_address_prefix
data['address_ip' ] = ipv4_address
data['address_prefix'] = ipv4_prefix
return compose_config_rule(path, data, delete)
def network_instance_interface(ni_name, ni_type, if_name, if_index, delete=False) -> Dict:
path = '/network_instance[{:s}]/interface[{:s}.{:d}]'.format(ni_name, if_name, if_index)
data = {'name': ni_name, 'type': ni_type, 'id': if_name, 'interface': if_name, 'subinterface': if_index}
return compose_config_rule(path, data, delete)
# configure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
# deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
def configure(router_a, port_a, router_b, port_b, ni_name):
context_client = ContextClient()
device_client = DeviceClient()
client_if_name = 'ce1'
client_if_addr = {'CSGW1': ('192.168.10.1', 24), 'CSGW2': ('192.168.20.1', 24)}
bgp_router_addresses = {'CSGW1': '192.168.150.1', 'CSGW2': '192.168.150.2'}
locations = [
{'router': router_a, 'port': port_a, 'neighbor': router_b},
{'router': router_b, 'port': port_b, 'neighbor': router_a},
]
for location in locations:
router = location['router']
port = location['port']
neighbor = location['neighbor']
client_ipv4_address_prefix = client_if_addr[router]
bgp_router_address = bgp_router_addresses[router]
bgp_neighbor_address = bgp_router_addresses[neighbor]
config_rules = [
network_instance(ni_name, 'L3VRF', bgp_router_address, '65001:1'),
network_instance_add_protocol_direct(ni_name, 'L3VRF'),
network_instance_add_protocol_static(ni_name, 'L3VRF'),
network_instance_add_protocol_bgp(ni_name, 'L3VRF', bgp_router_address, '65001', neighbors=[
(bgp_neighbor_address, '65001')
]),
network_instance_add_table_connection(
ni_name, 'DIRECTLY_CONNECTED', 'BGP', 'IPV4', 'ACCEPT_ROUTE', bgp_as='65001'
),
network_instance_add_table_connection(
ni_name, 'STATIC', 'BGP', 'IPV4', 'ACCEPT_ROUTE', bgp_as='65001'
),
interface(client_if_name, 0, if_type='ethernetCsmacd', mtu=1500),
network_instance_interface(ni_name, 'L3VRF', client_if_name, 0),
interface(client_if_name, 0, if_type='ethernetCsmacd', mtu=1500,
ipv4_address_prefix=client_ipv4_address_prefix, enabled=True),
interface(port, 0, if_type='ethernetCsmacd', mtu=1500),
network_instance_interface(ni_name, 'L3VRF', port, 0),
interface(port, 0, if_type='ethernetCsmacd', mtu=1500,
ipv4_address_prefix=(bgp_router_address, 24), enabled=True),
]
device = get_device(
context_client, router, rw_copy=True, include_endpoints=False,
include_config_rules=False, include_components=False
)
device.device_config.config_rules.extend(config_rules)
device_client.ConfigureDevice(device)
def deconfigure(router_a, port_a, router_b, port_b, ni_name):
context_client = ContextClient()
device_client = DeviceClient()
client_if_name = 'ce1'
locations = [
{'router': router_a, 'port': port_a, 'neighbor': router_b},
{'router': router_b, 'port': port_b, 'neighbor': router_a},
]
for location in locations:
router = location['router']
port = location['port']
#neighbor = location['neighbor']
config_rules = [
network_instance_interface(ni_name, 'L3VRF', client_if_name, 0, delete=True),
network_instance_interface(ni_name, 'L3VRF', port, 0, delete=True),
#interface(client_if_name, 0, delete=True),
#interface(port, 0, delete=True),
network_instance(ni_name, 'L3VRF', delete=True),
]
device = get_device(
context_client, router, rw_copy=True, include_endpoints=False,
include_config_rules=False, include_components=False
)
device.device_config.config_rules.extend(config_rules)
device_client.ConfigureDevice(device)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment