Flink interconnects with the ClickHouseBalancer instance of ClickHouse to read and write data, preventing ClickHouse traffic distribution problems.
Flink SQL Data Type |
ClickHouse Data Type |
---|---|
BOOLEAN |
UInt8 |
TINYINT |
Int8 |
SMALLINT |
Int16 |
INTEGER |
Int32 |
BIGINT |
Int64 |
FLOAT |
Float32 |
DOUBLE |
Float64 |
CHAR |
String |
VARCHAR |
String |
VARBINARY |
FixedString |
DATE |
Date |
TIMESTAMP |
DateTime |
DECIMAL |
Decimal |
cd /opt/Bigdata/client
source bigdata_env
kinit Component service user
Example: kinit clickhouseuser
clickhouse client --host IP address of the ClickHouse instance --user Username --password 'Password' --port ClickHouse port number
clickhouse client --host IP address of the ClickHouse instance --user Username --password 'Password'--port ClickHouse port number --secure --multiline
CREATE TABLE default.test1 on cluster default_cluster ( `pid` Int8, `uid` UInt8, `Int_16` Int16, `Int_32` Int32, `Int_64` Int64, `String_x` String, `String_y` String, `float_32` Float32, `float_64` Float64, `Decimal_x` Decimal32(2), `Date_x` Date, `DateTime_x` DateTime ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/test1','{replica}') PARTITION BY pid ORDER BY (pid, DateTime_x);
CREATE TABLE test1_all ON CLUSTER default_cluster ( `pid` Int8, `uid` UInt8, `Int_16` Int16, `Int_32` Int32, `Int_64` Int64, `String_x` String, `String_y` String, `float_32` Float32, `float_64` Float64, `Decimal_x` Decimal32(2), `Date_x` Date, `DateTime_x` DateTime ) ENGINE = Distributed(default_cluster, default, test1, rand());
create table kafkasource( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) with( 'connector' = 'kafka', 'topic' = 'input', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'group1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.System domain name' ); CREATE TABLE cksink ( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://ClickHouseBalancer instance IP address:21422/default?ssl=true&sslmode=none', 'username' = 'ClickHouse user. For details, see the note below.', 'password' = 'ClickHouse user password. For details, see the note below.', 'table-name' = 'test1_all', 'driver' = 'ru.yandex.clickhouse.ClickHouseDriver', 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' ); Insert into cksink select * from kafkasource;
create table kafkasource( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) with( 'connector' = 'kafka', 'topic' = 'kinput', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'kafka_test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE cksink ( `pid` TINYINT, `uid` BOOLEAN, `Int_16` SMALLINT, `Int_32` INTEGER, `Int_64` BIGINT, `String_x` CHAR, `String_y` VARCHAR(10), `float_32` FLOAT, `float_64` DOUBLE, `Decimal_x` DECIMAL(9,2), `Date_x` DATE, `DateTime_x` TIMESTAMP ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:clickhouse://ClickHouseBalancer instance IP address:21425/default', 'table-name' = 'test1_all', 'driver' = 'ru.yandex.clickhouse.ClickHouseDriver', 'sink.buffer-flush.max-rows' = '0', 'sink.buffer-flush.interval' = '60s' ); Insert into cksink select * from kafkasource;
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations, search for allow.everyone.if.no.acl.found, set its value to true, and click Save.
sink.buffer-flush.max-rows: Number of rows written to ClickHouse. The default value is 100.
sink.buffer-flush.interval: Interval for batch write. The default value is 1s.
If either of the two conditions is met, a sink operation is triggered. That is, data will be flushed to the database table.
sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topicTopic name --producer.config ../config/producer.properties
For example, if the topic name is kinput, the script is sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic kinput --producer.config ../config/producer.properties.
{"pid": "3","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.451236414","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}, {"pid": "4","uid":false,"Int_16": "6533","Int_32": "429496294","Int_64": "1844674407370955614","String_x": "abc1","String_y": "abc1defghi","float_32": "0.1234","float_64": "95.1","Decimal_x": "0.4512314","Date_x": "2021-05-29","DateTime_x": "2021-05-21 10:05:10"}
Press Enter to send the message.
clickhouse client --host IP address of the ClickHouse instance --user Username --password 'Password'--port ClickHouse port number --secure --multiline
Run the following command to check whether data is written to a specified ClickHouse table, for example, test1_all.
select * from test1_all;