forked from docs/doc-exports
Reviewed-by: gtema <artem.goncharov@gmail.com> Co-authored-by: Xiong, Chen Xiao <chenxiaoxiong@huawei.com> Co-committed-by: Xiong, Chen Xiao <chenxiaoxiong@huawei.com>
4.7 KiB
4.7 KiB
Downloading Streaming Data
Sample Code
Use the initialized client instance to obtain data through the DIS stream. The sample code is available in the ConsumerDemo.java file under the dis-sdk-demo\src\main\java\com\bigdata\dis\sdk\demo directory.
The value of streamName must be the same as that of Stream Name configured in Step 1: Creating a DIS Stream.
//Configure the stream name. String streamName = "streamName"; // Configure the ID of the partition for data download. String partitionId = "0"; //Configure the sequence number for data download. String startingSequenceNumber = "0"; //Configure the data download mode. String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name(); try { //Obtain data cursors. GetPartitionCursorRequest request = new GetPartitionCursorRequest(); request.setStreamName(streamName); request.setPartitionId(partitionId); request.setStartingSequenceNumber(startingSequenceNumber); request.setCursorType(cursorType); GetPartitionCursorResult response = dic.getPartitionCursor(request); String cursor = response.getPartitionCursor(); log.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor); GetRecordsRequest recordsRequest = newGetRecordsRequest(); GetRecordsResult recordResponse = null; while (true) { recordsRequest.setPartitionCursor(cursor); recordsRequest.setLimit(2); recordResponse = dic.getRecords(recordsRequest); //Obtain the next-batch data cursors. cursor = recordResponse.getNextPartitionCursor(); for (Record record : recordResponse.getRecords()) { log.info("Get Record [{}], partitionKey [{}], sequenceNumber [{}].", new String(record.getData().array()), record.getPartitionKey(), record.getSequenceNumber()); } Thread.sleep(1000); } } catch (DISClientException e) { log.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (ResourceAccessException e) { log.error("Failed to access endpoint. Error message [{}]", e.getMessage(), e); } catch (Exception e) { log.error(e.getMessage(), e); }
Running the Program
Right-click the program and choose
from the shortcut menu. If the program runs successfully, the information similar to the following is displayed on the console:14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader 14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader 14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2 14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ 14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0]. 14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1]. 14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2].
Parent topic: DIS SDK Usage Guide