分享

Flume按文件的方式导入hdfs的问题

小小布衣 发表于 2014-8-28 14:57:44
问题解决了吗,问下,如果海外的采集机器,把日志采集到国内的hadoop集群上,要注意什么,我的一直报超时,还有提示org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flume/14082811/test-.1409195936631.tmp could only be replicated to 0 nodes instead of minReplication (=1).  There are 2 datanode(s) running and 2 node(s) are excluded in this operation但是国内的采集到集群上却是好的,,一个星期没有搞定,郁闷死了
回复

使用道具 举报

Yrpro 发表于 2014-10-29 13:54:48
不知道lz问题是否解决了,hdfs.retryInterval这个参数可以解决你的问题。
回复

使用道具 举报

wjhdtx 发表于 2014-10-30 18:25:22
借楼主宝地问个问题:
-------------------------------------------------------
我的需求是这样:
日志文件已经产生在硬盘上,文件量不是很大,每天一个日志大概有2个G,我想用flume将前一天生成好的日志拷贝到hdfs中,不合并,日志文件原封不动的放到hdfs中。
问题:
①这样的需求适合用flume吗?是不是跑个定时任务调用 hadoop fs -put就行了(当然日志所在机器就在hadoop集群中)?
②如果用hdfs,应该怎么配置属性文件?弄了半天都是追加到hfds中的一个文件中
回复

使用道具 举报

wjhdtx 发表于 2014-10-30 18:46:21
更直白的说就是,通过flume将文件原样拷贝到hdfs中,不做合并、聚合等任何操作。
回复

使用道具 举报

wjhdtx 发表于 2014-10-30 18:53:57
本帖最后由 wjhdtx 于 2014-10-30 18:55 编辑

以下是我目前的配置:
--------------------------------------------------
  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = spooldir
  7. a1.sources.r1.spoolDir = /home/hiver/spool
  8. a1.sources.r1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
  9. a1.sources.r1.batchsize = 5
  10. # Describe the sink
  11. a1.sinks.k1.type = hdfs
  12. a1.sinks.k1.hdfs.path = hdfs://68.211.37.160:8020/inputs/flume
  13. a1.sinks.k1.hdfs.filetype = DataStream
  14. a1.sinks.k1.hdfs.writeFormat = Text
  15. a1.sinks.k1.hdfs.batchSize = 1
  16. a1.sinks.k1.hdfs.rollInterval = 0
  17. a1.sinks.k1.hdfs.rollcount = 1
  18. a1.sinks.k1.hdfs.rollsize = 0
  19. a1.sinks.k1.hdfs.filePrefix = logFile.%Y-%m-%d
  20. a1.sinks.k1.hdfs.fileSuffix = .log
  21. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  22. a1.sinks.k1.hdfs.idleTimeout = 60000
  23. # Use a channel which buffers events in memory
  24. a1.channels.c1.type = memory
  25. a1.channels.c1.capacity = 1000
  26. a1.channels.c1.transactionCapacity = 100
  27. # Bind the source and sink to the channel
  28. a1.sources.r1.channels = c1
  29. a1.sinks.k1.channel = c1
复制代码


效果是我有两个文件a和b,最终写道hdfs中的一个文件了,而且有好多乱码(怎么消除?):

hdfs中的结果

hdfs中的结果


我想要的结果是文件a和b通过fluem都单独的保存在hdfs中。


回复

使用道具 举报

howtodown 发表于 2014-10-30 21:16:56
wjhdtx 发表于 2014-10-30 18:25
借楼主宝地问个问题:
-------------------------------------------------------
我的需求是这样:

首先如果日志是实时产生,并且是多个文件,可以使用flume,如果只是一个文件,就没有必要使用flume。如果有乱码,注意文件的编码与hadfs编码一致,最好设置utf8,如果已经设置了,看看是否生效。或则说需要找一下日志产生的编码问题。

至于flume传递文件,你本地是几个文件,flume就会传递几个文件。

配置可以参考

