Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 839 additions and 195 deletions
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# tests/unit/test_qkd_performance.py
import pytest, time
from device.service.drivers.qkd.QKDDriver2 import QKDDriver
MOCK_QKD_ADDRRESS = '127.0.0.1'
MOCK_PORT = 11111
def test_performance_under_load():
driver = QKDDriver(address=MOCK_QKD_ADDRRESS, port=MOCK_PORT, username='user', password='pass')
driver.Connect()
start_time = time.time()
for _ in range(1000):
driver.GetConfig(['/qkd_interfaces/qkd_interface'])
end_time = time.time()
assert (end_time - start_time) < 60
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import pytest
import requests
from requests.exceptions import HTTPError
from device.service.drivers.qkd.QKDDriver2 import QKDDriver
from device.service.drivers.qkd.Tools2 import RESOURCE_CAPABILITES
# Helper function to print data in a formatted JSON style for debugging
def print_data(label, data):
print(f"{label}: {json.dumps(data, indent=2)}")
# Environment variables for sensitive information
QKD1_ADDRESS = os.getenv("QKD1_ADDRESS")
MOCK_QKD_ADDRRESS = '127.0.0.1'
MOCK_PORT = 11111
PORT = os.getenv("QKD_PORT")
USERNAME = os.getenv("QKD_USERNAME")
PASSWORD = os.getenv("QKD_PASSWORD")
# Utility function to retrieve JWT token
def get_jwt_token(address, port, username, password):
url = f"http://{address}:{port}/login"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
payload = f"username={username}&password={password}"
try:
response = requests.post(url, data=payload, headers=headers)
response.raise_for_status()
return response.json().get('access_token')
except requests.exceptions.RequestException as e:
print(f"Failed to retrieve JWT token: {e}")
return None
# Real QKD Driver (Requires JWT token)
@pytest.fixture
def real_qkd_driver():
token = get_jwt_token(QKD1_ADDRESS, PORT, USERNAME, PASSWORD) # Replace with actual details
if not token:
pytest.fail("Failed to retrieve JWT token.")
headers = {'Authorization': f'Bearer {token}'}
return QKDDriver(address=QKD1_ADDRESS, port=PORT, headers=headers)
# Mock QKD Driver (No actual connection, mock capabilities)
@pytest.fixture
def mock_qkd_driver():
# Initialize the mock QKD driver with mock settings
token = "mock_token"
headers = {"Authorization": f"Bearer {token}"}
return QKDDriver(address=MOCK_QKD_ADDRRESS, port=MOCK_PORT, headers=headers)
# General function to retrieve and test capabilities
def retrieve_capabilities(qkd_driver, driver_name):
try:
qkd_driver.Connect()
capabilities = qkd_driver.GetConfig([RESOURCE_CAPABILITES])
assert isinstance(capabilities, list), "Expected a list of capabilities"
assert len(capabilities) > 0, f"No capabilities found for {driver_name}"
print_data(f"{driver_name} Capabilities", capabilities)
except HTTPError as e:
pytest.fail(f"HTTPError while fetching capabilities for {driver_name}: {e}")
except AssertionError as e:
pytest.fail(f"AssertionError: {e}")
except Exception as e:
pytest.fail(f"An unexpected error occurred: {e}")
# Test for Real QKD Capabilities
def test_real_qkd_capabilities(real_qkd_driver):
retrieve_capabilities(real_qkd_driver, "Real QKD")
# Test for Mock QKD Capabilities
def test_mock_qkd_capabilities(mock_qkd_driver):
retrieve_capabilities(mock_qkd_driver, "Mock QKD")
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from typing import List, Tuple
from device.service.drivers.qkd.QKDDriver2 import QKDDriver
MOCK_QKD_ADDRRESS = '127.0.0.1'
MOCK_PORT = 11111
@pytest.fixture
def qkd_driver():
# Initialize the QKD driver
return QKDDriver(address=MOCK_QKD_ADDRRESS, port=MOCK_PORT, username='user', password='pass')
def test_state_subscription(qkd_driver):
"""
Test Case ID: SBI_Test_06 - Subscribe to state changes and validate the subscription process.
"""
qkd_driver.Connect()
try:
# Step 1: Define the subscription
subscriptions = [
('00000001-0000-0000-0000-000000000000', 60, 10) # (node_id, frequency, timeout)
]
# Step 2: Subscribe to state changes using the driver method
subscription_results = qkd_driver.SubscribeState(subscriptions)
# Step 3: Validate that the subscription was successful
assert all(result is True for result in subscription_results), "Subscription to state changes failed."
print("State subscription successful:", subscription_results)
except Exception as e:
pytest.fail(f"An unexpected error occurred during state subscription: {e}")
finally:
qkd_driver.Disconnect()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from typing import List, Tuple
from device.service.drivers.qkd.QKDDriver2 import QKDDriver
MOCK_QKD_ADDRRESS = '127.0.0.1'
MOCK_PORT = 11111
@pytest.fixture
def qkd_driver():
# Initialize the QKD driver
return QKDDriver(address=MOCK_QKD_ADDRRESS, port=MOCK_PORT, username='user', password='pass')
def test_state_subscription(qkd_driver):
"""
Test Case ID: SBI_Test_06 - Subscribe to state changes and validate the subscription process.
"""
qkd_driver.Connect()
try:
# Step 1: Define the subscription
subscriptions = [
('00000001-0000-0000-0000-000000000000', 60, 10) # (node_id, frequency, timeout)
]
# Step 2: Subscribe to state changes using the driver method
subscription_results = qkd_driver.SubscribeState(subscriptions)
# Step 3: Validate that the subscription was successful
assert all(result is True for result in subscription_results), "Subscription to state changes failed."
print("State subscription successful:", subscription_results)
except Exception as e:
pytest.fail(f"An unexpected error occurred during state subscription: {e}")
finally:
qkd_driver.Disconnect()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest, requests, uuid
from requests.exceptions import HTTPError
from device.service.drivers.qkd.QKDDriver2 import QKDDriver
from device.service.drivers.qkd.Tools2 import RESOURCE_APPS
MOCK_QKD1_ADDRRESS = '127.0.0.1'
MOCK_PORT1 = 11111
MOCK_QKD3_ADDRRESS = '127.0.0.1'
MOCK_PORT3 = 33333
@pytest.fixture
def qkd_driver1():
# Initialize the QKD driver for QKD1
return QKDDriver(address=MOCK_QKD1_ADDRRESS, port=MOCK_PORT1, username='user', password='pass')
@pytest.fixture
def qkd_driver3():
# Initialize the QKD driver for QKD3
return QKDDriver(address=MOCK_QKD3_ADDRRESS, port=MOCK_PORT3, username='user', password='pass')
def create_qkd_app(driver, qkdn_id, backing_qkdl_id, client_app_id=None):
"""
Helper function to create QKD applications on the given driver.
"""
server_app_id = str(uuid.uuid4()) # Generate a unique server_app_id
app_payload = {
'app': {
'server_app_id': server_app_id,
'client_app_id': client_app_id if client_app_id else [], # Add client_app_id if provided
'app_status': 'ON',
'local_qkdn_id': qkdn_id,
'backing_qkdl_id': backing_qkdl_id
}
}
try:
# Log the payload being sent
print(f"Sending payload to {driver.address}: {app_payload}")
# Send POST request to create the application
response = requests.post(f'http://{driver.address}/app/create_qkd_app', json=app_payload)
# Check if the request was successful (HTTP 2xx)
response.raise_for_status()
# Validate the response
assert response.status_code == 200, f"Failed to create QKD app for {driver.address}: {response.text}"
response_data = response.json()
assert response_data.get('status') == 'success', "Application creation failed."
# Log the response from the server
print(f"Server {driver.address} response: {response_data}")
return server_app_id # Return the created server_app_id
except HTTPError as e:
pytest.fail(f"HTTP error occurred while creating the QKD application on {driver.address}: {e}")
except Exception as e:
pytest.fail(f"An unexpected error occurred: {e}")
def test_create_qkd_application_bidirectional(qkd_driver1, qkd_driver3):
"""
Create QKD applications on both qkd1 and qkd3, and validate the complete creation in both directions.
"""
qkd_driver1.Connect()
qkd_driver3.Connect()
try:
# Step 1: Create QKD application for qkd1, referencing qkd3 as the backing QKDL
server_app_id_qkd1 = create_qkd_app(
qkd_driver1,
qkdn_id='00000001-0000-0000-0000-000000000000',
backing_qkdl_id=['00000003-0002-0000-0000-000000000000'] # qkd3's QKDL
)
# Step 2: Create QKD application for qkd3, referencing qkd1 as the backing QKDL, and setting client_app_id to qkd1's app
create_qkd_app(
qkd_driver3,
qkdn_id='00000003-0000-0000-0000-000000000000',
backing_qkdl_id=['00000003-0002-0000-0000-000000000000'], # qkd3's QKDL
client_app_id=[server_app_id_qkd1] # Set qkd1 as the client
)
# Step 3: Fetch applications from both qkd1 and qkd3 to validate that the applications exist
apps_qkd1 = qkd_driver1.GetConfig([RESOURCE_APPS])
apps_qkd3 = qkd_driver3.GetConfig([RESOURCE_APPS])
print(f"QKD1 applications config: {apps_qkd1}")
print(f"QKD3 applications config: {apps_qkd3}")
# Debugging: Print the full structure of the apps to understand what is returned
for app in apps_qkd1:
print(f"QKD1 App: {app}")
# Debugging: Print the full structure of the apps to understand what is returned
for app in apps_qkd3:
print(f"QKD3 App: {app}")
# Step 4: Validate the applications are created using app_id instead of server_app_id
assert any(app[1].get('app_id') == '00000001-0001-0000-0000-000000000000' for app in apps_qkd1), "QKD app not created on qkd1."
assert any(app[1].get('app_id') == '00000003-0001-0000-0000-000000000000' for app in apps_qkd3), "QKD app not created on qkd3."
print("QKD applications created successfully in both directions.")
except Exception as e:
pytest.fail(f"An unexpected error occurred: {e}")
finally:
qkd_driver1.Disconnect()
qkd_driver3.Disconnect()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, asyncio
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class DltConnectorClientAsync:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.DLT)
if not port: port = get_service_port_grpc(ServiceNameEnum.DLT)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
#self.connect()
#LOGGER.debug('Channel created')
async def connect(self):
self.channel = grpc.aio.insecure_channel(self.endpoint)
self.stub = DltConnectorServiceStub(self.channel)
LOGGER.debug('Channel created')
async def close(self):
if self.channel is not None:
await self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
async def RecordAll(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAll request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordAll(request)
LOGGER.debug('RecordAll result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def RecordAllDevices(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAllDevices request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordAllDevices(request)
LOGGER.debug('RecordAllDevices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def RecordDevice(self, request: DltDeviceId) -> Empty:
LOGGER.debug('RECORD_DEVICE request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordDevice(request)
LOGGER.debug('RecordDevice result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def RecordAllLinks(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAllLinks request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordAllLinks(request)
LOGGER.debug('RecordAllLinks result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def RecordLink(self, request: DltLinkId) -> Empty:
LOGGER.debug('RecordLink request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordLink(request)
LOGGER.debug('RecordLink result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def RecordAllServices(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAllServices request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordAllServices(request)
LOGGER.debug('RecordAllServices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def RecordService(self, request: DltServiceId) -> Empty:
LOGGER.debug('RecordService request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordService(request)
LOGGER.debug('RecordService result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def RecordAllSlices(self, request: TopologyId) -> Empty:
LOGGER.debug('RecordAllSlices request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordAllSlices(request)
LOGGER.debug('RecordAllSlices result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def RecordSlice(self, request: DltSliceId) -> Empty:
LOGGER.debug('RecordSlice request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordSlice(request)
LOGGER.debug('RecordSlice result: {:s}'.format(grpc_message_to_json_string(response)))
return response
......@@ -19,7 +19,6 @@ from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.client.DltGatewayClient import DltGatewayClient
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
# This class accepts an event_handler method as attribute that can be used to pre-process and
# filter events before they reach the events_queue. Depending on the handler, the supported
......
......@@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
import grpc, logging
from typing import Iterator
from common.proto.context_pb2 import Empty, TeraFlowController
from common.proto.dlt_gateway_pb2 import (
DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription)
DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription
)
from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
......@@ -43,7 +45,8 @@ class DltGatewayClient:
self.stub = DltGatewayServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
if self.channel is not None:
self.channel.close()
self.channel = None
self.stub = None
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio, grpc, logging
from typing import Iterator, List
from common.proto.context_pb2 import Empty, TeraFlowController
from common.proto.dlt_gateway_pb2 import (
DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription
)
from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.Config import DLT_GATEWAY_HOST, DLT_GATEWAY_PORT
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class DltGatewayClientAsync:
def __init__(self, host=None, port=None):
if not host: host = DLT_GATEWAY_HOST
if not port: port = DLT_GATEWAY_PORT
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.message_queue: List[DltRecord] = []
async def connect(self):
self.channel = grpc.aio.insecure_channel(self.endpoint)
self.stub = DltGatewayServiceStub(self.channel)
LOGGER.debug('Channel created')
async def close(self):
if self.channel is not None:
await self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
async def RecordToDlt(self, request : DltRecord) -> DltRecordStatus:
LOGGER.debug('RecordToDlt request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.RecordToDlt(request)
LOGGER.debug('RecordToDlt result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def GetFromDlt(self, request : DltRecordId) -> DltRecord:
LOGGER.debug('GetFromDlt request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.GetFromDlt(request)
LOGGER.debug('GetFromDlt result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def SubscribeToDlt(self, request : DltRecordSubscription) -> Iterator[DltRecordEvent]:
LOGGER.debug('SubscribeToDlt request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SubscribeToDlt(request)
LOGGER.debug('SubscribeToDlt result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def GetDltStatus(self, request : TeraFlowController) -> DltPeerStatus:
LOGGER.debug('GetDltStatus request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.GetDltStatus(request)
LOGGER.debug('GetDltStatus result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
async def GetDltPeers(self, request : Empty) -> DltPeerStatusList:
LOGGER.debug('GetDltPeers request: {:s}'.format(grpc_message_to_json_string(request)))
response = await self.stub.GetDltPeers(request)
LOGGER.debug('GetDltPeers result: {:s}'.format(grpc_message_to_json_string(response)))
return response
......@@ -14,15 +14,16 @@
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.tools.service.GenericGrpcServiceAsync import GenericGrpcServiceAsync
from common.proto.dlt_connector_pb2_grpc import add_DltConnectorServiceServicer_to_server
from .DltConnectorServiceServicerImpl import DltConnectorServiceServicerImpl
class DltConnectorService(GenericGrpcService):
class DltConnectorService(GenericGrpcServiceAsync):
def __init__(self, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.DLT)
super().__init__(port, cls_name=cls_name)
self.dltconnector_servicer = DltConnectorServiceServicerImpl()
def install_servicers(self):
add_DltConnectorServiceServicer_to_server(self.dltconnector_servicer, self.server)
async def install_servicers(self):
await self.dltconnector_servicer.initialize()
add_DltConnectorServiceServicer_to_server(self.dltconnector_servicer, self.server)
\ No newline at end of file
......@@ -12,16 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
import logging
from grpc.aio import ServicerContext
from typing import Optional
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method_async
from common.proto.context_pb2 import Empty, TopologyId
from common.proto.dlt_connector_pb2 import DltDeviceId, DltLinkId, DltServiceId, DltSliceId
from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer
from common.proto.dlt_gateway_pb2 import DltRecord, DltRecordId, DltRecordOperationEnum, DltRecordTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltGatewayClient import DltGatewayClient
from dlt.connector.client.DltGatewayClientAsync import DltGatewayClientAsync
from .tools.Checkers import record_exists
LOGGER = logging.getLogger(__name__)
......@@ -31,86 +32,91 @@ METRICS_POOL = MetricsPool('DltConnector', 'RPC')
class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
def __init__(self):
LOGGER.debug('Creating Servicer...')
self.dltgateway_client = DltGatewayClientAsync()
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAll(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
async def initialize(self):
await self.dltgateway_client.connect()
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAll(self, request : TopologyId, context : ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAllDevices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAllDevices(self, request : TopologyId, context : ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordDevice(self, request : DltDeviceId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordDevice(self, request : DltDeviceId, context : ServicerContext) -> Empty:
data_json = None
if not request.delete:
LOGGER.debug('RECORD_DEVICE = {:s}'.format(grpc_message_to_json_string(request)))
if not request.delete:
context_client = ContextClient()
device = context_client.GetDevice(request.device_id)
data_json = grpc_message_to_json_string(device)
self._record_entity(
await self._record_entity(
request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_DEVICE,
request.device_id.device_uuid.uuid, request.delete, data_json)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAllLinks(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAllLinks(self, request : TopologyId, context : ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordLink(self, request : DltLinkId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordLink(self, request : DltLinkId, context : ServicerContext) -> Empty:
data_json = None
LOGGER.debug('RECORD_LINK = {:s}'.format(grpc_message_to_json_string(request)))
if not request.delete:
context_client = ContextClient()
link = context_client.GetLink(request.link_id)
data_json = grpc_message_to_json_string(link)
self._record_entity(
await self._record_entity(
request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_LINK,
request.link_id.link_uuid.uuid, request.delete, data_json)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAllServices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAllServices(self, request : TopologyId, context : ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordService(self, request : DltServiceId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordService(self, request : DltServiceId, context : ServicerContext) -> Empty:
data_json = None
if not request.delete:
context_client = ContextClient()
service = context_client.GetService(request.service_id)
data_json = grpc_message_to_json_string(service)
self._record_entity(
await self._record_entity(
request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SERVICE,
request.service_id.service_uuid.uuid, request.delete, data_json)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordAllSlices(self, request : TopologyId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordAllSlices(self, request : TopologyId, context : ServicerContext) -> Empty:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecordSlice(self, request : DltSliceId, context : grpc.ServicerContext) -> Empty:
@safe_and_metered_rpc_method_async(METRICS_POOL, LOGGER)
async def RecordSlice(self, request : DltSliceId, context : ServicerContext) -> Empty:
data_json = None
if not request.delete:
context_client = ContextClient()
slice_ = context_client.GetSlice(request.slice_id)
data_json = grpc_message_to_json_string(slice_)
self._record_entity(
await self._record_entity(
request.topology_id.topology_uuid.uuid, DltRecordTypeEnum.DLTRECORDTYPE_SLICE,
request.slice_id.slice_uuid.uuid, request.delete, data_json)
return Empty()
def _record_entity(
async def _record_entity(
self, dlt_domain_uuid : str, dlt_record_type : DltRecordTypeEnum, dlt_record_uuid : str, delete : bool,
data_json : Optional[str] = None
) -> None:
dltgateway_client = DltGatewayClient()
dlt_record_id = DltRecordId()
dlt_record_id.domain_uuid.uuid = dlt_domain_uuid # pylint: disable=no-member
dlt_record_id.type = dlt_record_type
......@@ -118,7 +124,7 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
str_dlt_record_id = grpc_message_to_json_string(dlt_record_id)
LOGGER.debug('[_record_entity] sent dlt_record_id = {:s}'.format(str_dlt_record_id))
dlt_record = dltgateway_client.GetFromDlt(dlt_record_id)
dlt_record = await self.dltgateway_client.GetFromDlt(dlt_record_id)
str_dlt_record = grpc_message_to_json_string(dlt_record)
LOGGER.debug('[_record_entity] recv dlt_record = {:s}'.format(str_dlt_record))
......@@ -131,17 +137,19 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_DELETE
elif not delete and exists:
dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE
if data_json is None: raise Exception('data_json must be provided when updating')
if data_json is None:
raise Exception('data_json must be provided when updating')
dlt_record.data_json = data_json
elif not delete and not exists:
dlt_record.operation = DltRecordOperationEnum.DLTRECORDOPERATION_ADD
if data_json is None: raise Exception('data_json must be provided when adding')
if data_json is None:
raise Exception('data_json must be provided when adding')
dlt_record.data_json = data_json
else:
return
str_dlt_record = grpc_message_to_json_string(dlt_record)
LOGGER.debug('[_record_entity] sent dlt_record = {:s}'.format(str_dlt_record))
dlt_record_status = dltgateway_client.RecordToDlt(dlt_record)
dlt_record_status = await self.dltgateway_client.RecordToDlt(dlt_record)
str_dlt_record_status = grpc_message_to_json_string(dlt_record_status)
LOGGER.debug('[_record_entity] recv dlt_record_status = {:s}'.format(str_dlt_record_status))
......@@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
import logging, signal, threading, asyncio
from typing import Optional
from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (
......@@ -22,13 +24,13 @@ from .event_dispatcher.DltEventDispatcher import DltEventDispatcher
from .DltConnectorService import DltConnectorService
terminate = threading.Event()
LOGGER : logging.Logger = None
LOGGER : Optional[logging.Logger] = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
async def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
......@@ -55,13 +57,14 @@ def main():
# Starting DLT connector service
grpc_service = DltConnectorService()
grpc_service.start()
await grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
while not terminate.is_set():
await asyncio.sleep(1.0)
LOGGER.info('Terminating...')
grpc_service.stop()
await grpc_service.stop()
event_dispatcher.stop()
event_dispatcher.join()
......@@ -69,4 +72,4 @@ def main():
return 0
if __name__ == '__main__':
sys.exit(main())
asyncio.run(main())
......@@ -20,5 +20,5 @@ def record_exists(record : DltRecord) -> bool:
exists = exists and (record.record_id.type != DLTRECORDTYPE_UNDEFINED)
exists = exists and (len(record.record_id.record_uuid.uuid) > 0)
#exists = exists and (record.operation != DLTRECORDOPERATION_UNDEFINED)
exists = exists and (len(record.data_json) > 0)
#exists = exists and (len(record.data_json) > 0) # It conflicts as sometimes records do not have a data_json.
return exists
......@@ -17,12 +17,12 @@
# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.tests.basic
import logging, sys, time
from common.proto.context_pb2 import DEVICEOPERATIONALSTATUS_ENABLED, Device
from common.proto.dlt_gateway_pb2 import (
DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_UNDEFINED, DLTRECORDOPERATION_UPDATE, DLTRECORDTYPE_DEVICE,
DLTRECORDTYPE_UNDEFINED, DltRecord, DltRecordId)
from common.tools.object_factory.Device import json_device
from common.tools.grpc.Tools import grpc_message_to_json_string
from src.common.proto.context_pb2 import DEVICEOPERATIONALSTATUS_ENABLED, Device
from ..client.DltGatewayClient import DltGatewayClient
from ..client.DltEventsCollector import DltEventsCollector
......
......@@ -12,30 +12,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM zenika/kotlin:1.4-jdk12
# Use an official Node.js runtime as a parent image
FROM node:20
# Make working directory move to it and copy DLT Gateway code
RUN mkdir -p /var/teraflow/dlt/gateway
WORKDIR /var/teraflow/dlt/gateway
COPY src/dlt/gateway/. ./
# Set the working directory in the container
WORKDIR /usr/dltApp
# Make directory for proto files and copy them
RUN mkdir proto
COPY proto/*.proto ./proto/
# Create proto directory before copying the .proto files
RUN mkdir -p ./proto
# Build DLT Gateway
RUN ./gradlew build
# Copy package.json and package-lock.json
COPY src/dlt/gateway/dltApp/package*.json ./
# Copy tsconfig.json
COPY src/dlt/gateway/dltApp/tsconfig*.json ./
# Copy the proto folder contents
COPY proto/acl.proto ./proto/acl.proto
COPY proto/kpi_sample_types.proto ./proto/kpi_sample_types.proto
COPY proto/context.proto ./proto/context.proto
COPY proto/dlt_gateway.proto ./proto/dlt_gateway.proto
# Copy the src folder
COPY src/dlt/gateway/dltApp/src/ ./src
# Install dependencies
RUN npm install
# Expose the port that the gRPC service runs on
EXPOSE 50051
# Create entrypoint.sh script
RUN echo "#!/bin/sh" > /entrypoint.sh
RUN echo "echo 195.37.154.24 peer0.org1.example.com >> /etc/hosts" >> /entrypoint.sh
RUN echo "echo 195.37.154.24 peer0.org2.example.com >> /etc/hosts" >> /entrypoint.sh
RUN echo "echo 195.37.154.24 orderer0.example.com >> /etc/hosts" >> /entrypoint.sh
RUN echo "cd /var/teraflow/dlt/gateway" >> /entrypoint.sh
RUN echo "./gradlew runServer" >> /entrypoint.sh
RUN chmod +x /entrypoint.sh
# Gateway entry point
ENTRYPOINT ["sh", "/entrypoint.sh"]
# Command to run the service
CMD ["node", "src/dltGateway.js"]
```
NEC Laboratories Europe GmbH
PROPRIETARY INFORMATION
The software and its source code contain valuable trade secrets and
shall be maintained in confidence and treated as confidential
information. The software may only be used for evaluation and/or
testing purposes, unless otherwise explicitly stated in a written
agreement with NEC Laboratories Europe GmbH.
Any unauthorized publication, transfer to third parties or
duplication of the object or source code - either totally or in
part - is strictly prohibited.
Copyright (c) 2022 NEC Laboratories Europe GmbH
All Rights Reserved.
Authors: Konstantin Munichev <konstantin.munichev@neclab.eu>
NEC Laboratories Europe GmbH DISCLAIMS ALL WARRANTIES, EITHER
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE AND THE
WARRANTY AGAINST LATENT DEFECTS, WITH RESPECT TO THE PROGRAM AND
THE ACCOMPANYING DOCUMENTATION.
NO LIABILITIES FOR CONSEQUENTIAL DAMAGES: IN NO EVENT SHALL NEC
Laboratories Europe GmbH or ANY OF ITS SUBSIDIARIES BE LIABLE FOR
ANY DAMAGES WHATSOEVER (INCLUDING, WITHOUT LIMITATION, DAMAGES FOR
LOSS OF BUSINESS PROFITS, BUSINESS INTERRUPTION, LOSS OF
INFORMATION, OR OTHER PECUNIARY LOSS AND INDIRECT, CONSEQUENTIAL,
INCIDENTAL, ECONOMIC OR PUNITIVE DAMAGES) ARISING OUT OF THE USE OF
OR INABILITY TO USE THIS PROGRAM, EVEN IF NEC Laboratories Europe
GmbH HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
THIS HEADER MAY NOT BE EXTRACTED OR MODIFIED IN ANY WAY.
```
# DLT Gateway
# DLT module guide
## Description
## General information
The DLT module is used to provide access to the underlying Fabric deployment. It allows clients
to add, retrieve, modify and delete blockchain-backed data, essentially working as a key-value
database. External clients should use gRPC API to communicate with this service, its detailed
description available below.
The DLT Gateway consists of a **fabricConnect.ts** TypeScript file, which contains the logic for identification
management (certificates required for the MSP), connection management to the blockchain, and finally, it exposes a
contract object with all the required information for interacting with the chaincode. The **fabricConnect.ts** is
coded following the Fabric Gateway API recommendations from Hyperledger Fabric 2.4+. The compiled **fabricConnect.ts**
logic is imported into a **dltGateway.js** file, which contains the gRPC logic for interaction with the TFS controller.
Testing code for various performance tests is included inside the [/tests](./tests/) folder.
## Code structure
The whole DLT module consists of several packages:
- fabric package
- http package
- proto package
- client example
The chaincode is written in Go, providing a reference for the operations that are recorded in the blockchain. This
chaincode must already be deployed in a working Hyperledger Fabric blockchain.
### Fabric package
The most important class in this package is `FabricConnector`. First, it establishes connection
with the underlying Fabric network using Java Gateway SDK. After that, it could be used as a
CRUD interface.
Other files contain auxiliary code for `FabricConnector` which allows it to register/enroll
users and to obtain smart contract instances.
## Requisites
### Grpc package
Contains server side gRPC handler. It accepts requests from the outside and performs the
requested operation. For the more detailed description see Proto package description right below.
* NodeJS
* Docker
* Kubernetes (K8s)
Sign and TLS certificates, and private key of the MSP user from the Hyperledger Fabric deployment must be copied to the
[/keys](./keys/) directory inside this repository.
### Proto package
The proto package contains `dlt.proto` file which defines gRPC service `DltService` API and messages
it uses. There are 3 main functions: `RecordToDlt` which allows to create/modify/delete data,
`GetFromDlt` which returns already written data and `SubscribeToDlt` which allows clients subscribe
for future create/modify/delete events with provided filters.
Other proto files don't play any significant role and could be safely ignored by end users.
Example:
### Client example
This code is not necessary to the service, but it could be used to test the service. It contains
a sample gRPC client which connects the service and perform all the CRUD operations.
```bash
cp ~/fabric-samples/test-network/organizations/peerOrganizations/org1.example.com/users/User1@org1.example.com/tls/ca.crt src/dlt/gateway/keys/
# Fabric deployment notes
cp ~/fabric-samples/test-network/organizations/peerOrganizations/org1.example.com/users/User1@org1.example.com/msp/signcerts/User1@org1.example.com-cert.pem src/dlt/gateway/keys/cert.pem
## General notes
Current Fabric deployment uses Fabric test network with some additional helping scripts on top of it.
To start the network just run the `raft.sh` from `blockchain/scripts` directory. Use `stop.sh`
when you need to stop the network.
## Server start preparations
To run the server it's necessary to copy certificate file
`fabric-samples/test-network/organizations/peerOrganizations/org1.example.com/ca/ca.org1.example.com-cert.pem`
to the config folder (replacing the existing one). Also, it's necessary to copy `scripts/connection-org1.json`
file (again, replacing the old one). After copying, it must be edited. First, all `localhost` entrances
should be replaced with `teraflow.nlehd.de`. Second, `channel` section at the end of the file should be removed.
This should be done after every restart of the Fabric network.
## Fabric configuration
Even though a test network is easy to deploy and use it's better to perform a custom configuration
for a production deployment. In practice every participating organization will likely prefer to have
its own Peer/Orderer/CA instances to prevent possible dependency on any other participants. This leads
not only to a better privacy/availability/security in general but also to the more complicated
deployment process as a side effect. Here we provide a very brief description of the most important points.
### Organizations
Organization represents a network participant, which can be an individual, a large corporation or any other
entity. Each organization has its own CAs, orderers and peers. The recommendation here is to create an
organization entity for every independent participant and then decide how many CAs/peers/orderers does
every organization need and which channels should it has access to based on the exact project's goals.
### Channels
Each channel represents an independent ledger with its own genesis block. Each transaction is executed
on a specific channel, and it's possible to define which organization has access to a given channel.
As a result channels are a pretty powerful privacy mechanism which allows to limit access to the private
data between organization.
### Certificate authorities, peers and orderers
Certificate authorities (CA) are used to generate crypto materials for each organization. Two types of CA
exist: one is used to generate the certificates of the admin, the MSP and certificates of non-admin users.
Another type of CA is used to generate TLS certificates. As a result it's preferable to have at least two
CAs for every organization.
Peers are entities which host ledgers and smart contracts. They communicate with applications and orderers,
receiving chaincode invocations (proposals), invoking chaincode, updating ledger when necessary and
returning result of execution. Peers can handle one or many ledgers, depending on the configuration. It's
very use case specific how many peers are necessary to the exact deployment.
Orderers are used to execute a consensus in a distributing network making sure that every channel participant
has the same blocks with the same data. The default consensus algorithm is Raft which provides only a crash
fault tolerance.
### Conclusion
As you can see, configuration procedure for Fabric is pretty tricky and includes quite a lot of entities.
In real world it will very likely involve participants from multiple organizations each of them performing
its own part of configuration.
cp ~/fabric-samples/test-network/organizations/peerOrganizations/org1.example.com/users/User1@org1.example.com/msp/keystore/priv_sk src/dlt/gateway/keys/
```
As a further reading it's recommended to start with the
[official deployment guide](https://hyperledger-fabric.readthedocs.io/en/release-2.2/deployment_guide_overview.html).
It contains a high level overview of a deployment process as well as links to the detailed descriptions to
CA/Peer/Orderer configuration descriptions.
\ No newline at end of file
These files are essential for establishing the identity and secure connection to the blockchain. Make sure you replace
the paths with your actual file locations from your Hyperledger Fabric deployment.
This diff is collapsed.
This diff is collapsed.