本帖最后由 levycui 于 2020-6-3 18:50 编辑
问题导读:
1、如何使用spark连接ES?
2、如何使用TransportClient往ES批量导入数据?
3、在编写代码中踩了哪些坑?
4、ES中如何创建索引?
他们之前把数据导入ES是通过单机的程序导的,或者通过logstash从kafka往ES导,但当数据量很大的时候就会变得很低效,我这两天调研了一下把数据从hdfs直接通过spark导入ES的方法,当然,也适合spark Streaming程序;
这里指出版本号是有必要的,spark版本:1.6.2 ES版本:5.2.1,由于ES的API变动比较频繁,因此最好参考官网文档。
连接ES的方法列举
使用TransportClient往ES批量导入的方法
样例代码如下:
[mw_shl_code=java,true]import java.net.InetAddress
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
/**
* Author: wangxiaogang
* Date: 2017/7/11
* Email: Adamyuanyuan@gmail.com
* hdfs 中的数据根据格式写到ES中
*/
object HdfsToEs {
def main(args: Array[String]) {
if (args.length < 5) {
System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
System.exit(1)
}
val hdfsInputPath: String = args(0)
println("hdfsInputPath: " + hdfsInputPath)
val conf = new SparkConf().setAppName("HdfsToEs")
val sc = new SparkContext(conf)
//插入相关,索引 类型 id相关 以args方式提供接口。
val esIndex: String = args(1)
val esType: String = args(2)
val partition: Int = args(3).toInt
val bulkNum: Int = args(4).toInt
val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
val startTime: Long = System.currentTimeMillis
println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
hdfsRdd.foreachPartition {
eachPa => {
// 生产环境
val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
.put("transport.type", "netty3").put("http.type", "netty3").build
val client: TransportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
var bulkRequest: BulkRequestBuilder = null
var flag = true
var lineNum = 0
for (eachLine <- eachPa) {
// 每个bulk是10-15M为宜,数据封装为bulk后会较原来的数据略有增大,如果每行数据约为 1.5KB,则每 10000 行为一个bulk
if (flag) {
bulkRequest = client.prepareBulk
flag = false
}
val strArray: Array[String] = eachLine.split("###")
if (strArray.length != 25) {
// 表示这行数据又问题,为了不影响整体,则跳过
println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
} else {
// LinkedHashMap让ES中的数据变得有序
val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
val id: String = strArray(0)
esDataMap.put("msisdn", id)
// 数据合并后的格式为: msisdn###w0的前三###w1的前三###如果为空的话就是null...###w23的前三,共25列
for (i <- 1 to 24) {
val locTimesListStr = strArray(i)
val esDataKey = "w" + (i - 1)
if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
esDataMap.put(esDataKey, "")
} else {
esDataMap.put(esDataKey, locTimesListStr)
}
}
bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
lineNum += 1
if (lineNum % bulkNum == 0) {
val endTime: Long = System.currentTimeMillis
println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
val bbq: BulkResponse = bulkRequest.execute.actionGet()
flag = true
if (bbq.hasFailures) {
println("bbq.hasFailures: " + bbq.toString)
bulkRequest.execute.actionGet
}
}
}
}
if (bulkRequest != null) {
bulkRequest.execute().actionGet()
}
client.close()
val endTime: Long = System.currentTimeMillis
println("ths time is: " + (endTime - startTime) / 1000 + "s ")
}
}
sc.stop()
}
}[/mw_shl_code]
踩坑说明:在编写代码中踩了如下坑:
- 依赖冲突的问题: ES5.2与Spark1.6有如下包会产生依赖: netty-all:io.netty,com.fasterxml.jackson.core:jackson-core, org.apache.logging.log4j:log4j-core.
- 解决方案:
- 通过 mvn dependency:tree -Dverbose -Dincludes=com.fasterxml.jackson.core 命令查出依赖原因,然后在pom.xml中增加所需的相关依赖的最高版本;
- 每个bulk的大小,根据网上的经验是10M-15M为宜,大概计算一下就好了;
- 后来在单机测试通过,但在集群模式中还是会出现 netty4的依赖冲突:
[mw_shl_code=java,true] 17/07/17 10:21:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][management][T#1],5,main]
java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
at org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:488)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:527)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)[/mw_shl_code]
有一种解决方案我没有尝试成功,就是在pom中将冲突的依赖包exclusions掉,各位感兴趣可以尝试,成功了麻烦告知我一下。参考链接:https://www.elastic.co/blog/to-shade-or-not-to-shade, 使用 maven-shade-plugin 工具打包。
上个方法我尝试几次不成功后,使用了比较暴力的方法,直接将ES的netty参数由netty4改成了netty3,
.put("transport.type", "netty3").put("http.type", "netty3").build
好了,打包好之后,程序就可以完美运行了。
ES中创建索引
就算如果ES中是自动创建索引的,也希望你能手动创建索引和字段属性,因为默认的字段属性是Text,ES会自动对它进行分词相关的操作,如果ES中存的字符串你不想让它被分隔的话,就用keyword替代为Text类型,命令如下:
[mw_shl_code=java,true]PUT /weekend-20170718
{
"settings" : {
"index" : {
"number_of_shards" : 5,
"number_of_replicas" : 1,
"refresh_interval" : "60s"
},
"index.routing.allocation.include.zone": "light"
},
"mappings": {
"offline": {
"properties": {
"msisdn": {
"type": "keyword"
},"w0": {
"type": "keyword"
} ...后面省略
}
}
}
}[/mw_shl_code]
创建好索引后检查一下:
GET /weekend-20170718/_mapping
集群中运行
这个比较简单,只需要注意以下几点就好了:
- 使用jdk1.8版本;
- 注意内存的申请,可能会出现跑了一段时间后,内存不够用导致程序退出的情况;
- 观测好ES集群的状态,一段时间后,ES机器的GC比较高
- 最好别一下子跑所有数据,分几批跑,这样就算出问题,只需要重跑那一部分就好了
数据:通过观察,导入的速度随着时间的增长呈下降趋势,整体来说,ES集群隔离的小集群共有五台物理机,共2.23亿条,751G的数据导入用了约4.5小时,平均速度为 45M/s, 1.38W条/s。
作者:http://flume.cn
来源:http://flume.cn/2017/07/17/spark ... %E6%96%B9%E6%B3%95/
最新经典文章,欢迎关注公众号 |
|