forked from docs/doc-exports
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com> Co-authored-by: Hasko, Vladimir <vladimir.hasko@t-systems.com> Co-committed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com>
28 KiB
28 KiB
Scala Example Code
Development Description
Mongo can be connected only through enhanced datasource connections.
An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages.

Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
- Constructing dependency information and creating a Spark session
- Import dependencies.Maven dependency involved
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
Import dependency packages.import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
Create a session.val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
- Import dependencies.
- Connecting to data sources through SQL APIs
- Create a table to connect to a Mongo data source.
sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')")
- Insert data.
sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
- Query data.
sparkSession.sql("select * from test_dds").show()
- Create a table to connect to a Mongo data source.
- Connecting to data sources through DataFrame APIs
- Set connection parameters.
val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val uri = "mongodb://username:pwd@host:8635/db" val user = "rwuser" val database = "test" val collection = "test" val password = "######"
- Construct a schema.
1
val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
- Construct a DataFrame.
val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema)
- Import data to Mongo.
1 2 3 4 5 6 7 8 9
dataFrame.write.format("mongo") .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save()
- Read data from Mongo.
1 2 3 4 5 6 7 8
val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load()
Operation result
- Set connection parameters.
- Submitting a Spark job
- Generate a JAR package based on the code and upload the package to DLI.
- In the Spark job editor, select the corresponding dependency module and execute the Spark job.
- If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.mongo when you submit a job.
- If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
Complete Example Code
- Maven dependency
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- Connecting to data sources through SQL APIs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import org.apache.spark.sql.SparkSession object TestMongoSql { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().getOrCreate() sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')") sparkSession.sql("insert into test_dds values('3', 'Ann',23)") sparkSession.sql("select * from test_dds").show() sparkSession.close() } }
- Connecting to data sources through DataFrame APIs
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} object Test_Mongo_SparkSql { def main(args: Array[String]): Unit = { // Create a SparkSession session. val spark = SparkSession.builder().appName("mongodbTest").getOrCreate() // Set the connection configuration parameters. val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val uri = "mongodb://username:pwd@host:8635/db" val user = "rwuser" val database = "test" val collection = "test" val password = "######" // Setting up the schema val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType))) // Setting up the DataFrame val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema) // Write data to mongo dataFrame.write.format("mongo") .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save() // Reading data from mongo val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load() jdbcDF.show() spark.close() } }
Parent topic: Connecting to Mongo