This section describes the data definition language (DDL) of Kafka as a source or sink table, as well as the WITH parameters and example code for creating a table, and provides guidance on how to perform operations on the FlinkServer job management page.
If your Kafka cluster is in security mode, the following example SQL statements can be used.
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.System domain name' ); CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.System domain name' ); Insert into KafkaSink select * from KafkaSource;
Kafka port number
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations, search for allow.everyone.if.no.acl.found, set its value to true, and click Save.
./kafka-topics.sh --list --zookeeper IP address of the ZooKeeper quorumpeer instance:ZooKeeper port number/kafka
sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties
For example, if the topic name is test_source, the script is sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic test_source --producer.config /opt/Bigdata/client/Kafka/kafka/config/producer.properties.
1,clw,33
Press Enter to send the message.
To obtain IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instance and view the IP addresses of all the hosts where the quorumpeer instances locate.
Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort. The default value is 24002.
sh kafka-console-consumer.sh --topic Topic name --bootstrap-server IP address of the Kafka broker instance:Kafka port number --consumer.config /opt/Bigdata/client/Kafka/kafka/config/consumer.properties
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
connector |
Yes |
String |
Connector to be used. kafka is used for Kafka. |
topic |
|
String |
Topic name.
|
topic-pattern |
No (Kafka functions as a source table.) |
String |
Topic pattern. This parameter is available when Kafka is used as a source table. The topic name must be a regular expression. NOTE:
topic-pattern and topic cannot be set at the same time. |
properties.bootstrap.servers |
Yes |
String |
List of Kafka brokers, which are separated by commas (,). |
properties.group.id |
Yes (Kafka functions as a source table.) |
String |
Kafka user group ID. |
format |
Yes |
String |
Format of the value used for deserializing and serializing Kafka messages. |
properties.* |
No |
String |
Authentication-related parameters that need to be added in security mode. |