本帖最后由 SuperDove 于 2017-2-22 16:19 编辑
原贴为Spark SQL 统计分析案例介绍
现截出部分代码
[mw_shl_code=applescript,true]val spark = SparkSession.builder().appName("count201702211631").getOrCreate()
val txtrdd = spark.sparkContext.textFile("hdfs://master:8020/tmp/download/peoplecount.txt",10)
val rowRdd = txtrdd.map(x => x.split("\t")).map(x => Row(x(0),x(1),x(2).toInt))
rowRdd.cache()
println("cache................")
val schema = StructType(
Seq(
StructField("id",StringType,true)
,StructField("sex",StringType,true)
,StructField("hight",IntegerType,true)
)
)
val peopleDF = spark.createDataFrame(rowRdd,schema)
//正式创建people表
peopleDF.createOrReplaceTempView("people")
println("计算所有人数")
spark.sparkContext.parallelize(Array(txtrdd.count())).saveAsTextFile("hdfs://master:8020/tmp/download/allcount")
println("1.男性中身高超过 180cm 的人数。")
spark.sql(
"""
|select count(*) as total2
| from people
| where sex = 'M'
| and hight > 180
""".stripMargin).repartition(1).write.json("hdfs://master:8020/tmp/download/M180")
println("2.女性中身高超过 170cm 的人数。")
spark.sql(
"""
|select count(*) as total3
| from people
| where sex = 'F'
| and hight > 170
""".stripMargin).repartition(1).write.json("hdfs://master:8020/tmp/download/M170")[/mw_shl_code]
提交到spark的standlone模式下运行 结果运行的jobs任务在UI下如下,一共有四个JOB
sparj-jobs
但是详细进入到每个job下的stage下的task界面时,发现数据都是以ANY的形式加载读取进来的,如下
task
话说spark是用的内存计算的,一个rdd被加载之后应该不会重新再加载,也就是JOB2和JOB3的rdd数据应该不是以ANY的形式加载,而是PROCESS_LOCAL的模式加载啊
难道我对spark 的 RDD做内存计算有误解?有哪位能帮忙点通一下,请指教!
|
|