分享

Spark Streaming使用sortByKey报错NullPointerException

zstu 发表于 2017-2-27 11:21:57 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 10 15875
用Spark Streaming对数据进行处理时,使用了sortByKey进行排序,但报了一个NullPointerException异常
[mw_shl_code=java,true]userlog.foreachRDD { rdd =>
      val data = rdd.repartition(1).map(log => (log(3), log))
        .sortByKey().map(x => x._2)[/mw_shl_code]
异常信息如下:
[mw_shl_code=java,true]17/02/24 16:41:00 INFO scheduler.JobScheduler: Starting job streaming job 1487925660000 ms.1 from job set of time 1487925660000 ms
17/02/24 16:41:00 ERROR scheduler.JobScheduler: Error running job streaming job 1487925660000 ms.0
java.lang.NullPointerException
        at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1609)
        at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:178)
        at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$6$$anonfun$apply$2.apply(CoalescedRDD.scala:195)
        at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$6$$anonfun$apply$2.apply(CoalescedRDD.scala:194)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
        at org.apache.spark.rdd.PartitionCoalescer$LocationIterator.<init>(CoalescedRDD.scala:189)
        at org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:240)
        at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:341)
        at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:87)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.OrderedRDDFunctions.sortByKey$default$2(OrderedRDDFunctions.scala:59)
        at com.test.bigdata.DataProcess$$anonfun$calDetailIndex$1.apply(DataProcess.scala:41)[/mw_shl_code]

已有(10)人评论

跳转到指定楼层
nextuser 发表于 2017-2-27 13:50:37
spark里有sortByKey,Spark Streaming里面应该是没有的,下面是它能执行的操作

Transformation操作
Transformation
Meaning
map(func)
对DStream中的各个元素进行func函数操作,然后返回一个新的DStream.
flatMap(func)
与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
filter(func)
过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
repartition(numPartitions)
增加或减少DStream中的分区数,从而改变DStream的并行度
union(otherStream)
将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count()
通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce(func)
对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue()
对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
reduceByKey(func, [numTasks])
利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks])
输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
cogroup(otherStream, [numTasks])
输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
transform(func)
通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
updateStateByKey(func)
根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream

回复

使用道具 举报

zstu 发表于 2017-2-27 14:30:48
[mw_shl_code=java,true]val userlog = kafkaDStream.filter(x => x._2.split(",", -1).size >= 7).map{ line =>
      val log = line._2.split(",", -1)
      log
    }[/mw_shl_code]
接收kafka数据的时候就已经用filter过滤掉了切分后的size小于7的,那在根据log(3)作为key,进行sortByKey时,key应该不是空啊。
回复

使用道具 举报

easthome001 发表于 2017-2-27 15:40:24
本帖最后由 easthome001 于 2017-2-27 15:50 编辑
zstu 发表于 2017-2-27 14:30
[mw_shl_code=java,true]val userlog = kafkaDStream.filter(x => x._2.split(",", -1).size >= 7).map{ li ...
map可能出问题
map(x => x._2)这是啥意思?获取元组?
回复

使用道具 举报

sstutu 发表于 2017-2-27 15:57:52
userlog.foreachRDD { rdd =>
      val data = rdd.repartition(1).map(log => (log(3), log))
        .sortByKey().map(x => x._2)
建议,可以一步步调试,先看看rdd是否有数据,然后map转换,然后排序,后面map是否成功。

回复

使用道具 举报

zstu 发表于 2017-2-27 18:00:16
easthome001 发表于 2017-2-27 15:40
map可能出问题
map(x => x._2)这是啥意思?获取元组?

map中x 是(String, Array[String])。x._1是数组中的一个元素,map(x => x._2)获取数组
回复

使用道具 举报

zstu 发表于 2017-2-27 18:06:09
sstutu 发表于 2017-2-27 15:57
userlog.foreachRDD { rdd =>
      val data = rdd.repartition(1).map(log => (log(3), log))
         ...

rdd为空,后面的操作repartition map sortByKey就不会执行了吧,实时的调试不太好调,也不知道什么时候报错,觉得NullpointerException是由于sortByKey的key是空引起的,但前面已经过滤掉size <7的数据l
回复

使用道具 举报

easthome001 发表于 2017-2-27 18:06:12
zstu 发表于 2017-2-27 18:00
map中x 是(String, Array[String])。x._1是数组中的一个元素,map(x => x._2)获取数组

对这个理解有所不同,个人认为这是map的key和value。而不应该采用元组的方式
回复

使用道具 举报

zstu 发表于 2017-2-27 18:08:26
easthome001 发表于 2017-2-27 18:06
对这个理解有所不同,个人认为这是map的key和value。而不应该采用元组的方式

你觉得应该怎么写好些
回复

使用道具 举报

easthome001 发表于 2017-2-27 18:20:03
本帖最后由 easthome001 于 2017-2-27 18:55 编辑
zstu 发表于 2017-2-27 18:08
你觉得应该怎么写好些

userlog.foreachRDD { rdd =>
      val data = rdd.repartition(1).map(log => (log(3), log))
        .sortByKey().map(x => x.values)或则
userlog.foreachRDD { rdd =>
      val data = rdd.repartition(1).map(log => (log(3), log))
        .sortByKey().map(x => x.keys)

推荐参考
Scala中如何获取map中的keys和values值
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21097

回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条