Create a source stream to obtain data from Kafka as input data for jobs.
Apache Kafka is a fast, scalable, and fault-tolerant distributed message publishing and subscription system. It delivers high throughput and built-in partitions and provides data replicas and fault tolerance. Apache Kafka is applicable to scenarios of handling massive messages.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '' ); |
Parameter |
Mandatory |
Default Value |
Data Type |
Description |
---|---|---|---|---|
connector |
Yes |
None |
String |
Connector to be used. Set this parameter to kafka. |
topic |
Yes |
None |
String |
Topic name of the Kafka record. Note:
|
topic-pattern |
No |
None |
String |
Regular expression for a pattern of topic names to read from. Only one of topic and topic-pattern can be specified. For example: 'topic.*' '(topic-c|topic-d)' '(topic-a|topic-b|topic-\\d*)' '(topic-a|topic-b|topic-[0-9]*)' |
properties.bootstrap.servers |
Yes |
None |
String |
Comma separated list of Kafka brokers. |
properties.group.id |
Yes |
None |
String |
ID of the consumer group for the Kafka source. |
properties.* |
No |
None |
String |
This parameter can set and pass arbitrary Kafka configurations. Note:
|
format |
Yes |
None |
String |
Format used to deserialize and serialize the value part of Kafka messages. Note: Either this parameter or the value.format parameter is required. Refer to Format for more details and format parameters. |
key.format |
No |
None |
String |
Format used to deserialize and serialize the key part of Kafka messages. Note:
|
key.fields |
No |
[] |
List<String> |
Defines the columns in the table as the list of keys. This parameter must be configured in pair with key.format. This parameter is left empty by default. Therefore, no key is defined. The format is like field1;field2. |
key.fields-prefix |
No |
None |
String |
Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. |
value.format |
Yes |
None |
String |
Format used to deserialize and serialize the value part of Kafka messages. Note:
|
value.fields-include |
No |
ALL |
Enum Possible values: [ALL, EXCEPT_KEY] |
Whether to contain the key field when parsing the message body. Possible values are:
|
scan.startup.mode |
No |
group-offsets |
String |
Start position for Kafka to read data. Possible values are:
|
scan.startup.specific-offsets |
No |
None |
String |
This parameter takes effect only when scan.startup.mode is set to specific-offsets. It specifies the offsets for each partition, for example, partition:0,offset:42;partition:1,offset:300. |
scan.startup.timestamp-millis |
No |
None |
Long |
Startup timestamp. This parameter takes effect when scan.startup.mode is set to timestamp. |
scan.topic-partition-discovery.interval |
No |
None |
Duration |
Interval for a consumer to periodically discover dynamically created Kafka topics and partitions. |
You can define metadata columns in the source table to obtain the metadata of Kafka messages. For example, if multiple topics are defined in the WITH parameter and the metadata column is defined in the Kafka source table, the data read by Flink is labeled with the topic from which the data is read.
Key |
Data Type |
R/W |
Description |
---|---|---|---|
topic |
STRING NOT NULL |
R |
Topic name of the Kafka record. |
partition |
INT NOT NULL |
R |
Partition ID of the Kafka record. |
headers |
MAP<STRING, BYTES> NOT NULL |
R/W |
Headers of Kafka messages. |
leader-epoch |
INT NULL |
R |
Leader epoch of the Kafka record. |
offset |
BIGINT NOT NULL |
R |
Offset of the Kafka record. |
timestamp |
TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL |
R/W |
Timestamp of the Kafka record. |
timestamp-type |
STRING NOT NULL |
R |
Timestamp type of the Kafka record. The options are as follows:
|
CREATE TABLE orders ( `topic` String metadata, `partition` int metadata, `headers` MAP<STRING, BYTES> metadata, `leaderEpoch` INT metadata from 'leader-epoch', `offset` bigint metadata, `timestamp` TIMESTAMP(3) metadata, `timestampType` string metadata from 'timestamp-type', `message` string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "csv", "csv.field-delimiter" = "\u0001", "csv.quote-character" = "''" ); CREATE TABLE printSink ( `topic` String, `partition` int, `headers` MAP<STRING, BYTES>, `leaderEpoch` INT, `offset` bigint, `timestamp` TIMESTAMP(3), `timestampType` string, `message` string -- Indicates that data written by users is read from Kafka. ) WITH ( 'connector' = 'print' ); insert into printSink select * from orders;
If you need to read the value of each field instead of the entire message, use the following statements:
CREATE TABLE orders ( `topic` String metadata, `partition` int metadata, `headers` MAP<STRING, BYTES> metadata, `leaderEpoch` INT metadata from 'leader-epoch', `offset` bigint metadata, `timestamp` TIMESTAMP(3) metadata, `timestampType` string metadata from 'timestamp-type', order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE printSink ( `topic` String, `partition` int, `headers` MAP<STRING, BYTES>, `leaderEpoch` INT, `offset` bigint, `timestamp` TIMESTAMP(3), `timestampType` string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from orders;
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
The data result is as follows:
+I(fz-source-json,0,{},0,243,2021-12-27T09:23:32.253,CreateTime,{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}) +I(fz-source-json,0,{},0,244,2021-12-27T09:23:39.655,CreateTime,{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}) +I(fz-source-json,0,{},0,245,2021-12-27T09:23:48.405,CreateTime,{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"})
CREATE TABLE orders ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE printSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from orders;
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
The data result is as follows:
+I(202103241000000001,webShop,2021-03-24T10:00,100.0,100.0,2021-03-2410:02:03,0001,Alice,330106) +I(202103241606060001,appShop,2021-03-24T16:06:06,200.0,180.0,2021-03-2416:10:06,0001,Alice,330106) +I(202103251202020001,miniAppShop,2021-03-25T12:02:02,60.0,60.0,2021-03-2512:03:00,0002,Bob,330110)
Create a Kafka cluster for DMS, enable SASL_SSL, download the SSL certificate, and upload the downloaded certificate client.jks to an OBS bucket.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/xx.jks', -- Location where the user uploads the certificate to 'properties.sasl.mechanism' = 'PLAIN', -- Value format: SASL_PLAINTEXT 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";', -- Account and password set when the Kafka cluster is created "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/xx.jks', 'properties.sasl.mechanism' = 'PLAIN', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";', "format" = "json" ); insert into ordersSink select * from ordersSource;
Obtain the truststore.jks file using the authentication credential and store the credential and truststore.jks file in OBS.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21009,xx:21009', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', --Username 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_SSL', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- Password set for generating truststore.jks 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21009,xx:21009', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); insert into ordersSink select * from ordersSource;
CREATE TABLE ordersSources ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21007,xx:21007', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21007,xx:21007', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); insert into ordersSink select * from ordersSource;
Obtain the truststore.jks file using the authentication credential and store the credential and truststore.jks file in OBS.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- Password set for generating truststore.jks 'properties.security.protocol' = 'SSL', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
A: The datasource connection is not bound, the binding fails, or the security group of the Kafka cluster is not configured to allow access from the network segment of the DLI queue. Configure the datasource connection or configure the security group of the Kafka cluster to allow access from the DLI queue.
Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword.
A: The metadata type is defined in the sink table, but the Print connector does not support deletion of matadata from the sink table.