-
Lluis Gifre Renom authored
- corrected method used to get record from DLT in test script
Lluis Gifre Renom authored- corrected method used to get record from DLT in test script
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
main_test.py 1.90 KiB
# pip install grpcio==1.47.0 grpcio-tools==1.47.0 protobuf==3.20.1
# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.main_test
import logging, sys, time
from common.proto.dlt_gateway_pb2 import DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_UPDATE, DLTRECORDTYPE_DEVICE, DltRecord
from common.tools.object_factory.Device import json_device
from common.tools.grpc.Tools import grpc_message_to_json_string
from src.common.proto.context_pb2 import DEVICEOPERATIONALSTATUS_ENABLED, Device
from .client.DltGatewayClient import DltGatewayClient
from .client.DltEventsCollector import DltEventsCollector
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
def main():
dltgateway_client = DltGatewayClient(host='127.0.0.1', port=50051)
dltgateway_collector = DltEventsCollector(dltgateway_client, log_events_received=True)
dltgateway_collector.start()
time.sleep(3)
device = Device(**json_device('dev-1', 'packet-router', DEVICEOPERATIONALSTATUS_ENABLED))
r2dlt_req = DltRecord()
r2dlt_req.record_id.domain_uuid.uuid = 'tfs-a'
r2dlt_req.record_id.type = DLTRECORDTYPE_DEVICE
r2dlt_req.record_id.record_uuid.uuid = device.device_id.device_uuid.uuid
r2dlt_req.operation = DLTRECORDOPERATION_ADD
r2dlt_req.data_json = grpc_message_to_json_string(device)
LOGGER.info('r2dlt_req = {:s}'.format(grpc_message_to_json_string(r2dlt_req)))
r2dlt_rep = dltgateway_client.RecordToDlt(r2dlt_req)
LOGGER.info('r2dlt_rep = {:s}'.format(grpc_message_to_json_string(r2dlt_rep)))
dlt2r_req = r2dlt_req.record_id
LOGGER.info('dlt2r_req = {:s}'.format(grpc_message_to_json_string(dlt2r_req)))
dlt2r_rep = dltgateway_client.GetFromDlt(dlt2r_req)
LOGGER.info('dlt2r_rep = {:s}'.format(grpc_message_to_json_string(dlt2r_rep)))
dltgateway_collector.stop()
return 0
if __name__ == '__main__':
sys.exit(main())