本帖最后由 gaobangsheng 于 2014-7-15 11:59 编辑
项目中需要将日志文件导入到hdfs中进行处理,由于日志内容较复杂,中间可能存在有xml这种结构化内容,不能对数据截断到两个文件。
因此希望从本地上传hdfs过程中,文件是一个整体,一个文件对应一个文件。
试了很多办法,终于可以将文件上传上去了。
但是,问题:
每次最后一个文件上传都不能flush到hdfs,只能等下一个文件上传过来了,前面一个文件才会被flush更新。
比如,我最后一个文件上传是C,那么hdfs中就有一个C.tmp,大小为0; 直到我再上传一个文件D,C.tmp才会被更新成C,并且内容被提交。
这时,D也会上传成D.tmp,大小一直是0,直到下一个文件到来。
有没有什么好办法,能将上传的文件立即刷新的?我试了,将hdfs.batchsize设置成1,还是不行。
如果将hdfs.idleTimeout设成非0,过一会儿就会有一个空文件,也挺烦的。
请看配置如下:
#example command line: flume-ng agent -n agent1 -c conf -f conf/example
#agent1表示代理名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
#Spooling
#配置source1
agent1.sources.source1.type = spooldir
agent1.sources.source1.fileSuffix = .COMPLETED
agent1.sources.source1.deletePolicy = never
agent1.sources.source1.spoolDir = /home/hadoop/temp/flume_source
agent1.sources.source1.ignorePattern = ^$
agent1.sources.source1.consumeOrder = oldest
agent1.sources.source1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
agent1.sources.source1.batchsize = 5
agent1.sources.source1.channels = channel1
#配置sink1
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://mycluster/flume/%Y-%m-%d
agent1.sinks.sink1.hdfs.fileType = SequenceFile
#每个事件提交一次hdfs
agent1.sinks.sink1.hdfs.batchSize = 1
agent1.sinks.sink1.hdfs.rollInterval = 0
agent1.sinks.sink1.hdfs.rollcount = 1
agent1.sinks.sink1.hdfs.rollsize = 0
agent1.sinks.sink1.hdfs.filePrefix = logFile.%Y-%m-%d
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.sink1.hdfs.idleTimeout = 60000
agent1.sinks.sink1.channel = channel1
#配置channel1
agent1.channels.channel1.type = memory
#agent1.channels.channel1.type = file
#agent1.channels.channel1.checkpointDir = /home/hadoop/temp/flume_backup
#agent1.channels.channel1.dataDirs = /home/hadoop/temp/flume_data 复制代码