Use the initialized client instance to obtain data through the DIS stream. The sample code is available in the ConsumerDemo.java file under the dis-sdk-demo\src\main\java\com\bigdata\dis\sdk\demo directory.
The value of streamName must be the same as that of Stream Name configured in Step 1: Creating a DIS Stream.
//Configure the stream name. String streamName = "streamName"; // Configure the ID of the partition for data download. String partitionId = "0"; //Configure the sequence number for data download. String startingSequenceNumber = "0"; //Configure the data download mode. String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name(); try { //Obtain data cursors. GetPartitionCursorRequest request = new GetPartitionCursorRequest(); request.setStreamName(streamName); request.setPartitionId(partitionId); request.setStartingSequenceNumber(startingSequenceNumber); request.setCursorType(cursorType); GetPartitionCursorResult response = dic.getPartitionCursor(request); String cursor = response.getPartitionCursor(); log.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor); GetRecordsRequest recordsRequest = newGetRecordsRequest(); GetRecordsResult recordResponse = null; while (true) { recordsRequest.setPartitionCursor(cursor); recordsRequest.setLimit(2); recordResponse = dic.getRecords(recordsRequest); //Obtain the next-batch data cursors. cursor = recordResponse.getNextPartitionCursor(); for (Record record : recordResponse.getRecords()) { log.info("Get Record [{}], partitionKey [{}], sequenceNumber [{}].", new String(record.getData().array()), record.getPartitionKey(), record.getSequenceNumber()); } Thread.sleep(1000); } } catch (DISClientException e) { log.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (ResourceAccessException e) { log.error("Failed to access endpoint. Error message [{}]", e.getMessage(), e); } catch (Exception e) { log.error(e.getMessage(), e); }
Right-click the program and choose
from the shortcut menu. If the program runs successfully, the information similar to the following is displayed on the console:14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader 14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader 14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2 14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ 14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0]. 14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1]. 14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2].