分享

Spark-1.3.1与Hive

desehawk 发表于 2015-5-27 19:07:18 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 23257
问题导读

1.Spark为什么减少了读写磁盘I/O操作带来的延时?
2.Spark+Hive整合是在执行hsql的时候是转换成RDD还是mapreduce?
3.Spark+Hive如何实现整合?





在大数据应用场景下,使用过Hive做查询统计分析的应该知道,计算的延迟性非常大,可能一个非常复杂的统计分析需求,需要运行1个小时以上,但是比之于使用MySQL之类关系数据库做分析,执行速度快很多很多。使用HiveQL写类似SQL的查询分析语句,最终经过Hive查询解析器,翻译成Hadoop平台上的MapReduce程序进行运行,这也是MapReduce计算引擎的特点带来的延迟问题:Map中间结果写文件。如果一个HiveQL语句非常复杂,会被翻译成多个MapReduce Job,那么就会有很多的Map输出中间结果数据到文件中,基本没有数据的共享。

如果使用Spark计算平台,基于Spark RDD数据集模型计算,可以减少计算过程中产生中间结果数据写文件的开销,Spark会把数据直接放到内存中供后续操作共享数据,减少了读写磁盘I/O操作带来的延时。另外,如果基于Spark on YARN部署模式,可以充分利用数据在Hadoop集群DataNode节点的本地性(Locality)特点,减少数据传输的通信开销。

软件准备
我把使用的相关软件的版本在这里列出来,以便测试验证,如下所示:
  • CentOS-6.6 (Final)
  • JDK-1.7.0_25
  • Maven-3.2.1
  • Hadoop-2.2.0
  • Spark-1.3.1
  • Hive-0.12.0
  • MySQL-Server-5.5.8
另外还要搭建好Hadoop集群,以及安装配置好Hive客户端,能够在Hive上正确执行查询分析,安装过程不再累述,可以参考网上很多文档。由于我们使用最新版本的Spark-1.3.1,为了使用我们现有2.2.0版本的Hadoop平台,所以需要重新编译构建Spark程序,接下来会做详细说明。

这里,给出使用的各个集群环境的结构拓扑,如下表所示:

Source节点
服务名称
说明
hadoop1Spark Master/Spark DriverSpark集群
hadoop2DataNode/NodeManagerHadoop集群
hadoop3DataNode/NodeManagerHadoop集群
hadoop4HiveHive客户端
hadoop5Spark WorkerSpark集群
hadoop6Spark Worker/NameNode/ResourceManager/Secondary NameNodeSpark集群/Hadoop集群
10.10.4.130MySQL用于存储Hive元数据

上述节点配置相同,因为是测试机,所以配置相对比较低。我们是分别将Spark集群和Hadoop集群的Worker和NodeManager/DataNode分开部署了,在使用Spark做计算的时候,就没有数据本地性(Locality)的特性,所以如果基于Spark on YARN的模式,可能会获得更好地计算性能的提升。

Spark编译安装配置
首先从官网下在Spark源码文件:

[mw_shl_code=bash,true]cd ~/
wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1.tgz
tar xvzf spark-1.3.1.tgz
mv spark-1.3.1 spark-1.3.1-bin-hadoop2.2[/mw_shl_code]


我的环境是JDK 1.7,使用Maven构建,执行如下命令行:
[mw_shl_code=bash,true]export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
cd /home/spark/spark-1.3.1-bin-hadoop2.2/
mvn -Pyarn -Dyarn.version=2.2.0 -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package[/mw_shl_code]

