分享

Flume+Spark Steaming初探

breaking 2016-4-14 14:08:31 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 18240
本帖最后由 breaking 于 2016-4-14 14:40 编辑
问题导读:
1.怎么去测试Flume?
2.结合Flume怎么用Spark Streaming去测试?
3.怎么用Flume发送数据给Spark Streaming?




公司业务准备上流数据处理了。由于之前基础平台选用了CDH,而CDH自带Spark,且由于数据源是每隔几分钟发一组数据文件的形式来传送数据,所以最终选取用Spark Steaming来做流数据处理。
下面记录初步使用Spark Steaming和Flume的一些过程。

第一个测试:Flume(spooldir to hdfs)
原始数据通过ftp每隔几分钟拉取一批数据到本地某文件夹。于是测试了下flume监控文件夹并将新加入的文件写入hdfs的功能。
配置文件如下:
[mw_shl_code=bash,true]a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/flume/%Y-%m-%d/%H%M%S
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.channels.c1.type = file
根据官方文档hdfs.fileType[/mw_shl_code]

根据官方文档hdfs.fileType默认是SequenceFile,这里选用DataStream将不压缩输出文件。
若不设置hdfs.useLocalTimeStamp为true则会报下面的错误,暂时不知为何。

[mw_shl_code=bash,true]
15/06/05 17:16:45 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:224)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:420)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:388)
        ... 3 more
[/mw_shl_code]

期间还遇到了下面这个错误,原因很简单,当时想把路径设置成为yyyy-MM-dd hh:mm:ss这样的形式,但HDFS不允许创建带有“:”符号的文件夹,所以报错。

[mw_shl_code=bash,true]15/06/08 11:21:07 ERROR hdfs.HDFSEventSink: process failed
java.lang.IllegalArgumentException: Pathname /user/flume/2015-06-08/1121:04/FlumeData.1433733664360.tmp from /user/flume/2015-06-08/1121:04/FlumeData.1433733664360.tmp is not a valid DFS filename.
    at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195)
    at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104)
    at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:396)
    at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:392)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:392)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:336)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)[/mw_shl_code]

启动命令:
[mw_shl_code=bash,true]bin/flume-ng agent --conf conf --conf-file dir2hdfs.conf --name a1 -Dflume.root.logger=INFO,cons[/mw_shl_code]


第二个测试:执行Spark Steaming任务
测试程序:

[mw_shl_code=scala,true]public final class FlumeTest {
        private FlumeTest() {
        }

        public static void main(String[] args) {
                if (args.length != 2) {
                        System.err.println("Usage: FlumeTest <host> <port>");
                        System.exit(1);
                }

                String host = args[0];
                int port = Integer.parseInt(args[1]);

                Duration batchInterval = new Duration(2000);
                SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
                JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
                JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

                flumeStream.count();
                flumeStream.count().map(new Function<Long, String>() {
                        @Override
                        public String call(Long in) {
                                return "Received " + in + " flume events.";
                        }
                }).print();

                ssc.start();
                ssc.awaitTermination();
        }
}[/mw_shl_code]
打jar包后放在集群上跑,结果第一次报错VERSION版本号不对,然后看了看本机编译的时候采用的是JAVA 8,而集群都是JAVA 7,于是重新用JAVA 7打包。第二次报错,大概意思是找不到org.apache.spark.streaming下面的方法,这个就无语了,尝试了多次后突然想到可能是所下载的spark lib包是java 8编译的,于是尝试了用CDH集群的spark lib包,好使,果然是lib包的问题。
出现各种找不到方法的错误,如下:

[mw_shl_code=bash,true]Exception in thread "Driver" scala.MatchError: java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/FlumeUtils (of class java.lang.NoClassDefFoundError)
        at org.apache.spark.deploy.yarn.ApplicationMaster <span class='MathJax_Preview'>\(anon$2.run(ApplicationMaster.scala:432)[/mw_shl_code]

[mw_shl_code=bash,true]Exception in thread "Thread-48" java.lang.NoClassDefFoundError: org/apache/avro/ipc/Responder
        at org.apache.spark.streaming.flume.FlumeInputDStream.getReceiver(FlumeInputDStream.scala:55)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher\)</span><script type='math/tex'>anon$2.run(ApplicationMaster.scala:432)[/mw_shl_code]

[mw_shl_code=bash,true]15/06/08 13:22:20 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-14] shutting down ActorSystem [sparkDriver]
java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;
    at java.lang.Class.getDeclaredFields0(Native Method)
    at java.lang.Class.privateGetDeclaredFields(Class.java:2499)
    at java.lang.Class.getDeclaredField(Class.java:1951)
    at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)[/mw_shl_code]

[mw_shl_code=bash,true]15/06/08 13:22:20 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-14] shutting down ActorSystem [sparkDriver]
java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;
    at java.lang.Class.getDeclaredFields0(Native Method)
    at java.lang.Class.privateGetDeclaredFields(Class.java:2499)
    at java.lang.Class.getDeclaredField(Class.java:1951)
    at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)[/mw_shl_code]

解决方法1:启动命令里面添加缺少的各种jar包
[mw_shl_code=bash,true]spark-submit --class com.mobicloud.test.FlumeTest --jars /root/mrwork/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar,/root/mrwork/spark-streaming-flume-sink_2.10-1.3.1.jar,/root/mrwork/flume-ng-sdk-1.5.0-cdh5.3.0.jar,/root/mrwork/avro-ipc.jar --deploy-mode cluster --master yarn wc.jar master 33333
[/mw_shl_code]
解决方法2:程序中添加所需的各种jar包
[mw_shl_code=scala,true]SparkConf sparkConf = new SparkConf()
              .setAppName("FileStream")
              .setJars(new String[]{
                    "/root/mrwork/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar",
                    "/root/mrwork/spark-streaming-flume-sink_2.10-1.3.1.jar",
                    "/root/mrwork/flume-ng-sdk-1.5.0-cdh5.3.0.jar",
                    "/root/mrwork/avro-ipc.jar"
              });[/mw_shl_code]

第三个测试:Flume发送数据给Spark Streaming程序

Spark Streaming接收Flume以avro形式发送的数据。Flume配置文件如下:

[mw_shl_code=bash,true]a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 33333

a1.channels.c1.type = file[/mw_shl_code]

先启动Spark Steaming任务,在启动Flume-ng,否则会报以下错误:

[mw_shl_code=bash,true]15/06/05 17:35:12 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: master, port: 33333
org.apache.flume.FlumeException: NettyAvroRpcClient { host: slave07, port: 44443 }: RPC connection error
        at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:182)
        at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:121)
        at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638)
        at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:89)
        at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)[/mw_shl_code]

尝试将一个文件复制到Flume监控的/root/data文件夹下,Spark任务就会在log日志中输出一行:
[mw_shl_code=bash,true]-------------------------------------------
Time: 1433729594000 ms
-------------------------------------------
Received 75 flume events.[/mw_shl_code]

但看了下放进去的文件行数是74,这里不知道为啥会多一行,不知道是不是文件头信息还是时间戳信息什么的,待后续分析。

参考资料
后续
开始看Spark Streaming的官方文档了,发现可以直接使用Spark Streaming中的Basic Sources的File Streams即可,这个玩意就能监控文件夹。看来用Flume有点多此一举了,具体如何取舍看实际业务需求。



已有(4)人评论

跳转到指定楼层
恋枫缩影 发表于 2016-4-15 00:17:23
不错,正好要把storm对接kafka还有flume的改成spark的,可以参考一下
回复

使用道具 举报

a_zhen 发表于 2016-4-15 09:53:50
写的还行吧,spark还没来得及学呢
回复

使用道具 举报

杰仕人生 发表于 2016-4-15 15:37:08
写的不错,解决了我之前遇到的一个小问题。哈哈
回复

使用道具 举报

yinuo2016 发表于 2016-6-12 16:56:56
flume+kafka+sprak stream,处理小文件数据流,每个文件是一个逻辑分析,这套合适么?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条