forked from docs/mapreduce-service
Reviewed-by: Kacur, Michal <michal.kacur@t-systems.com> Co-authored-by: proposalbot <proposalbot@otc-service.com> Co-committed-by: proposalbot <proposalbot@otc-service.com>
182 lines
6.0 KiB
ReStructuredText
182 lines
6.0 KiB
ReStructuredText
:original_name: mrs_01_24033.html
|
|
|
|
.. _mrs_01_24033:
|
|
|
|
Getting Started
|
|
===============
|
|
|
|
Scenario
|
|
--------
|
|
|
|
This section describes capabilities of Hudi using spark-shell. Using the Spark data source, this section describes how to insert and update a Hudi dataset of the default storage mode Copy-on Write (COW) tables based on code snippets. After each write operation, you will be introduced how to read snapshot and incremental data.
|
|
|
|
Prerequisites
|
|
-------------
|
|
|
|
- You have created a user and added the user to user groups **hadoop** (primary group) and **hive** on Manager.
|
|
|
|
Procedure
|
|
---------
|
|
|
|
#. Download and install the Hudi client. For details, see :ref:`Installing a Client (Version 3.x or Later) <mrs_01_2127>`.
|
|
|
|
.. note::
|
|
|
|
Currently, Hudi is integrated in Spark2x. You only need to download the Spark2x client on Manager. For example, the client installation directory is **/opt/client**.
|
|
|
|
#. .. _mrs_01_24033__li6424125918379:
|
|
|
|
Log in to the node where the client is installed as user **root** and run the following command:
|
|
|
|
**cd /opt/client**
|
|
|
|
#. Run the following commands to load environment variables:
|
|
|
|
**source bigdata_env**
|
|
|
|
**source Hudi/component_env**
|
|
|
|
**kinit** *Created user*
|
|
|
|
.. note::
|
|
|
|
- You need to change the password of the created user, and then run the **kinit** command to log in to the system again.
|
|
- In normal mode (Kerberos authentication disabled), you do not need to run the **kinit** command.
|
|
|
|
#. .. _mrs_01_24033__li654313073616:
|
|
|
|
Use **spark-shell --master yarn-client** to import Hudi packages to generate test data:
|
|
|
|
.. code-block::
|
|
|
|
// Import required packages.
|
|
import org.apache.hudi.QuickstartUtils._
|
|
import scala.collection.JavaConversions._
|
|
import org.apache.spark.sql.SaveMode._
|
|
import org.apache.hudi.DataSourceReadOptions._
|
|
import org.apache.hudi.DataSourceWriteOptions._
|
|
import org.apache.hudi.config.HoodieWriteConfig._
|
|
// Define the table name and storage path to generate test data.
|
|
val tableName = "hudi_cow_table"
|
|
val basePath = "hdfs://hacluster/tmp/hudi_cow_table"
|
|
val dataGen = new DataGenerator
|
|
val inserts = convertToStringList(dataGen.generateInserts(10))
|
|
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
|
|
|
|
#. Write data to the Hudi table in overwrite mode.
|
|
|
|
.. code-block::
|
|
|
|
df.write.format("org.apache.hudi").
|
|
options(getQuickstartWriteConfigs).
|
|
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
|
|
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
|
|
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
|
|
option(TABLE_NAME, tableName).
|
|
mode(Overwrite).
|
|
save(basePath)
|
|
|
|
#. Query the Hudi table.
|
|
|
|
Register a temporary table and query the table.
|
|
|
|
.. code-block::
|
|
|
|
val roViewDF = spark.
|
|
read.
|
|
format("org.apache.hudi").
|
|
load(basePath + "/*/*/*/*")
|
|
roViewDF.createOrReplaceTempView("hudi_ro_table")
|
|
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()
|
|
|
|
#. Generate new data and update the Hudi table in append mode.
|
|
|
|
.. code-block::
|
|
|
|
val updates = convertToStringList(dataGen.generateUpdates(10))
|
|
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))
|
|
df.write.format("org.apache.hudi").
|
|
options(getQuickstartWriteConfigs).
|
|
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
|
|
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
|
|
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
|
|
option(TABLE_NAME, tableName).
|
|
mode(Append).
|
|
save(basePath)
|
|
|
|
#. Query incremental data in the Hudi table.
|
|
|
|
- Reload data.
|
|
|
|
.. code-block::
|
|
|
|
spark.
|
|
read.
|
|
format("org.apache.hudi").
|
|
load(basePath + "/*/*/*/*").
|
|
createOrReplaceTempView("hudi_ro_table")
|
|
|
|
- Perform the incremental query.
|
|
|
|
.. code-block::
|
|
|
|
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
|
|
val beginTime = commits(commits.length - 2)
|
|
val incViewDF = spark.
|
|
read.
|
|
format("org.apache.hudi").
|
|
option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
|
|
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
|
|
load(basePath);
|
|
incViewDF.registerTempTable("hudi_incr_table")
|
|
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
|
|
|
|
#. Perform the point-in-time query.
|
|
|
|
.. code-block::
|
|
|
|
val beginTime = "000"
|
|
val endTime = commits(commits.length - 2)
|
|
val incViewDF = spark.read.format("org.apache.hudi").
|
|
option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL).
|
|
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
|
|
option(END_INSTANTTIME_OPT_KEY, endTime).
|
|
load(basePath);
|
|
incViewDF.registerTempTable("hudi_incr_table")
|
|
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
|
|
|
|
#. Delete data.
|
|
|
|
- Prepare the data to be deleted.
|
|
|
|
.. code-block::
|
|
|
|
val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2")
|
|
val deletes = dataGen.generateDeletes(df.collectAsList())
|
|
|
|
- Execute the deletion.
|
|
|
|
.. code-block::
|
|
|
|
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
|
|
df.write.format("org.apache.hudi").
|
|
options(getQuickstartWriteConfigs).
|
|
option(OPERATION_OPT_KEY,"delete").
|
|
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
|
|
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
|
|
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
|
|
option(TABLE_NAME, tableName).
|
|
mode(Append).
|
|
save(basePath);
|
|
|
|
- Query data again.
|
|
|
|
.. code-block::
|
|
|
|
val roViewDFAfterDelete = spark.
|
|
read.
|
|
format("org.apache.hudi").
|
|
load(basePath + "/*/*/*/*")
|
|
roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")
|
|
spark.sql("select uuid, partitionPath from hudi_ro_table").show()
|