分享

集成Hive与Spark SQL及代码实现

fc013 2017-3-5 15:09:09 发表于 介绍解说 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 23024


问题导读:

1.怎样编译和配置HIVE?
2.怎样配置spark?
3.怎样开发spark?






小结
结构上Hive On Spark和SparkSQL都是一个翻译层,把一个SQL翻译成分布式可执行的Spark程序。Hive和SparkSQL都不负责计算。

hive编译
如果不是采用CDH在线自动安装和部署的话,可能需要对源码进行编译,使它能够兼容HIVE。

编译的方式也很简单,只需要在Spark_SRC_home(源码的home目录下)执行如下命令:

[mw_shl_code=shell,true]./make-distribution.sh --tgz -PHadoop-2.2 -Pyarn -DskipTests -Dhadoop.version=2.6.0-cdh5.4.4 -Phive
[/mw_shl_code]

编译好了之后,会在lib目录下多几个jar包()

hive配置
编辑 $HIVE_HOME/conf/Hive-site.xml,增加如下内容:

[mw_shl_code=xml,true]<property>
  <name>hive.metastore.uris</name>
  <value>thrift://master:9083</value>
  <description>Thrift uri for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>[/mw_shl_code]

启动hive metastore

[mw_shl_code=shell,true]$hive --service metastore &


查看 metastore:
$jobs
[1]+  Running                 hive --service metastore &

关闭 metastore:
$kill %1
kill %jobid,1代表job id[/mw_shl_code]

spark配置
[mw_shl_code=text,true]将 $HIVE_HOME/conf/hive-site.xml copy或者软链 到 $SPARK_HOME/conf/
将 $HIVE_HOME/lib/mysql-connector-java-5.1.12.jar copy或者软链到$SPARK_HOME/lib/
copy或者软链$SPARK_HOME/lib/ 是方便spark standalone模式使用[/mw_shl_code]

启动spark-sql
1. standalone模式

[mw_shl_code=shell,true]./bin/spark-sql --master spark:master:7077 --jars /home/spark/spark-1.6.0/lib/mysql-connector-java-5.1.12.jar
[/mw_shl_code]

2. yarn-client模式

[mw_shl_code=shell,true]./bin/spark-sql --master yarn-client --jars /home/spark/spark-1.6.0/lib/mysql-connector-java-5.1.12.jar

执行 sql:
select count(*) from people;[/mw_shl_code]

3. yarn-cluster模式

Cluster deploy mode 不支持的

[mw_shl_code=shell,true]./bin/spark-sql --master yarn-cluster  --jars /home/spark/spark-1.6.0/lib/mysql-connector-java-5.1.12.jar
Error: Cluster deploy mode is not applicable to Spark SQL shell.
Run with --help for usage help or --verbose for debug output
...[/mw_shl_code]

启动 spark-shell
1. standalone模式

[mw_shl_code=shell,true]./bin/spark-shell --master spark:master:7077 --jars /home/spark/spark-1.6.0/lib/mysql-connector-java-5.1.12.jar
[/mw_shl_code]

2. yarn-client模式

[mw_shl_code=applescript,true]./bin/spark-shell --master yarn-client --jars /home/spark/spark-1.6.0/lib/mysql-connector-java-5.1.12.jar

sqlContext.sql("from people SELECT count(appkey,name1,name2)").collect().foreach(println)
[/mw_shl_code]

代码测试
代码:

[mw_shl_code=scala,true]import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

object SqlHive {

  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
    val sc=new SparkContext(conf)
    val hiveContext = new HiveContext(sc)

    /*
    sqlContext.sql("CREATE TABLE IF NOT EXISTS people (id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ")  //这里需要注意数据的间隔符
    sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/people.txt' INTO TABLE people  ");
    sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)
    */

    //通过HiveContext.table方法来直接加载Hive中的表而生成DataFrame
    hiveContext.sql("use hive")
    hiveContext.sql("DROP TABLE IF EXISTS people")
    hiveContext.sql("CREATE TABLE IF NOT EXISTS people(id INT, name STRING, age INT)")
    //当然也可以通过LOAD DATA INPATH去获得HDFS等上面的数据 到Hive(数据的移动)
    hiveContext.sql("LOAD DATA LOCAL INPATH '/home/sql/spark/people.txt' INTO TABLE people")

    //把本地数据加载到Hive中(数据的拷贝)
    hiveContext.sql("DROP TABLE IF EXISTS orders")
    hiveContext.sql("CREATE TABLE IF NOT EXISTS orders(peopleId INT, orderNo STRING)")
    hiveContext.sql("LOAD DATA LOCAL INPATH '/home/sql/spark/orders.txt' INTO TABLE orders")
    //使用join
    val resultDF = hiveContext.sql("SELECT p.name, p.age, o.orderNo"
      + "FROM people p JOIN orders o ON p.id=o.peopleId WHERE p.age > 18")
    //当删除该表时,磁盘上的数据也会被删除
    hiveContext.sql("DROP TABLE IF EXISTS resultT")
    //通过saveAsTable的方式把DaraFrame中的数据保存到Hive数据仓库中,数据放在什么地方、元数据都是Hive管理的
    resultDF.saveAsTable("resultT")

    //通过HivewContext的Table方法去读Hive中的Table并生成DaraFrame
    //读取的数据就可以进行机器学习、图计算、各种复杂ETL等操作
    val dataFrameHive = hiveContext.table("resultT")
    dataFrameHive.show()

    sc.stop()
  }

}[/mw_shl_code]

shell脚本:

[mw_shl_code=shell,true]./bin/spark-submit --class SparkSQLByScala.SparkSQL2Hive --master spark://slq1:7077 /home/spark/SqlHive.jar
[/mw_shl_code]



来源:csdn
作者:bingo_liu


本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
yangyixin 发表于 2017-7-7 16:26:11
hello。那个脚本./make-distribution.sh ,我的目录下根本没有啊,是要怎么处理呀?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条