Skip to content
Snippets Groups Projects
Commit 40718a4c authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

VNT Manager component:

- Added creation of Kafka topics
- Updated requirements.in
- Updated proto file
- Updated client script
parent d3a529f0
No related branches found
No related tags found
4 merge requests!346Draft: support for restconf protocol,!345Draft: support ipinfusion devices via netconf,!328Resolve "(CTTC) Update recommendations to use SocketIO on NBI and E2E Orch components",!286Resolve "(CTTC) Implement integration test between E2E-IP-Optical SDN Controllers"
...@@ -12,26 +12,14 @@ ...@@ -12,26 +12,14 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// protocol buffers documentation: https://developers.google.com/protocol-buffers/docs/proto3
syntax = "proto3"; syntax = "proto3";
package vnt_manager; package vnt_manager;
import "context.proto"; import "context.proto";
service VNTManagerService { service VNTManagerService {
rpc VNTSubscript (VNTSubscriptionRequest) returns (VNTSubscriptionReply) {} rpc ListVirtualLinkIds(context.Empty ) returns (context.LinkIdList) {}
rpc ListVirtualLinkIds (context.Empty) returns (context.LinkIdList) {} rpc ListVirtualLinks (context.Empty ) returns (context.LinkList ) {}
rpc ListVirtualLinks (context.Empty) returns (context.LinkList) {} rpc GetVirtualLink (context.LinkId) returns (context.Link ) {}
rpc GetVirtualLink (context.LinkId) returns (context.Link) {} rpc SetVirtualLink (context.Link ) returns (context.LinkId ) {}
rpc SetVirtualLink (context.Link) returns (context.LinkId) {} rpc RemoveVirtualLink (context.LinkId) returns (context.Empty ) {}
rpc RemoveVirtualLink (context.LinkId) returns (context.Empty) {}
}
message VNTSubscriptionRequest {
string host = 1;
string port = 2;
}
message VNTSubscriptionReply {
string subscription = 1;
} }
...@@ -12,22 +12,17 @@ ...@@ -12,22 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import grpc
import grpc, logging
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.proto.context_pb2 import Empty from common.proto.context_pb2 import Empty
from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceStub from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceStub
from common.Settings import get_service_host, get_service_port_grpc from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Link, LinkId, LinkIdList, LinkList
from common.tools.client.RetryDecorator import delay_exponential, retry from common.tools.client.RetryDecorator import delay_exponential, retry
from common.tools.grpc.Tools import grpc_message_to_json
from common.proto.context_pb2 import (
Link, LinkId, LinkIdList, LinkList,
)
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15 MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
...@@ -40,10 +35,8 @@ RETRY_DECORATOR = retry( ...@@ -40,10 +35,8 @@ RETRY_DECORATOR = retry(
class VNTManagerClient: class VNTManagerClient:
def __init__(self, host=None, port=None): def __init__(self, host=None, port=None):
if not host: if not host: host = get_service_host(ServiceNameEnum.VNTMANAGER)
host = get_service_host(ServiceNameEnum.VNTMANAGER) if not port: port = get_service_port_grpc(ServiceNameEnum.VNTMANAGER)
if not port:
port = get_service_port_grpc(ServiceNameEnum.VNTMANAGER)
self.endpoint = "{:s}:{:s}".format(str(host), str(port)) self.endpoint = "{:s}:{:s}".format(str(host), str(port))
LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint)))
self.channel = None self.channel = None
...@@ -61,13 +54,6 @@ class VNTManagerClient: ...@@ -61,13 +54,6 @@ class VNTManagerClient:
self.channel = None self.channel = None
self.stub = None self.stub = None
@RETRY_DECORATOR
def VNTSubscript(self, request: VNTSubscriptionRequest) -> VNTSubscriptionReply:
LOGGER.debug("Subscript request: {:s}".format(str(grpc_message_to_json(request))))
response = self.stub.VNTSubscript(request)
LOGGER.debug("Subscript result: {:s}".format(str(grpc_message_to_json(response))))
return response
@RETRY_DECORATOR @RETRY_DECORATOR
def ListVirtualLinkIds(self, request: Empty) -> LinkIdList: def ListVirtualLinkIds(self, request: Empty) -> LinkIdList:
LOGGER.debug('ListVirtualLinkIds request: {:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('ListVirtualLinkIds request: {:s}'.format(grpc_message_to_json_string(request)))
......
...@@ -12,4 +12,5 @@ ...@@ -12,4 +12,5 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
websockets==12.0 confluent-kafka==2.3.*
#websockets==12.0
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
import logging, signal, sys, threading import logging, signal, sys, threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.tools.kafka.Variables import KafkaTopic
from common.Constants import ServiceNameEnum from common.Constants import ServiceNameEnum
from common.Settings import ( from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
...@@ -49,6 +50,8 @@ def main(): ...@@ -49,6 +50,8 @@ def main():
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
KafkaTopic.create_all_topics()
# Starting VNTManager service # Starting VNTManager service
grpc_service = VNTManagerService() grpc_service = VNTManagerService()
grpc_service.start() grpc_service.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment