分享

整体认识flume:Flume介绍、分布式安装、常见问题及解决方案

问题导读
1.什么是flume?
2.flume包含哪些组件?
3.Flume在读取utf-8格式的文件时会出现解析不了时间戳,该如何解决?





Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

Flume的逻辑架构:

1.png


Flume逻辑上分三层架构:
agent,collector,storage

agent
用于采集数据,agent是flume中产生数据流的地方,同时,agent会将产生的数据流传输到collector。
collector
collector的作用是将多个agent的数据汇总后,加载到storage中。
storage
storage是存储系统,可以是一个普通file,也可以是HDFS,HIVE,HBase等。

Master
Master是管理协调agent和collector的配置等信息,是flume集群的控制器。
在Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径。

2.png

对于agent数据流配置就是从哪得到数据,把数据发送到哪个collector。
对于collector是接收agent发过来的数据,把数据发送到指定的目标机器上。
Flume的特性
•         Reliability:Flume提供3中数据可靠性选项,包括End-to-end、Store on failure和Best effort。其中End-to-end使用了磁盘日志和接受端Ack的方式,保证Flume接受到的数据会最终到达目的,但是效率是最差的。Store on failure在目的不可用的时候,数据会保持在本地硬盘,效率会比end-to-end高,但是会出现日志丢失的情况。Best effort不做任何QoS保证,效率最高,日志记录没有保证。

•         Scalability:Flume的3大组件:collector、master和storage tier都是可伸缩的。需要注意的是,Flume中对事件的处理不需要带状态,它的Scalability可以很容易实现。

•         Manageability:master能够动态管理flume集群节点,多master情况,Flume利用ZooKeeper和gossip,保证配置数据的一致性。

•         Extensibility:基于Java,用户可以为Flume添加各种新的功能,如通过继承Source,用户可以实现自己的数据接入方式,实现Sink的子类,用户可以将数据写往特定目标,同时,通过SinkDecorator,用户可以对数据进行一定的预处理。

注:Flume框架对hadoop和zookeeper的依赖只是在jar包上,并不要求flume启动时必须将hadoop和zookeeper服务也启动。





Flume的分布式安装

                                        ————此为目前集群的flume安装过程
部署flume在集群上,按照如下步骤:
    在集群上的每台机器上安装flume
    选择一个或多个节点当做master
    修改静态配置文件
    在至少一台机器上启动一个master ,所有节点启动flume node
接下来的一章描述如何手动修改配置文件为集群上的节点指定master,如何为参数设置默认值,本章的后一部分描述对于大系统的数据流配置,如何通过增加collector来扩充系统容量,如何提高可靠性通过增加更多的master。注意:flume集群整个集群的网络环境要保证稳定,可靠,否则会出现一些莫名错误(比如:agent端发送不了数据到collector)。

集群每台机器上安装flume
场景:
操作系统版本:CentOS5.6
Hadoop版本:0.20.2
Jdk版本:jdk1.6.0_26
安装flume版本:flume-0.9.4

步骤1:
下载flume最新版本,现在服务器上安装的是flume-distribution-0.9.4的版本,下载地址是https://github.com/cloudera/flume/downloads 目前flume的安装包是放在/data/sysdir/install_tar文件件的下面。

步骤2:
解压flume安装包到/data/sysdir文件夹下面,
在命令行中输入 tar zxvf/data/sysdir/install_tar/flume-distributin-0.9.4.tar.gz -C /data/sysdir

步骤3:
修改etc/profile文件,加入:
export FLUME_HOME=/data/sysdir/flume-distribution-0.9.4
export PATH=.:$PATH::$FLUME_HOME/bin

步骤4:
验证安装及其他
安装完毕后,运行flume命令,会看到以下输出:
  
usage:  flume command [args...]
  
commands  include:
  
  dump            Takes a specified source and  dumps to console
  
  node            Start a Flume node/agent (with  watchdog)
  
  master          Start a Flume Master server (with  watchdog)
  
  version         Dump flume build version information
  
  node_nowatch    Start a flume node/agent (no watchdog)
  
  master_nowatch  Start a Flume Master server (no watchdog)
  
  class <class>   Run specified fully qualified class using  Flume environment (no watchdog)
  
                   for example: flume  com.cloudera.flume.agent.FlumeNode
  
  shell           Start the flume shell
  
  killmaster       Kill a running master
  














flume配置文件位置:$FLUME_HOME/conf 下

选择一个或多个节点当做master
对于master的选择情况,可以在集群上定义一个master,也可以为了提高可用性选择多个节点做为master,
单点master模式:容易管理,但在系统的容错和扩展性有缺陷
多点master模式:通常是运行3/5个master,能很好的容错
Standalone mode - this is where the Master runs ona single machine. This is easy to administer, and simple to set-up, but hasdisadvantages when it comes to scalability and fault-tolerance.
Distributed mode - this is where the Master isconfigured to run on several machines - usually three or five. This optionscales to serve many Flows, and also has good fault-toleranceproperties.
Flume master数量的选择原则

原文如下:
The distributed Flume Master will continue to workcorrectly as long as more than half the physical machines running it are stillworking and haven’t crashed. Therefore if you want to survive one fault, youneed three machines (because 3-1 = 2 > 3/2). For every extra fault you wantto tolerate, add another two machines, so for two faults you need fivemachines. Note that having an even number of machines doesn’t make the FlumeMaster any more fault-tolerant - four machines only tolerate one failure,because if two were to fail only two would be left functioning, which is notmore than half of four. Common deployments should be well served by three orfive machines.

分布式的master能够继续正常工作不会崩溃,的前提是正常工作的master数量超过总master数量的一半。
Flume master的作用主要有两个:

原文如下:
The Master has two main jobs to perform. The firstis to keep track of all the nodes in a Flume deployment and to keep theminformed of any changes to their configuration. The second is to trackacknowledgements from the end of a Flume flow that is operating in reliablemode so that the source at the top of that flow knows when to stop transmittingan event.

Master主要有两个工作,第一是跟踪各节点的配置情况,通知节点配置的改变,第二是跟踪来自flow的结尾操控在可靠的模式下(E2E)的信息,以至于让flow的源头知道什么时候停止传输event。
目前集群flume master的选择情况
10.168.0.174、10.168.0.181、10.168.0.188为flume master
修改静态配置文件
Site-specific设置对于flume节点和master通过在每一个集群节点的conf/flume-site.xml是可配置的,如果这个文件不存在,设置的属性默认的在conf/flume--conf.xml中,在接下来的例子中,在flume的节点上设置master名,让节点自己去寻找叫“master”的flume Master
conf/flume-conf.xml.  
  1. <?xml version="1.0"?>
  2. <?xml-stylesheet type="text/xsl"  href="configuration.xsl"?>
  3. <configuration>
  4. <property>
  5. <name>flume.master.servers</name>
  6. <value>master</value>
  7. </property>
  8. </configuration>
复制代码


在多master的情况下需要如下配置:
  1. <property>
  2.     <name>flume.master.servers</name>
  3.     <value>hadoopmaster.com,hadoopedge.com,datanode4.com</value>
  4.     <description>A comma-separated list of hostnames, one for each
  5.       machine in the Flume Master.
  6.     </description>
  7.   </property>
  8.   <property>
  9.     <name>flume.master.store</name>
  10.     <value>zookeeper</value>
  11.     <description>How the Flume Master stores node configurations. Must
  12.       be either 'zookeeper' or 'memory'.</description>
  13.   </property>
  14.   <property>
  15.     <name>flume.master.serverid</name>
  16.     <value>2</value>
  17.     <description>The unique identifier for a machine in a
  18.       Flume Master ensemble. Must be different on every
  19.       master instance.</description>
  20.   </property>
复制代码


注意:flume.master.serverid属性的配置主要是针对master,如在集群上目前是181,174,188为master,这三台机器的$FLUME_HOME/conf/flume-conf.xml文件中flume.master.serverid必须是不能相同的,该属性的值以0开始。
当使用agent角色时,你可以通过添加下面的配置文件在flume-conf.xml中,来设置默认的collector主机:
...
<property>
<name>flume.collector.event.host</name>
<value>collector</value>
<description>This is the host name of thedefault "remote"  collector.
</description>
</property>
<property>
<name>flume.collector.port</name>
<value>35853</value>
<description>This default tcp port that thecollector listens to in order to receive events it is collecting.
</description>
</property>
...
修改配置文件时,根据物理主机在不同的flume逻辑层,修改相应的属性的值即可。
在agent上,需要修改flume.collector.event.host属性,来指定此agent默认发送到的collector
在cllector上,不需要修改特定的属性
在master上,flume.master.serverid属性的配置主要是针对master,如在集群上目前是181,174,188为master,这三台机器的$FLUME_HOME/conf/flume-conf.xml文件中flume.master.serverid必须是不能相同的,该属性的值以0开始。
为简便起见,其他属性保持一致。


其中目前集群调整的配置文件属性:
flume.agent.logdir 该属性是配置agent临时存放文件的路径,尽可能不要放到/tmp下
flume.agent.logdir.maxage该属性是agent日志文件收集信息的时长,根据集群情况可适当调整。
flume.collector.roll.millis该属性hdfs文件切换(关闭后新建)的时长,控制在hdfs上生成的文件大小,如果调整该属性需要同时调整flume.agent.logdir.retransmit的属性,一般情况下最多是flume.agent.logdir.retransmit的一半。
flume.collector.output.format该属性是collector发送数据格式avro, avrojson(默认), avrodata…,目前集群是raw

启动Flume集群节点
集群上节点启动:
1        在命令行输入:flume master 启动master节点
2        在命令行输入:flume node –n nodeName 启动其他节点,nodeName最好根据集群逻辑的划分来取名子,这样在master进行配置的时候比较清晰。
如目前集群上的181 和188 是collector 所以nodeName可以选择collector1 collector2
名字规则自己定义,方便记忆和动态配置即可(后续会有介绍动态配置)


Flume集群动态配置
    Flume的动态配置指的是在flume shell下进行配置agent和collector,具体步骤如下:
    在已经启动的master节点新开窗口输入 依次输入”flume shell”è”connect localhost ”此时可以进行动态的配置。

