pig2 发表于 2018-11-21 18:58:22

彻底明白Flink系统学习3:编程知识之Flink程序结构

本帖最后由 pig2 于 2018-11-21 19:01 编辑

问题导读

1.Flink程序结是什么结构?
2.Flink中source,sink分别是什么意思?
2.Flink数据源有哪些?
3.如何自定义Flink数据源?
4.Flink如何定义Sink?

关注最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg

上一篇
彻底明白Flink系统学习2:Flink分布式执行包括调度、通信机制、检查点等
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26357


这篇文章是对第二篇的一个补充,我们看到Flink的Source和Sink可能对他们有所误解,这里对他们做一个诠释说明。同时由于这个不能单独来说,因此这里另起一篇Flink编程结构的一个解说。
我们大概都知道任何程序都是需要有输入、处理、输出。
那么Flink同样也是,它的专业术语是什么?
Flink专业术语对应Source,map,Sink。
1.数据源(source)
也就是在在我们提到Flink程序的时候,我们会有Source数据源,然后map其实就是对输入的数据处理的意思,接着Sink就是落地数据,也就是我们存储数据到什么地方。
这里需要跟Flume的source和sink比较,这二者含义和作用基本相同,但是却不能混淆。Flink的source,虽然也是数据源,但是这些我们需要代码指定的,Flume直接通过配置文件指定。同样sink也是。而Flink中间还有对数据的处理,也就是map。这就是程序Flink的程序结构。

Flink的source到底是什么?为了更好地理解,我们这里给出下面一个简单典型的wordcount程序。
case class WordWithCount(word: String, count: Long)

val text = env.socketTextStream(host, port, '\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")

windowCounts.print()
上面不知我们能否看出哪里是是source?
//连接socket获取输入的数据
val text = env.socketTextStream(host, port, '\n')
这里便是读取数据源,也就是我们所说的source。Flink的source多种多样,比如我们可以自定义source,这里面不知你是否思考一个问题,为什么要定义source。比如我们读取数据,该如何读取,这时候我们就想到了socketTextStream,在比如我们想读取mysql的数据,我们就需要mysql的source。如果没有source该怎么做?自定义source。该如何自定义,不会啊。大家可参考下面文章Flink自定义一个简单source及mysqlsource实例

2.处理
上面source我们认识了,读取数据之后,我们就要什么了那?处理数据。wordcount里面,哪些是处理数据的?
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
flatMap 、map 、keyBy、timeWindow、sum那么这些就是对数据的处理。

3.保存数据(sink)
上面数据是如何保存的,相信不用说,大家也能看出来。
windowCounts.print()
这里输出可以说是非常简单的。而sink当然跟source一样也是可以自定义的。你是否想到,我们为什么要自定义sink?
很简单,Flink数据,保存到myslq,是不能直接保存的,所以需要自定义一个sink。不定义sink可以吗?可以的,那就是自己在写一遍,每次调用都复制一遍,这样造成大量的重复,所以我们需要自定义sink。
那么常见的sink有哪些?如下:
flink在批处理中常见的sink
1.基于本地集合的sink(Collection-based-sink)
2.基于文件的sink(File-based-sink)

1.基于本地集合的sink(Collection-based-sink)
package code.book.batch.sinksource.scala
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object DataSink000 {
def main(args: Array): Unit = {
    //1.定义环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2.定义数据 stu(age,name,height)
    val stu: DataSet[(Int, String, Double)] = env.fromElements(
      (19, "zhangsan", 178.8),
      (17, "lisi", 168.8),
      (18, "wangwu", 184.8),
      (21, "zhaoliu", 164.8)
    )
    //3.sink到标准输出
    stu.print

    //3.sink到标准error输出
    stu.printToErr()

    //4.sink到本地Collection
    print(stu.collect())
}
}
2.基于文件的sink(File-based-sink)
flink支持多种存储设备上的文件,包括本地文件,hdfs文件,alluxio文件等。
flink支持多种文件的存储格式,包括text文件,CSV文件等。
package code.book.batch.sinksource.scala

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
object DataSink001 {
def main(args: Array): Unit = {
    //0.主意:不论是本地还是hdfs.若Parallelism>1将把path当成目录名称,若Parallelism=1将把path当成文件名。
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
    //1.写入到本地,文本文档,NO_OVERWRITE模式下如果文件已经存在,则报错,OVERWRITE模式下如果文件已经存在,则覆盖
    ds1.setParallelism(1).writeAsText("file:///output/flink/datasink/test001.txt",
    WriteMode.OVERWRITE)
    env.execute()

    //2.写入到hdfs,文本文档,路径不存在则自动创建路径。
    ds1.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt",
    WriteMode.OVERWRITE)
    env.execute()

    //3.写入到hdfs,CSV文档
    //3.1读取csv文件
    val inPath = "hdfs:///input/flink/sales.csv"
    case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)
    val ds2 = env.readCsvFile(
      filePath = inPath,
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      lenient = false,
      ignoreFirstLine = true,
      includedFields = Array(0, 1, 2, 3),
      pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")
    )
    //3.2将CSV文档写入到hdfs
    val outPath = "hdfs:///output/flink/datasink/sales.csv"
    ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "\n",
    fieldDelimiter = "|", WriteMode.OVERWRITE)
    env.execute()
}
}
3.基于文件的sink(数据进行排序)
可以使用sortPartition对数据进行排序后再sink到外部系统。
执行程序

