:original_name: mrs_01_0397.html
.. _mrs_01_0397:
Using Flume from Scratch
========================
Scenario
--------
You can use Flume to import collected log information to Kafka.
Prerequisites
-------------
- A streaming cluster that contains components such as Flume and Kafka and has Kerberos authentication enabled has been created.
- The streaming cluster can properly communicate with the node where logs are generated.
Using the Flume Client (Versions Earlier Than MRS 3.x)
------------------------------------------------------
.. note::
You do not need to perform :ref:`2 ` to :ref:`6 ` for a normal cluster.
#. Install the Flume client.
Install the Flume client in a directory, for example, **/opt/Flumeclient**, on the node where logs are generated by referring to :ref:`Installing the Flume Client on Clusters of Versions Earlier Than MRS 3.x `. The Flume client installation directories in the following steps are only examples. Change them to the actual installation directories.
#. .. _mrs_01_0397__l78730912572649fd8edfda3920dc20cf:
Copy the configuration file of the authentication server from the Master1 node to the *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf** directory on the node where the Flume client is installed.
For versions earlier than MRS 1.9.2, **${BIGDATA_HOME}/FusionInsight/etc/1\_**\ *X*\ **\_KerberosClient/kdc.conf** is used as the full file path.
For versions earlier than MRS 3.\ *x*, **${BIGDATA_HOME}/MRS_Current/1\_**\ *X*\ **\_KerberosClient/etc/kdc.conf** is used as the full file path.
In the preceding paths, **X** indicates a random number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. Check the service IP address of any node where the Flume role is deployed.
- For versions earlier than MRS 1.9.2, log in to MRS Manager. Choose **Cluster** > **Services** > **Flume** > **Instance**. Query **Service IP Address** of any node on which the Flume role is deployed.
- For MRS 1.9.2 to versions earlier than 3.x, click the cluster name on the MRS console and choose *Name of the desired cluster* > **Components** > **Flume** > **Instances** to view **Business IP Address** of any node where the Flume role is deployed.
#. .. _mrs_01_0397__l762ab29694a642ac8ae1a0609cb97c9b:
Copy the user authentication file from this node to the *Flume client installation directory*\ **/fusioninsight-flume-Flume component version number/conf** directory on the Flume client node.
For versions earlier than MRS 1.9.2, **${BIGDATA_HOME}/FusionInsight/FusionInsight-Flume-**\ *Flume component version number*\ **/flume/conf/flume.keytab** is used as the full file path.
For versions earlier than 3.\ *x*, **${BIGDATA_HOME}/MRS\_**\ *XXX*\ **/install/FusionInsight-Flume-**\ *Flume component version number*\ **/flume/conf/flume.keytab** is used as the full file path.
In the preceding paths, **XXX** indicates the product version number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. Copy the **jaas.conf** file from this node to the **conf** directory on the Flume client node.
For versions earlier than MRS 1.9.2, **${BIGDATA_HOME}/FusionInsight/etc/1\_**\ *X*\ **\_Flume/jaas.conf** is used as the full file path.
For versions earlier than MRS 3.\ *x*, **${BIGDATA_HOME}/MRS_Current/1\_**\ *X*\ **\_Flume/etc/jaas.conf** is used as the full file path.
In the preceding path, **X** indicates a random number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. .. _mrs_01_0397__lfde322e0f3de4ccb88b4e195e65f9993:
Log in to the Flume client node and go to the client installation directory. Run the following command to modify the file:
**vi conf/jaas.conf**
Change the full path of the user authentication file defined by **keyTab** to the **Flume client installation directory/fusioninsight-flume-*Flume component version number*/conf** saved in :ref:`4 `, and save the modification and exit.
#. Run the following command to modify the **flume-env.sh** configuration file of the Flume client:
**vi** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/flume-env.sh**
Add the following information after **-XX:+UseCMSCompactAtFullCollection**:
.. code-block::
-Djava.security.krb5.conf=Flume client installation directory/fusioninsight-flume-1.9.0/conf/kdc.conf -Djava.security.auth.login.config=Flume client installation directory/fusioninsight-flume-1.9.0/conf/jaas.conf -Dzookeeper.request.timeout=120000
Example: **"-XX:+UseCMSCompactAtFullCollection -Djava.security.krb5.conf=/opt/FlumeClient**/**fusioninsight-flume-**\ *Flume component version number*\ **/conf/kdc.conf -Djava.security.auth.login.config=/opt/FlumeClient**/**fusioninsight-flume-**\ *Flume component version number*\ **/conf/jaas.conf -Dzookeeper.request.timeout=120000"**
Change *Flume client installation directory* to the actual installation directory. Then save and exit.
#. Run the following command to restart the Flume client:
**cd** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/bin**
**./flume-manage.sh restart**
Example:
**cd /opt/FlumeClient/fusioninsight-flume-**\ *Flume component version number*\ **/bin**
**./flume-manage.sh restart**
#. Run the following command to configure and save jobs in the Flume client configuration file **properties.properties** based on service requirements.
**vi** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/properties.properties**
The following uses SpoolDir Source+File Channel+Kafka Sink as an example:
.. code-block::
#########################################################################################
client.sources = static_log_source
client.channels = static_log_channel
client.sinks = kafka_sink
#########################################################################################
#LOG_TO_HDFS_ONLINE_1
client.sources.static_log_source.type = spooldir
client.sources.static_log_source.spoolDir = Monitoring directory
client.sources.static_log_source.fileSuffix = .COMPLETED
client.sources.static_log_source.ignorePattern = ^$
client.sources.static_log_source.trackerDir = Metadata storage path during transmission
client.sources.static_log_source.maxBlobLength = 16384
client.sources.static_log_source.batchSize = 51200
client.sources.static_log_source.inputCharset = UTF-8
client.sources.static_log_source.deserializer = LINE
client.sources.static_log_source.selector.type = replicating
client.sources.static_log_source.fileHeaderKey = file
client.sources.static_log_source.fileHeader = false
client.sources.static_log_source.basenameHeader = true
client.sources.static_log_source.basenameHeaderKey = basename
client.sources.static_log_source.deletePolicy = never
client.channels.static_log_channel.type = file
client.channels.static_log_channel.dataDirs = Data cache path. Multiple paths, separated by commas (,), can be configured to improve performance.
client.channels.static_log_channel.checkpointDir = Checkpoint storage path
client.channels.static_log_channel.maxFileSize = 2146435071
client.channels.static_log_channel.capacity = 1000000
client.channels.static_log_channel.transactionCapacity = 612000
client.channels.static_log_channel.minimumRequiredSpace = 524288000
client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
client.sinks.kafka_sink.kafka.topic = Topic to which data is written, for example, flume_test
client.sinks.kafka_sink.kafka.bootstrap.servers = XXX.XXX.XXX.XXX:Kafka port number,XXX.XXX.XXX.XXX:Kafka port number,XXX.XXX.XXX.XXX:Kafka port number
client.sinks.kafka_sink.flumeBatchSize = 1000
client.sinks.kafka_sink.kafka.producer.type = sync
client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT
client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka domain name. This parameter is mandatory for a security cluster, for example, hadoop.xxx.com.
client.sinks.kafka_sink.requiredAcks = 0
client.sources.static_log_source.channels = static_log_channel
client.sinks.kafka_sink.channel = static_log_channel
.. note::
- **client.sinks.kafka_sink.kafka.topic**: Topic to which data is written. If the topic does not exist in Kafka, it is automatically created by default.
- **client.sinks.kafka_sink.kafka.bootstrap.servers**: List of Kafka Brokers, which are separated by commas (,). By default, the port is **21007** for a security cluster and **9092** for a normal cluster.
- **client.sinks.kafka_sink.kafka.security.protocol**: The value is **SASL_PLAINTEXT** for a security cluster and **PLAINTEXT** for a normal cluster.
- **client.sinks.kafka_sink.kafka.kerberos.domain.name**:
You do not need to set this parameter for a normal cluster. For a security cluster, the value of this parameter is the value of **kerberos.domain.name** in the Kafka cluster.
For versions earlier than MRS 1.9.2, obtain the value by checking **${BIGDATA_HOME}/FusionInsight/etc/1\_**\ *X*\ **\_Broker/server.properties** on the node where the broker instance resides.
Obtain the value for versions earlier than MRS 3.\ *x* by checking **${BIGDATA_HOME}/MRS_Current/1\_**\ *X*\ **\_Broker/etc/server.properties** on the node where the broker instance resides.
In the preceding paths, **X** indicates a random number. Change it based on site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. After the parameters are set and saved, the Flume client automatically loads the content configured in **properties.properties**. When new log files are generated by spoolDir, the files are sent to Kafka producers and can be consumed by Kafka consumers.
Using the Flume Client (MRS 3.x or Later)
-----------------------------------------
.. note::
You do not need to perform :ref:`2 ` to :ref:`6 ` for a normal cluster.
#. Install the Flume client.
Install the Flume client in a directory, for example, **/opt/Flumeclient**, on the node where logs are generated by referring to :ref:`Installing the Flume Client on MRS 3.x or Later Clusters `. The Flume client installation directories in the following steps are only examples. Change them to the actual installation directories.
#. .. _mrs_01_0397__li81278495417:
Copy the configuration file of the authentication server from the Master1 node to the *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf** directory on the node where the Flume client is installed.
The full file path is **${BIGDATA_HOME}/FusionInsight\_**\ **BASE\_**\ *XXX*\ **/1\_**\ *X*\ **\_KerberosClient/etc/kdc.conf**. In the preceding path, **XXX** indicates the product version number. **X** indicates a random number. Replace them based on site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. Check the service IP address of any node where the Flume role is deployed.
Log in to FusionInsight Manager. For details, see :ref:`Accessing FusionInsight Manager (MRS 3.x or Later) `. Choose **Cluster > Services > Flume > Instance**. Check the service IP address of any node where the Flume role is deployed.
#. .. _mrs_01_0397__li4130849748:
Copy the user authentication file from this node to the *Flume client installation directory*\ **/fusioninsight-flume-Flume component version number/conf** directory on the Flume client node.
The full file path is **${BIGDATA_HOME}/FusionInsight_Porter\_**\ *XXX*\ **/install/FusionInsight-Flume-**\ *Flume component version number*\ **/flume/conf/flume.keytab**.
In the preceding paths, **XXX** indicates the product version number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. Copy the **jaas.conf** file from this node to the **conf** directory on the Flume client node.
The full file path is **${BIGDATA_HOME}/FusionInsight_Current/1\_**\ *X*\ **\_Flume/etc/jaas.conf**.
In the preceding path, **X** indicates a random number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. .. _mrs_01_0397__li31329494415:
Log in to the Flume client node and go to the client installation directory. Run the following command to modify the file:
**vi conf/jaas.conf**
Change the full path of the user authentication file defined by **keyTab** to the **Flume client installation directory/fusioninsight-flume-*Flume component version number*/conf** saved in :ref:`4 `, and save the modification and exit.
#. Run the following command to modify the **flume-env.sh** configuration file of the Flume client:
**vi** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/flume-env.sh**
Add the following information after **-XX:+UseCMSCompactAtFullCollection**:
.. code-block::
-Djava.security.krb5.conf=Flume client installation directory/fusioninsight-flume-1.9.0/conf/kdc.conf -Djava.security.auth.login.config=Flume client installation directory/fusioninsight-flume-1.9.0/conf/jaas.conf -Dzookeeper.request.timeout=120000
Example: **"-XX:+UseCMSCompactAtFullCollection -Djava.security.krb5.conf=/opt/FlumeClient**/**fusioninsight-flume-**\ *Flume component version number*\ **/conf/kdc.conf -Djava.security.auth.login.config=/opt/FlumeClient**/**fusioninsight-flume-**\ *Flume component version number*\ **/conf/jaas.conf -Dzookeeper.request.timeout=120000"**
Change *Flume client installation directory* to the actual installation directory. Then save and exit.
#. Run the following command to restart the Flume client:
**cd** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/bin**
**./flume-manage.sh restart**
Example:
**cd /opt/FlumeClient/fusioninsight-flume-**\ *Flume component version number*\ **/bin**
**./flume-manage.sh restart**
#. Configure jobs based on actual service scenarios.
- Some parameters, for MRS 3.\ *x* or later, can be configured on Manager.
- Set the parameters in the **properties.properties** file. The following uses SpoolDir Source+File Channel+Kafka Sink as an example.
Run the following command on the node where the Flume client is installed. Configure and save jobs in the Flume client configuration file **properties.properties** based on actual service requirements.
**vi** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/properties.properties**
.. code-block::
#########################################################################################
client.sources = static_log_source
client.channels = static_log_channel
client.sinks = kafka_sink
#########################################################################################
#LOG_TO_HDFS_ONLINE_1
client.sources.static_log_source.type = spooldir
client.sources.static_log_source.spoolDir = Monitoring directory
client.sources.static_log_source.fileSuffix = .COMPLETED
client.sources.static_log_source.ignorePattern = ^$
client.sources.static_log_source.trackerDir = Metadata storage path during transmission
client.sources.static_log_source.maxBlobLength = 16384
client.sources.static_log_source.batchSize = 51200
client.sources.static_log_source.inputCharset = UTF-8
client.sources.static_log_source.deserializer = LINE
client.sources.static_log_source.selector.type = replicating
client.sources.static_log_source.fileHeaderKey = file
client.sources.static_log_source.fileHeader = false
client.sources.static_log_source.basenameHeader = true
client.sources.static_log_source.basenameHeaderKey = basename
client.sources.static_log_source.deletePolicy = never
client.channels.static_log_channel.type = file
client.channels.static_log_channel.dataDirs = Data cache path. Multiple paths, separated by commas (,), can be configured to improve performance.
client.channels.static_log_channel.checkpointDir = Checkpoint storage path
client.channels.static_log_channel.maxFileSize = 2146435071
client.channels.static_log_channel.capacity = 1000000
client.channels.static_log_channel.transactionCapacity = 612000
client.channels.static_log_channel.minimumRequiredSpace = 524288000
client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
client.sinks.kafka_sink.kafka.topic = Topic to which data is written, for example, flume_test
client.sinks.kafka_sink.kafka.bootstrap.servers = XXX.XXX.XXX.XXX:Kafka port number,XXX.XXX.XXX.XXX:Kafka port number,XXX.XXX.XXX.XXX:Kafka port number
client.sinks.kafka_sink.flumeBatchSize = 1000
client.sinks.kafka_sink.kafka.producer.type = sync
client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT
client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka domain name. This parameter is mandatory for a security cluster, for example, hadoop.xxx.com.
client.sinks.kafka_sink.requiredAcks = 0
client.sources.static_log_source.channels = static_log_channel
client.sinks.kafka_sink.channel = static_log_channel
.. note::
- **client.sinks.kafka_sink.kafka.topic**: Topic to which data is written. If the topic does not exist in Kafka, it is automatically created by default.
- **client.sinks.kafka_sink.kafka.bootstrap.servers**: List of Kafka Brokers, which are separated by commas (,). By default, the port is **21007** for a security cluster and **9092** for a normal cluster.
- **client.sinks.kafka_sink.kafka.security.protocol**: The value is **SASL_PLAINTEXT** for a security cluster and **PLAINTEXT** for a normal cluster.
- **client.sinks.kafka_sink.kafka.kerberos.domain.name**:
You do not need to set this parameter for a normal cluster. For a security cluster, the value of this parameter is the value of **kerberos.domain.name** in the Kafka cluster.
For versions earlier than MRS 1.9.2, obtain the value by checking **${BIGDATA_HOME}/FusionInsight/etc/1\_**\ *X*\ **\_Broker/server.properties** on the node where the broker instance resides.
Obtain the value for versions earlier than MRS 3.\ *x* by checking **${BIGDATA_HOME}/MRS_Current/1\_**\ *X*\ **\_Broker/etc/server.properties** on the node where the broker instance resides.
In the preceding paths, **X** indicates a random number. Change it based on site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. After the parameters are set and saved, the Flume client automatically loads the content configured in **properties.properties**. When new log files are generated by spoolDir, the files are sent to Kafka producers and can be consumed by Kafka consumers.