This section describes how to create a Kafka table to automatically synchronize Kafka data to the ClickHouse cluster.
Currently, ClickHouse cannot interconnect with Kafka clusters with security mode enabled.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host1:port1,host2:port2', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'; [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N]
Parameter |
Mandatory |
Description |
---|---|---|
kafka_broker_list |
Yes |
A list of Kafka broker instances, separated by comma (,). For example, IP address 1 of the Kafka broker instance:9092,IP address 2 of the Kafka broker instance:9092,IP address 3 of the Kafka broker instance:9092. To obtain the IP address of the Kafka broker instance, perform the following steps:
|
kafka_topic_list |
Yes |
A list of Kafka topics. |
kafka_group_name |
Yes |
A group of Kafka consumers, which can be customized. |
kafka_format |
Yes |
Kafka message format, for example, JSONEachRow, CSV, and XML. |
kafka_row_delimiter |
No |
Delimiter character, which ends a message. |
kafka_schema |
No |
Parameter that must be used if the format requires a schema definition. |
kafka_num_consumers |
No |
Number of consumers in per table. The default value is 1. If the throughput of a consumer is insufficient, more consumers are required. The total number of consumers cannot exceed the number of partitions in a topic because only one consumer can be allocated to each partition. |
kafka-topics.sh --topic kafkacktest2 --create --zookeeper IP address of the Zookeeper role instance:2181/kafka --partitions 2 --replication-factor 1
kinit Component service user
Example: kinit clickhouseuser
clickhouse client --host IP address of the ClickHouse instance --user Login username --password --port ClickHouse port number --database Database name --multiline
Enter the user password.
create table kafka_src_tbl3 on cluster default_cluster (id UInt32, age UInt32, msg String) ENGINE=Kafka() SETTINGS kafka_broker_list='IP address 1 of the Kafka broker instance:9092,IP address 2 of the Kafka broker instance:9092,IP address 3 of the Kafka broker instance:9092', kafka_topic_list='kafkacktest2', kafka_group_name='cg12', kafka_format='JSONEachRow';
create table kafka_dest_tbl3 on cluster default_cluster ( id UInt32, age UInt32, msg String ) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/kafka_dest_tbl3', '{replica}') partition by age order by id;
create materialized view consumer3 on cluster default_cluster to kafka_dest_tbl3 as select * from kafka_src_tbl3;
>{"id":31, "age":30, "msg":"31 years old"} >{"id":32, "age":30, "msg":"31 years old"} >{"id":33, "age":30, "msg":"31 years old"} >{"id":35, "age":30, "msg":"31 years old"}
select * from kafka_dest_tbl3;