分享

spark sql如何动态设置并行度

yuwenge 2018-3-30 11:26:19 发表于 小知识点 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 12504



spark sql 的并行度 默认是200   一般是怎么设置并行度的。

回答:
通过设置partitions来实现。
如何通过动态实现
###############################
DolOriginToRunNew:
[mw_shl_code=java,true]/**
* Created by zhoushuilin140 on 2017/11/20.
*/
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class DolOriginToRunNew
{
    public static void main(String[] args)
            throws Exception
    {
        String dol = args[0];
        String inputDir = args[1];
        String outputDir = args[2];

        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(inputDir + "/" + dol), "UTF-8"));
        FileWriter fw = new FileWriter(outputDir + "/" + dol + "_run");


        Class.forName("com.mysql.jdbc.Driver");
        String url = "jdbc:mysql://kafka003:3306/dispatch";
        String username = "root";
        String password = "root";
        Connection conn = DriverManager.getConnection(url, username, password);

        boolean noteFlag = false;
        boolean macroFlag = false;
        String sqlText = "";
        String line;
        while ((line = br.readLine()) != null) {
            if (line.trim().startsWith("##"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#set"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#foreach"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#end"))
            {
                fw.write(line + "\n");
                if (macroFlag) {
                    macroFlag = false;
                }
            }
            else if (line.trim().startsWith("#if"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#elseif"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#else"))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#macro"))
            {
                fw.write(line + "\n");
                macroFlag = true;
            }
            else if (line.trim().startsWith("use "))
            {
                fw.write(line + "\n");
            }
            else if (line.trim().startsWith("#**"))
            {
                noteFlag = true;
                fw.write(line + "\n");
            }
            else if (line.trim().endsWith("*#"))
            {
                noteFlag = false;
                fw.write(line + "\n");
            }
            else if ((noteFlag) || (macroFlag))
            {
                fw.write(line + "\n");
            }
            else
            {
                sqlText = sqlText + line + "\n";
                if (line.contains(";"))
                {
                    if (((sqlText.toLowerCase().contains("insert")) || (sqlText.toLowerCase().contains("create"))) && (sqlText.toLowerCase().contains("table")))
                    {
                        //System.out.println(sqlText.trim().hashCode());
                        String sql = "SELECT parallel FROM best_parallel WHERE dol = '" + dol + "' and sql_id =982000623";
                        System.out.println(sql);
                        PreparedStatement pstmt = conn.prepareStatement(sql);
                        ResultSet rs = pstmt.executeQuery();

                        int parallel = -2;
                        while (rs.next())
                        {
                            parallel = rs.getInt(1);
                            System.out.println(parallel);
                        }
                        if (parallel > 0) {
                            fw.write("set spark.sql.shuffle.partitions=" + parallel + ";\n");
                        }
                    }
                    fw.write(sqlText);
                    sqlText = "";
                }
            }
        }
        fw.write(sqlText);

        conn.close();
        fw.close();
        br.close();
    }
}[/mw_shl_code]

建表语句:
[mw_shl_code=sql,true]create database dispatch;

CREATE TABLE `best_parallel` (
  `dol` varchar(200) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '',
  `sql_id` int(11) NOT NULL DEFAULT '0',
  `parallel` int(11) DEFAULT NULL,
  `update_time` date DEFAULT NULL,
  PRIMARY KEY (`dol`,`sql_id`);

  +--------------+-----------+----------+-------------+
| dol          | sql_id    | parallel | update_time |
+--------------+-----------+----------+-------------+
| test_zsl.dol | 982000623 |       10 | 2017-11-20  |
+--------------+-----------+----------+-------------+[/mw_shl_code]

dol文件:
[mw_shl_code=sql,true]#**
[Subject    : 业务新分类->yqb交易明细前置表]
[Author     : caixianfen367 <@pingan.com.cn> ]
[Created    : 2017-05-21 开发程序(按新的业务线分类) ]
[Modified   : 2017-05-21 设置为跑17年
              2017-07-13 设置为跑当年,加上短信渠道]
[Comment :]
[source table :
              i8ji.test_zsl :  i8ji.test_zsl          交易事实表
]
*#


##切换到数据库
use i8ji;

##设置shuffle期间task的数目
set spark.sql.shuffle.partitions=1;

##设置日期变量

#set($PTDAY = $dt.minusDays("$env.date",0,'yyyyMMdd'))
#set($SDAY = $dt.minusDays("$env.date",0,'yyyy0101'))
#set($RUNDAY = $dt.minusDays("$env.date",0,'yyyy-MM-dd') )

##yqb交易明细前置表
drop table if exists i8ji.test_zsl_bak_bak;
create table i8ji.test_zsl_bak_bak as
select count(1) from i8ji.dw_cust_trans_info where day_id='$PTDAY';[/mw_shl_code]






###################################

动态设置spark.sql.shuffle.partitions参数
https://blog.csdn.net/zhoushuilin/article/details/78813207


内容来自7群552029443讨论,整理分享给大家
参与者:
黄瓜炖啤酒鸭

Titanic &#9834;  .&#9835;

感谢分享

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条