本帖最后由 pig2 于 2018-11-21 19:01 编辑
问题导读
1.Flink程序结是什么结构?
2.Flink中source,sink分别是什么意思?
2 .Flink数据源有哪些?
3.如何自定义Flink数据源?
4.Flink如何定义Sink?
关注最新经典文章,欢迎关注公众号
上一篇
彻底明白Flink系统学习2:Flink分布式执行包括调度、通信机制、检查点等
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26357
这篇文章是对第二篇的一个补充,我们看到Flink的Source和Sink可能对他们有所误解,这里对他们做一个诠释说明。同时由于这个不能单独来说,因此这里另起一篇Flink编程结构的一个解说。
我们大概都知道任何程序都是需要有输入、处理、输出 。
那么Flink同样也是,它的专业术语是什么?
Flink专业术语对应Source,map,Sink 。
1.数据源(source)
也就是在在我们提到Flink程序的时候,我们会有Source数据源,然后map其实就是对输入的数据处理的意思,接着Sink就是落地数据,也就是我们存储数据到什么地方。
这里需要跟Flume的source和sink比较,这二者含义和作用基本相同,但是却不能混淆。Flink的source,虽然也是数据源,但是这些我们需要代码指定的,Flume直接通过配置文件指定。同样sink也是。而Flink中间还有对数据的处理,也就是map。 这就是程序Flink的程序结构。
Flink的source到底是什么?为了更好地理解,我们这里给出下面一个简单典型的wordcount程序。
case class WordWithCount(word: String, count: Long)
val text = env.socketTextStream(host, port, '\n')
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
windowCounts.print() 复制代码
上面不知我们能否看出哪里是是source?
//连接socket获取输入的数据
val text = env.socketTextStream(host, port, '\n')
这里便是读取数据源,也就是我们所说的source。Flink的source多种多样,比如我们可以自定义source,这里面不知你是否思考一个问题,为什么要定义source。比如我们读取数据,该如何读取,这时候我们就想到了socketTextStream, 在比如我们想读取mysql的数据,我们就需要mysql的source。如果没有source该怎么做?自定义source。该如何自定义,不会啊。大家可参考下面文章Flink自定义一个简单source及mysqlsource实例
2.处理
上面source我们认识了,读取数据之后,我们就要什么了那?处理数据。wordcount里面,哪些是处理数据的?
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
flatMap 、map 、keyBy、timeWindow、sum那么这些就是对数据的处理。
3.保存数据(sink)
上面数据是如何保存的,相信不用说,大家也能看出来。
windowCounts.print()
这里输出可以说是非常简单的。而sink当然跟source一样也是可以自定义的。你是否想到,我们为什么要自定义sink?
很简单,Flink数据,保存到myslq,是不能直接保存的,所以需要自定义一个sink。不定义sink可以吗?可以的,那就是自己在写一遍,每次调用都复制一遍,这样造成大量的重复,所以我们需要自定义sink。
那么常见的sink有哪些?如下:
flink在批处理中常见的sink
1.基于本地集合的sink(Collection-based-sink)
2.基于文件的sink(File-based-sink)
1.基于本地集合的sink(Collection-based-sink)
package code.book.batch.sinksource.scala
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object DataSink000 {
def main(args: Array[String]): Unit = {
//1.定义环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.定义数据 stu(age,name,height)
val stu: DataSet[(Int, String, Double)] = env.fromElements(
(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)
)
//3.sink到标准输出
stu.print
//3.sink到标准error输出
stu.printToErr()
//4.sink到本地Collection
print(stu.collect())
}
} 复制代码
2.基于文件的sink(File-based-sink)
flink支持多种存储设备上的文件,包括本地文件,hdfs文件,alluxio文件等。
flink支持多种文件的存储格式,包括text文件,CSV文件等。
package code.book.batch.sinksource.scala
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
object DataSink001 {
def main(args: Array[String]): Unit = {
//0.主意:不论是本地还是hdfs.若Parallelism>1将把path当成目录名称,若Parallelism=1将把path当成文件名。
val env = ExecutionEnvironment.getExecutionEnvironment
val ds1: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
//1.写入到本地,文本文档,NO_OVERWRITE模式下如果文件已经存在,则报错,OVERWRITE模式下如果文件已经存在,则覆盖
ds1.setParallelism(1).writeAsText("file:///output/flink/datasink/test001.txt",
WriteMode.OVERWRITE)
env.execute()
//2.写入到hdfs,文本文档,路径不存在则自动创建路径。
ds1.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt",
WriteMode.OVERWRITE)
env.execute()
//3.写入到hdfs,CSV文档
//3.1读取csv文件
val inPath = "hdfs:///input/flink/sales.csv"
case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)
val ds2 = env.readCsvFile[Sales](
filePath = inPath,
lineDelimiter = "\n",
fieldDelimiter = ",",
lenient = false,
ignoreFirstLine = true,
includedFields = Array(0, 1, 2, 3),
pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")
)
//3.2将CSV文档写入到hdfs
val outPath = "hdfs:///output/flink/datasink/sales.csv"
ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "\n",
fieldDelimiter = "|", WriteMode.OVERWRITE)
env.execute()
}
} 复制代码
3.基于文件的sink(数据进行排序)
可以使用sortPartition对数据进行排序后再sink到外部系统。
执行程序
package code.book.batch.sinksource.scala
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
object DataSink002 {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//stu(age,name,height)
val stu: DataSet[(Int, String, Double)] = env.fromElements(
(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)
)
//1.以age从小到大升序排列(0->9)
stu.sortPartition(0, Order.ASCENDING).print
//2.以naem从大到小降序排列(z->a)
stu.sortPartition(1, Order.DESCENDING).print
//3.以age升序,height降序排列
stu.sortPartition(0, Order.ASCENDING).sortPartition(2, Order.DESCENDING).print
//4.所有字段升序排列
stu.sortPartition("_", Order.ASCENDING).print
//5.以Student.name升序
//5.1准备数据
case class Student(name: String, age: Int)
val ds1: DataSet[(Student, Double)] = env.fromElements(
(Student("zhangsan", 18), 178.5),
(Student("lisi", 19), 176.5),
(Student("wangwu", 17), 168.5)
)
val ds2 = ds1.sortPartition("_1.age", Order.ASCENDING).setParallelism(1)
//5.2写入到hdfs,文本文档
val outPath1="hdfs:///output/flink/datasink/Student001.txt"
ds2.writeAsText(filePath = outPath1, WriteMode.OVERWRITE)
env.execute()
//5.3写入到hdfs,CSV文档
val outPath2="hdfs:///output/flink/datasink/Student002.csv"
ds2.writeAsCsv(filePath = outPath2,rowDelimiter = "\n",
fieldDelimiter = "|||",WriteMode.OVERWRITE)
env.execute()
}
}
复制代码
注释:我们知道由于Flink既有流处理,又有批处理。上面只是批处理的sink,当然也有流处理的sink。更多大家可以搜索或则参考官网。
4.扩展
为了更好的理解,下面我们与传统程序,spark等比较下。
传统程序
我们来想想传统程序应该是怎么样的。小的输入例如:数组、字符串。大的比如文件等,这就是我们传统程序的输入。输出当然比如输出到磁盘或则数据库
大数据程序
那么Spark和Mapreduce的输入是什么?Spark输入相当多,当然其实也就是Spark的数据源。Spark输出也比较多,比传统的多了分布式文件系统,比如hdfs等。
对于Spark数据源比较多,不做详细描述,感兴趣可参考下面内容:
Spark多数据源计算实践及其在GrowingIO的实践
http://www.aboutyun.com/forum.php?mod=viewthread&tid=18615
spark json文件parquet文件,和常用的文件,jdbc等数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20573
spark2 sql去哪读取数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23466
solr作为spark sql的数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21888
sparksql多数据源透明访问
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22494
通过Spark DataSource API 如何实现Rest数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=19291
Spark SQL之External DataSource外部数据源(一)示例
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12657
Spark SQL之External DataSource外部数据源(二)源码分析
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12660