分享

flume与kafka整合高可靠教程2:flume与kafka整合安装

问题导读

1.flume source目录是哪个?
2.flume在kafka中扮演什么角色?
3.如何测试配置是否成功?



接上篇flume与kafka整合高可靠教程1:kafka安装
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22138


flume安装,其实也并不复杂,可是整合的时候,很多人遇到这么个情况,消费者收不到信息。这个的原因很多。出现问题,无非两种。
1.对flume和kafka基本不理解,只是照抄。这里面就容易出现问题。比如配置错误agent名字错误,配置过期等
2.对整个过程不理解。比如有的不报错,只是看到几行信息。很可能是kafka还没有启动。而且很多人遇到过这种情况,网络都是通的,防火墙是关闭的,为何连接还是拒绝的,原因可能就是,服务根本没有启动。

上面两个方法,相信可以解决大部分问题,更多的其实还是需要自己去理解和查看出现错误的地方。
当然对于flume的配置的理解,还是推荐参考
flume应该思考的问题
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22102
上面只是加深对flume的认识。

下面开始安装flume及整合kafka
一、Flume安装


1. 压缩安装包

[mw_shl_code=bash,true]tar -zxvf ~/jar/apache-flume-1.6.0-bin.tar.gz -C /data
mv /data/apache-flume-1.6.0-bin/ /data/flume-1.6.0 # 重命名[/mw_shl_code]
网盘下载
链接:http://pan.baidu.com/s/1bBnF5O 密码:xoll

2. 配置环境变量

编辑文件 ~/.bashrc
sudo vim  ~/.bashrc
[mw_shl_code=bash,true] export FLUME_HOME=/data/flume-1.6.0
export PATH=$FLUME_HOME/bin:$PATH
[/mw_shl_code]

[mw_shl_code=bash,true]source ~/.bashrc
[/mw_shl_code]

3. 配置flume
[mw_shl_code=bash,true]cp flume-env.sh.template flume-env.sh修改JAVA_HOME
export JAVA_HOME= /data/jdk1.8.0_111[/mw_shl_code]

4. 验证安装
[mw_shl_code=bash,true]flume-ng version
[/mw_shl_code]

flume.jpg


二、Flume使用

1. 单节点的agent

1) 增加配置文件
cd $FLUME_HOME/conf
vim single_agent.conf

将以下内容拷贝进去
[mw_shl_code=bash,true]# agent的名称为a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1

# set source
a1.sources.source1.type = spooldir
a1.sources.source1.spoolDir=/data/aboutyunlog
a1sources.source1.fileHeader = flase

# set sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.topic= aboutyunlog
a1.sinks.sink1.kafka.flumeBatchSize = 20
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy

# set channel
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint
a1.channels.channel1.dataDirs= /data/flume_data/data

# bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1
[/mw_shl_code]
可以看到上面配置信息中
#a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
被注释掉,改换成
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
这里根据官网配置a1.sinks.sink1.brokerList这个属性已经被弃用,但是使用a1.sinks.sink1.kafka.bootstrap.servers 属性会报错。

[mw_shl_code=bash,true]ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker
        at org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
        at org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
        at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)[/mw_shl_code]
所以不管官网如何写,还是使用a1.sinks.sink1.brokerList这个属性

2. 创建所需文件
[mw_shl_code=bash,true]mkdir -p /data/aboutyunlog
mkdir -p /data/flume_data/checkpoint
mkdir -p /data/flume_data/data[/mw_shl_code]
同时提醒,创建完毕,当前用户一定具有操作权限。最好授权为777.

3. 在kafka上创建名为aboutyunlog的topic

[mw_shl_code=bash,true]kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3[/mw_shl_code]

4. 启动flume

[mw_shl_code=bash,true]flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console
[/mw_shl_code]
1.jpg

2.jpg


5. 创建一个kafka的consumer

flume启动其实是启动了产生着,所以这里创建一个消费者。那么这个消费者创建到什么地方。我这里创建在slave1上。

