:original_name: mrs_01_0397.html .. _mrs_01_0397: Using Flume from Scratch ======================== Scenario -------- You can use Flume to import collected log information to Kafka. Prerequisites ------------- - A streaming cluster with Kerberos authentication enabled has been created. - The Flume client has been installed on the node where logs are generated, for example, **/opt/Flumeclient**. The client directory in the following operations is only an example. Change it to the actual installation directory. - The streaming cluster can properly communicate with the node where logs are generated. Using the Flume Client ---------------------- .. note:: You do not need to perform :ref:`2 ` to :ref:`6 ` for a normal cluster. #. Install the client. For details, see :ref:`Installing the Flume Client on Clusters `. #. .. _mrs_01_0397__en-us_topic_0000001173789216_li81278495417: Copy the configuration file of the authentication server from the Master1 node to the *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf** directory on the node where the Flume client resides. The full file path is ${BIGDATA_HOME}/FusionInsight_BASE\_\ *XXX*/1_X_KerberosClient/etc/kdc.conf. In the preceding path, **XXX** indicates the product version number. **X** indicates a random number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**. #. Check the service IP address of any node where the Flume role is deployed. Log in to FusionInsight Manager. For details, see :ref:`Accessing FusionInsight Manager `. Choose **Cluster > Services > Flume > Instance**. Check the service IP address of any node where the Flume role is deployed. #. .. _mrs_01_0397__en-us_topic_0000001173789216_li4130849748: Copy the user authentication file from this node to the *Flume client installation directory*\ **/fusioninsight-flume-Flume component version number/conf** directory on the Flume client node. The full file path is ${BIGDATA_HOME}/FusionInsight_Porter\_\ *XXX*/install/FusionInsight-Flume-Flume component version number/flume/conf/flume.keytab. In the preceding paths, **XXX** indicates the product version number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**. #. Copy the **jaas.conf** file from this node to the **conf** directory on the Flume client node. The full file path is ${BIGDATA_HOME}/FusionInsight_Current/1\_\ *X*\ \_Flume/etc/jaas.conf. In the preceding path, **X** indicates a random number. Change it based on the site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**. #. .. _mrs_01_0397__en-us_topic_0000001173789216_li31329494415: Log in to the Flume client node and go to the client installation directory. Run the following command to modify the file: **vi conf/jaas.conf** Change the full path of the user authentication file defined by **keyTab** to the **Flume client installation directory/fusioninsight-flume-*Flume component version number*/conf** saved in :ref:`4 `, and save the modification and exit. #. Run the following command to modify the **flume-env.sh** configuration file of the Flume client: **vi** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/flume-env.sh** Add the following information after **-XX:+UseCMSCompactAtFullCollection**: .. code-block:: -Djava.security.krb5.conf=Flume client installation directory/fusioninsight-flume-1.9.0/conf/kdc.conf -Djava.security.auth.login.config=Flume client installation directory/fusioninsight-flume-1.9.0/conf/jaas.conf -Dzookeeper.request.timeout=120000 For example, **"-XX:+UseCMSCompactAtFullCollection -Djava.security.krb5.conf=\ Flume client installation directory/fusioninsight-flume-*Flume component version number*/conf/kdc.conf -Djava.security.auth.login.config=**\ *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/jaas.conf -Dzookeeper.request.timeout=120000"** Change *Flume client installation directory* to the actual installation directory. Then save and exit. #. Run the following command to restart the Flume client: **cd** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/bin** **./flume-manage.sh restart** Example: **cd /opt/FlumeClient/fusioninsight-flume-**\ *Flume component version number*\ **/bin** **./flume-manage.sh restart** #. Configure jobs based on actual service scenarios. - Some parameters can be configured for MRS 3.\ *x* or later on Manager. For details, see :ref:`Non-Encrypted Transmission ` or :ref:`Encrypted Transmission `. - Set the parameters in the **properties.properties** file. The following uses SpoolDir Source+File Channel+Kafka Sink as an example. Run the following command on the node where the Flume client is installed to configure and save a job in **properties.properties** (Flume client configuration file) based on service requirements by referring to :ref:`Flume Service Configuration Guide `: **vi** *Flume client installation directory*\ **/fusioninsight-flume-**\ *Flume component version number*\ **/conf/properties.properties** .. code-block:: ######################################################################################### client.sources = static_log_source client.channels = static_log_channel client.sinks = kafka_sink ######################################################################################### #LOG_TO_HDFS_ONLINE_1 client.sources.static_log_source.type = spooldir client.sources.static_log_source.spoolDir = Monitoring directory client.sources.static_log_source.fileSuffix = .COMPLETED client.sources.static_log_source.ignorePattern = ^$ client.sources.static_log_source.trackerDir = Metadata storage path during transmission client.sources.static_log_source.maxBlobLength = 16384 client.sources.static_log_source.batchSize = 51200 client.sources.static_log_source.inputCharset = UTF-8 client.sources.static_log_source.deserializer = LINE client.sources.static_log_source.selector.type = replicating client.sources.static_log_source.fileHeaderKey = file client.sources.static_log_source.fileHeader = false client.sources.static_log_source.basenameHeader = true client.sources.static_log_source.basenameHeaderKey = basename client.sources.static_log_source.deletePolicy = never client.channels.static_log_channel.type = file client.channels.static_log_channel.dataDirs = Data cache path. Multiple paths, separated by commas (,), can be configured to improve performance. client.channels.static_log_channel.checkpointDir = Checkpoint storage path client.channels.static_log_channel.maxFileSize = 2146435071 client.channels.static_log_channel.capacity = 1000000 client.channels.static_log_channel.transactionCapacity = 612000 client.channels.static_log_channel.minimumRequiredSpace = 524288000 client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink client.sinks.kafka_sink.kafka.topic = Topic to which data is written, for example, flume_test client.sinks.kafka_sink.kafka.bootstrap.servers = XXX.XXX.XXX.XXX:Kafka port number,XXX.XXX.XXX.XXX:Kafka port number,XXX.XXX.XXX.XXX:Kafka port number client.sinks.kafka_sink.flumeBatchSize = 1000 client.sinks.kafka_sink.kafka.producer.type = sync client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka domain name. This parameter is mandatory for a security cluster, for example, hadoop.xxx.com. client.sinks.kafka_sink.requiredAcks = 0 client.sources.static_log_source.channels = static_log_channel client.sinks.kafka_sink.channel = static_log_channel .. note:: - **client.sinks.kafka_sink.kafka.topic**: Topic to which data is written. If the topic does not exist in Kafka, it is automatically created by default. - **client.sinks.kafka_sink.kafka.bootstrap.servers**: List of Kafka Brokers, which are separated by commas (,). By default, the port is **21007** for a security cluster and **9092** for a normal cluster. - **client.sinks.kafka_sink.kafka.security.protocol**: The value is **SASL_PLAINTEXT** for a security cluster and **PLAINTEXT** for a normal cluster. - **client.sinks.kafka_sink.kafka.kerberos.domain.name**: You do not need to set this parameter for a normal cluster. For a security cluster, the value of this parameter is the value of **kerberos.domain.name** in the Kafka cluster. In the preceding paths, **X** indicates a random number. Change it based on site requirements. The file must be saved by the user who installs the Flume client, for example, user **root**. #. After the parameters are set and saved, the Flume client automatically loads the content configured in **properties.properties**. When new log files are generated by spoolDir, the files are sent to Kafka producers and can be consumed by Kafka consumers. For details, see :ref:`Managing Messages in Kafka Topics `.