本帖最后由 为啥呢? 于 2016-1-25 21:51 编辑
问题1:执行过程中出现的错误如何解决?
问题2:如何输出我在lines.flatMap(new FlatMapFunction<String, String>() 方法中的相关日志?(我想拿到获取到的数据进行业务处理)
问题3:运行时如何提交到后台,相关日志输出到文件中?
刚刚从.NET转到Java学习Hadoop Spark,找了一个Streaming的示例,JavaDirectKafkaWordCount的示例,并且已经可以运行,但是在其中代码段加了输出,代码如下
[mw_shl_code=java,true]public static void main(String[] args) {
logger.info("-------------Start Streaming--------------");
if (args.length < 2) {
System.err
.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n"
+ " <brokers> is a list of one or more Kafka brokers\n"
+ " <topics> is a list of one or more kafka topics to consume from\n\n");
System.exit(1);
}
logger.info("-------------args--------------,args[0]:" + args[0]
+ ",args[1]:" + args[1]);
StreamingExamples.setStreamingLogLevels();
String brokers = args[0];
String topics = args[1];
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf()
.setAppName("JavaDirectKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(10));
logger.info("-------------jssc OK --------------");
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics
.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
logger.info("-------------kafkaParams OK --------------");
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams,
topicsSet);
logger.info("-------------JavaPairInputDStream messages OK,messages="
+ messages.toString() + " --------------");
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages
.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
logger.info("-------------JavaDStream lines OK,lines="
+ lines.toString() + " --------------");
JavaDStream<String> words = lines
.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
logger.info("---------------------------Get Message :"
+ x);
return Lists.newArrayList(SPACE.split(x));
}
});
logger.info("-------------words OK,words=" + words.context()
+ " --------------");
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
logger.info("-------------wordCounts OK,wordCounts="
+ wordCounts.context() + " --------------");
wordCounts.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}[/mw_shl_code]
运行结果如下
开始执行:
1
开始执行1
开始执行2
然后发现出现了错误
开始执行包含
但是好像没有影响到运行
topic输入数据
topic输入数据
获取到topic数据并运行了
获取到topic的数据
提交运行的命令如下:
./spark-submit --class foo.JavaDirectKafkaWordCount --master spark://YK2TDW015:7077 --executor-memory 512M --num-executors 4 /home/sfdp/qiaoym/1221zhstreaming.jar YK2TDW014:9092,YK2TDW015:9092,YK2TDW016:9092 topic1,topic2
问题1:执行过程中出现的错误如何解决?
问题2:如何输出我在lines.flatMap(new FlatMapFunction<String, String>() 方法中的相关日志?(我想拿到获取到的数据进行业务处理)
问题3:运行时如何提交到后台,相关日志输出到文件中?
补充内容 (2016-1-26 15:25):
还有就是:我想在读取到topic的数据时候输出,这个是只进行了一次输出 |