Yang, Tong 3f5759eed2 MRS comp-lts 2.0.38.SP20 version
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com>
Co-authored-by: Yang, Tong <yangtong2@huawei.com>
Co-committed-by: Yang, Tong <yangtong2@huawei.com>
2023-01-19 17:08:45 +00:00

169 lines
15 KiB
HTML

<a name="mrs_01_24180"></a><a name="mrs_01_24180"></a>
<h1 class="topictitle1">Interconnecting FlinkServer with Hudi</h1>
<div id="body32001227"><div class="section" id="mrs_01_24180__en-us_topic_0000001219149723_section29191115174417"><h4 class="sectiontitle">Scenario</h4><p id="mrs_01_24180__en-us_topic_0000001219149723_p127621119184416">This section describes how to interconnect FlinkServer with Hudi through Flink SQL jobs.</p>
</div>
<div class="section" id="mrs_01_24180__en-us_topic_0000001219149723_section17259141064413"><h4 class="sectiontitle">Prerequisites</h4><ul id="mrs_01_24180__en-us_topic_0000001219149723_ul184973415531"><li id="mrs_01_24180__en-us_topic_0000001219149723_li349713412536">The HDFS, Yarn, Flink, and Hudi services have been installed in a cluster.</li><li id="mrs_01_24180__en-us_topic_0000001219149723_li449716411539">The client that contains the Hudi service has been installed, for example, in the <strong id="mrs_01_24180__en-us_topic_0000001219149723_b2741824154919">/opt/Bigdata/client</strong> directory.</li><li id="mrs_01_24180__en-us_topic_0000001219149723_li147108173911">Flink 1.12.2 or later and Hudi 0.9.0 or later are required.</li><li id="mrs_01_24180__en-us_topic_0000001219149723_li516410590478">You have created a user with <strong id="mrs_01_24180__en-us_topic_0000001219149723_b1747910384111">FlinkServer Admin Privilege</strong>, for example, <strong id="mrs_01_24180__en-us_topic_0000001219149723_b1148511381712">flink_admin</strong>, to access the Flink web UI. For details, see <a href="mrs_01_24049.html">Authentication Based on Users and Roles</a>.</li></ul>
</div>
<div class="section" id="mrs_01_24180__en-us_topic_0000001219149723_section4663231114614"><h4 class="sectiontitle">Flink Support for Read and Write Operations on Hudi Tables</h4><p id="mrs_01_24180__en-us_topic_0000001219149723_p1466473124613"><a href="#mrs_01_24180__en-us_topic_0000001219149723_table1766417313461">Table 1</a> lists the read and write operations supported by Flink on Hudi COW and MOR tables.</p>
<div class="tablenoborder"><a name="mrs_01_24180__en-us_topic_0000001219149723_table1766417313461"></a><a name="en-us_topic_0000001219149723_table1766417313461"></a><table cellpadding="4" cellspacing="0" summary="" id="mrs_01_24180__en-us_topic_0000001219149723_table1766417313461" frame="border" border="1" rules="all"><caption><b>Table 1 </b>Flink support for read and write operations on Hudi tables</caption><thead align="left"><tr id="mrs_01_24180__en-us_topic_0000001219149723_row7664731184610"><th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.3.3.2.4.1.1"><p id="mrs_01_24180__en-us_topic_0000001219149723_p866443114466">Flink SQL</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.3.3.2.4.1.2"><p id="mrs_01_24180__en-us_topic_0000001219149723_p1766453117466">COW table</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.3.3.2.4.1.3"><p id="mrs_01_24180__en-us_topic_0000001219149723_p18664163120466">MOR table</p>
</th>
</tr>
</thead>
<tbody><tr id="mrs_01_24180__en-us_topic_0000001219149723_row1666423119464"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.1 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p15664133184612">Batch write</p>
</td>
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.2 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p0664831154619">Supported</p>
</td>
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.3 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p166583110466">Supported</p>
</td>
</tr>
<tr id="mrs_01_24180__en-us_topic_0000001219149723_row156651331204614"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.1 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p16665103120466">Batch read</p>
</td>
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.2 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p085646112218">Supported</p>
</td>
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.3 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p8857461223">Supported</p>
</td>
</tr>
<tr id="mrs_01_24180__en-us_topic_0000001219149723_row26654316469"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.1 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p1866517318467">Stream write</p>
</td>
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.2 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p106945484222">Supported</p>
</td>
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.3 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p46941486229">Supported</p>
</td>
</tr>
<tr id="mrs_01_24180__en-us_topic_0000001219149723_row166651331154611"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.1 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p96652313466">Stream read</p>
</td>
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.2 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p14834105052211">Supported</p>
</td>
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.3.3.2.4.1.3 "><p id="mrs_01_24180__en-us_topic_0000001219149723_p16834650152216">Supported</p>
</td>
</tr>
</tbody>
</table>
</div>
<div class="note" id="mrs_01_24180__en-us_topic_0000001219149723_note346182133617"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="mrs_01_24180__en-us_topic_0000001219149723_p17149142693713">Currently, Flink SQL allows you to read data from Hudi tables only in snapshot mode and read optimized mode.</p>
</div></div>
</div>
<div class="section" id="mrs_01_24180__en-us_topic_0000001219149723_section34521934173619"><h4 class="sectiontitle">Procedure</h4><ol id="mrs_01_24180__en-us_topic_0000001219149723_ol11471534103911"><li id="mrs_01_24180__en-us_topic_0000001219149723_li12826131220399"><span>Log in to Manager as user <strong id="mrs_01_24180__en-us_topic_0000001219149723_b14473225105812">flink_admin</strong> and choose <strong id="mrs_01_24180__en-us_topic_0000001219149723_b14799254588">Cluster</strong> &gt; <strong id="mrs_01_24180__en-us_topic_0000001219149723_b3479162585813">Services</strong> &gt; <strong id="mrs_01_24180__en-us_topic_0000001219149723_b04809257581">Flink</strong>. In the <strong id="mrs_01_24180__en-us_topic_0000001219149723_b154817252580">Basic Information</strong> area, click the link on the right of <strong id="mrs_01_24180__en-us_topic_0000001219149723_b1348152585810">Flink WebUI</strong> to access the Flink web UI.</span></li><li id="mrs_01_24180__en-us_topic_0000001219149723_li2023444020"><span>Create a Flink SQL job by referring to <a href="mrs_01_24024.html#mrs_01_24024__en-us_topic_0000001173470782_section1746418521537">Creating a Job</a>. On the job development page, configure the job parameters as follows and start the job.</span><p><p id="mrs_01_24180__en-us_topic_0000001219149723_p159544617569">Select <strong id="mrs_01_24180__en-us_topic_0000001219149723_b1938820118595">Enable CheckPoint</strong> in <strong id="mrs_01_24180__en-us_topic_0000001219149723_b1839416111594">Running Parameter</strong> and set <strong id="mrs_01_24180__en-us_topic_0000001219149723_b18394617599">Time Interval (ms)</strong> to <strong id="mrs_01_24180__en-us_topic_0000001219149723_b239551165912">60000</strong>.</p>
<div class="note" id="mrs_01_24180__en-us_topic_0000001219149723_note1583913511576"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><ul id="mrs_01_24180__en-us_topic_0000001219149723_ul17604226162617"><li id="mrs_01_24180__en-us_topic_0000001219149723_li116041426132612">CheckPoint should be enabled on the Flink web UI because data is written to a Hudi table only when a Flink SQL job triggers CheckPoint. Adjust the CheckPoint interval based on service requirements. You are advised to set the interval to a large number.</li><li id="mrs_01_24180__en-us_topic_0000001219149723_li17764403210">If the CheckPoint interval is too short, job exceptions may occur due to untimely data updates. It is recommended that the CheckPoint interval be configured at the minute level.</li><li id="mrs_01_24180__en-us_topic_0000001219149723_li2060482612261">Asynchronous compaction is required when a Flink SQL job writes an MOR table. For details about the parameter for controlling the compaction interval, visit Hudi official website <a href="https://hudi.apache.org/docs/configurations.html" target="_blank" rel="noopener noreferrer">https://hudi.apache.org/docs/configurations.html</a>.</li></ul>
</div></div>
<ul id="mrs_01_24180__en-us_topic_0000001219149723_ul124841549102518"><li id="mrs_01_24180__en-us_topic_0000001219149723_li164841449132517">The following shows a Flink SQL job writing data to an MOR table in stream mode. Only the Kafka JSON format is supported.<pre class="screen" id="mrs_01_24180__en-us_topic_0000001219149723_screen1744713024915">CREATE TABLE stream_mor(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts INT,
`p` VARCHAR(20)
) PARTITIONED BY (`p`) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hacluster/tmp/hudi/stream_mor',
'table.type' = 'MERGE_ON_READ'
);
CREATE TABLE kafka(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts INT,
`p` VARCHAR(20)
) WITH (
'connector' = 'kafka',
'topic' = 'writehudi',
'properties.bootstrap.servers' = '<em id="mrs_01_24180__en-us_topic_0000001219149723_i18982928135915">IP address of the Kafka broker instance</em>:<em id="mrs_01_24180__en-us_topic_0000001219149723_i1398882813594">Kafka port number</em>',
'properties.group.id' = 'testGroup1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
insert into
stream_mor
select
*
from
kafka;</pre>
</li><li id="mrs_01_24180__en-us_topic_0000001219149723_li1348424913254">The following shows a Flink SQL job writing data to a COW table in stream mode:<pre class="screen" id="mrs_01_24180__en-us_topic_0000001219149723_screen20902424125617">CREATE TABLE stream_write_cow(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts INT,
`p` VARCHAR(20)
) PARTITIONED BY (`p`) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hacluster/tmp/hudi/stream_cow'
);
CREATE TABLE kafka(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts INT,
`p` VARCHAR(20)
) WITH (
'connector' = 'kafka',
'topic' = 'writehudi',
'properties.bootstrap.servers' = '<em id="mrs_01_24180__en-us_topic_0000001219149723_i36724015018">IP address of the Kafka broker instance</em>:<em id="mrs_01_24180__en-us_topic_0000001219149723_i136781019014">Kafka port number</em>',
'properties.group.id' = 'testGroup1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
insert into
stream_write_cow
select
*
from
kafka;</pre>
</li><li id="mrs_01_24180__en-us_topic_0000001219149723_li1243812146449">The following shows a Flink SQL job reading an MOR table.<pre class="screen" id="mrs_01_24180__en-us_topic_0000001219149723_screen184172588315">CREATE TABLE hudi_read_spark_mor(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts INT,
`p` VARCHAR(20)
) PARTITIONED BY (`p`) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hacluster/tmp/default/tb_hudimor',
'table.type' = 'MERGE_ON_READ'
);
CREATE TABLE kafka(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts timestamp(6)INT,
`p` VARCHAR(20)
) WITH (
'connector' = 'kafka',
'topic' = 'writehudi',
'properties.bootstrap.servers' = '<em id="mrs_01_24180__en-us_topic_0000001219149723_i67011110201">IP address of the Kafka broker instance</em>:<em id="mrs_01_24180__en-us_topic_0000001219149723_i1570718101106">Kafka port number</em>',
'properties.group.id' = 'testGroup1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
insert into
hudi_read_spark_mor
select
*
from
kafka;</pre>
</li></ul>
<div class="note" id="mrs_01_24180__en-us_topic_0000001219149723_note167015421341"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_p3718569187">Kafka port number</p>
<ul id="mrs_01_24180__en-us_topic_0000001252705552_ul330585217498"><li id="mrs_01_24180__en-us_topic_0000001252705552_li15431748164911">In security mode, the port number is the value of <strong id="mrs_01_24180__en-us_topic_0000001252705552_b673611250427">sasl.port</strong> (<strong id="mrs_01_24180__en-us_topic_0000001252705552_b17736325174216">21007</strong> by default).</li><li id="mrs_01_24180__en-us_topic_0000001252705552_li175451448174913">In non-security mode, the port is the value of <strong id="mrs_01_24180__en-us_topic_0000001252705552_b13650127124215">port</strong> (<strong id="mrs_01_24180__en-us_topic_0000001252705552_b196511527174212">9092</strong> by default). If the port number is set to <strong id="mrs_01_24180__en-us_topic_0000001252705552_b9801172914211">9092</strong>, set <strong id="mrs_01_24180__en-us_topic_0000001252705552_b188021729164219">allow.everyone.if.no.acl.found</strong> to <strong id="mrs_01_24180__en-us_topic_0000001252705552_b880212910426">true</strong>. The procedure is as follows:<p id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_p8835244171713">Log in to FusionInsight Manager and choose <strong id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_b1882175163611">Cluster</strong> &gt; <strong id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_b88835515361">Services</strong> &gt; <strong id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_b288415510368">Kafka</strong>. On the displayed page, click <strong id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_b18842583618">Configurations</strong> and then <strong id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_b18885145113618">All Configurations</strong>, search for <strong id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_b38856533618">allow.everyone.if.no.acl.found</strong>, set its value to <strong id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_b168865593615">true</strong>, and click <strong id="mrs_01_24180__en-us_topic_0000001252705552_en-us_topic_0000001219029313_b78871057363">Save</strong>.</p>
</li></ul>
</div></div>
</p></li><li id="mrs_01_24180__en-us_topic_0000001219149723_li64391512114616"><span>After data is written to the Hudi table by a Flink SQL job and is read by Spark and Hive, use <strong id="mrs_01_24180__en-us_topic_0000001219149723_b2771104853913">run_hive_sync_tool.sh</strong> to synchronize the data in the Hudi table to Hive. For details about the synchronization method, see <a href="mrs_01_24064.html">Synchronizing Hudi Table Data to Hive</a>.</span><p><div class="notice" id="mrs_01_24180__en-us_topic_0000001219149723_note191725112474"><span class="noticetitle"><img src="public_sys-resources/notice_3.0-en-us.png"> </span><div class="noticebody"><p id="mrs_01_24180__en-us_topic_0000001219149723_p217175164712">Ensure that no partition is added before the synchronization. After the synchronization, new partitions cannot be read.</p>
</div></div>
</p></li></ol>
</div>
</div>
<div>
<div class="familylinks">
<div class="parentlink"><strong>Parent topic:</strong> <a href="mrs_01_24226.html">Interconnecting FlinkServer with External Components</a></div>
</div>
</div>