:original_name: mrs_01_0397.html
.. _mrs_01_0397:
Using Flume from Scratch
========================
Scenario
--------
You can use Flume to import collected log information to Kafka.
Prerequisites
-------------
- A streaming cluster with Kerberos authentication enabled has been created.
- The Flume client has been installed on the node where logs are generated, for example, **/opt/Flumeclient**. The client directory in the following operations is only an example. Change it to the actual installation directory.
- The streaming cluster can properly communicate with the node where logs are generated.
Using the Flume Client
----------------------
.. note::
You do not need to perform :ref:`2 ` to :ref:`6 ` for a normal cluster.
#. Install the client.
For details, see :ref:`Installing the Flume Client on Clusters `.
#. .. _mrs_01_0397__en-us_topic_0000001173789216_li81278495417:
Copy the configuration file of the authentication server from the Master1 node to the *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf** directory on the node where the Flume client resides.
The full file path is ${BIGDATA_HOME}/FusionInsight_BASE\_\ *XXX*/1_X_KerberosClient/etc/kdc.conf. In the preceding path, **XXX** indicates the product version number. **X** indicates a random number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. Check the service IP address of any node where the Flume role is deployed.
Log in to FusionInsight Manager. For details, see :ref:`Accessing FusionInsight Manager `. Choose **Cluster > Services > Flume > Instance**. Check the service IP address of any node where the Flume role is deployed.
#. .. _mrs_01_0397__en-us_topic_0000001173789216_li4130849748:
Copy the user authentication file from this node to the *Flume client installation directory*\ **/fusioninsight-flume-Flume component version number/conf** directory on the Flume client node.
The full file path is ${BIGDATA_HOME}/FusionInsight_Porter\_\ *XXX*/install/FusionInsight-Flume-Flume component version number/flume/conf/flume.keytab.
In the preceding paths, **XXX** indicates the product version number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. Copy the **jaas.conf** file from this node to the **conf** directory on the Flume client node.
The full file path is ${BIGDATA_HOME}/FusionInsight_Current/1\_\ *X*\ \_Flume/etc/jaas.conf.
In the preceding path, **X** indicates a random number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. .. _mrs_01_0397__en-us_topic_0000001173789216_li31329494415:
Log in to the Flume client node and go to the client installation directory. Run the following command to modify the file:
**vi conf/jaas.conf**
Change the full path of the user authentication file defined by **keyTab** to the **Flume client installation directory/fusioninsight-flume-*Flume component version number*/conf** saved in :ref:`4 `, and save the modification and exit.
#. Run the following command to modify the **flume-env.sh** configuration file of the Flume client:
**vi** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/flume-env.sh**
Add the following information after **-XX:+UseCMSCompactAtFullCollection**:
.. code-block::
-Djava.security.krb5.conf=Flume client installation directory/fusioninsight-flume-1.9.0/conf/kdc.conf -Djava.security.auth.login.config=Flume client installation directory/fusioninsight-flume-1.9.0/conf/jaas.conf -Dzookeeper.request.timeout=120000
For example, **"-XX:+UseCMSCompactAtFullCollection -Djava.security.krb5.conf=\ Flume client installation directory/fusioninsight-flume-*Flume component version number*/conf/kdc.conf -Djava.security.auth.login.config=**\ *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/jaas.conf -Dzookeeper.request.timeout=120000"**
Change *Flume client installation directory* to the actual installation directory. Then save and exit.
#. Run the following command to restart the Flume client:
**cd** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/bin**
**./flume-manage.sh restart**
Example:
**cd /opt/FlumeClient/fusioninsight-flume-**\ *Flume component version number*\ **/bin**
**./flume-manage.sh restart**
#. Configure jobs based on actual service scenarios.
- Some parameters can be configured for MRS 3.\ *x* or later on Manager. For details, see :ref:`Non-Encrypted Transmission ` or :ref:`Encrypted Transmission `.
- Set the parameters in the **properties.properties** file. The following uses SpoolDir Source+File Channel+Kafka Sink as an example.
Run the following command on the node where the Flume client is installed to configure and save a job in **properties.properties** (Flume client configuration file) based on service requirements by referring to :ref:`Flume Service Configuration Guide `:
**vi** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/properties.properties**
.. code-block::
#########################################################################################
client.sources = static_log_source
client.channels = static_log_channel
client.sinks = kafka_sink
#########################################################################################
#LOG_TO_HDFS_ONLINE_1
client.sources.static_log_source.type = spooldir
client.sources.static_log_source.spoolDir = Monitoring directory
client.sources.static_log_source.fileSuffix = .COMPLETED
client.sources.static_log_source.ignorePattern = ^$
client.sources.static_log_source.trackerDir = Metadata storage path during transmission
client.sources.static_log_source.maxBlobLength = 16384
client.sources.static_log_source.batchSize = 51200
client.sources.static_log_source.inputCharset = UTF-8
client.sources.static_log_source.deserializer = LINE
client.sources.static_log_source.selector.type = replicating
client.sources.static_log_source.fileHeaderKey = file
client.sources.static_log_source.fileHeader = false
client.sources.static_log_source.basenameHeader = true
client.sources.static_log_source.basenameHeaderKey = basename
client.sources.static_log_source.deletePolicy = never
client.channels.static_log_channel.type = file
client.channels.static_log_channel.dataDirs = Data cache path. Multiple paths, separated by commas (,), can be configured to improve performance.
client.channels.static_log_channel.checkpointDir = Checkpoint storage path
client.channels.static_log_channel.maxFileSize = 2146435071
client.channels.static_log_channel.capacity = 1000000
client.channels.static_log_channel.transactionCapacity = 612000
client.channels.static_log_channel.minimumRequiredSpace = 524288000
client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
client.sinks.kafka_sink.kafka.topic = Topic to which data is written, for example, flume_test
client.sinks.kafka_sink.kafka.bootstrap.servers = XXX.XXX.XXX.XXX:Kafka port number,XXX.XXX.XXX.XXX:Kafka port number,XXX.XXX.XXX.XXX:Kafka port number
client.sinks.kafka_sink.flumeBatchSize = 1000
client.sinks.kafka_sink.kafka.producer.type = sync
client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT
client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka domain name. This parameter is mandatory for a security cluster, for example, hadoop.xxx.com.
client.sinks.kafka_sink.requiredAcks = 0
client.sources.static_log_source.channels = static_log_channel
client.sinks.kafka_sink.channel = static_log_channel
.. note::
- **client.sinks.kafka_sink.kafka.topic**: Topic to which data is written. If the topic does not exist in Kafka, it is automatically created by default.
- **client.sinks.kafka_sink.kafka.bootstrap.servers**: List of Kafka Brokers, which are separated by commas (,). By default, the port is **21007** for a security cluster and **9092** for a normal cluster.
- **client.sinks.kafka_sink.kafka.security.protocol**: The value is **SASL_PLAINTEXT** for a security cluster and **PLAINTEXT** for a normal cluster.
- **client.sinks.kafka_sink.kafka.kerberos.domain.name**:
You do not need to set this parameter for a normal cluster. For a security cluster, the value of this parameter is the value of **kerberos.domain.name** in the Kafka cluster.
In the preceding paths, **X** indicates a random number. Change it based on site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**.
#. After the parameters are set and saved, the Flume client automatically loads the content configured in **properties.properties**. When new log files are generated by spoolDir, the files are sent to Kafka producers and can be consumed by Kafka consumers. For details, see :ref:`Managing Messages in Kafka Topics `.