分享

spark全局计数器每次执行结果不一样

pandatyut 发表于 2016-11-29 21:34:20 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 2 10787
最近闲着无聊从网上下载了一个去年叫流行的2000W开房数据,数据格式如下:

基本数据格式

基本数据格式


对其中一个文件进行解析,解析的时候发现gender那一栏有时候没有数据,然后就想着如果gender一栏数据不符合要求就从身份证号对性别进行解析,我想统计符合要求的记录和不符合要求的记录,用了两个计数器进行计算,代码如下:

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.SaveMode
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
case class KFCustomer2(name: String, gender: String, ctfId: String, birthday: String,
                      address: String, constellation: String)

object AnalysisKF2 {

  val filePath = "/home/panda/2000W/last5000.csv"
  val conf = new SparkConf().setAppName("AnalysisKF").setMaster("local[3]")
  val sc = new SparkContext(conf)
  val useRec = sc.accumulator(0, "useful Records")
  val uselessRec = sc.accumulator(0, "useless Records")

  def main(args: Array[String]) {


    sc.setLogLevel("ERROR")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._


    val customer = sc.textFile(filePath).map(_.split(",")).filter(line => line.length > 7).filter(!_.contains("Name")).map(
      p => KFCustomer2(p(0), formatGender(p(4), p(5)), p(4), p(6), p(7), p(6))).toDF()
    customer.registerTempTable("customer")
    var result = sqlContext.sql("SELECT gender, count(gender) FROM customer where gender = 'F' or gender = 'M' group by gender")
    result.collect().foreach(println)

    println("************************************************")
    println(useRec.name.get + " " + useRec.value)

    println(uselessRec.name.get + " " + uselessRec.value)
    println("************************************************")


  }

  def formatGender(ctfId: String, gender: String): String = {
    var rt = "未知"
    if (gender.length != 1) {
      if (ctfId.length == 18) {
        try {
                             if ((ctfId.substring(16, 17).toInt) % 2 == 0)                              {
                                     rt = "M"
                             }
                             else {
                                     rt = "F"
                              }
                            useRec.add(1)
            }
           catch

             {
                 case _=> println("error ctfid is : " + ctfId)
                 uselessRec.add(1)
             }

      }
    else
     {
        uselessRec.add(1)
      }
    }
   else
    {
      useRec.add(1)
      rt = gender
    }
    rt
  }

}




运行两次Accumulator计算结果都不一样:
[F,17091]
[M,32239]
************************************************
useful Records 49777
useless Records 223
************************************************

[F,17091]
[M,32239]
************************************************
useful Records 49454
useless Records 223
************************************************

不知道什么原因,求高人指点





已有(2)人评论

跳转到指定楼层
langke93 发表于 2016-11-30 08:23:20
程序的逻辑没有错,最可能原因是数据丢失了。运行的过程中,楼主看看是不是有的客户端出问题了
回复

使用道具 举报

desehawk 发表于 2016-11-30 11:06:51
多运行几次看看什么结果
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条