如何把kafka的数据通过flume采集到hdfs中呢或Hbase中
现在做个小项目。需要把kafka的数据采集到hdfs存储。数据从本地由flume采集到了kafka中。是不是kafka中的 数据只能通过消费者消费才能看到呢。有没有具体的 数据存储目录呢。如何查看采集到kafka中的 数据呀。非常感谢Kafka的数据目录中(由server配置文件中的log.dirs指定的)。但是可能看不懂。
现在配置文件修改好了,启动就报错; 错误如下:
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: org.apache.kafka.common.config.ConfigException: No bootstrap urls given in bootstrap.servers
本帖最后由 einhep 于 2017-7-1 19:52 编辑
yunge2016 发表于 2017-7-1 19:41
现在配置文件修改好了,启动就报错; 错误如下:
Caused by: org.apache.kafka.common.KafkaException: Fail ...
启动报错,那就基本的配置问题了。把配置贴出来看下bootstrap.servers是到kafka集群的连接字符串来自:
kafka权威指南 第四章第2、3节 创建kafka消费者并订阅Topics
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22016
配置文件内容:
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true
## kerberos config ##
# For each one of the sources, the type is defined
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel
# The channel can be defined as follows.
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
# 配置sources
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true
## kerberos config ##
# For each one of the sources, the type is defined
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel
# The channel can be defined as follows.
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
# 配置sources
flumetohdfs_agent.sources = source_from_kafka
flumetohdfs_agent.channels = mem_channel
flumetohdfs_agent.sinks = hdfs_sink
#auto.commit.enable = true
## kerberos config ##
# For each one of the sources, the type is defined
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel
# The channel can be defined as follows.
flumetohdfs_agent.sinks.hdfs_sink.type = hdfs
#flumetohdfs_agent.sinks.hdfs_sink.filePrefix = %{host}
flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs://192.168.137.3:8020/data/ds=%Y%m%d
## roll every hour (after gz)
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollSize = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollCount = 0
flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 300
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codeC = gzip
#flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType = CompressedStream
flumetohdfs_agent.sinks.hdfs_sink.hdfs.fileType=DataStream
flumetohdfs_agent.sinks.hdfs_sink.hdfs.writeFormat=Text
#Specify the channel the sink should use
flumetohdfs_agent.sinks.hdfs_sink.channel = mem_channel
# Each channel's type is defined.
flumetohdfs_agent.channels.mem_channel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
flumetohdfs_agent.channels.mem_channel.capacity = 100000
flumetohdfs_agent.channels.mem_channel.transactionCapacity = 10000
帮助看看有没有问题,感谢啦 重复的有点多啊,数据源没有连接brokerlist
# For each one of the sources, the type is defined
flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumetohdfs_agent.sources.source_from_kafka.topic = xiaohu
flumetohdfs_agent.sources.source_from_kafka.kafka.bootstrap.servers = host1:9092,host2:9092,host3:9092
#flumetohdfs_agent.sources.source_from_kafka.batchSize = 10000
flumetohdfs_agent.sources.source_from_kafka.groupId = flume4097
flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel
粘贴多了。vi下复制了好多行。昨天把数据通过flume采集到kafka了,如何才能在kafka里查看到呢。能不能给我说说这个流程。我是这样做的: 数据文件上传到flume监听的目录,然后采集到了kafka. 不知如何查看,是不是只有kafka消费者消费才能看到数据呢。另外就是kafka并没有创建topic那些用的是原来的旧的,就是那个xiaohu. 本帖最后由 langke93 于 2017-7-2 12:45 编辑
yunge2016 发表于 2017-7-2 10:59
粘贴多了。vi下复制了好多行。昨天把数据通过flume采集到kafka了,如何才能在kafka里查看到呢。能不能给我 ...
把你的kafka相关配置贴出来。
kafka配置:
三台broker的id 分别是0,1,2broker.id=0
port=9092
log.dirs=/opt/modules/kafka/kafka-data 数据存放目录
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
zookeeper.connection.timeout.ms=6000
zookeeper.connect=Beicaicheng1:2181,Beicaicheng2:2181,Beicaicheng3:2181
把主要内容粘贴了,其他注释部分没有粘贴出来。
yunge2016 发表于 2017-7-2 16:54
kafka配置:
三台broker的id 分别是0,1,2broker.id=0
port=9092
到这个目录去看,但是你可能不认识。应该可以通过程序读取。
/opt/modules/kafka/kafka-data
页:
[1]
2