spark sql如何动态设置并行度
spark sql 的并行度 默认是200 一般是怎么设置并行度的。
回答:
通过设置partitions来实现。
如何通过动态实现
###############################
DolOriginToRunNew:
/**
* Created by zhoushuilin140 on 2017/11/20.
*/
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class DolOriginToRunNew
{
public static void main(String[] args)
throws Exception
{
String dol = args;
String inputDir = args;
String outputDir = args;
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(inputDir + "/" + dol), "UTF-8"));
FileWriter fw = new FileWriter(outputDir + "/" + dol + "_run");
Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://kafka003:3306/dispatch";
String username = "root";
String password = "root";
Connection conn = DriverManager.getConnection(url, username, password);
boolean noteFlag = false;
boolean macroFlag = false;
String sqlText = "";
String line;
while ((line = br.readLine()) != null) {
if (line.trim().startsWith("##"))
{
fw.write(line + "\n");
}
else if (line.trim().startsWith("#set"))
{
fw.write(line + "\n");
}
else if (line.trim().startsWith("#foreach"))
{
fw.write(line + "\n");
}
else if (line.trim().startsWith("#end"))
{
fw.write(line + "\n");
if (macroFlag) {
macroFlag = false;
}
}
else if (line.trim().startsWith("#if"))
{
fw.write(line + "\n");
}
else if (line.trim().startsWith("#elseif"))
{
fw.write(line + "\n");
}
else if (line.trim().startsWith("#else"))
{
fw.write(line + "\n");
}
else if (line.trim().startsWith("#macro"))
{
fw.write(line + "\n");
macroFlag = true;
}
else if (line.trim().startsWith("use "))
{
fw.write(line + "\n");
}
else if (line.trim().startsWith("#**"))
{
noteFlag = true;
fw.write(line + "\n");
}
else if (line.trim().endsWith("*#"))
{
noteFlag = false;
fw.write(line + "\n");
}
else if ((noteFlag) || (macroFlag))
{
fw.write(line + "\n");
}
else
{
sqlText = sqlText + line + "\n";
if (line.contains(";"))
{
if (((sqlText.toLowerCase().contains("insert")) || (sqlText.toLowerCase().contains("create"))) && (sqlText.toLowerCase().contains("table")))
{
//System.out.println(sqlText.trim().hashCode());
String sql = "SELECT parallel FROM best_parallel WHERE dol = '" + dol + "' and sql_id =982000623";
System.out.println(sql);
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery();
int parallel = -2;
while (rs.next())
{
parallel = rs.getInt(1);
System.out.println(parallel);
}
if (parallel > 0) {
fw.write("set spark.sql.shuffle.partitions=" + parallel + ";\n");
}
}
fw.write(sqlText);
sqlText = "";
}
}
}
fw.write(sqlText);
conn.close();
fw.close();
br.close();
}
}
建表语句:
create database dispatch;
CREATE TABLE `best_parallel` (
`dol` varchar(200) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '',
`sql_id` int(11) NOT NULL DEFAULT '0',
`parallel` int(11) DEFAULT NULL,
`update_time` date DEFAULT NULL,
PRIMARY KEY (`dol`,`sql_id`);
+--------------+-----------+----------+-------------+
| dol | sql_id | parallel | update_time |
+--------------+-----------+----------+-------------+
| test_zsl.dol | 982000623 | 10 | 2017-11-20|
+--------------+-----------+----------+-------------+
dol文件:
#**
[Modified : 2017-05-21 设置为跑17年
2017-07-13 设置为跑当年,加上短信渠道]
[source table :
i8ji.test_zsl :i8ji.test_zsl 交易事实表
]
*#
##切换到数据库
use i8ji;
##设置shuffle期间task的数目
set spark.sql.shuffle.partitions=1;
##设置日期变量
#set($PTDAY = $dt.minusDays("$env.date",0,'yyyyMMdd'))
#set($SDAY = $dt.minusDays("$env.date",0,'yyyy0101'))
#set($RUNDAY = $dt.minusDays("$env.date",0,'yyyy-MM-dd') )
##yqb交易明细前置表
drop table if exists i8ji.test_zsl_bak_bak;
create table i8ji.test_zsl_bak_bak as
select count(1) from i8ji.dw_cust_trans_info where day_id='$PTDAY';
###################################
动态设置spark.sql.shuffle.partitions参数
https://blog.csdn.net/zhoushuilin/article/details/78813207
内容来自7群552029443讨论,整理分享给大家
参与者:
黄瓜炖啤酒鸭
Titanic ♪.♫
感谢分享
页:
[1]