让你快速认识flume及安装和使用flume1.5传输数据(日志)到hadoop2.2
回复

使用道具 举报

howtodown 发表于 2014-10-30 21:18:05
wjhdtx 发表于 2014-10-30 18:53
以下是我目前的配置:
--------------------------------------------------
有乱码的应该是汉字
回复

使用道具 举报

wjhdtx 发表于 2014-10-31 07:42:30
本帖最后由 wjhdtx 于 2014-10-31 07:48 编辑
howtodown 发表于 2014-10-30 21:16
首先如果日志是实时产生,并且是多个文件,可以使用flume,如果只是一个文件,就没有必要使用flume。如果 ...

先谢谢您的回复。1. 需求大概是这样:日志每天都会实时产生,而且在不同目录,我的想法将日志原封不动的拷贝到hdfs中,但是到拷贝前一天产生的日志文件,不是实时拷贝当天产生的日志。


2. 我的这种需求适合用flume吗?但如果以后要是做一次日志归并也许就用上了。


3. 如果不用flume,我觉得有两种方案:
①使用java调用hdfs的api将日志文件写入hdfs
②使用java或shell调用hadoop fs -put来将文件写入hdfs
以上两种策略哪种效率会高点?或者有其他好的策略吗?


4. 关于乱码的问题,我的原文件中没有汉字,我就在本机随便录入些字符做测试,写入hdfs中的文件头部都有“SEQ!org.apache.hadoop.io.LongWritableorg.apache.hadoop.io.Text”这样的前缀,不明白咋回事。。。


回复

使用道具 举报

wjhdtx 发表于 2014-10-31 07:55:13
本帖最后由 wjhdtx 于 2014-10-31 08:05 编辑

1. 乱码解决了,配置文件中的属性区分大小写,hdfs.fileType,我是拷贝其他人的,hdfs.filetype,刚仔细减产了一遍,改了好了
2. 文件也能单独生成,可是最后文件拷贝到hdfs中后,扩展名还是。tmp,非得产生下一个事件,这个文件才不被使用(就是楼主遇到的问题了)



回复

使用道具 举报

wjhdtx 发表于 2014-10-31 10:50:16
大概就是这样,楼主说的最后一个上传到hdfs的.tmp文件大小为0我这没有出现,但我配置的是一个文件一个事件,能不能传上去就改名把.tmp去掉,不知道哪位知道?   下面是我的配置文件
  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = spooldir
  7. a1.sources.r1.spoolDir = /home/hiver/spool
  8. a1.sources.r1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
  9. a1.sources.r1.batchSize = 5
  10. a1.sources.r1.fileHeader = false
  11. # Describe the sink
  12. a1.sinks.k1.type = hdfs
  13. a1.sinks.k1.hdfs.path = hdfs://75.69.17.10:8020/inputs/flume
  14. a1.sinks.k1.hdfs.fileType = DataStream
  15. a1.sinks.k1.hdfs.writeFormat = Text
  16. a1.sinks.k1.hdfs.batchSize = 1
  17. a1.sinks.k1.hdfs.rollInterval = 0
  18. a1.sinks.k1.hdfs.rollCount = 1
  19. a1.sinks.k1.hdfs.rollSize = 0
  20. a1.sinks.k1.hdfs.filePrefix = logFile.%Y-%m-%d
  21. a1.sinks.k1.hdfs.fileSuffix = .log
  22. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  23. a1.sinks.k1.hdfs.idleTimeout = 60000
  24. a1.sinks.k1.hdfs.retryInterval = 180
  25. # Use a channel which buffers events in memory
  26. a1.channels.c1.type = memory
  27. a1.channels.c1.capacity = 1000
  28. a1.channels.c1.transactionCapacity = 100
  29. # Bind the source and sink to the channel
  30. a1.sources.r1.channels = c1
  31. a1.sinks.k1.channel = c1
复制代码


另外,我再问下,上传到hdfs的文件名想用上传的源文件名,能办到吗?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条