问题导读:
1.怎样把不同天的数据分离导入到不同天的es索引里面? 2.有哪几种方式可以不同天的数据分离导入到不同天的es索引里面? 3.怎样使用scala+Spark SQL读取Hive表按日期分组,然后借助es-hadoop框架把每一组的数据导入es里面?
Spark sql on hive的一个强大之处就是能够嵌在编程语言内执行,比如在Java或者Scala,Python里面,正是因为这样的特性,使得spark sql开发变得更加有趣。 比如我们想做一个简单的交互式查询,我们可以直接在Linux终端直接执行spark sql查询Hive来分析,也可以开发一个jar来完成特定的任务。
有些时候单纯的使用sql开发可能功能有限,比如我有下面的一个功能:
一张大的hive表里面有许多带有日期的数据,现在一个需求是能够把不同天的数据分离导入到不同天的es索引里面,方便按时间检索,提高检索性能。 有几种方式可以完成:
(1)写一个MapReduce程序,遍历这个表每一条数据,插入到es里面。 (2)使用Hive按日期分区,生成n个日期分区表,再借助es-Hadoop框架,通过shell封装将n个表的数据批量导入到es里面不同的索引里面 (3)使用scala+Spark SQL读取Hive表按日期分组,然后借助es-hadoop框架把每一组的数据导入es里面。
优缺点:
方式一:开发量最大,导入性能最差 方式二:开发量次之,导入性能一般 方式三:开发量小,性能最优
总结分析:
方式一:
直接使用MapReduce读取表数据,然后每一行add一次,插入性能非常低效,有人会说可以批使用list批量插入,但是不要忘记我们现在是每一天的数据插入到不同的索引里面,一个list是不能放不同日期的数据,所以如果想要批量还要维护一个不同日期的list,并放在Map里面,最后提交完清空集合,整体复杂度增加而且维护调试都比较麻烦。
方式二:
直接使用Hive,提前将数据构建成多个分区表,然后借助官方的es-hadoop框架,直接将每一个分区表的数据,导入到对应的索引里面,这种方式直接使用大批量的方式导入,性能比方式一好,但由于Hive生成多个分区表以及导入时还要读取每个分区表的数据涉及的落地IO次数比较多,所以性能一般
方式三:
在scala中使用spark sql操作hive数据,然后分组后取出每一组的数据集合,转化成DataFrame最后借助es-hadoop框架,将每组数据直接批量插入到es里面,注意此种方式对内存依赖比较大,因为最终需要将数据拉回spark的driver端进行插入操作。
关于方式一和方式二就不再细说了,有兴趣的朋友可以自己尝试下,下篇文章会把基于第三种方式实现的例子分享出来,可以直接在spark的local模式下模拟运行。
首下看下用到的依赖包有哪些:
下面看相关的代码,代码可直接在跑在win上的idea中,使用的是local模式,数据是模拟造的:
分析下,代码执行过程:
(1)首先创建了一个SparkSession对象,注意这是新版本的写法,然后加入了es相关配置 (2)导入了隐式转化的es相关的包 (3)通过Seq+Tuple创建了一个DataFrame对象,并注册成一个表 (4)导入spark sql后,执行了一个sql分组查询 (5)获取每一组的数据 (6)处理组内的Struct结构 (7)将组内的Seq[Row]转换为rdd,最终转化为df (8)执行导入es的方法,按天插入不同的索引里面 (9)结束
需要注意的是必须在执行collect方法后,才能在循环内使用sparkContext,否则会报错的,在服务端是不能使用sparkContext的,只有在Driver端才可以。
来源:woshigcs 作者:woshigcs
|