This section describes how to interconnect FlinkServer with Hudi through Flink SQL jobs.
Table 1 lists the read and write operations supported by Flink on Hudi COW and MOR tables.
Flink SQL |
COW table |
MOR table |
---|---|---|
Batch write |
Supported |
Supported |
Batch read |
Supported |
Supported |
Stream write |
Supported |
Supported |
Stream read |
Supported |
Supported |
Currently, Flink SQL allows you to read data from Hudi tables only in snapshot mode and read optimized mode.
Select Enable CheckPoint in Running Parameter and set Time Interval (ms) to 60000.
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into stream_mor select * from kafka;
CREATE TABLE stream_write_cow( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into stream_write_cow select * from kafka;
CREATE TABLE hudi_read_spark_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/default/tb_hudimor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts timestamp(6)INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into hudi_read_spark_mor select * from kafka;
Kafka port number
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations, search for allow.everyone.if.no.acl.found, set its value to true, and click Save.
Ensure that no partition is added before the synchronization. After the synchronization, new partitions cannot be read.