分享

Spark Structured Streaming如何将流数据保存到Elasticseach【附代码】


问题导读

1.从哪个版本开始Elasticseach支持spark sql?
2.如何查找Elasticseach和spark sql对应版本?
3.如何实现Spark Structured Streaming数据保存到Elasticseach?

关注最新经典文章,欢迎关注公众号


Spark为流数据提供了两类API,一个是Spark Streaming,它是Spark提供的独立库。 另一个是基于Spark-SQL库构建的结构化流。今天我们将专注于使用Spark Structured Streaming将流数据保存到Elasticseach。 Elasticsearch在版本6.0.0版本的“Elasticsearch For Apache Hadoop”依赖项中添加了对Spark Structured Streaming 2.2.0的支持。 我们将使用这些或更高版本来构建我们的sbt-scala项目。

准备
首先,添加 “Spark SQL” 依赖:
  1. libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
复制代码

找到对应版本elasticsearch
  1. libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.4.1"
复制代码

你可以到“Elasticsearch For Apache Hadoop”中找到合适的版本,但是需要注意,Elasticsearch 至少是6.0版本,Spark-SQL至少是2.2.0版本或则更高版本
如下图,我们打开连接看到如下内容:
1.png


开始写代码
我们将在此代码中读取JSON文件并将其数据保存到elasticsearch。 但首先让我们通过基本代码将JSON文件读取作为DataFrame,为此,我们需要为JSON创建一个schema 。
  1. val jsonSchema = StructType(
  2.   Seq(
  3.     StructField("name", StringType, true),
  4.     StructField("age", IntegerType, true),
  5.     StructField("nationality", StringType, true),
  6.     StructField("skills", ArrayType(StringType, true), true)
  7.   )
  8. )
复制代码

现在我们用下面代码实现使用schema以流模式读取JSON文件:
  1. val sparkSession = SparkSession.builder()
  2.   .master("local[*]")
  3.   .appName("sample-structured-streaming")
  4.   .getOrCreate()
  5. val streamingDF: DataFrame = sparkSession
  6.   .readStream
  7.   .schema(jsonSchema)
  8.   .json("src/main/resources/json-resources/")
复制代码

使用streamingDF,我们将数据输出到控制台。 使用控制台来验证数据。
  1. streamingDF
  2.   .writeStream
  3.   .outputMode("append")
  4.   .format("console")
  5.   .start().awaitTermination()
复制代码

这里.format(“console”)实现结果在控制台上打印。输出结果如下:
  1. +----+---+-----------+--------------------+
  2. |name|age|nationality|              skills|
  3. +----+---+-----------+--------------------+
  4. |Anuj| 24|     Indian|[Scala, Spark, Akka]|
  5. +----+---+-----------+--------------------+
复制代码

这只是一个简单的结构化流代码,其中JSON文件是源,数据输出到控制台。

输出数据到Elasticsearch
现在Spark Structured Streaming输出数据保存到elasticsearch,我们需要在SparkSession的对象中添加elasticsearch的配置:
  1. val sparkSession = SparkSession.builder()
  2.   .config(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, "username")
  3.   .config(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, "password")
  4.   .config(ConfigurationOptions.ES_NODES, "127.0.0.1")
  5.   .config(ConfigurationOptions.ES_PORT, "9200")
  6.   .master("local[*]")
  7.   .appName("sample-structured-streaming")
  8.   .getOrCreate()
复制代码

我们在此处使用elasticsearch节点和端口添加身份验证凭据。 目前,该地址用于本地机器进行测试。

现在使用前面的代码,将数据写到elasticsearch:
  1. streamingDF.writeStream
  2.   .outputMode("append")
  3.   .format("org.elasticsearch.spark.sql")
  4.   .option("checkpointLocation", "path-to-checkpointing")
  5.   .start("index-name/doc-type).awaitTermination()
复制代码

上面我们看到改变format由控制台(console )变为org.elasticsearch.spark.sql,它实现了将数据写到elasticsearch

现在代码已经完成了。 运行后,这会将JSON数据保存到elasticsearch。 要确保使用curl:
curl http://localhost:9200/index-name/_search
这显示所有文档保存index,index-name。 以下内容已保存到Elasticsearch:
screenshot-from-2018-09-23-19-42-52.png

整个代码下载:
structured-streaming-examples-master.zip (5.93 KB, 下载次数: 12)

已有(2)人评论

跳转到指定楼层
jiangzi 发表于 2018-9-28 12:41:58
使用Spark Structured Streaming将流数据保存到Elasticseach!!!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条