本帖最后由 breaking 于 2016-4-14 14:40 编辑
问题导读:
1.怎么去测试Flume? 2.结合Flume怎么用Spark Streaming去测试? 3.怎么用Flume发送数据给Spark Streaming?
公司业务准备上流数据处理了。由于之前基础平台选用了CDH,而CDH自带Spark,且由于数据源是每隔几分钟发一组数据文件的形式来传送数据,所以最终选取用Spark Steaming来做流数据处理。 下面记录初步使用Spark Steaming和Flume的一些过程。
第一个测试:Flume(spooldir to hdfs)原始数据通过ftp每隔几分钟拉取一批数据到本地某文件夹。于是测试了下flume监控文件夹并将新加入的文件写入hdfs的功能。 配置文件如下: [mw_shl_code=bash,true]a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/flume/%Y-%m-%d/%H%M%S
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = file
根据官方文档hdfs.fileType[/mw_shl_code]
根据官方文档hdfs.fileType默认是SequenceFile,这里选用DataStream将不压缩输出文件。 若不设置hdfs.useLocalTimeStamp为true则会报下面的错误,暂时不知为何。
[mw_shl_code=bash,true]
15/06/05 17:16:45 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:224)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:420)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:388)
... 3 more
[/mw_shl_code]
期间还遇到了下面这个错误,原因很简单,当时想把路径设置成为yyyy-MM-dd hh:mm:ss这样的形式,但HDFS不允许创建带有“:”符号的文件夹,所以报错。
[mw_shl_code=bash,true]15/06/08 11:21:07 ERROR hdfs.HDFSEventSink: process failed
java.lang.IllegalArgumentException: Pathname /user/flume/2015-06-08/1121:04/FlumeData.1433733664360.tmp from /user/flume/2015-06-08/1121:04/FlumeData.1433733664360.tmp is not a valid DFS filename.
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:396)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:392)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:392)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:336)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)[/mw_shl_code]
启动命令: [mw_shl_code=bash,true]bin/flume-ng agent --conf conf --conf-file dir2hdfs.conf --name a1 -Dflume.root.logger=INFO,cons[/mw_shl_code]
第二个测试:执行Spark Steaming任务测试程序:
[mw_shl_code=scala,true]public final class FlumeTest {
private FlumeTest() {
}
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("Usage: FlumeTest <host> <port>");
System.exit(1);
}
String host = args[0];
int port = Integer.parseInt(args[1]);
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
flumeStream.count();
flumeStream.count().map(new Function<Long, String>() {
@Override
public String call(Long in) {
return "Received " + in + " flume events.";
}
}).print();
ssc.start();
ssc.awaitTermination();
}
}[/mw_shl_code] 打jar包后放在集群上跑,结果第一次报错VERSION版本号不对,然后看了看本机编译的时候采用的是JAVA 8,而集群都是JAVA 7,于是重新用JAVA 7打包。第二次报错,大概意思是找不到org.apache.spark.streaming下面的方法,这个就无语了,尝试了多次后突然想到可能是所下载的spark lib包是java 8编译的,于是尝试了用CDH集群的spark lib包,好使,果然是lib包的问题。 出现各种找不到方法的错误,如下:
[mw_shl_code=bash,true]Exception in thread "Driver" scala.MatchError: java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/FlumeUtils (of class java.lang.NoClassDefFoundError)
at org.apache.spark.deploy.yarn.ApplicationMaster <span class='MathJax_Preview'>\(anon$2.run(ApplicationMaster.scala:432)[/mw_shl_code]
[mw_shl_code=bash,true]Exception in thread "Thread-48" java.lang.NoClassDefFoundError: org/apache/avro/ipc/Responder
at org.apache.spark.streaming.flume.FlumeInputDStream.getReceiver(FlumeInputDStream.scala:55)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher\)</span><script type='math/tex'>anon$2.run(ApplicationMaster.scala:432)[/mw_shl_code]
[mw_shl_code=bash,true]15/06/08 13:22:20 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-14] shutting down ActorSystem [sparkDriver]
java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2499)
at java.lang.Class.getDeclaredField(Class.java:1951)
at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)[/mw_shl_code]
[mw_shl_code=bash,true]15/06/08 13:22:20 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-14] shutting down ActorSystem [sparkDriver]
java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2499)
at java.lang.Class.getDeclaredField(Class.java:1951)
at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)[/mw_shl_code]
解决方法1:启动命令里面添加缺少的各种jar包 [mw_shl_code=bash,true]spark-submit --class com.mobicloud.test.FlumeTest --jars /root/mrwork/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,/root/mrwork/spark-streaming-flume-sink_2.10-1.3.1.jar,/root/mrwork/flume-ng-sdk-1.5.0-cdh5.3.0.jar,/root/mrwork/avro-ipc.jar --deploy-mode cluster --master yarn wc.jar master 33333
[/mw_shl_code]
解决方法2:程序中添加所需的各种jar包
[mw_shl_code=scala,true]SparkConf sparkConf = new SparkConf()
.setAppName("FileStream")
.setJars(new String[]{
"/root/mrwork/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar",
"/root/mrwork/spark-streaming-flume-sink_2.10-1.3.1.jar",
"/root/mrwork/flume-ng-sdk-1.5.0-cdh5.3.0.jar",
"/root/mrwork/avro-ipc.jar"
});[/mw_shl_code]
第三个测试:Flume发送数据给Spark Streaming程序
Spark Streaming接收Flume以avro形式发送的数据。Flume配置文件如下:
[mw_shl_code=bash,true]a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 33333
a1.channels.c1.type = file[/mw_shl_code]
先启动Spark Steaming任务,在启动Flume-ng,否则会报以下错误:
[mw_shl_code=bash,true]15/06/05 17:35:12 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: master, port: 33333
org.apache.flume.FlumeException: NettyAvroRpcClient { host: slave07, port: 44443 }: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:182)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:121)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:89)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)[/mw_shl_code]
尝试将一个文件复制到Flume监控的/root/data文件夹下,Spark任务就会在log日志中输出一行:
[mw_shl_code=bash,true]-------------------------------------------
Time: 1433729594000 ms
-------------------------------------------
Received 75 flume events.[/mw_shl_code]
但看了下放进去的文件行数是74,这里不知道为啥会多一行,不知道是不是文件头信息还是时间戳信息什么的,待后续分析。
参考资料
后续开始看Spark Streaming的官方文档了,发现可以直接使用Spark Streaming中的Basic Sources的File Streams即可,这个玩意就能监控文件夹。看来用Flume有点多此一举了,具体如何取舍看实际业务需求。
|