问题导读: 1.node向master发送心跳之后等待反馈的最大时长由哪个参数来决定,默认多长时间? 2.当primary sink(可以认为是第一collector)故障后,重启primary sink的一个延迟时间,在此期间,agent将把数据发送到secondary sink(可能是第二collector)由哪个参数来决定? 3.collector的默认发送目录通过哪个参数可以配置? 4.flume command执行命令的方式有几种?
Flume配置文件(flume-site.conf)
1、 watchdog watchdog.restarts.max | watchdog每分钟重启的最大数??? |
2、 common node flume.config.heartbeat.period | node发送心跳周期,默认5000(毫秒) | flume.node.status.port | node web端口 | flume.node.heartbeat.backoff.ceiling | node向master发送心跳之后等待反馈的最大时长,默认60000(毫秒) | flume.node.http.autofindport | 如果已有node启动,允许第二个node自动选择一个未使用的端口做web服务。多个node的界面端口从35862、35863向后延续 |
3、agent flume.agent.logdir | agent日志路径 | flume.agent.logdir.maxage | 当前处于打开状态agent日志文件收集信息的时长,在这之后该日志文件将会被关闭,并将数据发送到网络,默认10000(毫秒) | flume.agent.logdir.retransmit | 在end-to-end模式下agent向collector发送数据失败后再次发送的间隔时长,默认60000(毫秒),建议至少是flume.collector.roll.millis的两倍 | flume.agent.failover.backoff.initial | 当primary sink(可以认为是第一collector)故障后,重启primary sink的一个延迟时间,在此期间,agent将把数据发送到secondary sink(可能是第二collector) | flume.agent.failover.backoff.max | 在一定时限内尝试链接故障节点失败后,agent将把数据转发向备用节点 |
4、collector flume.collector.event.host | 默认collector地址 | flume.collector.port | 默认collector端口 | flume.collector.dfs.dir | 最终数据发向目录(默认),可以是本地,可以是hdfs,默认是/tmp | flume.collector.dfs.compress.codec | 压缩格式GzipCodec, DefaultCodec (deflate), BZip2Codec,默认是None | flume.collector.roll.millis | hdfs文件切换(关闭后新建)的时长 | flume.collector.output.format | collector发送数据格式avro, avrojson(默认), avrodata… |
5、master flume.master.servers | 用逗号分隔多个master地址列表 | flume.master.store | master配置存储方式(zookeeper/memory) zookeeper保证master的配置在多master节点之间同步,memory则保存在内存中,其配置随着master宕机而丢失 | flume.master.serverid | master的唯一标识 | flume.master.http.port | http端口 | flume.master.heartbeat.missed.max | 判断节点失效的最大未达心跳数 | flume.master.savefile | 当前flume配置文件的路径,默认conf/current.flume | flume.master.savefile.autoload | 启动时是否加载current.flume,默认false | flume.master.gossip.period | master通信周期(毫秒) | flume.master.heartbeat.rpc | THRIFT/AVRO | flume.event.rpc | THRIFT/AVRO | flume.report.server.rpc.type | THRIFT/AVRO |
6、zookeeper flume.master.zk.logdir | zookeeper日志路径 |
7、thrift flume.thrift.socket.timeout.ms | thrift网络连接超时时间(毫秒) |
1.command shell(flume command)
help | 帮助 | connect master:port | 登录master | config logicalnode source sink | 为逻辑节点配置一个source到sink的映射 | getnodestatus | 获得节点状态(HELLO, CONFIGURING, ACTIVE, IDLE, ERROR, DECOMMISSIONED, LOST ) HELLO, node启动时 CONFIGURING, node被配置后 ACTIVE, 一个event从source送达到sink IDLE, source中所有evnet发送完毕后 ERROR, 节点故障退出,数据没有flush DECOMMISSIONED, node被master移除 LOST, master长时间未收到node心跳 | getconfigs | 获得配置 | getmappings [physical node] | 如果physical node参数被省略,将显示所有logical node到physical node的映射关系 | exec | 同步执行命令 | Source file | 执行脚本. | submit | 异步执行命令 | wait ms [cmdid] | 设定一个时间,周期检查命令进程的状态(success or failure) | waitForNodesActive ms node1 [node2 […]] | 设定一个时间,检查node是否处于使用(configuring, active)状态 | waitForNodesDone ms node1 [node2 […]] | 设定一个时间,检查node是否处于未用(IDLE, ERROR, LOST)状态 | quit | 退出 |
2.command shell(exec & submit command) 双引号 | 包含转义字符的java string | 单引号 | 能引住除单引号之外的所有字符 | noop | touch master, 不做操作 | config logicalnode source sink | 为逻辑节点配置source到sink的映射 | multiconfig flumespec | | unconfig logicalnode | 取消逻辑节点的配置,影响master调整故障转移列表(failover list) | refreshAll logicalnode | 刷新 | save filename | 保存current configuration到master硬盘 | load filename | 从master中加载current configuration | map physicalnode logicalnode | 配置物理节点到逻辑节点的映射关系,master的配置将被同步到logicalnode | spawn physicalnode logicalnode | 恢复 | decommission logicalnode | | unmap physicalnode logicalnode | 取消映射 | unmapAll | 全部取消 | purge logicalnode | 清除状态,类似重启一个logical node, 适用于(DECOMMISSIONED、 LOST)状态 | purgeAll | 清除所有logical node的状态 |
Flume Source
1、Flume’s Tiered Event Sources collectorSource[(port)] | Collector source,监听端口汇聚数据 | autoCollectorSource | 通过master协调物理节点自动汇聚数据 | logicalSource | 逻辑source,由master分配端口并监听rpcSink |
2、Flume’s Basic Sources null | | console | 监听用户编辑历史和快捷键输入,只在node_nowatch模式下可用 | stdin | 监听标准输入,只在node_nowatch模式下可用,每行将作为一个event source | rpcSource(port) | 由rpc框架(thrift/avro)监听tcp端口 | text("filename") | 一次性读取一个文本,每行为一个event | tail("filename"[, startFromEnd=false]) | 每行为一个event。监听文件尾部的追加行,如果startFromEnd为true,tail将从文件尾读取,如果为false,tail将从文件开始读取全部数据 | multitail("filename"[, file2 [,file3… ] ]) | 同上,同时监听多个文件的末尾 | tailDir("dirname"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]]) | 监听目录中的文件末尾,使用正则去选定需要监听的文件(不包含目录),recurseDepth为递归监听其下子目录的深度 | seqfile("filename") | 监听hdfs的sequencefile,全路径 | syslogUdp(port) | 监听Udp端口 | syslogTcp(port) | 监听Tcp端口 | syslogTcp1(port) | 只监听Tcp端口的一个链接 | execPeriodic("cmdline", ms) | 周期执行指令,监听指令的输出,整个输出都被作为一个event | execStream("cmdline") | 执行指令,监听指令的输出,输出的每一行被作为一个event | exec("cmdline"[,aggregate=false[,restart=false[,period=0]]]) | 执行指令,监听指令的输出,aggregate如果为true,整个输出作为一个event如果为false,则每行作为一个event。如果restart为true,则按period为周期重新运行 | synth(msgCount,msgSize) | 随即产生字符串event,msgCount为产生数量,msgSize为串长度 | synthrndsize(msgCount,minSize,maxSize) | 同上,minSize – maxSize | nonlsynth(msgCount,msgSize) | | asciisynth(msgCount,msgSize) | Ascii码字符 | twitter("username","pw"[,"url"]) | 尼玛twitter的插件啊 | irc("server",port, "nick","chan") | | scribe[(+port)] | Scribe插件 | report[(periodMillis)] | 生成所有physical node报告为事件源 |
Flume Sinks
1、Flume’s Collector Tier Event Sinks collectorSink( "fsdir","fsfileprefix",rollmillis) | collectorSink,数据通过collector汇聚之后发送到hdfs, fsdir 是hdfs目录,fsfileprefix为文件前缀码 |
2、Flume’s Agent Tier Event Sinks agentSink[("machine"[,port])] | Defaults to agentE2ESink,如果省略,machine参数,默认使用flume.collector.event.host与flume.collector.event.port作为默认collecotr(以下同此) | agentE2ESink[("machine"[,port])] | 执着的agent,如果agent发送event没有收到collector成功写入的状态码,该event将被agent重复发送,直到接到成功写入的状态码 | agentDFOSink[("machine" [,port])] | 本地热备agent,agent发现collector节点故障后,不断检查collector的存活状态以便重新发送event,在此间产生的数据将缓存到本地磁盘中 | agentBESink[("machine"[,port])] | 不负责的agent,如果collector故障,将不做任何处理,它发送的数据也将被直接丢弃 | agentE2EChain("m1[:_p1_]" [,"m2[:_p2_]"[,…]]) | 指定多个collector提高可用性。 当向主collector发送event失效后,转向第二个collector发送,当所有的collector失败后,它会非常执着的再来一遍... | agentDFOChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]]) | 同上,当向所有的collector发送事件失效后,他会将event缓存到本地磁盘,并检查collector状态,尝试重新发送 | agentBEChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]]) | 同上,当向所有的collector发送事件失效后,他会将event丢弃 | autoE2EChain | 无需指定collector, 由master协调管理event的流向 | autoDFOChain | 同上 | autoBEChain | 同上 |
3、Flume’s Logical Sinks logicalSink("logicalnode") | |
4、Flume’s Basic Sinks 在不使用collector收集event的情况下,可将source直接发向basic sinks null | null | console[("formatter")] | 转发到控制台 | text("txtfile" [,"formatter"]) | 转发到文本文件 | seqfile("filename") | 转发到seqfile | dfs("hdfspath") | 转发到hdfs | customdfs("hdfspath"[, "format"]) | 自定义格式dfs | +escapedCustomDfs("hdfspath", "file", "format") | | rpcSink("host"[, port]) | Rpc框架 | syslogTcp("host"[,port]) | 发向网络地址 | irc("host",port, "nick", "chan") | |
|