forked from docs/doc-exports
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com> Co-authored-by: Yang, Tong <yangtong2@huawei.com> Co-committed-by: Yang, Tong <yangtong2@huawei.com>
8.6 KiB
8.6 KiB
Getting Started
Scenario
This section describes capabilities of Hudi using spark-shell. Using the Spark data source, this section describes how to insert and update a Hudi dataset of the default storage mode Copy-on Write (COW) tables based on code snippets. After each write operation, you will be introduced how to read snapshot and incremental data.
Prerequisites
- You have created a user and added the user to user groups hadoop (primary group) and hive on Manager.
Procedure
- Download and install the Hudi client. For details, see Installing a Client (Version 3.x or Later).
- Log in to the node where the client is installed as user root and run the following command:
cd /opt/client
- Run the following commands to load environment variables:
source bigdata_env
source Hudi/component_env
kinit Created user
- Use spark-shell --master yarn-client to import Hudi packages to generate test data:
// Import required packages. import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ // Define the table name and storage path to generate test data. val tableName = "hudi_cow_table" val basePath = "hdfs://hacluster/tmp/hudi_cow_table" val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
- Write data to the Hudi table in overwrite mode.
df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
- Query the Hudi table.
Register a temporary table and query the table.
val roViewDF = spark. read. format("org.apache.hudi"). load(basePath + "/*/*/*/*") roViewDF.createOrReplaceTempView("hudi_ro_table") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()
- Generate new data and update the Hudi table in append mode.
val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)) df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
- Query incremental data in the Hudi table.
- Reload data.
spark. read. format("org.apache.hudi"). load(basePath + "/*/*/*/*"). createOrReplaceTempView("hudi_ro_table")
- Perform the incremental query.
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) val incViewDF = spark. read. format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath); incViewDF.registerTempTable("hudi_incr_table") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
- Reload data.
- Perform the point-in-time query.
val beginTime = "000" val endTime = commits(commits.length - 2) val incViewDF = spark.read.format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath); incViewDF.registerTempTable("hudi_incr_table") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
- Delete data.
- Prepare the data to be deleted.
val df = spark.sql("select uuid, partitionpath from hudi_ro_table limit 2") val deletes = dataGen.generateDeletes(df.collectAsList())
- Execute the deletion.
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2)); df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath);
- Query data again.
val roViewDFAfterDelete = spark. read. format("org.apache.hudi"). load(basePath + "/*/*/*/*") roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table") spark.sql("select uuid, partitionPath from hudi_ro_table").show()
- Prepare the data to be deleted.
Parent topic: Using Hudi