diff --git a/proto/vnt_manager.proto b/proto/vnt_manager.proto
index 6442e7b90e021404a8ff76ef67f4f68c377f49d3..14126528c52c1a0508f3608778532ce1a6c49526 100644
--- a/proto/vnt_manager.proto
+++ b/proto/vnt_manager.proto
@@ -12,26 +12,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// protocol buffers documentation: https://developers.google.com/protocol-buffers/docs/proto3
 syntax = "proto3";
 package vnt_manager;
 import "context.proto";
 
-
 service VNTManagerService {
-  rpc VNTSubscript          (VNTSubscriptionRequest)    returns (VNTSubscriptionReply)  {}
-  rpc ListVirtualLinkIds    (context.Empty)             returns (context.LinkIdList)    {}
-  rpc ListVirtualLinks      (context.Empty)             returns (context.LinkList)      {}
-  rpc GetVirtualLink        (context.LinkId)            returns (context.Link)          {}
-  rpc SetVirtualLink        (context.Link)              returns (context.LinkId)        {}
-  rpc RemoveVirtualLink     (context.LinkId)            returns (context.Empty)         {}
-}
-
-message VNTSubscriptionRequest {
-  string host = 1;
-  string port = 2;
-}
-
-message VNTSubscriptionReply {
-  string subscription = 1;
+  rpc ListVirtualLinkIds(context.Empty ) returns (context.LinkIdList) {}
+  rpc ListVirtualLinks  (context.Empty ) returns (context.LinkList  ) {}
+  rpc GetVirtualLink    (context.LinkId) returns (context.Link      ) {}
+  rpc SetVirtualLink    (context.Link  ) returns (context.LinkId    ) {}
+  rpc RemoveVirtualLink (context.LinkId) returns (context.Empty     ) {}
 }
diff --git a/src/vnt_manager/client/VNTManagerClient.py b/src/vnt_manager/client/VNTManagerClient.py
index b313a590f3c5d8db64a9ae1b7b9ac89a94595f2a..4ea5d3db1b40e12e214945c7d84caf302c00cbf8 100644
--- a/src/vnt_manager/client/VNTManagerClient.py
+++ b/src/vnt_manager/client/VNTManagerClient.py
@@ -12,22 +12,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
-import grpc
 
+import grpc, logging
 from common.Constants import ServiceNameEnum
 from common.proto.context_pb2 import Empty
-from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply
 from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceStub
 from common.Settings import get_service_host, get_service_port_grpc
+from common.proto.context_pb2 import Link, LinkId, LinkIdList, LinkList
 from common.tools.client.RetryDecorator import delay_exponential, retry
-from common.tools.grpc.Tools import grpc_message_to_json
-from common.proto.context_pb2 import (
-    Link, LinkId, LinkIdList, LinkList,
-)
 from common.tools.grpc.Tools import grpc_message_to_json_string
 
+
 LOGGER = logging.getLogger(__name__)
 MAX_RETRIES = 15
 DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
@@ -40,10 +35,8 @@ RETRY_DECORATOR = retry(
 
 class VNTManagerClient:
     def __init__(self, host=None, port=None):
-        if not host:
-            host = get_service_host(ServiceNameEnum.VNTMANAGER)
-        if not port:
-            port = get_service_port_grpc(ServiceNameEnum.VNTMANAGER)
+        if not host: host = get_service_host(ServiceNameEnum.VNTMANAGER)
+        if not port: port = get_service_port_grpc(ServiceNameEnum.VNTMANAGER)
         self.endpoint = "{:s}:{:s}".format(str(host), str(port))
         LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint)))
         self.channel = None
@@ -61,13 +54,6 @@ class VNTManagerClient:
         self.channel = None
         self.stub = None
 
-    @RETRY_DECORATOR
-    def VNTSubscript(self, request: VNTSubscriptionRequest) -> VNTSubscriptionReply:
-        LOGGER.debug("Subscript request: {:s}".format(str(grpc_message_to_json(request))))
-        response = self.stub.VNTSubscript(request)
-        LOGGER.debug("Subscript result: {:s}".format(str(grpc_message_to_json(response))))
-        return response
-
     @RETRY_DECORATOR
     def ListVirtualLinkIds(self, request: Empty) -> LinkIdList:
         LOGGER.debug('ListVirtualLinkIds request: {:s}'.format(grpc_message_to_json_string(request)))
diff --git a/src/vnt_manager/requirements.in b/src/vnt_manager/requirements.in
index 38764add745987ea115b9c8f2a9a169e6d0e3c39..d8f9537b471d5645604ff39ab5142c3ae0e7c9ba 100644
--- a/src/vnt_manager/requirements.in
+++ b/src/vnt_manager/requirements.in
@@ -12,4 +12,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-websockets==12.0
+confluent-kafka==2.3.*
+#websockets==12.0
diff --git a/src/vnt_manager/service/__main__.py b/src/vnt_manager/service/__main__.py
index c36a0ae1fb7bfa568f79bae26e53cd5d734a4f2e..089a330e11497f024937673c927587fa9a5966ce 100644
--- a/src/vnt_manager/service/__main__.py
+++ b/src/vnt_manager/service/__main__.py
@@ -14,6 +14,7 @@
 
 import logging, signal, sys, threading
 from prometheus_client import start_http_server
+from common.tools.kafka.Variables import KafkaTopic
 from common.Constants import ServiceNameEnum
 from common.Settings import (
     ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
@@ -49,6 +50,8 @@ def main():
     metrics_port = get_metrics_port()
     start_http_server(metrics_port)
 
+    KafkaTopic.create_all_topics()
+
     # Starting VNTManager service
     grpc_service = VNTManagerService()
     grpc_service.start()