[mw_shl_code=applescript,true]package main.stock
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by root on 2016/11/9.
*/
object tm_stock_d {
def main(args:Array[String]){
val conf = new SparkConf().setMaster("local").setAppName("stock").set("spark executor memory","4g")
//val conf = new SparkConf().setAppName("stock")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
//hiveContext.sql("use hive")
hiveContext.sql("SET spark.sql.shuffle.partitions=20")
//hiveContext.sql("drop table tm_stock_d").show
//hiveContext.sql("create external table if not exists tm_stock_d(stockcode string,t_date string,open float,high float,close float,low float,volume float,price_change float,p_change float,ma5 float,ma10 float,ma20 float,v_ma5 float,v_ma10 float,v_ma20 float,turnover float,status string)ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE")
//hiveContext.sql("select * from tm_stock_d").show
val sqlrdd = hiveContext.sql("select distinct regexp_replace(t_date,'-','') t_date from if_stock_d where regexp_replace(t_date,'-','') >= 20160104 and regexp_replace(t_date,'-','') <= 20160105 order by t_date")
val arr = sqlrdd.collect
//val t_date = arr(1).toString().substring(1,9).toInt
for(i <- 0 to arr.length-1){
val t_date = arr(i).toString().substring(1,9).toInt
val y_date = t_date - 1
// dw层
val sql_text = ("insert into table tm_stock_d " +
"select t1.stockcode,t1.t_date,t1.open,t1.high,t1.close,t1.low,t1.volume,t1.price_change,t1.p_change,t1.ma5,t1.ma10,t1.ma20,t1.v_ma5,t1.v_ma10,t1.v_ma20,t1.turnover," +
"case when (t2.close - t1.close) > 0 then 1 else -1 end status from " +
"(select * from tw_stock_d where t_date ="+ t_date + ") t1," +
"(select * from tw_stock_d where t_date ="+ y_date + ") t2 " +
"where t1.stockcode = t2.stockcode"
)
println("PrintSql:" + sql_text)
hiveContext.sql(sql_text)
}
hiveContext.sql("select distinct t_date from tm_stock_d").show()
//hiveContext.sql("select min(t_date) from tw_stock_d").show()
//hiveContext.sql("select distinct regexp_replace(t_date,'-','') t_date from if_stock_d where regexp_replace(t_date,'-','') >= 20160104 and regexp_replace(t_date,'-','') <= 20160105 order by t_date").show()
sc.stop()
}
}
[/mw_shl_code]
用spark-sql 跑数据仓库的数据 是不行的,这么多空文件没法控制。还得用hive来跑合适。你们的数据仓库用什么跑的? |