DLI outputs the Flink job output data to GaussDB(DWS). GaussDB(DWS) database kernel is compliant with PostgreSQL. The PostgreSQL database can store data of more complex types and deliver space information services, multi-version concurrent control (MVCC), and high concurrency. It applies to location applications, financial insurance, and e-Commerce.
GaussDB(DWS) is an online data processing database based on the cloud infrastructure and platform and helps you mine and analyze massive sets of data.
1 2 3 4 5 6 7 8 9 10 11 12 13 | create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDatabase', 'table-name' = 'car_info', 'username' = 'DwsUserName', 'password' = 'DwsPasswrod', 'write.mode' = 'upsert' ); |
1 2 3 4 5 6 7 8 9 10 11 12 13 | create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' ); |
Parameter |
Mandatory |
Default Value |
Data Type |
Description |
---|---|---|---|---|
connector |
Yes |
None |
String |
Connector to be used. Set this parameter to gaussdb. |
url |
Yes |
None |
String |
JDBC connection address. If you use the gsjdbc4 driver, set the value in jdbc:postgresql://${ip}:${port}/${dbName} format. If you use the gsjdbc200 driver, set the value in jdbc:gaussdb://${ip}:${port}/${dbName} format. |
table-name |
Yes |
None |
String |
Name of the table to be operated. If the GaussDB(DWS) table is in a schema, the format is schema\".\"Table name. For details, see FAQ. |
driver |
No |
org.postgresql.Driver |
String |
JDBC connection driver. The default value is org.postgresql.Driver. |
username |
No |
None |
String |
Username for GaussDB(DWS) database authentication. This parameter must be configured in pair with password. |
password |
No |
None |
String |
Password for GaussDB(DWS) database authentication. This parameter must be configured in pair with username. |
write.mode |
No |
None |
String |
Data write mode. The value can be copy, insert, or upsert. The default value is upsert. This parameter must be configured depending on primary key.
Note: GaussDB(DWS) does not support the update of distribution columns. The primary keys of columns to be updated must cover all distribution columns defined in the GaussDB(DWS) table. |
sink.buffer-flush.max-rows |
No |
100 |
Integer |
Maximum rows allowed for data flush. If the data size exceeds the value, data flush is triggered. The default value is 100. If this parameter is set to 0, this configuration is disabled, and data is flushed in real time. |
sink.buffer-flush.interval |
No |
1s |
Duration |
Data flush period. Data flush is triggered periodically. The format is {length value}{time unit label}, for example, 123ms, 321s. The supported time units include d, h, min, s, and ms (default unit). |
sink.max-retries |
No |
3 |
Integer |
Maximum number of write retries. |
write.escape-string-value |
No |
false |
Boolean |
Whether to escape values of the string type. This parameter is used only when write.mode is set to copy. |
In this example, data is read from the Kafka data source and written to the GaussDB(DWS) result table in insert mode. The procedure is as follows:
create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR);
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DWSAddress:DWSPort/DWSdbName', 'table-name' = 'dws_order', 'driver' = 'org.postgresql.Driver', 'username' = 'DWSUserName', 'password' = 'DWSPassword', 'write.mode' = 'insert' ); insert into dwsSink select * from kafkaSource;
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
select * from dws_order
202103241000000001 webShop 2021-03-24 10:00:00 100.0 100.0 2021-03-24 10:02:03 0001 Alice 330106
java.io.IOException: unable to open JDBC writer ... Caused by: org.postgresql.util.PSQLException: The connection attempt failed. ... Caused by: java.net.SocketTimeoutException: connect timed out
CREATE TABLE ads_rpt_game_sdk_realtime_ada_reg_user_pay_mm ( ddate DATE, dmin TIMESTAMP(3), game_appkey VARCHAR, channel_id VARCHAR, pay_user_num_1m bigint, pay_amt_1m bigint, PRIMARY KEY (ddate, dmin, game_appkey, channel_id) NOT ENFORCED ) WITH ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://<yourDwsAddress>:<yourDwsPort>/dws_bigdata_db', 'table-name' = 'ads_game_sdk_base\".\"test', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'write.mode' = 'upsert' );