forked from docs/doc-exports
Reviewed-by: gtema <artem.goncharov@gmail.com> Co-authored-by: Xiong, Chen Xiao <chenxiaoxiong@huawei.com> Co-committed-by: Xiong, Chen Xiao <chenxiaoxiong@huawei.com>
6.2 KiB
6.2 KiB
Uploading Streaming Data
Sample Code
Use the initialized client instance to upload your streaming data to DIS. The sample code is available in the ProducerDemo.java file in the dis-sdk-demo\src\com\bigdata\dis\sdk\demo directory.
The value of streamName must be the same as that of Stream Name configured in Step 1: Creating a DIS Stream.
The code for uploading streaming data is as follows:
//Initialize an asynchronous sending client. DISConfig disConfig = new DISConfig().setAK("xxxx").setSK("xxxx").setProjectId("xxxx").setRegion("xxxx").setEndpoint("xxxx"); DISProducer producer = new DISProducer(disConfig); //Configure the stream name. String streamName = "streamName"; //Configure the data to be uploaded. String message = "hello world."; ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(buffer); putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000))); log.info("========== BEGIN PUT ============"); int count = 10; CountDownLatch cd = new CountDownLatch(count); for (int i = 0; i < count; i++) { putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000))); try { producer.putRecordAsync(streamName, putRecordsRequestEntry, new AsyncHandler<PutRecordsResultEntry>() { @Override public void onSuccess(PutRecordsResultEntry result) { log.info(result.toString()); cd.countDown(); } @Override public void onError(Exception exception) { log.error(exception.getMessage(), exception); cd.countDown(); } }); } catch (Exception e) { log.error(e.getMessage(), e); cd.countDown(); } } cd.await(); log.info("========== PUT OVER ============"); producer.close();
Running the Program
Right-click the program and choose
from the shortcut menu. If the program runs successfully, the information similar to the following is displayed on the console:17:27:49.130 [main] INFO com.bigdata.dis.sdk.DISConfig - get from classLoader 17:27:49.142 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - Starting Kafka producer I/O thread. 17:27:49.145 [main] INFO DISProducerDemo - ========== BEGIN PUT ============ 17:27:49.202 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - begin to send : 1 17:27:49.203 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - batch size: 10, 120 17:27:50.197 [pool-2-thread-1] INFO com.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader 17:27:50.197 [pool-2-thread-1] INFO com.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2 17:27:51.531 [pool-2-thread-1] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - batches success. dis-alAR-nb, 10 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.533 [main] INFO DISProducerDemo - ========== PUT OVER ============ 17:27:51.571 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
Parent topic: DIS SDK Usage Guide