分享

SparkSql会生成很多的小文件,怎么解决哦?

玉溪 发表于 2016-11-13 10:18:54 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 30002
SparkSql运行后,会生成很多小文件200个task就生成200个文件,有什么好办法解决?


补充内容 (2016-11-14 03:55):
package main.stock
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by root on 2016/11/9.
*/
object tm_stock_d {
    def main(args:Array[String]){
    val conf = new SparkConf().se...

已有(6)人评论

跳转到指定楼层
yuwenge 发表于 2016-11-13 17:32:20
w517424787 发表于 2016-11-13 15:51
或者是df的repartition函数都可以了 df.repartition(1)

楼主上对的
因为多线程并行往hdfs写造成的(因为每个DataFrame/RDD分成若干个Partition,这些partition可以被并行处理)。

其结果就是一个存下来的文件,其实是hdfs中一个目录,在这个目录下才是众多partition对应的文件,最坏的情况是出现好多size为0的文件。

如果确实想避免小文件,可以在save之前把DaraFrame的partition设为0: (当然,这必然影响程序效率)
1. 如果是Spark 1.3.x,  可以调用函数如下:

    【DataFrame2】=【DataFrame1】.repartition(1);

    【DataFrame2】.save(path);

2. 如果是Spark 1.4.0, 可以调用如下函数:

    【DataFrame2】=【DataFrame1】.coalecse(1, false)

回复

使用道具 举报

einhep 发表于 2016-11-13 12:25:23
输入的是什么文件,可以尝试从源头解决
回复

使用道具 举报

desehawk 发表于 2016-11-13 12:44:29
本帖最后由 desehawk 于 2016-11-13 12:45 编辑
einhep 发表于 2016-11-13 12:25
输入的是什么文件,可以尝试从源头解决

任务减少至100,输出的应该是100文件。以此类推这可能就不一定是小文件了。
回复

使用道具 举报

w517424787 发表于 2016-11-13 15:49:52
用rdd的repartition的函数来减小分区数量,就可以了
回复

使用道具 举报

w517424787 发表于 2016-11-13 15:51:56
或者是df的repartition函数都可以了 df.repartition(1)
回复

使用道具 举报

玉溪 发表于 2016-11-14 03:57:13
[mw_shl_code=applescript,true]package main.stock
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by root on 2016/11/9.
*/
object tm_stock_d {
    def main(args:Array[String]){
    val conf = new SparkConf().setMaster("local").setAppName("stock").set("spark executor memory","4g")
    //val conf = new SparkConf().setAppName("stock")
    val sc = new SparkContext(conf)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
      //hiveContext.sql("use hive")
      hiveContext.sql("SET spark.sql.shuffle.partitions=20")
      //hiveContext.sql("drop table tm_stock_d").show
      //hiveContext.sql("create external table if not exists tm_stock_d(stockcode string,t_date string,open float,high float,close float,low float,volume float,price_change float,p_change float,ma5 float,ma10 float,ma20 float,v_ma5 float,v_ma10 float,v_ma20 float,turnover float,status string)ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE")
      //hiveContext.sql("select * from tm_stock_d").show

      val sqlrdd = hiveContext.sql("select distinct regexp_replace(t_date,'-','') t_date from if_stock_d where regexp_replace(t_date,'-','') >= 20160104 and regexp_replace(t_date,'-','') <= 20160105 order by t_date")
      val arr = sqlrdd.collect

      //val t_date = arr(1).toString().substring(1,9).toInt
      for(i <- 0 to arr.length-1){
        val t_date = arr(i).toString().substring(1,9).toInt
        val y_date = t_date - 1
        // dw层
        val sql_text = ("insert into table tm_stock_d " +
          "select t1.stockcode,t1.t_date,t1.open,t1.high,t1.close,t1.low,t1.volume,t1.price_change,t1.p_change,t1.ma5,t1.ma10,t1.ma20,t1.v_ma5,t1.v_ma10,t1.v_ma20,t1.turnover," +
          "case when (t2.close - t1.close) > 0 then 1 else -1 end status from " +
          "(select * from tw_stock_d where t_date ="+ t_date + ") t1," +
          "(select * from tw_stock_d where t_date ="+ y_date + ") t2 " +
          "where t1.stockcode = t2.stockcode"
          )
        println("PrintSql:" + sql_text)
        hiveContext.sql(sql_text)
      }
      hiveContext.sql("select distinct t_date from tm_stock_d").show()
      //hiveContext.sql("select min(t_date) from tw_stock_d").show()
      //hiveContext.sql("select distinct regexp_replace(t_date,'-','') t_date from if_stock_d where regexp_replace(t_date,'-','') >= 20160104 and regexp_replace(t_date,'-','') <= 20160105 order by t_date").show()

      sc.stop()
  }
}
[/mw_shl_code]

用spark-sql 跑数据仓库的数据 是不行的,这么多空文件没法控制。还得用hive来跑合适。你们的数据仓库用什么跑的?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条