如执行 exec config a1 ‘tailDir(“/data/logfile”)’ ‘agentSink’
表示nodeName 是a1的机器监听/data/logfile文件夹下的日志,发送信息到flume-conf.xml配置的flume.collector.event.host 的flume.collector.port端口。
如执行配置collector的命令: exec configc1 ‘collecotrSource’ ‘collectorSink(“hdfs://hadoopmaster.com:9000/flume/webdata/%Y-%m-%d/%H”,”syslog”)

表示nodeName是c1的机器监听配置文件中flume.collector.port的端口,发送信息到hdfs上
其中/%Y-%m-%d/%H表示/年-月-日/时 建立发送文件到达的文件夹,这个时间以agent读取日志时产生的文件时间为准,syslog是文件名,创建文件的时间是flume.collector.roll.millis属性决定。

接下来的例子是 六个agent 一个collector

3.png

显式的配置:
agentA : src |agentSink("collector",35853);
agentB : src |agentSink("collector",35853);
agentC : src |agentSink("collector",35853);
agentD : src |agentSink("collector",35853);
agentE : src |agentSink("collector",35853);
agentF : src |agentSink("collector",35853);
collector : collectorSource(35853) |collectorSink("hdfs://namenode/flume/","srcdata");

可靠性模式
你可以运行agent的可靠性级别,简单的配置不同的agentSink就可以,下面有三个级别的配置
agentE2ESink[("machine"[,port])]
    end to end,这个级别是WAL,relies on an acknowledgement, and willretry if no acknowledgement is received.
agentDFOSink[("machine"[,port])]
    DFO,当agent发现在collector操作失败的时候,agent写入到本地硬盘上,如果出现存储在硬盘上的数据,agent重新进行网络连接,并试图重新发送数据。
agentBESink[("machine"[,port])]
    效率最好,agent不写入到本地任何数据,如果在collector 发现处理失败,直接删除消息。
AgentSink 是agentE2ESink 的别名
补充说明
多个collector能够增加日志收集的吞吐量,提高collector的有效性能够提高数据的传输速度,数据的收集是可并行的,此外,来自多个agent的数据能够分配到多个collector上加载。

多collector下agent的划分
前面的图展示flume节点典型的拓扑结构和数据流,为了可靠的传输,当collector停止运行或是失去与agents的联系的时候,agents将会存储他们的events在各自的本地硬盘上,这些agents试图重新连接collector,因为collector的宕机,任何处理和分析的数据流都被阻塞。

4.png

当你有多个collector如上图所示,即使在collector宕机的情况下,数据处理仍然能够进行下去,如果collector b 宕机了,agent a,agent b,ageng e,和agentf会分别继续传送events通过collector a 和collector c,agent c 和agent d 的不得不排在其他agent的后面等待日志的处理直到collector b重新上线。
接下来的配置划分agents在多collector,这个例子是每一个collector由同一个输出的dfs路径和文件的前缀名,聚合所有的日志到同一个目录下
agentA : src |agentE2ESink("collectorA",35853);
agentB : src |agentE2ESink("collectorA",35853);
agentC : src |agentE2ESink("collectorB",35853);
agentD : src |agentE2ESink("collectorB",35853);
agentE : src |agentE2ESink("collectorC",35853);
agentF : src |agentE2ESink("collectorC",35853);
collectorA : collectorSource(35853) |collectorSink("hdfs://...","src");
collectorB : collectorSource(35853) |collectorSink("hdfs://...","src");
collectorC : collectorSource(35853) |collectorSink("hdfs://...","src");

5.png

当多个collector写入到相同的存储位置时,你可以设置agent c  agent d 当他们的collector出现错误时转向其他的collector,可以分别设置agent c 和agent d 转移到collectora 和collector c 上。

用agents的failover chains。和指向单独的collector(agentSink)类似,failover chains也有三个可靠性的级别agentE2EChain,agentDFOChain, and agentBEChain.
下面的例子,手动设置失败转移链表用agentE2EChain,有多个失败转移collector的agent的end-to-end的可靠性级别,agentA默认的将数据发送到collectorAd端口35853,第二个参数在agentA 's sink是指定备用的collector。你可以定义任意数量的collector,(至少一个以上)。

agentA : src |agentE2EChain("collectorA:35853","collectorB:35853");
agentB : src |agentE2EChain("collectorA:35853","collectorC:35853");
agentC : src |agentE2EChain("collectorB:35853","collectorA:35853");
agentD : src | agentE2EChain("collectorB:35853","collectorC:35853");
agentE : src |agentE2EChain("collectorC:35853","collectorA:35853");
agentF : src |agentE2EChain("collectorC:35853","collectorB:35853");
collectorA : collectorSource(35853) |collectorSink("hdfs://...","src");
collectorB : collectorSource(35853) |collectorSink("hdfs://...","src");
collectorC : collectorSource(35853) |collectorSink("hdfs://...","src");
        注意:这章中,agent[A-F] 和 collector[A-B] 是物理节点的名字
        注意:自动失败转移链表功能还不能用在多master的集群。





Flume集群测试以及节点失败后的处理
Flume集群出现错误的节点按照逻辑划分,分别属于agent,collector,master,这三层的逻辑架构的节点出错可以使单独一个节点出错,也可以是组合出错。
目前集群测试情况
目前集群测试环境:
说明:目前集群的测试主要通过上传文件到hdfs上然后通过hive查询文件的记录条数。
场景一:三台master(174/176,181,188),两台collector(181,188),一台agent(235),传送的文件有据说有3000000条记录,agent配置188-181。
Session1:不做任何异常测试,hive建立表是logflumeSession1,结果是:数据条数:300万
Session2:两个collector都宕机,数据条数:3427971

6.png
Session3:宕机一个cllector,和一个master,然后重新启动collector,master,数据量:

7.png
Session4:宕机agent,数据会重新发送
场景n:三台master(174/176,181,188),一台collector(188),一台agent(235),传送的文件有6000000条记录,colletcor和agent不在174节点的master上
出现的状况:174宕机
错误情况:不能正常传送文件到hdfs上,collector报错信息如图
8.png

Hdfs无法写入数据

9.png

解决办法:重新启动collector
结果:数据正常传输
10.png
数据的总行数:

11.png
数据会增加。
单一逻辑层的节点失败及处理
单一逻辑层的节点出错,情况有四种agent,collector,master,storage,storage。 storage层的失败和collector层的失败是一样的,只要数据放不到最终的位置,就认为节点是失败的。
agent失败
Flume数据安全级别的配置主要Agent的配置上,Agent提供三种级别发送数据到collector:E2E、DFO、BF。
E2E:   agent会先将收集到的数据写到agent本地硬盘上,然后发送  collector,collector会将数据发送到最后数据的节点。确定数据已经成功保存到目标机器之后,collector通知agent删除硬盘上文件。
DFO:   当数据无法正常保存到目标主机时,agent会将收集到的数    据保到agent安装时指定的目录下,错误恢复之后,将记录的数据发送到collector,进入目标机器。
BF:    agent不做任何的日志记录,如果目标地址不可达则数据丢失。
agent节点监控日志文件夹下的所有文件,每一个agent最多监听1024个文件,每一个文件在agent的都会有一个类似游标的东西,记录监听文件读取的位置,这样每次文件有新的记录产生,那么游标就会读取增量记录,根据agent配置发送到collector的安全层级属性有E2E,DFO。如果是E2E的情况那么agent节点会首先把文件写入到agent节点的文件夹下,然后发送给collector,如果最终数据最终成功存储到storage层,那么agent删除之前写入的文件,如果没有收到成功的信息,那么就保留信息。
如果agent节点出现问题,那么相当于所有的记录信息都消失了,如果直接重新启动,agent会认为日志文件夹下的所有文件都是没有监听过的,没有文件记录的标示,所以会重新读取文件,这样,日志就会有重复,具体恢复办法如下
      将agent节点上监听的日志文件夹下已经发送的日志文件移出,处理完
      故障重新启动agent即可。
注:在agent节点失败的情况下,按照失败的时间点,将时间点之前的数据文件移出,将flume.agent.logdir配置的文件夹清空,重新启动agent。
collector失败  
     collector端做数据的合并,并且将数据发送到目标机器上去,如果collector端失败那么不用担心,因为agent的E2E模式会将没有发送成功的数据保存到agent的本地,等到collector恢复之后,数据会重新发送到collector上,数据不会丢失。
除了恢复故障节点之外不用做其他的额外处理,collector的性能影响
      的是整个flume集群的数据吞吐量,所以collector最好单独部署。
master失败
      master宕机,整个集群将不能工作,在重新启动集群,将agent监听的日志文件夹下的所有文件
       移出,然后重新启动master即可。
      在多master节点情况下,只要集群上正常工作的master大于总master数量的一半,集群就能正常工作,那么只要恢复其中宕机的master即可。
      
多逻辑层节点组合失败及处理
两个逻辑层组合出错,情况有agent-collector,collector-master,agent-master,实际上agent
agent-collector组合
    agent和collector组合出现错误,collector和stroage出错一样,姑且将collector和storage放在一起考虑。
collector-master组合agent-master组合






Flume集群遇到的问题
1,  Flume在agent端采集数据的时候默认会在/tmp/flume-{user}下生成临时的目录用于存放agent自己截取的日志文件,如果文件过大导致磁盘写满那么agent端会报出
Error closing logicalNode a2-18 sink: No space lefton device,所以在配置agent端的时候需要注意
<property>
   <name>flume.agent.logdir</name>
   <value>/data/tmp/flume-${user.name}/agent</value>
  </property>
属性,只要保证flume在7*24小时运行过程agent端不会使该路径flume.agent.logdir磁盘写满即可。

2,  Flume在启动时候会去寻找hadoop-core-*.jar的文件,需要修改标准版的hadoop核心jar包的名字 将hadoop-*-core.jar改成hadoop-core-*.jar。

3,  Flume集群中的flume必须版本一致。否则会出现莫名其妙的错误。

4,  Flume集群收集的日志发送到hdfs上建立文件夹的时间依据是根据event的时间,在源代码上是Clock.unixTime(),所以如果想要根据日志生成的时间来生成文件的话,需要对
com.cloudera.flume.core.EventImpl类的构造函数
publicEventImpl(byte[] s, long timestamp, Priority pri, long nanoTime,
      String host, Map<String,byte[]> fields)重新写,解析数组s的内容取出时间,赋给timestamp。
注意:flume的框架会构造s内容是空的数组,用来发送类似简单验证的event,所以需要注意s内容为空的时候timestamp的问题。

5,  Flume在读取utf-8格式的文件时会出现解析不了时间戳,因为utf-8的文件格式在文件头会加上utf-8的文件标识,
解决办法:
flume-core-0.9.4-modify.jar包修改了 EventImpl类
使文件创建的时间是根据日志中的时间来确定的,修改的代码部分在FlumeSource中
修改的部分是在构造函数上,支持utf-8的文件格式,也支持其他普通的文件格式的文件
主要代码如下:
System.arraycopy(s, 3, tmpByte, 0, 13);
     tmpStr= new String(tmpByte);
     m =p.matcher(tmpStr);
     if(m.matches())//表示符合utf-8的文件格式
     this.timestamp= Long.parseLong(tmpStr);
     else
     this.timestamp=Long.parseLong(new String(s).split("\t")[0].toString());
     
性能没有测试。

6,   与5不同的是时间戳不是long的,而是字符串如:“2011-10-21 10:12:65.125”,修改办法如下: System.arraycopy(s, 3, tmpByte, 0, 25);
     tmpStr = new String(tmpByte);
     try {
            //this.timestamp=sf.parse(tmpStr.split("\t")[0].toString()).getTime();
            this.timestamp=sf.parse(tmpStr.substring(0,tmpStr.indexOf("\t"))).getTime();
        }catch (ParseException e) {
            e.printStackTrace();
        }
7,  如果collector和agent不在一个网段的话会发生闪断的现象,这样的话,就会造成agent端不能传送数据个collector所以,在部署agent和collector最好在一个网段。

8,  如果在启动master时出现:“试着启动hostname,但是hostname不在master列表里的错误“,这是需要检查是否主机地址和hostname配置的正确与否。




12.png

已有(6)人评论

跳转到指定楼层
quenlang 发表于 2014-9-30 13:29:13
学习一下 ,感谢楼主的好帖
回复

使用道具 举报

hahaxixi 发表于 2014-10-24 11:27:45
感谢楼主的好帖,very good!
回复

使用道具 举报

hbbtym 发表于 2014-12-2 16:31:30
你好楼主,是不是Flume1.x就没有了master的概念
回复

使用道具 举报

howtodown 发表于 2014-12-4 13:01:28
hbbtym 发表于 2014-12-2 16:31
你好楼主,是不是Flume1.x就没有了master的概念
Flume NG 1.x 是Flume 0.9.x的重构版本,基本面目全非了,Master和zookeeper没有了,collector没有了,Web console没有了,只有
source  
sink  
channl

回复

使用道具 举报

hbbtym 发表于 2014-12-5 14:04:46
howtodown 发表于 2014-12-4 13:01
Flume NG 1.x 是Flume 0.9.x的重构版本,基本面目全非了,Master和zookeeper没有了,collector没有了,Web ...

恩 这几天查资料弄明白了,感谢楼主
回复

使用道具 举报

若曦难得 发表于 2016-9-22 14:56:18
好文档,谢谢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条