FlinkServer can be interconnected with HBase. The details are as follows:
Change the owner of the configuration file directory and its upper-layer directory on the FlinkServer node to omm.
If the node where a FlinkServer instance is located is the node where the HBase client is installed, enter the /opt/Bigdata/client/HBase/hbase/conf/ directory of HBase in Value of the HBASE_CONF_DIR parameter.
Select Enable CheckPoint in Running Parameter and set Time Interval (ms) to 60000.
CREATE TABLE ksource1 ( user_id STRING, item_id STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'ksource1', 'properties.group.id' = 'group1', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance 1:Kafka port number,IP address of the Kafka broker instance 2:Kafka port number', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka' ); CREATE TABLE hsink1 ( rowkey STRING, f1 ROW < item_id STRING >, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'dim_province', 'zookeeper.quorum' = 'IP address of the ZooKeeper quorumpeer instance 1:ZooKeeper port number'IP address of the ZooKeeper quorumpeer instance 2:ZooKeeper port number' ); INSERT INTO hsink1 SELECT user_id as rowkey, ROW(item_id) as f1 FROM ksource1;
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations, search for allow.everyone.if.no.acl.found, set its value to true, and click Save.
To obtain IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instance and view the IP addresses of all the hosts where the quorumpeer instances locate.
Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort. The default value is 24002.
sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic Topic name
For example, if the topic name is ksource1, the script is sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic ksource1.
{"user_id": "3","item_id":"333333"}, {"user_id": "4","item_id":"44444444"}
Press Enter to send the message.
hbase shel
scan 'dim_province'
Parameter |
Description |
---|---|
'properties.hbase.rpc.protection' = 'authentication' |
This parameter must be consistent with that on the HBase server. |
'properties.hbase.security.authorization' = 'true' |
Authentication is enabled. |
'properties.hbase.security.authentication' = 'kerberos' |
Kerberos authentication is enabled. |
CREATE TABLE hsink1 ( rowkey STRING, f1 ROW < q1 STRING >, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'cc', 'zookeeper.quorum' = 'x.x.x.x:24002', 'properties.hbase.rpc.protection' = 'authentication', 'properties.zookeeper.znode.parent' = '/hbase', 'properties.hbase.security.authorization' = 'true', 'properties.hbase.security.authentication' = 'kerberos' );