slave1.jpg

上面有以前创建的内容,这里做一个测试,我们在增加一条

6.  添加文件到flume source目录

这个是在master上执行
echo -e "this is a test file! \nhttp://www.aboutyun.com\n20170710">log.1
mv log.1 /data/aboutyunlog/

master.jpg

7.在slave1上收到

收到信息.jpg

有的时候可能会慢,需要等待几秒。

这样就整合成功了。

#############################################

遇到问题:
1.Failed to find leader for Set
详细如下
[2017-07-05 11:26:04,077] WARN [console-consumer-27231_slave1-1499223048151-471ca15a-leader-finder-thread], Failed to find leader for Set([aboutyunlog,2], [aboutyunlog,1], [aboutyunlog,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(aboutyunlog)] from broker [ArrayBuffer()] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

原因:kafka未启动
解决办法:自然是启动kafka.
kafka未启动产生的其它问题:
其实kafka未启动,还会有其它错误,比如在创建消费者的时候,你看不到错误,只有一条警告 WARN [console-consumer-90733_master-1498548695990-7aaba945], no brokers found when trying to rebalance. (kafka.consumer.ZookeeperConsumerConnector)
记得还有另外的错误就是如果未启动,生产者和消费者链接端口是拒绝的,这让很多人认为是网络问题。

错误2

Agent configuration invalid for agent 'a11'. It will be removed.
17/06/27 15:46:02 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
17/06/27 15:46:02 INFO node.AbstractConfigurationProvider: Creating channels
17/06/27 15:46:02 INFO channel.DefaultChannelFactory: Creating instance of channel channel1 type file
17/06/27 15:46:02 INFO node.AbstractConfigurationProvider: Created channel channel1
17/06/27 15:46:02 INFO source.DefaultSourceFactory: Creating instance of source source1, type spooldir
17/06/27 15:46:02 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: org.apache.flume.sink.kafka.KafkaSink
17/06/27 15:46:02 INFO kafka.KafkaSink: Using the static topic: aboutyunlog this may be over-ridden by event headers
17/06/27 15:46:02 INFO kafka.KafkaSinkUtil: context={ parameters:{kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092, channel=channel1, topic=aboutyunlog, type=org.apache.flume.sink.kafka.KafkaSink} }
17/06/27 15:46:02 ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker
        at org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
        at org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
        at org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
17/06/27 15:46:02 INFO node.AbstractConfigurationProvider: Channel channel1 connected to [source1]
17/06/27 15:46:02 INFO node.Application: Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:Spool Directory source source1: { spoolDir: /data/aboutyunlog } }} sinkRunners:{} channels:{channel1=FileChannel channel1 { dataDirs: [/data/flume_data/data] }} }
17/06/27 15:46:02 INFO node.Application: Starting Channel channel1

Sink  has been removed due to an error during configuration org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker


上面两个问题
第一:agent的名字不一致造成的。所以产生了这个问题
Agent configuration invalid for agent 'a11'. It will be removed.

解决办法:
自然修改为a1,而不是a11
第二:
ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker

上面由于官网说brokerList被弃用
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
所以使用下面属性
a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
所以产生问题。修改为a1.sinks.sink1.brokerList即可

下一篇:
flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22521




本帖被以下淘专辑推荐:

已有(7)人评论

跳转到指定楼层
zhanmsl 发表于 2017-7-11 22:08:56
可以的,很详细,点赞!
回复

使用道具 举报

虚空凝望者 发表于 2017-7-12 14:11:44
测试成功,刚好最近要做日志收集到kafka
回复

使用道具 举报

yuntian0215 发表于 2017-7-13 15:34:37
收藏一下,备用
回复

使用道具 举报

hadoop_to_spark 发表于 2017-9-7 09:38:48
正在学习这一块,多谢分享
回复

使用道具 举报

jiangzi 发表于 2018-7-1 11:09:00
flume安装,其实也并不复杂,可是整合的时候~~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条