Skip to content
Snippets Groups Projects
basic.py 3.38 KiB
Newer Older
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# pip install grpcio==1.47.0 grpcio-tools==1.47.0 protobuf==3.20.1
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# PYTHONPATH=./src python
# PYTHONPATH=/home/cttc/teraflow/src python -m dlt.connector.tests.basic
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

import logging, sys, time
from common.proto.context_pb2 import DEVICEOPERATIONALSTATUS_ENABLED, Device
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.dlt_gateway_pb2 import (
    DLTRECORDOPERATION_ADD, DLTRECORDOPERATION_UNDEFINED, DLTRECORDOPERATION_UPDATE, DLTRECORDTYPE_DEVICE,
    DLTRECORDTYPE_UNDEFINED, DltRecord, DltRecordId)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.object_factory.Device import json_device
from common.tools.grpc.Tools import grpc_message_to_json_string
from ..client.DltGatewayClient import DltGatewayClient
from ..client.DltEventsCollector import DltEventsCollector
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
DLT_GATEWAY_HOST = '127.0.0.1'
DLT_GATEWAY_PORT = 30551 #50051

def record_found(record : DltRecord) -> bool:
    found = True
    found = found and (len(record.record_id.domain_uuid.uuid) > 0)
    found = found and (record.record_id.type != DLTRECORDTYPE_UNDEFINED)
    found = found and (len(record.record_id.record_uuid.uuid) > 0)
    #found = found and (record.operation != DLTRECORDOPERATION_UNDEFINED)
    found = found and (len(record.data_json) > 0)
    return found

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    dltgateway_client = DltGatewayClient(host=DLT_GATEWAY_HOST, port=DLT_GATEWAY_PORT)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    dltgateway_collector = DltEventsCollector(dltgateway_client, log_events_received=True)
    dltgateway_collector.start()

    time.sleep(3)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    # Check record exists
    dri = DltRecordId()
    dri.domain_uuid.uuid = 'non-existing-domain'
    dri.record_uuid.uuid = 'non-existing-record'
    dri.type = DLTRECORDTYPE_DEVICE
    reply = dltgateway_client.GetFromDlt(dri)
    assert not record_found(reply), 'Record should not exist'

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    device = Device(**json_device('dev-1', 'packet-router', DEVICEOPERATIONALSTATUS_ENABLED))

    r2dlt_req = DltRecord()
    r2dlt_req.record_id.domain_uuid.uuid = 'tfs-a'
    r2dlt_req.record_id.type             = DLTRECORDTYPE_DEVICE
    r2dlt_req.record_id.record_uuid.uuid = device.device_id.device_uuid.uuid
    r2dlt_req.operation                  = DLTRECORDOPERATION_ADD
    r2dlt_req.data_json                  = grpc_message_to_json_string(device)
    LOGGER.info('r2dlt_req = {:s}'.format(grpc_message_to_json_string(r2dlt_req)))
    r2dlt_rep = dltgateway_client.RecordToDlt(r2dlt_req)
    LOGGER.info('r2dlt_rep = {:s}'.format(grpc_message_to_json_string(r2dlt_rep)))

    dlt2r_req = r2dlt_req.record_id
    LOGGER.info('dlt2r_req = {:s}'.format(grpc_message_to_json_string(dlt2r_req)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    dlt2r_rep = dltgateway_client.GetFromDlt(dlt2r_req)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    LOGGER.info('dlt2r_rep = {:s}'.format(grpc_message_to_json_string(dlt2r_rep)))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    dltgateway_collector.stop()
    return 0

if __name__ == '__main__':
    sys.exit(main())