分享

如何把kafka的数据通过flume采集到hdfs中呢或Hbase中

yunge2016 发表于 2017-7-1 18:43:46 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 17 31072
现在做个小项目。需要把kafka的数据采集到hdfs存储。数据从本地由flume采集到了kafka中。是不是kafka中的 数据只能通过消费者消费才能看到呢。有没有具体的 数据存储目录呢。如何查看采集到kafka中的 数据呀。非常感谢

已有(17)人评论

跳转到指定楼层
starrycheng 发表于 2017-7-1 19:21:41
Kafka的数据目录中(由server配置文件中的log.dirs指定的)。但是可能看不懂。
回复

使用道具 举报

yunge2016 发表于 2017-7-1 19:41:13
现在配置文件修改好了,启动就报错; 错误如下:
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: org.apache.kafka.common.config.ConfigException: No bootstrap urls given in bootstrap.servers
回复

使用道具 举报

einhep 发表于 2017-7-1 19:48:25
本帖最后由 einhep 于 2017-7-1 19:52 编辑
yunge2016 发表于 2017-7-1 19:41
现在配置文件修改好了,启动就报错; 错误如下:
Caused by: org.apache.kafka.common.KafkaException: Fail ...

启动报错,那就基本的配置问题了。把配置贴出来看
下bootstrap.servers是到kafka集群的连接字符串
来自:
kafka权威指南 第四章第2、3节 创建kafka消费者并订阅Topics
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22016


回复

使用道具 举报

yunge2016 发表于 2017-7-1 20:06:27
配置文件内容:
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true  

## kerberos config ##  


# For each one of the sources, the type is defined  
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000  
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel


# The channel can be defined as follows.  
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}  
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)  
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300


#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip  
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream  
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
# 配置sources
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true  

## kerberos config ##  


# For each one of the sources, the type is defined  
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000  
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel


# The channel can be defined as follows.  
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}  
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)  
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300


#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip  
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream  
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true  

## kerberos config ##  


# For each one of the sources, the type is defined  
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000  
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel


# The channel can be defined as follows.  
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}  
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)  
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300


#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip  
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream  
flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType=DataStream
flumetohdfs_agent.sinks.hdfs_sink.hdfs.writeFormat=Text


#Specify the channel the sink should use  
flumetohdfs_agent.sinks.hdfs_sink.channel = mem_channel
# Each channel's type is defined.  
flumetohdfs_agent.channels.mem_channel.type = memory


# Other config values specific to each type of channel(sink or source)  
# can be defined as well  
# In this case, it specifies the capacity of the memory channel  
flumetohdfs_agent.channels.mem_channel.capacity = 100000
flumetohdfs_agent.channels.mem_channel.transactionCapacity = 10000

帮助看看有没有问题,感谢啦
回复

使用道具 举报

langke93 发表于 2017-7-1 20:35:38
重复的有点多啊,数据源没有连接brokerlist
# For each one of the sources, the type is defined  
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
flumetohdfs_agent.sources.source_from_kafka.kafka.bootstrap.servers = host1:9092,host2:9092,host3:9092

#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000  
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel

回复

使用道具 举报

yunge2016 发表于 2017-7-2 10:59:23
粘贴多了。vi下复制了好多行。昨天把数据通过flume采集到kafka了,如何才能在kafka里查看到呢。能不能给我说说这个流程。我是这样做的: 数据文件上传到flume监听的目录,然后采集到了kafka. 不知如何查看,是不是只有kafka消费者消费才能看到数据呢。另外就是kafka并没有创建topic那些用的是原来的旧的,就是那个xiaohu.
回复

使用道具 举报

langke93 发表于 2017-7-2 12:42:58
本帖最后由 langke93 于 2017-7-2 12:45 编辑
yunge2016 发表于 2017-7-2 10:59
粘贴多了。vi下复制了好多行。昨天把数据通过flume采集到kafka了,如何才能在kafka里查看到呢。能不能给我 ...

把你的kafka相关配置贴出来。
回复

使用道具 举报

yunge2016 发表于 2017-7-2 16:54:16
kafka配置:
三台broker的id 分别是0,1,2  broker.id=0
port=9092
log.dirs=/opt/modules/kafka/kafka-data   数据存放目录
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
zookeeper.connection.timeout.ms=6000
zookeeper.connect=Beicaicheng1:2181,Beicaicheng2:2181,Beicaicheng3:2181

把主要内容粘贴了,其他注释部分没有粘贴出来。
回复

使用道具 举报

langke93 发表于 2017-7-2 17:01:00
yunge2016 发表于 2017-7-2 16:54
kafka配置:
三台broker的id 分别是0,1,2  broker.id=0
port=9092

到这个目录去看,但是你可能不认识。应该可以通过程序读取。
/opt/modules/kafka/kafka-data


回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条