问题导读:
1. 如何安装flume?
2. 如何使用flume将本地文件发送到kafka?
接上篇:about云日志分析项目准备8:Kafka集群安装
这篇主要讲解Flume集群的安装和配置
一、Flume安装
1. 压缩安装包
- tar -zxvf ~/jar/apache-flume-1.6.0-bin.tar.gz -C /data
- mv /data/apache-flume-1.6.0-bin/ /data/flume-1.6.0 # 重命名
复制代码
2. 配置环境变量
- echo -e "export FLUME_HOME=/data/flume-1.6.0\nexport PATH=\$FLUME_HOME/bin:\$PATH" >> ~/.bashrc
- source ~/.bashrc
复制代码
3. 配置flume
- cp flume-env.sh.template flume-env.sh修改JAVA_HOME
- export JAVA_HOME= /data/jdk1.8.0_111
复制代码
4. 验证安装
二、Flume使用
一个agent由source、channel、sink组成。这儿我们使用Spooling Directory Source、File Channel、Kafka Sink。
1. 单节点的agent
1) 增加配置文件
- cd $FLUME_HOME/conf
- vim single_agent.conf
复制代码
将以下内容拷贝进去
- # agent的名称为a1
- a1.sources = source1
- a1.channels = channel1
- a1.sinks = sink1
- # set source
- a1.sources.source1.type = spooldir
- a1.sources.source1.spoolDir=/data/aboutyunlog
- a11.sources.source1.fileHeader = false
- # set sink
- a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
- a1.sinks.sink1.topic= aboutyunlog
- # set channel
- a1.channels.channel1.type = file
- a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint
- a1.channels.channel1.dataDirs= /data/flume_data/data
- # bind
- a1.sources.source1.channels = channel1
- a1.sinks.sink1.channel = channel1
复制代码 修改:a11.sources.source1.fileHeader = false
需修改为a1
2. 创建所需文件
- mkdir -p /data/aboutyunlog
- mkdir -p /data/flume_data/checkpoint
- mkdir -p /data/flume_data/data
复制代码
3. 查看kafka现有的topic
- kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --list
复制代码
4. 在kafka上创建名为aboutyunlog的topic
- kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3
复制代码
5. 启动flume
- flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console
复制代码 启动过程中控制台会输出很多日志。
6. 创建一个kafka的consumer
- kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic aboutyunlog --from-beginning
复制代码 这条命令的意思是说创建aboutyunlog这个topic下的消费者,消费时从最开始的一条信息开始消费。
上图说明该消费者创建成功,由于本地/data/aboutyunlog目录下没有新文件加入,造成aboutyunlog这个topic没有信息输入,所以消费者没有得到一条信息。
7. 添加文件到flume source目录
- echo -e "this is a test file! \nhttp://www.aboutyun.com"
- mv log.1 /data/aboutyunlog/
复制代码 为:echo -e "this is a test file! \nhttp://www.aboutyun.com">log.1
将一个文件添加到flume的监控目录之后,flume会将改文件重命名为filename.COMPLETED。这一信息可以在启动flume的shell界面中看到
8. 再次查看kafka consumer
切换到创建kafka consumer的shell界面,会看到我们log.1中文件的内容被打印在屏幕上。
上图说明我们已经成功使用flume监控/data/aboutyunlog目录,并将监控目录中的内容发送到kafka的aboutyunlog主题中。
|