分享

用spark和elasticsearch交互的方式

ld512870 发表于 2017-3-29 17:30:13 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 9398
有没有对es比较熟悉的,我想问一下es官方提供的和spark进行交互的包,向下面这种,是通过类似于jdbc那种查询出来的,还是直接从底层取的数据?效率怎么样

import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._
val conf = new SparkConf()   
.set("es.nodes","192.168.47.155")   
.set("es.port","9200")   
.setMaster("spark://...")   
.setAppName("es_hdfs")
val sc = new SparkConf(sc)
//查询合作方为abc的数据
val query = """{"query":{"match":{"activity.partnerCode": "abc"}}}"""
//将在es中的查询结果转化为rdd/dataFrame
val esRdd = sc.esRDD(s"index/type",query)
//直接读入全部数据
val esDf = sqlContext.esDF(s"index/type")
//对读入rdd/dataFrame进行操作
esRdd.map(r=>{...})esDf.flatMap(r=>{......})
//将dataFrame/rdd写入es
esRdd.saveToEs("index/type")resultDf.saveToEs("index/type")




已有(3)人评论

跳转到指定楼层
qcbb001 发表于 2017-3-29 18:05:36

感觉没有太清楚楼主的意思。下面说说个人的想法:

//查询合作方为abc的数据
val query = """{"query":{"match":{"activity.partnerCode": "abc"}}}"""
上面的查询是es的原理,执行的查询
//将在es中的查询结果转化为rdd/dataFrame
val esRdd = sc.esRDD(s"index/type",query)
上面是讲查询的数据转换为rdd,后面就是spark中rdd及df的操作
//直接读入全部数据
val esDf = sqlContext.esDF(s"index/type")
//对读入rdd/dataFrame进行操作
esRdd.map(r=>{...})esDf.flatMap(r=>{......})
//将dataFrame/rdd写入es
esRdd.saveToEs("index/type")resultDf.saveToEs("index/type")

回复

使用道具 举报

huangll 发表于 2017-3-30 21:52:04
本质是基于HttpClient封装了一个RestClient和ES集群通信,走的依然是9200/HTTP端口.
用ES存储数据的好处是ES的查询可以把不需要的数据过滤掉,只把有用的数据返回,避免spark做过滤和数据传输。
回复

使用道具 举报

ld512870 发表于 2017-3-31 09:25:15
huangll 发表于 2017-3-30 21:52
本质是基于HttpClient封装了一个RestClient和ES集群通信,走的依然是9200/HTTP端口.
用ES存储数据的好处是 ...

那样的话还是不好,如果数据量比较大的话,会严重影响es的性能吧。甚至崩溃

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条