Skip to content
Snippets Groups Projects
Commit 8a9e716b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

VNT Manager component:

- Updated code to use Kafka topics for recommendations to NBI
parent 3f9c2b70
No related branches found
No related tags found
4 merge requests!346Draft: support for restconf protocol,!345Draft: support ipinfusion devices via netconf,!328Resolve "(CTTC) Update recommendations to use SocketIO on NBI and E2E Orch components",!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
...@@ -12,149 +12,83 @@ ...@@ -12,149 +12,83 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import grpc import grpc, json, logging, uuid
import json from confluent_kafka import Consumer as KafkaConsumer
import logging from confluent_kafka import Producer as KafkaProducer
import threading from confluent_kafka import KafkaError
import time
from websockets.sync.client import connect
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import ContextId, Empty, Link, LinkId, LinkList, TopologyId from common.proto.context_pb2 import Empty, Link, LinkId, LinkList
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id from common.tools.kafka.Variables import KafkaConfig, KafkaTopic
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from .vntm_config_device import configure, deconfigure from .vntm_config_device import configure, deconfigure
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool("VNTManager", "RPC") METRICS_POOL = MetricsPool("VNTManager", "RPC")
context_client: ContextClient = ContextClient()
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
GET_EVENT_TIMEOUT = 0.5
class VNTMEventDispatcher(threading.Thread):
def __init__(self, host, port) -> None:
LOGGER.debug('Creating VNTM connector...')
self.host = host
self.port = port
super().__init__(name='VNTMEventDispatcher', daemon=True)
self._terminate = threading.Event()
LOGGER.debug('VNTM connector created')
def start(self) -> None:
self._terminate.clear()
return super().start()
def stop(self):
self._terminate.set()
def send_msg(self, msg):
try:
self.websocket.send(msg)
except Exception as e:
LOGGER.exception('Unable to send message')
def recv_msg(self):
message = self.websocket.recv()
return message
def run(self) -> None:
events_collector = EventsCollector(
context_client,
log_events_received = True,
activate_context_collector = True,
activate_topology_collector = True,
activate_device_collector = True,
activate_link_collector = True,
activate_service_collector = False,
activate_slice_collector = False,
activate_connection_collector = False,
)
events_collector.start()
try:
url = "ws://" + str(self.host) + ":" + str(self.port)
LOGGER.info("Connecting to events server...: {:s}".format(url))
self.websocket = connect(url)
except Exception as ex:
MSG = 'Error connecting to {:s}'
LOGGER.exception(MSG.format(str(url)))
else:
LOGGER.info('Connected to {:s}'.format(url))
context_id = json_context_id(DEFAULT_CONTEXT_NAME)
topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id)
try:
topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
except Exception as ex:
LOGGER.warning('No topology found')
else:
self.send_msg(grpc_message_to_json_string(topology_details))
while not self._terminate.is_set():
event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
if event is None: continue
LOGGER.debug('Event type: {}'.format(event))
topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
to_send = grpc_message_to_json_string(topology_details)
self.send_msg(to_send)
LOGGER.info('Exiting')
events_collector.stop()
class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
def __init__(self): def __init__(self):
LOGGER.debug("Creating Servicer...") LOGGER.debug("Creating Servicer...")
LOGGER.debug("Servicer Created") self.context_client = ContextClient()
self.links = [] self.links = []
LOGGER.debug("Servicer Created")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def VNTSubscript(self, request: VNTSubscriptionRequest, context: grpc.ServicerContext) -> VNTSubscriptionReply:
LOGGER.info("Subscript request: {:s}".format(str(grpc_message_to_json(request))))
reply = VNTSubscriptionReply()
reply.subscription = "OK"
self.event_dispatcher = VNTMEventDispatcher(request.host, int(request.port))
self.host = request.host
self.port = request.port
LOGGER.info('sleeping 5...')
time.sleep(5)
self.event_dispatcher.start()
return reply
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList: def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
return [link for link in context_client.ListLinks(Empty()).links if link.virtual] links = self.context_client.ListLinks(Empty()).links
return [link for link in links if link.virtual]
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
link = context_client.GetLink(request) link = self.context_client.GetLink(request)
return link if link.virtual else Empty() return link if link.virtual else Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
try: try:
LOGGER.info('SETTING virtual link') LOGGER.info('[SetVirtualLink] request={:s}'.format(grpc_message_to_json_string(request)))
self.event_dispatcher.send_msg(grpc_message_to_json_string(request)) request_key = str(uuid.uuid4())
kafka_producer = KafkaProducer({
'bootstrap.servers' : KafkaConfig.get_kafka_address()
})
vntm_request = json.dumps({
'event': 'vlink_create', 'data': grpc_message_to_json_string(request)
}).encode('utf-8')
LOGGER.info('[SetVirtualLink] vntm_request={:s}'.format(str(vntm_request)))
kafka_producer.produce(
KafkaTopic.VNTMANAGER_REQUEST.value, key=request_key, value=vntm_request
)
kafka_producer.flush()
kafka_consumer = KafkaConsumer({
'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : str(uuid.uuid4()),
'auto.offset.reset' : 'latest'
})
kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value])
while True:
receive_msg = kafka_consumer.poll(2.0)
if receive_msg is None: continue
LOGGER.info('[SetVirtualLink] receive_msg={:s}'.format(str(receive_msg)))
if receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue
LOGGER.error('Consumer error: {:s}'.format(str(receive_msg.error())))
break
reply_key = receive_msg.key().decode('utf-8')
if reply_key == request_key: break
link = Link(**json.loads(receive_msg.value().decode('utf-8')))
# at this point, we know the request was accepted and an optical connection was created
# configure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1') # configure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
response = self.event_dispatcher.recv_msg() self.context_client.SetLink(link)
message_json = json.loads(response) except: # pylint: disable=bare-except
link = Link(**message_json) MSG = 'Exception setting virtual link={:s}'
context_client.SetLink(link)
except Exception as e:
MSG = 'Exception setting virtual link={:s}')
LOGGER.exception(MSG.format(str(request.link_id.link_uuid.uuid))) LOGGER.exception(MSG.format(str(request.link_id.link_uuid.uuid)))
return request.link_id return request.link_id
...@@ -162,20 +96,45 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): ...@@ -162,20 +96,45 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
try: try:
LOGGER.debug('Removing virtual link') LOGGER.debug('Removing virtual link')
self.event_dispatcher.send_msg(grpc_message_to_json_string(request)) request_key = str(uuid.uuid4())
# deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
response = self.event_dispatcher.recv_msg() kafka_producer = KafkaProducer({
message_json = json.loads(response) 'bootstrap.servers' : KafkaConfig.get_kafka_address()
link_id = LinkId(**message_json) })
context_client.RemoveLink(link_id)
vntm_request = json.dumps({
'event': 'vlink_remove', 'data': grpc_message_to_json_string(request)
}).encode('utf-8')
LOGGER.info('[RemoveVirtualLink] vntm_request={:s}'.format(str(vntm_request)))
kafka_producer.produce(
KafkaTopic.VNTMANAGER_REQUEST.value, key=request_key, value=vntm_request
)
kafka_producer.flush()
kafka_consumer = KafkaConsumer({
'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : str(uuid.uuid4()),
'auto.offset.reset' : 'latest'
})
kafka_consumer.subscribe([KafkaTopic.VNTMANAGER_RESPONSE.value])
while True:
receive_msg = kafka_consumer.poll(2.0)
if receive_msg is None: continue
if receive_msg.error():
if receive_msg.error().code() == KafkaError._PARTITION_EOF: continue
LOGGER.error('Consumer error: {:s}'.format(str(receive_msg.error())))
break
reply_key = receive_msg.key().decode('utf-8')
if reply_key == request_key: break
link_id = LinkId(**json.loads(receive_msg.value().decode('utf-8')))
# at this point, we know the request was accepted and an optical connection was deleted
# deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
self.context_client.RemoveLink(link_id)
LOGGER.info('Removed') LOGGER.info('Removed')
except Exception as e: except: # pylint: disable=bare-except
MSG = 'Exception removing virtual link={:s}' MSG = 'Exception removing virtual link={:s}'
LOGGER.exception(MSG.format(str(request.link_uuid.uuid))) LOGGER.exception(MSG.format(str(request.link_uuid.uuid)))
return msg_error
else:
context_client.RemoveLink(request)
LOGGER.info('Removed')
return Empty() return Empty()
# Copyright 2022-2024 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
import json
import logging
import threading
import time
from websockets.sync.client import connect
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import ContextId, Empty, Link, LinkId, LinkList, TopologyId
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from context.client.ContextClient import ContextClient
from context.client.EventsCollector import EventsCollector
from .vntm_config_device import configure, deconfigure
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool("VNTManager", "RPC")
context_client: ContextClient = ContextClient()
JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME)
ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID)
ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID))
GET_EVENT_TIMEOUT = 0.5
class VNTMEventDispatcher(threading.Thread):
def __init__(self, host, port) -> None:
LOGGER.debug('Creating VNTM connector...')
self.host = host
self.port = port
super().__init__(name='VNTMEventDispatcher', daemon=True)
self._terminate = threading.Event()
LOGGER.debug('VNTM connector created')
def start(self) -> None:
self._terminate.clear()
return super().start()
def stop(self):
self._terminate.set()
def send_msg(self, msg):
try:
self.websocket.send(msg)
except Exception as e:
LOGGER.exception('Unable to send message')
def recv_msg(self):
message = self.websocket.recv()
return message
def run(self) -> None:
events_collector = EventsCollector(
context_client,
log_events_received = True,
activate_context_collector = True,
activate_topology_collector = True,
activate_device_collector = True,
activate_link_collector = True,
activate_service_collector = False,
activate_slice_collector = False,
activate_connection_collector = False,
)
events_collector.start()
try:
url = "ws://" + str(self.host) + ":" + str(self.port)
LOGGER.info("Connecting to events server...: {:s}".format(url))
self.websocket = connect(url)
except Exception as ex:
MSG = 'Error connecting to {:s}'
LOGGER.exception(MSG.format(str(url)))
else:
LOGGER.info('Connected to {:s}'.format(url))
context_id = json_context_id(DEFAULT_CONTEXT_NAME)
topology_id = json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id)
try:
topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
except Exception as ex:
LOGGER.warning('No topology found')
else:
self.send_msg(grpc_message_to_json_string(topology_details))
while not self._terminate.is_set():
event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT)
if event is None: continue
LOGGER.debug('Event type: {}'.format(event))
topology_details = context_client.GetTopologyDetails(TopologyId(**topology_id))
to_send = grpc_message_to_json_string(topology_details)
self.send_msg(to_send)
LOGGER.info('Exiting')
events_collector.stop()
class VNTManagerServiceServicerImpl(VNTManagerServiceServicer):
def __init__(self):
LOGGER.debug("Creating Servicer...")
LOGGER.debug("Servicer Created")
self.links = []
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList:
links = context_client.ListLinks(Empty()).links
return [link for link in links if link.virtual]
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link:
link = context_client.GetLink(request)
return link if link.virtual else Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId:
try:
LOGGER.info('SETTING virtual link')
self.event_dispatcher.send_msg(grpc_message_to_json_string(request))
# configure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
response = self.event_dispatcher.recv_msg()
message_json = json.loads(response)
link = Link(**message_json)
context_client.SetLink(link)
except Exception as e:
MSG = 'Exception setting virtual link={:s}')
LOGGER.exception(MSG.format(str(request.link_id.link_uuid.uuid)))
return request.link_id
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty:
try:
LOGGER.debug('Removing virtual link')
self.event_dispatcher.send_msg(grpc_message_to_json_string(request))
# deconfigure('CSGW1', 'xe5', 'CSGW2', 'xe5', 'ecoc2024-1')
response = self.event_dispatcher.recv_msg()
message_json = json.loads(response)
link_id = LinkId(**message_json)
context_client.RemoveLink(link_id)
LOGGER.info('Removed')
except Exception as e:
MSG = 'Exception removing virtual link={:s}'
LOGGER.exception(MSG.format(str(request.link_uuid.uuid)))
return msg_error
else:
context_client.RemoveLink(request)
LOGGER.info('Removed')
return Empty()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment