wordcounts.foreachRDD(new VoidFunction <JavaPairRDD<String, Integer> > (){
private static final long serialVersionUID = 1L;
@Override
public void call(JavaPairRDD<String, Integer> wordcountsRDD)
throws Exception {
wordcountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>(){
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<Tuple2<String, Integer>> wordcounts)
throws Exception {
Connection conn = ConnectionPool.getConnection();
Tuple2<String,Integer> wordcount = null;
while(wordcounts.hasNext()){
wordcount = wordcounts.next();
String sql = "insert into wordcount(word,count) "
+ "values('" + wordcount._1 + "'," + wordcount._2 + ")";
Statement stmt = conn.createStatement();
stmt.executeUpdate(sql);
}
ConnectionPool.returnConnection(conn);
}
});
}
});
报错:The method foreachRDD(Function<JavaPairRDD<String,Integer>,Void>) in the type
AbstractJavaDStreamLike<Tuple2<String,Integer>,JavaPairDStream<String,Integer>,JavaPairRDD<Stri
ng,Integer>> is not applicable for the arguments (new VoidFunction<JavaPairRDD<String,Integer>>()
{})
|
|