分享

spark的初学Stream开发遇到问题请教

本帖最后由 为啥呢? 于 2016-1-25 21:51 编辑

问题1:执行过程中出现的错误如何解决?
问题2:如何输出我在lines.flatMap(new FlatMapFunction<String, String>() 方法中的相关日志?(我想拿到获取到的数据进行业务处理)
问题3:运行时如何提交到后台,相关日志输出到文件中?


刚刚从.NET转到Java学习Hadoop Spark,找了一个Streaming的示例,JavaDirectKafkaWordCount的示例,并且已经可以运行,但是在其中代码段加了输出,代码如下
[mw_shl_code=java,true]public static void main(String[] args) {
                logger.info("-------------Start Streaming--------------");
                if (args.length < 2) {
                        System.err
                                        .println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n"
                                                        + "  <brokers> is a list of one or more Kafka brokers\n"
                                                        + "  <topics> is a list of one or more kafka topics to consume from\n\n");
                        System.exit(1);
                }
                logger.info("-------------args--------------,args[0]:" + args[0]
                                + ",args[1]:" + args[1]);

                StreamingExamples.setStreamingLogLevels();

                String brokers = args[0];
                String topics = args[1];

                // Create context with a 2 seconds batch interval
                SparkConf sparkConf = new SparkConf()
                                .setAppName("JavaDirectKafkaWordCount");
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                                Durations.seconds(10));
                logger.info("-------------jssc OK --------------");
                HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics
                                .split(",")));
                HashMap<String, String> kafkaParams = new HashMap<String, String>();
                kafkaParams.put("metadata.broker.list", brokers);
                logger.info("-------------kafkaParams OK --------------");
                // Create direct kafka stream with brokers and topics
                JavaPairInputDStream<String, String> messages = KafkaUtils
                                .createDirectStream(jssc, String.class, String.class,
                                                StringDecoder.class, StringDecoder.class, kafkaParams,
                                                topicsSet);
                logger.info("-------------JavaPairInputDStream messages OK,messages="
                                + messages.toString() + " --------------");
                // Get the lines, split them into words, count the words and print
               
                JavaDStream<String> lines = messages
                                .map(new Function<Tuple2<String, String>, String>() {
                                        public String call(Tuple2<String, String> tuple2) {
                                                return tuple2._2();
                                        }
                                });
                logger.info("-------------JavaDStream lines OK,lines="
                                + lines.toString() + " --------------");
                JavaDStream<String> words = lines
                                .flatMap(new FlatMapFunction<String, String>() {
                                        public Iterable<String> call(String x) {
                                                logger.info("---------------------------Get Message :"
                                                                + x);
                                                return Lists.newArrayList(SPACE.split(x));
                                        }
                                });
                logger.info("-------------words OK,words=" + words.context()
                                + " --------------");

                JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                                new PairFunction<String, String, Integer>() {
                                        public Tuple2<String, Integer> call(String s) {

                                                return new Tuple2<String, Integer>(s, 1);
                                        }
                                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                        public Integer call(Integer i1, Integer i2) {

                                return i1 + i2;
                        }
                });
                logger.info("-------------wordCounts OK,wordCounts="
                                + wordCounts.context() + " --------------");
                wordCounts.print();

                // Start the computation
                jssc.start();
                jssc.awaitTermination();
        }[/mw_shl_code]



运行结果如下


开始执行:
1

开始执行1

开始执行1


开始执行2

开始执行2


然后发现出现了错误

开始执行包含

开始执行包含


但是好像没有影响到运行

topic输入数据

topic输入数据

topic输入数据


获取到topic数据并运行了

获取到topic的数据

获取到topic的数据


提交运行的命令如下:
./spark-submit --class foo.JavaDirectKafkaWordCount  --master spark://YK2TDW015:7077   --executor-memory 512M  --num-executors 4 /home/sfdp/qiaoym/1221zhstreaming.jar YK2TDW014:9092,YK2TDW015:9092,YK2TDW016:9092 topic1,topic2



问题1:执行过程中出现的错误如何解决?
问题2:如何输出我在lines.flatMap(new FlatMapFunction<String, String>() 方法中的相关日志?(我想拿到获取到的数据进行业务处理)
问题3:运行时如何提交到后台,相关日志输出到文件中?

补充内容 (2016-1-26 15:25):
还有就是:我想在读取到topic的数据时候输出,这个是只进行了一次输出

已有(16)人评论

跳转到指定楼层
wscl1213 发表于 2016-1-25 22:15:05
今天太晚了,明天帮楼主看看
回复

使用道具 举报

为啥呢? 发表于 2016-1-26 08:50:48
wscl1213 发表于 2016-1-25 22:15
今天太晚了,明天帮楼主看看

感谢兄弟回复,坐等~~~
回复

使用道具 举报

when30 发表于 2016-1-26 12:15:29
为啥呢? 发表于 2016-1-26 08:50
感谢兄弟回复,坐等~~~

第一个问题:
很多人都遇到过,应该不是什么大的问题。看看提交spark版本与集群运行的版本是否一致
第二个问题跟第三个问题应该是一样的,想输出日志,直接使用log4j的api就能办到了
回复

使用道具 举报

为啥呢? 发表于 2016-1-26 12:30:11
when30 发表于 2016-1-26 12:15
第一个问题:
很多人都遇到过,应该不是什么大的问题。看看提交spark版本与集群运行的版本是否一致
第 ...

感谢兄弟的回复,
第一个问题先不考虑的话,第二个第三个,如何修改呢?能否给出代码段?

自己增加的输出的内容位置是在哪里呢?
还请指教,多谢。

回复

使用道具 举报

when30 发表于 2016-1-26 13:03:00
为啥呢? 发表于 2016-1-26 12:30
感谢兄弟的回复,
第一个问题先不考虑的话,第二个第三个,如何修改呢?能否给出代码段?

日志楼主是可以自定义的比如下面:
logger.info("-------------jssc OK --------------");
同样楼主也可以在lines.flatMap(new FlatMapFunction<String, String>() 中加入。
想获取他的数据,直接参数赋值即可。

log4j,可以输出到文件,控制台等。
也可以配置
Appenders
  禁用与使用日志请求只是Log4j 其中的一个小小的地方,Log4j 日志系统允许把日志输出到不同的地方,如控制台(Console )、文件(Files )、根据天数或者文件大小产生新的文件、以流的形式发送到其它地方等等。
  
  其语法表示为:
  
  org.apache.log4j.ConsoleAppender (控制台)
  org.apache.log4j.FileAppender (文件)
  org.apache.log4j.DailyRollingFileAppender (每天产生一个日志文件)
    org.apache.log4j.RollingFileAppender (文件大小到达指定尺寸的时候产生一个新的文件)
  org.apache.log4j.WriterAppender (将日志信息以流格式发送到任意指定的地方)
  
  配置时使用方式为:
  log4j.appender.appenderName = fully.qualified.name.of.appender.class
  log4j.appender.appenderName.option1 = value1
  …
    log4j.appender.appenderName.option = valueN
  这样就为日志的输出提供了相当大的便利。


来源
Log4j日志入门
http://www.aboutyun.com/thread-17185-1-1.html





回复

使用道具 举报

为啥呢? 发表于 2016-1-26 15:23:30
when30 发表于 2016-1-26 13:03
日志楼主是可以自定义的比如下面:
logger.info("-------------jssc OK --------------");
同样楼主也 ...

感谢兄弟的回复

配置了logger.error方式还是无法输出相关内容,问题在于我想在读取到topic的数据时候输出,这个是只进行了一次输出,并没有输出相关我想输出的东西,应该不是这个的原因,非常感谢
回复

使用道具 举报

为啥呢? 发表于 2016-1-26 19:21:25
自己UP一下,求各路大神
回复

使用道具 举报

w123aw 发表于 2016-1-26 20:53:35
本帖最后由 w123aw 于 2016-1-26 20:56 编辑
为啥呢? 发表于 2016-1-26 19:21
自己UP一下,求各路大神

可能并不是不行,而是楼主进入死胡同。
只要你能读取topic,那么你可以输出。
建议先从简单的测试,在进行复杂的。
回复

使用道具 举报

arsenduan 发表于 2016-1-26 21:02:35
只输出一次,说明日志已经成功了。
那么为什么会输出一次?
难道是第二次没有执行??不可能的,楼主建议详细描述下,或则自己看看是不是程序只执行了一次
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条