yuwenge 发表于 2018-3-30 11:26:19

spark sql如何动态设置并行度




spark sql 的并行度 默认是200   一般是怎么设置并行度的。

回答:
通过设置partitions来实现。
如何通过动态实现
###############################
DolOriginToRunNew:
/**
* Created by zhoushuilin140 on 2017/11/20.
*/
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class DolOriginToRunNew
{
    public static void main(String[] args)
            throws Exception
    {
      String dol = args;
      String inputDir = args;
      String outputDir = args;

      BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(inputDir + "/" + dol), "UTF-8"));
      FileWriter fw = new FileWriter(outputDir + "/" + dol + "_run");


      Class.forName("com.mysql.jdbc.Driver");
      String url = "jdbc:mysql://kafka003:3306/dispatch";
      String username = "root";
      String password = "root";
      Connection conn = DriverManager.getConnection(url, username, password);

      boolean noteFlag = false;
      boolean macroFlag = false;
      String sqlText = "";
      String line;
      while ((line = br.readLine()) != null) {
            if (line.trim().startsWith("##"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#set"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#foreach"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#end"))
            {
                fw.write(line + "\n");
                if (macroFlag) {
                  macroFlag = false;
                }
            }
            else if (line.trim().startsWith("#if"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#elseif"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#else"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#macro"))
            {
                fw.write(line + "\n");
                macroFlag = true;
            }
            else if (line.trim().startsWith("use "))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#**"))
            {
                noteFlag = true;
                fw.write(line + "\n");
            }
            else if (line.trim().endsWith("*#"))
            {
                noteFlag = false;
                fw.write(line + "\n");
            }
            else if ((noteFlag) || (macroFlag))
            {
                fw.write(line + "\n");
            }
            else
            {
                sqlText = sqlText + line + "\n";
                if (line.contains(";"))
                {
                  if (((sqlText.toLowerCase().contains("insert")) || (sqlText.toLowerCase().contains("create"))) && (sqlText.toLowerCase().contains("table")))
                  {
                        //System.out.println(sqlText.trim().hashCode());
                        String sql = "SELECT parallel FROM best_parallel WHERE dol = '" + dol + "' and sql_id =982000623";
                        System.out.println(sql);
                        PreparedStatement pstmt = conn.prepareStatement(sql);
                        ResultSet rs = pstmt.executeQuery();

                        int parallel = -2;
                        while (rs.next())
                        {
                            parallel = rs.getInt(1);
                            System.out.println(parallel);
                        }
                        if (parallel > 0) {
                            fw.write("set spark.sql.shuffle.partitions=" + parallel + ";\n");
                        }
                  }
                  fw.write(sqlText);
                  sqlText = "";
                }
            }
      }
      fw.write(sqlText);

      conn.close();
      fw.close();
      br.close();
    }
}

建表语句:
create database dispatch;

CREATE TABLE `best_parallel` (
`dol` varchar(200) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '',
`sql_id` int(11) NOT NULL DEFAULT '0',
`parallel` int(11) DEFAULT NULL,
`update_time` date DEFAULT NULL,
PRIMARY KEY (`dol`,`sql_id`);

+--------------+-----------+----------+-------------+
| dol          | sql_id    | parallel | update_time |
+--------------+-----------+----------+-------------+
| test_zsl.dol | 982000623 |       10 | 2017-11-20|
+--------------+-----------+----------+-------------+

dol文件:
#**



[Modified   : 2017-05-21 设置为跑17年
            2017-07-13 设置为跑当年,加上短信渠道]

[source table :
            i8ji.test_zsl :i8ji.test_zsl          交易事实表
]
*#


##切换到数据库
use i8ji;

##设置shuffle期间task的数目
set spark.sql.shuffle.partitions=1;

##设置日期变量

#set($PTDAY = $dt.minusDays("$env.date",0,'yyyyMMdd'))
#set($SDAY = $dt.minusDays("$env.date",0,'yyyy0101'))
#set($RUNDAY = $dt.minusDays("$env.date",0,'yyyy-MM-dd') )

##yqb交易明细前置表
drop table if exists i8ji.test_zsl_bak_bak;
create table i8ji.test_zsl_bak_bak as
select count(1) from i8ji.dw_cust_trans_info where day_id='$PTDAY';






###################################

动态设置spark.sql.shuffle.partitions参数
https://blog.csdn.net/zhoushuilin/article/details/78813207


内容来自7群552029443讨论,整理分享给大家
参与者:
黄瓜炖啤酒鸭

Titanic ♪.♫

感谢分享

页: [1]
查看完整版本: spark sql如何动态设置并行度