package code.book.batch.sinksource.scala
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
object DataSink002 {
def main(args: Array): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //stu(age,name,height)
    val stu: DataSet[(Int, String, Double)] = env.fromElements(
      (19, "zhangsan", 178.8),
      (17, "lisi", 168.8),
      (18, "wangwu", 184.8),
      (21, "zhaoliu", 164.8)
    )
    //1.以age从小到大升序排列(0->9)
    stu.sortPartition(0, Order.ASCENDING).print
    //2.以naem从大到小降序排列(z->a)
    stu.sortPartition(1, Order.DESCENDING).print
    //3.以age升序,height降序排列
    stu.sortPartition(0, Order.ASCENDING).sortPartition(2, Order.DESCENDING).print
    //4.所有字段升序排列
    stu.sortPartition("_", Order.ASCENDING).print
    //5.以Student.name升序
    //5.1准备数据
    case class Student(name: String, age: Int)
    val ds1: DataSet[(Student, Double)] = env.fromElements(
      (Student("zhangsan", 18), 178.5),
      (Student("lisi", 19), 176.5),
      (Student("wangwu", 17), 168.5)
    )
    val ds2 = ds1.sortPartition("_1.age", Order.ASCENDING).setParallelism(1)
    //5.2写入到hdfs,文本文档
    val outPath1="hdfs:///output/flink/datasink/Student001.txt"
    ds2.writeAsText(filePath = outPath1, WriteMode.OVERWRITE)
    env.execute()
    //5.3写入到hdfs,CSV文档
    val outPath2="hdfs:///output/flink/datasink/Student002.csv"
    ds2.writeAsCsv(filePath = outPath2,rowDelimiter = "\n",
    fieldDelimiter = "|||",WriteMode.OVERWRITE)
    env.execute()
}
}

注释:我们知道由于Flink既有流处理,又有批处理。上面只是批处理的sink,当然也有流处理的sink。更多大家可以搜索或则参考官网。

4.扩展
为了更好的理解,下面我们与传统程序,spark等比较下。

传统程序
我们来想想传统程序应该是怎么样的。小的输入例如:数组、字符串。大的比如文件等,这就是我们传统程序的输入。输出当然比如输出到磁盘或则数据库

大数据程序
那么Spark和Mapreduce的输入是什么?Spark输入相当多,当然其实也就是Spark的数据源。Spark输出也比较多,比传统的多了分布式文件系统,比如hdfs等。

对于Spark数据源比较多,不做详细描述,感兴趣可参考下面内容:
Spark多数据源计算实践及其在GrowingIO的实践
http://www.aboutyun.com/forum.php?mod=viewthread&tid=18615


spark json文件parquet文件,和常用的文件,jdbc等数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20573


spark2 sql去哪读取数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23466


solr作为spark sql的数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21888


sparksql多数据源透明访问
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22494


通过Spark DataSource API 如何实现Rest数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=19291


Spark SQL之External DataSource外部数据源(一)示例
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12657


Spark SQL之External DataSource外部数据源(二)源码分析
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12660


jiewuzhe02 发表于 2018-11-22 16:35:33

关注关注

linux_oracle 发表于 2018-11-27 11:21:02

6666666666666666666

gk091620 发表于 2019-1-14 08:28:26

赞赞赞

devilmz 发表于 2019-6-19 16:43:08

感谢分享

若无梦何远方 发表于 2019-8-21 10:06:21

代码好像优点问题 ,总的来说问题不大

金瞳 发表于 2019-12-9 18:00:18

1.Flink程序结是什么结构?
- source -> map -> sink

2.Flink中source,sink分别是什么意思?
- source:map上游来源数据
- sink:map输出的保存数据

2.Flink数据源有哪些?
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)

3.如何自定义Flink数据源?
- 继承RichSourceFunction类,重载open,run,close方法

4.Flink如何定义Sink?
- 继承RichSourceFunction类,重载open,run,close方法
- 继承RichSinkFunction类,重载open,invoke,close方法
页: [1]
查看完整版本: 彻底明白Flink系统学习3:编程知识之Flink程序结构