分享

Flume按文件的方式导入hdfs的问题

本帖最后由 gaobangsheng 于 2014-7-15 11:59 编辑

项目中需要将日志文件导入到hdfs中进行处理,由于日志内容较复杂,中间可能存在有xml这种结构化内容,不能对数据截断到两个文件。
因此希望从本地上传hdfs过程中,文件是一个整体,一个文件对应一个文件。
试了很多办法,终于可以将文件上传上去了。
但是,问题:
每次最后一个文件上传都不能flush到hdfs,只能等下一个文件上传过来了,前面一个文件才会被flush更新。
比如,我最后一个文件上传是C,那么hdfs中就有一个C.tmp,大小为0; 直到我再上传一个文件D,C.tmp才会被更新成C,并且内容被提交。
这时,D也会上传成D.tmp,大小一直是0,直到下一个文件到来。

有没有什么好办法,能将上传的文件立即刷新的?我试了,将hdfs.batchsize设置成1,还是不行。
如果将hdfs.idleTimeout设成非0,过一会儿就会有一个空文件,也挺烦的。

请看配置如下:

  1. #example command line: flume-ng agent -n agent1 -c conf -f conf/example
  2. #agent1表示代理名称
  3. agent1.sources=source1
  4. agent1.sinks=sink1
  5. agent1.channels=channel1
  6. #Spooling
  7. #配置source1
  8. agent1.sources.source1.type = spooldir
  9. agent1.sources.source1.fileSuffix = .COMPLETED
  10. agent1.sources.source1.deletePolicy = never
  11. agent1.sources.source1.spoolDir = /home/hadoop/temp/flume_source
  12. agent1.sources.source1.ignorePattern = ^$
  13. agent1.sources.source1.consumeOrder = oldest
  14. agent1.sources.source1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
  15. agent1.sources.source1.batchsize = 5
  16. agent1.sources.source1.channels = channel1
  17. #配置sink1
  18. agent1.sinks.sink1.type = hdfs
  19. agent1.sinks.sink1.hdfs.path = hdfs://mycluster/flume/%Y-%m-%d
  20. agent1.sinks.sink1.hdfs.fileType = SequenceFile
  21. #每个事件提交一次hdfs
  22. agent1.sinks.sink1.hdfs.batchSize = 1
  23. agent1.sinks.sink1.hdfs.rollInterval = 0
  24. agent1.sinks.sink1.hdfs.rollcount = 1
  25. agent1.sinks.sink1.hdfs.rollsize = 0
  26. agent1.sinks.sink1.hdfs.filePrefix = logFile.%Y-%m-%d
  27. agent1.sinks.sink1.hdfs.fileSuffix = .log
  28. agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
  29. agent1.sinks.sink1.hdfs.idleTimeout = 60000
  30. agent1.sinks.sink1.channel = channel1
  31. #配置channel1
  32. agent1.channels.channel1.type = memory
  33. #agent1.channels.channel1.type = file
  34. #agent1.channels.channel1.checkpointDir = /home/hadoop/temp/flume_backup
  35. #agent1.channels.channel1.dataDirs = /home/hadoop/temp/flume_data
复制代码

本帖被以下淘专辑推荐:

已有(54)人评论

跳转到指定楼层
hyj 发表于 2014-7-15 13:40:26
#Spooling
#配置source1
agent1.sources.source1.type = spooldir
agent1.sources.source1.fileSuffix = .COMPLETED
agent1.sources.source1.deletePolicy = never
agent1.sources.source1.spoolDir = /home/hadoop/temp/flume_source
agent1.sources.source1.ignorePattern = ^$
agent1.sources.source1.consumeOrder = oldest
agent1.sources.source1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
agent1.sources.source1.batchsize = 5
agent1.sources.source1.channels = channel1
把上面红字部分改成random或则youngest ,尝试下。
回复

使用道具 举报

sstutu 发表于 2014-7-15 17:53:25
我的每次都能上传成功,你是不是做特殊设置了
回复

使用道具 举报

pig2 发表于 2014-7-15 19:22:41
hdfs.batchsize设置成0试一下
回复

使用道具 举报

nettman 发表于 2014-7-16 03:39:43



官方文档

官方文档有下面一句话:
The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory.

flume hdfs sink.png


或许你能从这里找到答案
回复

使用道具 举报

howtodown 发表于 2014-7-16 04:04:56
本帖最后由 howtodown 于 2014-7-16 04:12 编辑
个人认为:
这是flume机制,这个文件其实已经上传到hdfs上了,只不过文件格式后面加上.tmp
他的作用就是一个标识,标识这是最后一个文件,如果目录过大,可以把其他已上传的给删除掉。

回复

使用道具 举报

gaobangsheng 发表于 2014-7-16 08:34:52
sstutu 发表于 2014-7-15 17:53
我的每次都能上传成功,你是不是做特殊设置了


能给个你配置的样例看看吗?我这边没有什么特殊处理方式,所有的配置都在上面显示的配置文件中。

点评

我觉得这个不是问题,如果真是的环境的话,这个反而还有些好处  发表于 2014-7-16 12:02
回复

使用道具 举报

gaobangsheng 发表于 2014-7-16 08:35:52
pig2 发表于 2014-7-15 19:22
hdfs.batchsize设置成0试一下

如果设置成0的话,文件都不会被提交了,数据都不会被更新到hdfs中。
回复

使用道具 举报

howtodown 发表于 2014-7-16 18:42:03
gaobangsheng 发表于 2014-7-16 08:35
如果设置成0的话,文件都不会被提交了,数据都不会被更新到hdfs中。
不行,就自己开发个插件。
回复

使用道具 举报

lbwahoo 发表于 2014-7-17 21:35:43
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条