问题导读
1.如何利用Spark将json存入hive库?
2.如何在Shell脚本中下载json数据?
3.如何在Shell脚本中执行Spark程序?
4.如何在Shell脚本中执行kylin restapi让kylin任务执行?
5.如何编写Azkaban的job?
1 Spark获取json数据,并将json数据存hive库hive表建立Demo
[mw_shl_code=sql,true]--如果存在hive表,直接删除这个hive表。
drop table if EXISTS tb_trade_info;
--创建hive表(第一次全量,后续增量)
CREATE TABLE IF NOT EXISTS tb_trade_info (
salesmanId VARCHAR(40) comment '发展业务员Id',
salesmanName VARCHAR(20) comment '发展店铺的业务员名称',
createDate bigint comment '交易订单创建天,时间格式为yyyyMMdd的integer值,分区时间'
)
partitioned by(pt_createDate integer comment '创建天,时间格式为yyyyMMdd的integer值,分区时间')
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;[/mw_shl_code]
hive视图建立Demo
[mw_shl_code=sql,true]--- 交易客单价对应的视图(第一次全量,后续增量)
DROP VIEW IF EXISTS trade_info_view;
CREATE VIEW IF NOT EXISTS trade_info_view
(
shopRegTime COMMENT '商户注册时间',
levelOne COMMENT '客单价 <10元',
pt_createDate COMMENT '创建天,时间格式为yyyyMMdd的integer值,分区时间'
) COMMENT '客单价视图'
AS
select
shopRegTime,
(case when (balanceFee + payFee) < 10.0 then 1 else 0 end) as levelOne,
pt_createDate
from
tb_trade_info;
[/mw_shl_code]
按照某个字段分组降序,获取最开始的第一条的hive视图demo
[mw_shl_code=sql,true]-- 广告主,流量主对应的 按照广告发布时间进行控制
DROP VIEW IF EXISTS advert_flowofmain_view;
CREATE VIEW IF NOT EXISTS advert_flowofmain_view
(
shopId COMMENT '店铺Id,主键唯一',
action COMMENT '动作类型:10,发布广告,20:流量任务(流量主)'
) COMMENT '广告主、流量主数量统计视图'
AS
select
t.shopId,
t.action
from
(
select
shopId,
action,
ROW_NUMBER() OVER(PARTITION BY advertId ORDER BY actionTime desc) AS rn
FROM
table_name
) t
where t.rn=1;
[/mw_shl_code]
代码示例:
[mw_shl_code=scala,true]import java.util.Date
import xxx.xxx.bigdata.common.utils.DateUtils
import org.apache.spark.sql.SparkSession
object TradeDataClean {
// def main(args: Array[String]): Unit = {
// val conf = new SparkConf().setAppName("TradeDataClean").setMaster("local")
// val sc = new SparkContext(conf)
// val input = sc.textFile("hdfs://bigdata1:9000/bplan/data-center/alitradelist.log.2018-06-21")
//
// input.collect().foreach(
// x => {
// println(x);
// val json = JSON.parseObject(x)
// println("====value====")
// println(json)
// println(json.getString("agentId"))
// }
// )
//
// sc.stop()
// }
/**
* 如果有参数,直接返回参数中的值,如果没有默认是前一天的时间
* @param args :系统运行参数
* @param pattern :时间格式
* @return
*/
// def gainDayByArgsOrSysCreate(args: Array[String],pattern: String):String = {
// //如果有参数,直接返回参数中的值,如果没有默认是前一天的时间
// if(args.length > 1) {
// args(1)
// } else {
// val previousDay = DateUtils.addOrMinusDay(new Date(), -1);
// DateUtils.dateFormat(previousDay, "yyyy-MM-dd");
// }
// }
/**
* args(0) :要处理的json文件路径
* @param args
*/
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("TradeDataClean")
//.master("local
")
.config("spark.sql.warehouse.dir","/user/hive/warehouse")
//为解决:Use the CROSS JOIN syntax to allow cartesian products between these relations
//.config("spark.sql.crossJoin.enabled",true)
//.config("spark.sql.warehouse.dir","hdfs://bigdata1:9000/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate();
//val previousDayStr = gainDayByArgsOrSysCreate(args,"yyyy-MM-dd")
//val df = spark.read.json("/bplan/data-center/tradeInfo/"+ previousDayStr +"/tradeInfo.json")
val df = spark.read.json(args(0))
//val df = spark.read.json("hdfs://bigdata1:9000/xxx/xxx/xxxx")
spark.sql("use data_center")
df.createOrReplaceTempView("tb_trade_info_temp");
val previousDay = DateUtils.addOrMinusDay(new Date(), -1)
//val tmepRdd = rs.rdd.saveAsTextFile("hdfs://bigdata1:9000/bplan/data-center/temp.txt")
val pt_createDate = DateUtils.dateFormat(previousDay, "yyyyMMdd");
spark.sql("INSERT INTO TABLE tb_trade_info partition(pt_createDate=" + pt_createDate + ") " +
"SELECT " +
" ttit.agentId as agentId, " +
" from_unixtime(ttit.payTimeUnix,'yyyyMMdd') as createDate " +
"FROM " +
" tb_sys_industry si, " +
" tb_shop ts," +
" tb_trade_info_temp ttit " +
"WHERE " +
" si.category_id = ts.industryId " +
" and ts.shopId = ttit.shopId" +
" and ts.storeType != 10");
spark.stop()
}
}
[/mw_shl_code]
spark处理数据入mysql数据库:
[mw_shl_code=scala,true]package xxx.shop
import java.sql.{Connection, DriverManager, PreparedStatement}
import xxxx.common.utils.snowflake.SnowflakeUtils
import org.apache.spark.sql.SparkSession
object ShopExtDataClean {
// /**
// * 如果有参数,直接返回参数中的值,如果没有默认是前一天的时间
// * @param args :系统运行参数
// * @param pattern :时间格式
// * @return
// */
// def gainDayByArgsOrSysCreate(args: Array[String],pattern: String):String = {
// //如果有参数,直接返回参数中的值,如果没有默认是前一天的时间
// if(args.length > 1) {
// args(1)
// } else {
// val previousDay = DateUtils.addOrMinusDay(new Date(), -1);
// DateUtils.dateFormat(previousDay, "yyyy-MM-dd");
// }
// }
/**
* args(0) :json数据
* args(1) :mysql的ip地址
* args(2) :mysql数据库的端口号
* args(3) :mysql数据库用户
* args(4) :mysql数据库密码
*
* @param args
*/
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("ShopDataClean")
//.master("local
")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate();
// val previousDayStr = gainDayByArgsOrSysCreate(args,"yyyy-MM-dd")
//val df = spark.read.json("/bplan/data-center/shop/" + previousDayStr + "/shop.json");
val df = spark.read.json(args(0));
spark.sql("use data_center");
df.createOrReplaceTempView("shop_ext_temp")
val df2 = spark.sql("SELECT " +
" st.areaName as areaName, " +
" st.areaCode as areaCode, " +
" st.agentId as agentId, " +
" st.agentName as agentName, " +
" st.rootCategoryId as rootCategoryId, " +
" st.parentCategoryId as parentCategoryId, " +
" st.industryId as industryId, " +
" st.industryName as industryName, " +
" set.shopId as shopId, " +
" set.businessType as businessType, " +
" set.addTime as addTime, " +
" set.num as num " +
"FROM " +
" shop_ext_temp set left join tb_shop st " +
"ON " +
" set.shopId = st.shopId and st.storeType in(1, 20)")
//" set.shopId = st.shopId and st.storeType = 1 or st.storeType = 20")
// val previousDay = DateUtils.addOrMinusDay(new Date(), -1);
// //将临时的数据存入到实际的tb_shop表中
// val pt_createDate = DateUtils.dateFormat(previousDay, "yyyyMMdd")
var conn: Connection = null;
var ps: PreparedStatement = null;
val sql = s"insert into tb_shop_ext(" +
s"id," +
s"area_name," +
s"area_code," +
s"agent_id," +
s"agent_name," +
s"root_category_id," +
s"parent_category_id," +
s"industry_id," +
s"industry_name," +
s"shop_id," +
s"business_type," +
s"add_time," +
s"num) " +
s"values (?,?,?,?,?,?,?,?,?,?,?,?,?)"
try {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(s"jdbc:mysql://" + args(1) + ":" + args(2) + "/data_center", args(3), args(4))
ps = conn.prepareStatement(sql)
//关闭自动提交,即开启事务
conn.setAutoCommit(false)
var i = 1;
df2.collect().foreach(
x => {
ps.setLong(1, SnowflakeUtils.getId)
ps.setString(2, x.get(0).toString)
ps.setString(3, x.get(1).toString)
ps.setString(4, x.get(2).toString)
ps.setString(5, x.get(3).toString)
ps.setString(6, x.get(4).toString)
ps.setString(7, x.get(5).toString)
ps.setString(8, x.get(6).toString)
ps.setString(9, x.get(7).toString)
ps.setString(10, x.get(8).toString)
ps.setInt(11, x.get(9).toString.toInt)
ps.setLong(12, x.get(10).toString.toLong)
ps.setInt(13, x.get(11).toString.toInt)
ps.addBatch()
i += 1;
if (i % 500 == 0) {
ps.executeBatch()
}
}
)
//最后不足500条的,直接执行批量更新操作
ps.executeBatch()
ps.close()
//执行完后,手动提交事务
conn.commit()
//再把自动提交打开
conn.setAutoCommit(true)
} catch {
case e: Exception => {
//先打印出异常
e.printStackTrace()
try {
//发生异常,事务回滚
if (conn != null && !conn.isClosed) {
conn.rollback()
conn.setAutoCommit(true)
}
} catch {
case ex: Exception => ex.printStackTrace()
}
}
} finally {
if (ps != null) {
try {
ps.close()
} catch {
//下面两行等价 case e : Exception => e.printStackTrace()
//case e : ClassNotFoundException => e.printStackTrace()
//case e : SQLException => e.printStackTrace()
case e: Exception => e.printStackTrace()
}
}
if (conn != null) {
try {
conn.close()
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}
spark.stop();
//程序正常退出
System.exit(0)
}
}
[/mw_shl_code]
2 Shell脚本中下载json数据[mw_shl_code=shell,true]自己定义的env的脚本:
env.sh
#!/bin/bash
#定义接口请求url地址
export webUrl='http://xxx/xxx/xxx/xxxx'
export backUpWebUrl='http://xxxx/xxxx/xxxx/xxxx'
#echo ${webUrl}
#设置默认的数据类型,默认下载全量数据
export dataType="full"
#昨天时间(时间格式类:2018-10-24)
export yesterday=`date --date='1 days ago' +%Y-%m-%d`
export today=`date +%Y-%m-%d`
#1周前数据(用于保留7天数据)
export aweekAgo=`date --date='7 days ago' +%Y-%m-%d`
export aweekAgoFolder=
#echo $yesterday
#oss中json的位置
export ossUrl="https://ossurl"
#当前路径
export current=$PWD
#Spark运行所需的参数配置等信息
export sparkArgs="--jars /xxxx/apache-phoenix-4.14.1-HBase-1.4-bin/phoenix-spark-4.14.1-HBase-1.4.jar,/xxx/apache-phoenix-4.14.1-HBase-1.4-bin/phoenix-4.14.1-HBase-1.4-client.jar --master spark://xxxx:7077 --executor-memory 2g --total-executor-cores 6 --class "
#Spark程序所在位置
export programPrefixPath="/xxxx/program"
#kylin的参数
export kylinUserInfo="--user ADMIN:KYLIN"
export kylinCubeUrl="http://xxxx:7070/kylin/api/cubes/"
export kylinJobsUrl="http://xxxx:7070/kylin/api/jobs/"
export startTime="2015-01-01 00:00"
export startTimeTimeStamp=`date -d "$startTime" +%s`
export startTimeTimeStampMs=$(($startTimeTimeStamp * 1000))
export endTime=`date +%Y-%m-%d -d "+1days"`
export endTimeTimeStamp=`date -d "$endTime" +%s`
#将时间戳编程毫秒值
export endTimeTimeStampMs=$(($endTimeTimeStamp * 1000))
#phoenix对应的ZkUrl
#export phoenixZkUrl="jdbc:phoenix:ip地址:2181"
#########################################################
#3、订单交易数据请求参数(第一次全量,后续增量)
#请求地址:curl -d "dataName=tradeInfo&dataType=full&dataTime=2018-10-23" http://xxxxx/oss/selectList
export tradeInfoArgs="dataName=tradeInfo&dataType=" #$dataType"&dataTime="$yesterday
#json的url信息存储的文件路径
export tradeInfoJsonUrls=$current/tmpfile/tradeInfoJsonUrls
#json的url存储位置前缀
export tradeInfoJsonUrlPrefix=$current/tmpfile/tradeInfoJsonUrlPrefix
export tradeAnalyzeCubeName="tb_trade_analyze_cube"
export tradeCollectMoneyCubeName="tb_trade_collect_money_cube"
#用于存储是否下载了的变量文件
export tradeInfoVariableFile=$current/tmpfile/tradeInfoVariableFile
#!/bin/bash
source /etc/profile
#求结果中的url路径长度,如果是4,表示这里的值是一个控制了(下面这两行是等效的)
#urlLength=`echo ${urlInfo} |jq '.data.urls[1]' | awk '{print length($0)}'`
#urlLength=$(echo ${urlInfo} |jq '.data.urls[1]' | awk '{print length($0)}')
#echo $urlLength
#引用公共文件中定义的参数变量
source $PWD/env.sh
#定义变量
urlInfo=
#定义尝试次数
retryTimes=1
#json数据所在的文件目录(相对于脚本所在的相对路径)
urlPrefix=
#Json数据文件存放的实际目录
fileFolder=
#最新的数据下载位置
newUrlArgs=
#传递变量存储的路径的位置,返回当前当前数据类型
function resetArgs() {
#如果文件存在,读取相应的数据类型
if [[ `ls $tradeInfoVariableFile | grep tradeInfoVariableFile | grep -v grep` != "" ]];then
#存在这个文件的时候,返回存储在文件中的这个类型的值
#获取数据类型,然后读取出文件中dataType的值,将dataType=变成空值
dataType=`cat $tradeInfoVariableFile | grep dataType | sed 's/dataType=//g'`
newUrlArgs=$tradeInfoArgs$dataType"&dataTime="$yesterday
#并返回dataType
#return $dataType
else
mkdir -p $current/tmpfile
cd $current/tmpfile
#不存在这个文件的时候,返回0,并创建这个文件,将变量的类型的值写入到文件中
#将数据类型写入进去,表示后续都是按照增量的方式进行计算
echo "dataType=increment" > $tradeInfoVariableFile
newUrlArgs=$tradeInfoArgs"full&dataTime="$yesterday
#return "full"
fi
}
#获取代理商和区域的数据json url地址信息
function getUrlInfo() {
resetArgs
echo $newUrlArgs
#获取代理商的地址信息
urlInfo=`curl -d $newUrlArgs $webUrl`
}
#获取url参数
#返回值
#1:请求url的结果为200,且成功做了相关操作
#0:请求url的结果不为为200
function getUrlsArray() {
code=$(echo ${urlInfo} |jq '.code')
if [[ "$code" = 200 ]];then
echo "状态码为200"
mkdir -p $current/tmpfile
#删除上次生成的临时的json url地址
rm -rf $tradeInfoJsonUrls
rm -rf $tradeInfoJsonUrlPrefix
touch $tradeInfoJsonUrls
touch $tradeInfoJsonUrlPrefix
dataInfo=$(echo ${urlInfo} |jq '.data')
if [[ $dataInfo == "" ]];then
return 1
fi
#获取url的前缀
echo "===============开始获取 json url 路径前缀==========================="
echo ${urlInfo} |jq '.data.urlPrefix' > $tradeInfoJsonUrlPrefix
sed -i 's/"//g' $tradeInfoJsonUrlPrefix
echo "===============获取 json url 路径前缀结束==========================="
echo "===============开始获取 json url ==================================="
#do while方式获取url的列表,然后把结果存入新的数组中
#定义数组的角标
index=0
while :
do
#获取url
url=$(echo ${urlInfo} |jq '.data.urls['$index']')
#查看字符串中是否有指定字符串
hasBplan=$(echo $url | grep "bplan/data-center")
#如果url中有bplan/data-center这样的表示,将这些url存入到临时文件中
if [[ "$hasBplan" != "" ]]
then
echo $url >> $tradeInfoJsonUrls
index=`expr $index + 1`
else
break
fi
done
#将文本中的所有的字符串中的引号去除掉
sed -i 's/"//g' $tradeInfoJsonUrls
echo "===============获取 json url 成功==================================="
#如果最终成功,返回1
return 1
else
#如果没有得到url的值,返回0,表示失败
webUrl=$backUpWebUrl
return 0
fi
}
#如果获取url的过程失败,则一直失败重试,直到程序被处理好了
function getUrlRetry() {
while :
do
echo "开始执行第${retryTimes}次任务,结果如下:"
#调用方法
getUrlInfo
getUrlsArray
#判断本地执行是否成功
if [[ $? = 1 ]];then
echo "第${retryTimes}次执行之后,处理json数据成功了,接着处理后续任务"
break
else
echo "第${retryTimes}次执行程序失败,休眠5分钟后再次重试,知道144次之后停止"
#重试144次,即144 * 5 = 720min (半天)
if [[ "$retryTimes" -eq 144 ]];then
echo "已经执行了${retryTimes}次,程序达到预定次数,后续停止执行"
break
else
retryTimes=`expr $retryTimes + 1`
#休眠5分钟
sleep 5m
#再次执行这个函数
fi
fi
#为了让打印的日志显示好看一些,空3行
echo ""
echo ""
echo ""
done
}
#1、获取Json文件相对脚本的文件目录(相对路径)
#2、获取Json数据文件在磁盘上的绝对路径
function getJsonFolderPath() {
#查看指定文件是否存在
urlPrefix=`cat $tradeInfoJsonUrlPrefix`
#数据文件所在位置
fileFolder=$current$urlPrefix
}
#下载Json文件
function downloadJsons() {
#获取到url路径前缀
echo "开始下载Json文件"
#urlPrefix=`cat $tradeInfoJsonUrlPrefix`
#echo $current$urlPrefix
#最终下载的文件存放位置在下面
#fileFolder=$current$urlPrefix
getJsonFolderPath
if [[ $urlPrefix == "" ]];then
echo "当天没有数据文件,直接返回"
return 0;
fi
mkdir -p $fileFolder
#删除指定目录下的文件,然后删除
rm -rf $fileFolder/*
#进入$current$urlPrefix,开始循环下载json文件
cd $fileFolder
#开始循环文件,然后下载文件
for line in `cat $tradeInfoJsonUrls`
do
jsonOssPath=$ossUrl$line
echo $jsonOssPath
wget $jsonOssPath
echo "文件路径:"$current$line
newPath=`echo $line |sed 's/_//g'`
mv $current$line $current$newPath
done
#修改替换文件中文件名称
sed -i 's/_//g' $tradeInfoJsonUrls
echo "下载json文件结束"
}
#上传json文件到HDFS中
function putJsonFile2Hdfs() {
#上传数据文件到HDFS中
cd $current
getJsonFolderPath
if [[ $urlPrefix == "" ]];then
echo "当天没有数据文件,直接返回"
return 0;
fi
echo "hdfs中的文件路径"
echo $urlPrefix
hdfs dfs -rm -r $urlPrefix
hdfs dfs -mkdir -p $urlPrefix
#下面是上传文件到hdfs中
for line in `cat $tradeInfoJsonUrls`
do
echo $current$line
#将文件上传到指定的目录中
hdfs dfs -put $current$line $urlPrefix
#上传完成之后,删除留在本地的Json文件
rm -rf $current$line
done
echo "上传json文件到HDFS中完成"
}
#获取数据json文件路径,前缀等信息
getUrlRetry
#下载json数据到指定目录
downloadJsons
#上传数据文件到HDFS中
putJsonFile2Hdfs
#清理Linux系统中不用的垃圾暂用的内存
sync
echo 3 > /proc/sys/vm/drop_caches
[/mw_shl_code]
3、Shell脚本中执行Spark程序
[mw_shl_code=shell,true]#!/bin/bash
source /etc/profile
#求结果中的url路径长度,如果是4,表示这里的值是一个控制了(下面这两行是等效的)
#urlLength=`echo ${urlInfo} |jq '.data.urls[1]' | awk '{print length($0)}'`
#urlLength=$(echo ${urlInfo} |jq '.data.urls[1]' | awk '{print length($0)}')
#echo $urlLength
#引用公共文件中定义的参数变量
source $PWD/env.sh
#json数据所在的文件目录(相对于脚本所在的相对路径)
urlPrefix=
#Json数据文件存放的实际目录
fileFolder=
#1、获取Json文件相对脚本的文件目录(相对路径)
#2、获取Json数据文件在磁盘上的绝对路径
function getJsonFolderPath() {
#查看指定文件是否存在
urlPrefix=`cat $tradeInfoJsonUrlPrefix`
#数据文件所在位置
fileFolder=$current$urlPrefix
}
#是否执行过初始化程序了的控制逻辑
function isInited() {
#如果文件存在,读取相应的数据类型
if [[ `ls $tradeInfoVariableFile | grep tradeInfoVariableFile | grep -v grep` != "" ]];then
dataType=`cat $tradeInfoVariableFile | grep sparkInited | sed 's/sparkInited=//g'`
#如果没有,说明这个Spark程序还没有初始化过
if [[ $dataType == "" ]];then
echo -e "\n" >> $tradeInfoVariableFile
echo "sparkInited=inited" >> $tradeInfoVariableFile
return 0;
else
return 1;
fi
else
mkdir -p $current/tmpfile
cd $current/tmpfile
#如果没有这个文件,则是在这个文件中添加
echo "sparkInited=inited" > $tradeInfoVariableFile
return 0;
fi
}
function mergeFiles() {
#上传数据文件到HDFS中
cd $current
getJsonFolderPath
isInited
if [[ $? == 1 ]];then
echo "开始合并小文件为大文件"
hdfs dfs -getmerge $urlPrefix $PWD/tradeInfo
#删除$urlPrefix 下的文件
hdfs dfs -rm $urlPrefix/*
#将文件上传到指定的位置
hdfs dfs -put $PWD/tradeInfo $urlPrefix
echo $urlPrefix"tradeInfo" > $tradeInfoJsonUrls
echo "文件合并完成,并且已经将新文件路径写入文件"
rm -rf $PWD/tradeInfo
echo "删除存储在本地的文件"
fi
}
#Spark处理
function sparkHandler() {
#上传数据文件到HDFS中
cd $current
getJsonFolderPath
if [[ $urlPrefix == "" ]];then
echo "当天没有数据文件,直接返回"
return 0;
fi
isInited
if [[ $? == 0 ]];then
#由于是全量数据,在处理之前,删除hive库中的所有数据
echo '开始drop hive中的tb_trade_info表'
hive -e "
use data_center;
drop table if EXISTS tb_trade_info;
CREATE TABLE IF NOT EXISTS tb_trade_info (
createDate bigint comment '交易订单创建天,时间格式为yyyyMMdd的integer值,分区时间'
)
partitioned by(pt_createDate integer comment '创建天,时间格式为yyyyMMdd的integer值,分区时间')
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
"
echo 'drop hive中的tb_trade_info表 完成'
fi
#下面是上传文件到hdfs中
for line in `cat $tradeInfoJsonUrls`
do
#执行Spark程序来
echo $line
cd $SPARK_HOME
bin/spark-submit $sparkArgs xxx.xxx.xxx.xxx.xxx.TradeDataClean $programPrefixPath/trade-info/trade-info-1.0.1-SNAPSHOT.jar $line
done
echo "完成执行Spark程序"
}
mergeFiles
#上传数据文件到HDFS中
sparkHandler
#清理Linux系统中不用的垃圾暂用的内存
sync
echo 3 > /proc/sys/vm/drop_caches[/mw_shl_code]
4 Shell脚本中执行kylin restapi让kylin任务执行
[mw_shl_code=shell,true]env.sh 内容:
#!/bin/bash
#kylin的参数
export kylinUserInfo="--user ADMIN:KYLIN"
export kylinCubeUrl="http://xxx:7070/kylin/api/cubes/"
export kylinJobsUrl="http://xxxx:7070/kylin/api/jobs/"
export startTime="2015-01-01 00:00"
export startTimeTimeStamp=`date -d "$startTime" +%s`
export startTimeTimeStampMs=$(($startTimeTimeStamp * 1000))
export endTime=`date +%Y-%m-%d -d "+1days"`
export endTimeTimeStamp=`date -d "$endTime" +%s`
#将时间戳编程毫秒值
export endTimeTimeStampMs=$(($endTimeTimeStamp * 1000))
export tradeInfoArgs="dataName=tradeInfo&dataType=" #$dataType"&dataTime="$yesterday
#json的url信息存储的文件路径
export tradeInfoJsonUrls=$current/tmpfile/tradeInfoJsonUrls
#json的url存储位置前缀
export tradeInfoJsonUrlPrefix=$current/tmpfile/tradeInfoJsonUrlPrefix
export tradeAnalyzeCubeName="xxxx"
export tradeCollectMoneyCubeName="xxxx"
#用于存储是否下载了的变量文件
export tradeInfoVariableFile=$current/tmpfile/tradeInfoVariableFile
#!/bin/bash
source /etc/profile
#引用公共文件中定义的参数变量
source $PWD/env.sh
jobId=
#是否执行过初始化程序了的控制逻辑
function isInited() {
#如果文件存在,读取相应的数据类型
if [[ `ls $tradeInfoVariableFile | grep tradeInfoVariableFile | grep -v grep` != "" ]];then
dataType=`cat $tradeInfoVariableFile | grep kylinTradeAnalyzeCubeInited | sed 's/kylinTradeAnalyzeCubeInited=//g'`
#如果没有,说明这个Spark程序还没有初始化过
if [[ $dataType == "" ]];then
echo -e "\n" >> $tradeInfoVariableFile
echo "kylinTradeAnalyzeCubeInited=inited" >> $tradeInfoVariableFile
return 0;
else
return 1;
fi
else
mkdir -p $current/tmpfile
cd $current/tmpfile
#如果没有这个文件,则是在这个文件中添加
echo "kylinTradeAnalyzeCubeInited=inited" > $tradeInfoVariableFile
return 0;
fi
}
#Spark处理
function kylinHandler() {
isInited
if [[ $? == 0 ]];then
#上传数据文件到HDFS中
cd $current
#1、Disable Cube
curl -X PUT $kylinUserInfo -H "Content-Type: application/json;charset=utf-8" $kylinCubeUrl$tradeAnalyzeCubeName/disable
echo ""
echo ""
#2、Purge Cube
curl -X PUT $kylinUserInfo -H "Content-Type: application/json;charset=utf-8" $kylinCubeUrl$tradeAnalyzeCubeName/purge
echo ""
echo ""
#3、Enable Cube
curl -X PUT $kylinUserInfo -H "Content-Type: application/json;charset=utf-8" $kylinCubeUrl$tradeAnalyzeCubeName/enable
echo ""
echo ""
#4、Build cube
cubeBuildInfo=`curl -X PUT $kylinUserInfo -H "Content-Type: application/json;charset=utf-8" -d '{ "startTime":'$startTimeTimeStampMs',"endTime":'$endTimeTimeStampMs', "buildType": "BUILD"}' $kylinCubeUrl$tradeAnalyzeCubeName/build`
echo ""
echo ""
else
cubeBuildInfo=`curl -X PUT $kylinUserInfo -H "Content-Type: application/json;charset=utf-8" -d '{"endTime":'$endTimeTimeStampMs', "buildType": "BUILD"}' $kylinCubeUrl$tradeAnalyzeCubeName/rebuild`
echo ""
echo ""
fi
echo "cube build的状态结果:"
echo $cubeBuildInfo
echo ""
echo ""
#查看是否build好了,如果build好了,发现last_build_time变成了build的最后时间了。
jobId=$(echo $cubeBuildInfo |jq '.uuid')
echo $jobId > $jobId
sed -i 's/"//g' $jobId
realJobId=`cat $jobId`
echo $realJobId
rm -rf $jobId
echo ""
echo ""
while :
do
sleep 1m
cubeJobInfo=`curl -X GET --user ADMIN:KYLIN $kylinJobsUrl$realJobId`
echo "获取cube job运行的状态"
echo $cubeJobInfo
echo ""
echo ""
jobStatus=$(echo $cubeJobInfo | jq ".job_status")
echo "jobStatus"
echo $jobStatus > $realJobId
sed -i 's/"//g' $realJobId
realJobStatus=`cat $realJobId`
echo "$realJobStatus"
echo ""
if [[ $realJobStatus == "NEW" ]];then
echo "kylin cube build job status:NEW; sleep 1m;"
elif [[ $realJobStatus == "PENDING" ]];then
echo "kylin cube build job status:PENDING; sleep 1m;"
elif [[ $realJobStatus == "RUNNING" ]];then
echo "kylin cube build job status:RUNNING; sleep 1m;"
elif [[ $realJobStatus == "STOPPED" ]];then
echo "kylin cube build job status:STOPPED"
#如果stop了,停掉kylin脚本的运行
break;
elif [[ $realJobStatus == "FINISHED" ]];then
echo "kylin cube build job status:FINISHED"
break;
elif [[ $realJobStatus == "ERROR" ]];then
echo "kylin cube build job status:ERROR"
break;
elif [[ $realJobStatus == "DISCARDED" ]];then
echo "kylin cube build job status:DISCARDED"
break;
else
echo "kylin cube build job status:OTHER UNKNOWN STATUS"
break;
fi
done
#删除文件
rm -rf $realJobId
}
#上传数据文件到HDFS中
kylinHandler
#清理Linux系统中不用的垃圾暂用的内存
sync
echo 3 > /proc/sys/vm/drop_caches
[/mw_shl_code]
5 编写Azkaban的job目的:编写一个类似下面图能够并行执行任务,串行往下执行任务,最终到一个结束任务。
最顶层的一个job脚本
[mw_shl_code=shell,true]#jsonHandler-all.job
type=command
command=sh /xxx/jsonHandler-all.sh
[/mw_shl_code]
对于下面一行并行的任务,其中的一个的写法如下:
[mw_shl_code=shell,true]#sparkHandler-advertiserFlowofmain
type=command
dependencies=sparkHandler-shop
command=sh /xxxx/sparkHandler-advertiserFlowofmain.sh
[/mw_shl_code]
注意上面的dependencies,这种写法之后,在上面的那种图中,这个job上只有一个sparkHandler-shop相关的任务 对于最底层的那个任务,需要依赖上面的多个任务的名称,类似如下: [mw_shl_code=shell,true]#sparkHandler-tradeInfo
type=command
dependencies=sparkHandler-couponCard,sparkHandler-memberCard
command=sh /data/workspace/bplan-data-center-job/sparkHandler-tradeInfo.sh
[/mw_shl_code]
这个写完之后,在sparkHandler-tradeInfo的上面就会存在2个任务job,分别是:sparkHandler-couponCard,sparkHandler-memberCard。sparkHandler-tradeInfo会在最底层。
最新经典文章,欢迎关注公众号
来源:CSDN 作者:toto1297488504 原文:《hive表,hive视图,spark处理数据入mysql,shell获取url数据下载json,Spark sql处理json,shell脚本执行kylin,azkaban任务调度》 https://blog.csdn.net/tototuzuoquan/article/details/88654771
|