Use the initialized client instance to upload your streaming data to DIS. The sample code is available in the ProducerDemo.java file in the dis-sdk-demo\src\com\bigdata\dis\sdk\demo directory.
The value of streamName must be the same as that of Stream Name configured in Step 1: Creating a DIS Stream.
The code for uploading streaming data is as follows:
//Initialize an asynchronous sending client. DISConfig disConfig = new DISConfig().setAK("xxxx").setSK("xxxx").setProjectId("xxxx").setRegion("xxxx").setEndpoint("xxxx"); DISProducer producer = new DISProducer(disConfig); //Configure the stream name. String streamName = "streamName"; //Configure the data to be uploaded. String message = "hello world."; ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(buffer); putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000))); log.info("========== BEGIN PUT ============"); int count = 10; CountDownLatch cd = new CountDownLatch(count); for (int i = 0; i < count; i++) { putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000))); try { producer.putRecordAsync(streamName, putRecordsRequestEntry, new AsyncHandler<PutRecordsResultEntry>() { @Override public void onSuccess(PutRecordsResultEntry result) { log.info(result.toString()); cd.countDown(); } @Override public void onError(Exception exception) { log.error(exception.getMessage(), exception); cd.countDown(); } }); } catch (Exception e) { log.error(e.getMessage(), e); cd.countDown(); } } cd.await(); log.info("========== PUT OVER ============"); producer.close();
Right-click the program and choose
from the shortcut menu. If the program runs successfully, the information similar to the following is displayed on the console:17:27:49.130 [main] INFO com.bigdata.dis.sdk.DISConfig - get from classLoader 17:27:49.142 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - Starting Kafka producer I/O thread. 17:27:49.145 [main] INFO DISProducerDemo - ========== BEGIN PUT ============ 17:27:49.202 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - begin to send : 1 17:27:49.203 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - batch size: 10, 120 17:27:50.197 [pool-2-thread-1] INFO com.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader 17:27:50.197 [pool-2-thread-1] INFO com.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2 17:27:51.531 [pool-2-thread-1] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - batches success. dis-alAR-nb, 10 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null] 17:27:51.533 [main] INFO DISProducerDemo - ========== PUT OVER ============ 17:27:51.571 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.