Currently, FlinkServer interconnects with Hive MetaStore. Therefore, the MetaStore function must be enabled for Hive. Hive can be used as source, sink, and dimension tables.
If your Kafka cluster is in security mode, the following example SQL statements can be used.
The following uses the process of interconnecting a Kafka mapping table to Hive as an example.
Parameter |
Description |
Example Value |
---|---|---|
Cluster Connection Name |
Name of the cluster connection, which can contain a maximum of 100 characters. Only letters, digits, and underscores (_) are allowed. |
flink_hive |
Description |
Description of the cluster connection name. |
- |
Version |
Select a cluster version. |
MRS 3 |
Secure Version |
|
Yes |
Username |
The user must have the minimum permissions for accessing services in the cluster. The name can contain a maximum of 100 characters. Only letters, digits, and underscores (_) are allowed. This parameter is available only when Secure Version is set to Yes. |
flink_admin |
Client Profile |
Client profile of the cluster, in TAR format. |
- |
User Credential |
User authentication credential in FusionInsight Manager in TAR format. This parameter is available only when Secure Version is set to Yes. Files can be uploaded only after the username is entered. |
User credential of flink_admin |
Parameter |
Description |
Example Value |
---|---|---|
Type |
Job type, which can be Flink SQL or Flink Jar. |
Flink SQL |
Name |
Job name, which can contain a maximum of 64 characters. Only letters, digits, and underscores (_) are allowed. |
flinktest1 |
Task Type |
Type of the job data source, which can be a stream job or a batch job. |
Stream job |
Description |
Job description, which can contain a maximum of 100 characters. |
- |
CREATE TABLE test_kafka ( user_id varchar, item_id varchar, cat_id varchar, zw_test timestamp ) WITH ( 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'format' = 'json', 'topic' = 'zw_tset_kafka', 'connector' = 'kafka', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.System domain name' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-version' = '3.1.0', 'default-database' = 'default', 'cluster.name' = 'flink_hive' ); use catalog myhive; set table.sql-dialect = hive;create table user_behavior_hive_tbl_no_partition ( user_id STRING, item_id STRING, cat_id STRING, ts timestamp ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as textfile TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00', 'sink.partition-commit.trigger' = 'process-time', 'sink.partition-commit.delay' = '0S', 'sink.partition-commit.policy.kind' = 'metastore,success-file' ); INSERT into user_behavior_hive_tbl_no_partition SELECT user_id, item_id, cat_id, zw_test, DATE_FORMAT(zw_test, 'yyyy-MM-dd'), DATE_FORMAT(zw_test, 'HH'), DATE_FORMAT(zw_test, 'mm') FROM default_catalog.default_database.test_kafka;
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations, search for allow.everyone.if.no.acl.found, set its value to true, and click Save.
./kafka-topics.sh --list --zookeeper IP address of the ZooKeeper quorumpeer instance:ZooKeeper port number/kafka
sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties
For example, if the topic name is zw_tset_kafka, the script is sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic zw_tset_kafka --producer.config /opt/Bigdata/client/Kafka/kafka/config/producer.properties.
{"user_id": "3","item_id":"333333","cat_id":"cat333","zw_test":"2021-09-08 09:08:01"} {"user_id": "4","item_id":"444444","cat_id":"cat444","zw_test":"2021-09-08 09:08:01"}
Press Enter to send the message.
To obtain IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instance and view the IP addresses of all the hosts where the quorumpeer instances locate.
Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort. The default value is 24002.
beeline
select * from user_behavior_hive_tbl_no_partition;