问题导读
1.从哪个版本开始Elasticseach支持spark sql?
2.如何查找Elasticseach和spark sql对应版本?
3.如何实现Spark Structured Streaming数据保存到Elasticseach?
关注最新经典文章,欢迎关注公众号
Spark为流数据提供了两类API,一个是Spark Streaming,它是Spark提供的独立库。 另一个是基于Spark-SQL库构建的结构化流。今天我们将专注于使用Spark Structured Streaming将流数据保存到Elasticseach。 Elasticsearch在版本6.0.0版本的“Elasticsearch For Apache Hadoop”依赖项中添加了对Spark Structured Streaming 2.2.0的支持。 我们将使用这些或更高版本来构建我们的sbt-scala项目。
准备
首先,添加 “Spark SQL” 依赖:
- libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
复制代码
找到对应版本elasticsearch
- libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.4.1"
复制代码
你可以到“Elasticsearch For Apache Hadoop”中找到合适的版本,但是需要注意,Elasticsearch 至少是6.0版本,Spark-SQL至少是2.2.0版本或则更高版本
如下图,我们打开连接看到如下内容:
开始写代码
我们将在此代码中读取JSON文件并将其数据保存到elasticsearch。 但首先让我们通过基本代码将JSON文件读取作为DataFrame,为此,我们需要为JSON创建一个schema 。
- val jsonSchema = StructType(
- Seq(
- StructField("name", StringType, true),
- StructField("age", IntegerType, true),
- StructField("nationality", StringType, true),
- StructField("skills", ArrayType(StringType, true), true)
- )
- )
复制代码
现在我们用下面代码实现使用schema以流模式读取JSON文件:
- val sparkSession = SparkSession.builder()
- .master("local[*]")
- .appName("sample-structured-streaming")
- .getOrCreate()
-
- val streamingDF: DataFrame = sparkSession
- .readStream
- .schema(jsonSchema)
- .json("src/main/resources/json-resources/")
复制代码
使用streamingDF,我们将数据输出到控制台。 使用控制台来验证数据。
- streamingDF
- .writeStream
- .outputMode("append")
- .format("console")
- .start().awaitTermination()
复制代码
这里.format(“console”)实现结果在控制台上打印。输出结果如下:
- +----+---+-----------+--------------------+
- |name|age|nationality| skills|
- +----+---+-----------+--------------------+
- |Anuj| 24| Indian|[Scala, Spark, Akka]|
- +----+---+-----------+--------------------+
复制代码
这只是一个简单的结构化流代码,其中JSON文件是源,数据输出到控制台。
输出数据到Elasticsearch
现在Spark Structured Streaming输出数据保存到elasticsearch,我们需要在SparkSession的对象中添加elasticsearch的配置:
- val sparkSession = SparkSession.builder()
- .config(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, "username")
- .config(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, "password")
- .config(ConfigurationOptions.ES_NODES, "127.0.0.1")
- .config(ConfigurationOptions.ES_PORT, "9200")
- .master("local[*]")
- .appName("sample-structured-streaming")
- .getOrCreate()
复制代码
我们在此处使用elasticsearch节点和端口添加身份验证凭据。 目前,该地址用于本地机器进行测试。
现在使用前面的代码,将数据写到elasticsearch:
- streamingDF.writeStream
- .outputMode("append")
- .format("org.elasticsearch.spark.sql")
- .option("checkpointLocation", "path-to-checkpointing")
- .start("index-name/doc-type).awaitTermination()
复制代码
上面我们看到改变format由控制台(console )变为org.elasticsearch.spark.sql,它实现了将数据写到elasticsearch
现在代码已经完成了。 运行后,这会将JSON数据保存到elasticsearch。 要确保使用curl:
curl http://localhost:9200/index-name/_search 这显示所有文档保存index,index-name。 以下内容已保存到Elasticsearch:
整个代码下载:
structured-streaming-examples-master.zip
(5.93 KB, 下载次数: 12)
|