This section describes how to use Flume to connect to Hive (version 3.1.0) in the cluster.
Flume and Hive have been correctly installed in the cluster. The services are running properly, and no alarm is reported.
You can obtain the JAR package from the Hive installation directory and restart the Flume process to ensure that the JAR package is loaded to the running environment.
On FusionInsight Manager, choose Cluster > Name of the desired cluster > Services > Hive > Configurations > All Configurations > HiveServer > Customization > hive.server.customized.configs.
Example configurations:
Name |
Value |
---|---|
hive.support.concurrency |
true |
hive.exec.dynamic.partition.mode |
nonstrict |
hive.txn.manager |
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager |
hive.compactor.initiator.on |
true |
hive.compactor.worker.threads |
1 |
source bigdata_env
kinit flume_hive
create table flume_multi_type_part(id string, msg string) partitioned by (country string, year_month string, day string) clustered by (id) into 5 buckets stored as orc TBLPROPERTIES('transactional'='true');
In this case, the number of data records in the table is 0.
core-site.xml
Data is loaded in sequence in Hive.
Ensure that the Flume running user omm has the read and write permissions on the directory where the configuration file is stored.
On the Hive client, run the select * from Table name; command. Check whether the corresponding data has been written to the Hive table.
server.sources = spool_source server.channels = mem_channel server.sinks = Hive_Sink #config the source server.sources.spool_source.type = spooldir server.sources.spool_source.spoolDir = /tmp/testflume server.sources.spool_source.montime = server.sources.spool_source.fileSuffix =.COMPLETED server.sources.spool_source.deletePolicy = never server.sources.spool_source.trackerDir =.flumespool server.sources.spool_source.ignorePattern = ^$ server.sources.spool_source.batchSize = 20 server.sources.spool_source.inputCharset =UTF-8 server.sources.spool_source.selector.type = replicating server.sources.spool_source.fileHeader = false server.sources.spool_source.fileHeaderKey = file server.sources.spool_source.basenameHeaderKey= basename server.sources.spool_source.deserializer = LINE server.sources.spool_source.deserializer.maxBatchLine= 1 server.sources.spool_source.deserializer.maxLineLength= 2048 server.sources.spool_source.channels = mem_channel #config the channel server.channels.mem_channel.type = memory server.channels.mem_channel.capacity =10000 server.channels.mem_channel.transactionCapacity= 2000 server.channels.mem_channel.channelfullcount= 10 server.channels.mem_channel.keep-alive = 3 server.channels.mem_channel.byteCapacity = server.channels.mem_channel.byteCapacityBufferPercentage= 20 #config the sink server.sinks.Hive_Sink.type = hive server.sinks.Hive_Sink.channel = mem_channel server.sinks.Hive_Sink.hive.metastore = thrift://${any MetaStore service IP address}:21088 server.sinks.Hive_Sink.hive.hiveSite = /opt/hivesink-conf/hive-site.xml server.sinks.Hive_Sink.hive.coreSite = /opt/hivesink-conf/core-site.xml server.sinks.Hive_Sink.hive.metastoreSite = /opt/hivesink-conf/hivemeatastore-site.xml server.sinks.Hive_Sink.hive.database = default server.sinks.Hive_Sink.hive.table = flume_multi_type_part server.sinks.Hive_Sink.hive.partition = Tag,%Y-%m,%d server.sinks.Hive_Sink.hive.txnsPerBatchAsk= 100 server.sinks.Hive_Sink.hive.autoCreatePartitions= true server.sinks.Hive_Sink.useLocalTimeStamp = true server.sinks.Hive_Sink.batchSize = 1000 server.sinks.Hive_Sink.hive.kerberosPrincipal= super1 server.sinks.Hive_Sink.hive.kerberosKeytab= /opt/mykeytab/user.keytab server.sinks.Hive_Sink.round = true server.sinks.Hive_Sink.roundValue = 10 server.sinks.Hive_Sink.roundUnit = minute server.sinks.Hive_Sink.serializer = DELIMITED server.sinks.Hive_Sink.serializer.delimiter= ";" server.sinks.Hive_Sink.serializer.serdeSeparator= ';' server.sinks.Hive_Sink.serializer.fieldnames= id,msg