Interconnecting FlinkServer with HDFS

Scenario

This section describes the data definition language (DDL) of HDFS as a sink table, as well as the WITH parameters and example code for creating a sink table, and provides guidance on how to perform operations on the FlinkServer job management page.

If your Kafka cluster is in security mode, the following example SQL statements can be used.

Prerequisites

Procedure

  1. Log in to Manager as user flink_admin and choose Cluster > Services > Flink. In the Basic Information area, click the link on the right of Flink WebUI to access the Flink web UI.
  2. Create a Flink SQL job by referring to Creating a Job. On the job development page, configure the job parameters as follows and start the job.

    Select Enable CheckPoint in Running Parameter and set Time Interval (ms) to 60000.
    CREATE TABLE kafka_table (
      user_id STRING,
      order_amount DOUBLE,
      log_ts TIMESTAMP(3),
      WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_source',
      'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'csv',
       --Ignore the CSV data that fails to be parsed.
      'csv.ignore-parse-errors' = 'true' ,--If the data is in JSON format, set 'json.ignore-parse-errors' to true.
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.System domain name'
    
    );
    
    CREATE TABLE fs_table (
      user_id STRING,
      order_amount DOUBLE,
      dt STRING,
      `hour` STRING
    ) PARTITIONED BY (dt, `hour`) WITH ( --Date-specific file partitioning
      'connector'='filesystem',
      'path'='hdfs:///sql/parquet',
      'format'='parquet',
      'sink.partition-commit.delay'='1 h',
      'sink.partition-commit.policy.kind'='success-file'
    );
    -- streaming sql, insert into file system table
    INSERT INTO fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

    Kafka port number

    • In security mode, the port number is the value of sasl.port (21007 by default).
    • In non-security mode, the port is the value of port (9092 by default). If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

      Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations, search for allow.everyone.if.no.acl.found, set its value to true, and click Save.

  3. On the job management page, check whether the job status is Running.
  4. Execute the following commands to view the topic and write data to Kafka. For details, see Managing Messages in Kafka Topics.

    ./kafka-topics.sh --list --zookeeper IP address of the ZooKeeper quorumpeer instance:ZooKeeper port number/kafka

    sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties

    For example, if the topic name is user_source, the script is sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance locates:Kafka port number --topic user_source --producer.config /opt/Bigdata/client/Kafka/kafka/config/producer.properties.

    Enter the message content.
    3,3333,"2021-09-10 14:00"
    4,4444,"2021-09-10 14:01"

    Press Enter to send the message.

    • IP address of the ZooKeeper quorumpeer instance

      To obtain IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instance and view the IP addresses of all the hosts where the quorumpeer instances locate.

    • Port number of the ZooKeeper client

      Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort. The default value is 24002.

  5. Run the following command to check whether data is written from the HDFS directory to the sink table:

    hdfs dfs -ls -R /sql/parquet

Interconnecting Flink with HDFS Partitions