编译构建完成以后,可以看到如下内容:
[mw_shl_code=bash,true]/home/spark/spark-1.3.1-bin-hadoop2.2/assembly/target/scala-2.10/spark-assembly-1.3.1-hadoop2.2.0.jar
/home/spark/spark-1.3.1-bin-hadoop2.2/lib_managed/*.jar[/mw_shl_code]


如果网络状况不好,可能无法构建成功。
另外,也可以使用sbt构建,执行如下命令:

[mw_shl_code=bash,true]cd /home/spark/spark-1.3.1-bin-hadoop2.2/
build/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0 -Phive-thriftserver assembly[/mw_shl_code]

如果失败,多试几次可能会以成功。
使用Maven构建与使用sbt构建,都要耗费很长时间,而且最终生成的文件可能会有所不同。

下面,我们配置Spark集群,首先在Spark Master节点上配置,修改配置文件conf/slaves,将Worker节点主机名加入进去,一行一个,内容如下所示:
[mw_shl_code=bash,true]hadoop5
hadoop6[/mw_shl_code]

修改Spark环境变量配置文件conf/spark-env.sh,增加如下配置行:
[mw_shl_code=bash,true]SPARK_MASTER_IP=hadoop1
[/mw_shl_code]

登录到Hive安装的节点,将Hive的配置文件拷贝到Spark安装目录下的conf目录下面,执行如下命令:
[mw_shl_code=bash,true]scp /usr/local/hive/conf/hive-site.xml spark@hadoop1:/home/spark/spark-1.3.1-bin-hadoop2.2/conf/
[/mw_shl_code]
最后分发Spark安装文件到Spark Worker节点上:

[mw_shl_code=bash,true]sudo scp -r /home/spark/spark-1.3.1-bin-hadoop2.2 spark@hadoop5:/home/spark/spark-1.3.1-bin-hadoop2.2/
sudo scp -r /home/spark/spark-1.3.1-bin-hadoop2.2 spark@hadoop6:/home/spark/spark-1.3.1-bin-hadoop2.2/[/mw_shl_code]

为了方便启动Spark集群,可以配置Spark Master到Workers的ssh免密码登录,然后只需要在Master中执行如下脚本即可:


可以查看Spark各个节点的服务启动情况,也可以通过Spark UI链接进入页面查看http://hadoop1:8080/,默认是8080端口,如果8080端口已经被占用,Spark会自动选择端口号数字加1,如http://hadoop1:8081/

Spark+Hive整合
我们知道,在使用Hive进行查询的时候,到底层MapReduce计算层会将HiveQL翻译成MapReduce程序,在Hadoop平台上执行计算,这使得计算的延迟比较大。我们整合Spark和Hive,就是通过Spark平台来计算Hive查询,也就是Hive不再使用它默认的MapReduce计算引擎,Spark会直接读取Hive的元数据存储,将Hive数据转换成Spark RDD数据,通过Spark提供的计算操作来实现(Transformation和Action)。
我们首先在Hive中创建一个数据库event_db,执行如下命令:

[mw_shl_code=sql,true]CREATE DATABASE event_db;
[/mw_shl_code]

在创建一个Hive外部表user_event,执行DDL脚本:
[mw_shl_code=sql,true]CREATE EXTERNAL TABLE event_db.user_event(
  appid string,
  event_code string,
  udid string,
  uid string,
  install_id string,
  session_id string,
  play_id string,
  page string,
  timestamp string,
  action string,
  network string,
  operator string,
  lon string,
  lat string,
  imsi string,
  speed string,
  event_id string,
  type string,
  result string,
  refer string,
  radio_id bigint,
  audio_id bigint,
  play_time bigint,
  duration bigint,
  start_time string,
  end_time string,
  request_agent string,
  request_referer string,
  device_id string,
  model_id string,
  area_tag string,
  remarks4 string,
  remarks5 string,
  ip bigint,
  area_code int,
  create_time string)
PARTITIONED BY (
  create_date string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://hadoop6:8020/hive/event_db/user_event';[/mw_shl_code]

我选择了一天的用户事件数据(大概有5G左右,13824560条记录),将数据加载到Hive的分区中,执行如下LOAD命令行:

[mw_shl_code=bash,true]LOAD DATA LOCAL INPATH '/home/shirdrn/data/user_event_20150511.log' OVERWRITE INTO TABLE event_db.user_event PARTITION (create_date='2015-05-11');[/mw_shl_code]

  • Standalone模式
我们可以通过指定SPARK_CLASSPATH变量,将需要访问Hive的元数据存储MySQL的驱动包加入进去,然后直接启动Spark SQL Shell即可。这里,使用Spark默认的集群管理模式Standalone,启动Shell时需要指定master选项为Spark Master服务连接:

[mw_shl_code=bash,true]SPARK_CLASSPATH="$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
bin/spark-sql --master spark://hadoop1:7077[/mw_shl_code]

这样我们可以直接在Spark SQL Shell上输入Hive查询语句就可以执行查询分析计算。
另外,还可以通过Spark Shell进行操作,不过需要了解Spark SQL支持的Scala API,启动Spark Shell,执行如下命令:
[mw_shl_code=bash,true]SPARK_CLASSPATH="$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
bin/spark-shell --master spark://hadoop1:7077[/mw_shl_code]

然后,创建一个org.apache.spark.sql.hive.HiveContext对象,用来执行Hive查询:

[mw_shl_code=bash,true]scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@6dcc664b[/mw_shl_code]

接着可以执行查询:
[mw_shl_code=bash,true]scala> sqlContext.sql("SELECT area_code,event_code,COUNT(udid) AS user_cnt FROM event_db.user_event WHERE create_date='2015-05-11' GROUP BY area_code,event_code LIMIT 10").collect().foreach(println)[/mw_shl_code]

可以看到查询结果。
  • yarn-client模式
如果基于YARN模式运行(与Hive整合只支持yarn-client模式,不支持yarn-cluster),需要指定Hadoop集群的环境变量(在当前Driver节点上必须有Hadoop的安装文件),如下所示:
[mw_shl_code=bash,true]export HADOOP_HOME=/usr/local/hadoop-2.2.0
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop[/mw_shl_code]

然后启动Spark SQL Shell,执行如下命令:

[mw_shl_code=bash,true]SPARK_CLASSPATH="$SPARK_CLASSPATH:/home/spark/spark-1.3.0-bin-hadoop2.2/lib_managed/jars/mysql-connector-java-5.1.34.jar"
bin/spark-sql --master yarn-client[/mw_shl_code]

  • 查询结果耗时比较
我们使用Hive,以及上面提到的两种模式分别执行如下HiveQL查询统计语句:
[mw_shl_code=bash,true]SELECT area_code,event_code,COUNT(DISTINCT udid) AS user_cnt FROM event_db.user_event WHERE create_date='2015-05-11' AND (create_time BETWEEN '2015-05-11 17:00:00' AND '2015-05-11 23:30:00') GROUP BY area_code,event_code ORDER BY user_cnt DESC LIMIT 10[/mw_shl_code]

可以看到查询结果,结果如下所示:

[mw_shl_code=bash,true]156000000     100003     8290
110000     100003     7832
440100     100003     4956
110000     100010     3850
440300     100003     3709
320100     100003     3683
410100     100003     3669
110000     101014     3479
110000     200004     3455
110000     100011     3423[/mw_shl_code]

对比耗时,如下表所示:
运行模式
花费时间(秒)
Hive189.695
Spark Standalone82.895
Spark yarn-client104.259
可见,无论是Spark Standalone模式还是Spark yarn-client模式,耗时都比直接执行Hive查询要少得多。我们执行Spark计算,2个Worker节点上各用了一个Executor,每个Executor使用512M内存,如果增加Executor个数,或者调大内存,应该比上面运行耗时更少,例如,启动Spark SQL Shell并指定相关参数:
[mw_shl_code=bash,true]bin/spark-sql --master spark://hadoop1:7077 --driver-memory 1G --driver-cores 2 --executor-memory 4G[/mw_shl_code]

或者:

[mw_shl_code=bash,true]bin/spark-sql --master yarn-client --driver-memory 1G --driver-cores 2 --executor-cores 4 --num-executors 8 --executor-memory 4G[/mw_shl_code]


总结
根据上面我们实践的整合Spark+Hive,在执行复杂统计分析时,完全可以使用Spark SQL来替代Hive,至少会提高几倍的速度,对于一些基于Hive统计应用,可能每天晚上要执行6个小时以上的统计计算,导致第二天结果数据都无法出来,如果统计需求再次增加,可能时间还会更长。除了对Hive查询语句进行优化之外,应该说优化空间不大,所以这个时候可以考虑使用Spark平台来实现统计分析,而且,Spark集群可以线性扩展,对于一些调优也更容易一些。
另外,Spark的发展超级迅猛,新版本频繁发布,而且在后期的版本中还会在性能方面进行大幅改进。Tungsten项目将是Spark自诞生以来内核级别的最大改动,以大幅度提升Spark应用程序的内存和CPU利用率为目标,旨在最大程度上压榨新时代硬件性能。Tungsten项目包括了3个方面的努力:
  • Memory Management和Binary Processing:利用应用的语义(Application Semantics)来更明确地管理内存,同时消除JVM对象模型和垃圾回收开销。
  • Cache-aware Computation(缓存友好的计算):使用算法和数据结构来实现内存分级结构(Memory Hierarchy)。
  • 代码生成(Code Generation):使用代码生成来利用新型编译器和CPU。
Tungsten将大幅度提升Spark的核心引擎,在Spark 1.4版本,会包括Dataframe API中聚合操作的内存管理,以及定制化序列化器。在Spark 1.5版本中,会有部分项目(基于DataFrame模型)包括二进制内存管理的扩展和Cache-aware数据结构。




已有(1)人评论

跳转到指定楼层
小秦琼 发表于 2015-6-11 09:50:46
这篇文章不错。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条