问题导读
1.怎样使用Flink实现WordCount程序?
2.怎样搭建Flink集群?
3.Flink怎样整合Kafka?
上一篇:Flink入门简介一
第二章 编程模型
一、第一个Flink程序-WordCount
步骤
1.在IDEA中创建Maven项目
2.导入pom.xml:
[mw_shl_code=xml,true] <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.7.1</flink.version>
</properties>
<dependencies>
<!-- Flink 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- Flink Scala2.11 版本 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.lw.myflink.Streaming.FlinkSocketWordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
[/mw_shl_code]
3.创建被统计的文本
4.程序代码
[mw_shl_code=java,true]package ah.szxy.flink;
import akka.stream.impl.fusing.GroupBy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 创建Flink程序,统计单词数目
* 1.创建环境
* 批处理: ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
* 流处理: StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
* 2.在批处理中Flink处理的数据对象是DataSet
* 在流处理中Flink处理的数据对象是DataStream
* 3.代码流程必须符合 source ->transformation->sink
* transformation 都是懒执行, 需要最后使用env.execute()触发执行或者使用print(),count(),collect()触发执行
* 4.Flink编程不是K.V格式的编程, 通过某些方式来虚拟key
* 5.Flink中的tuple最多支持25个元素, 每个元素都是从0开始
*
* @author TimePause
* @create 2019-12-08 21:04
*/
public class WordCont {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource=env.readTextFile("./data/word.txt");
FlatMapOperator<String,String> words=dataSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String w, Collector<String> collector) throws Exception {
String[] split = w.split(" ");
for (String word : split) {
collector.collect(word);
}
}
});
MapOperator<String,Tuple2<String,Integer>> reduceWords =words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String w) throws Exception {
return new Tuple2<String, Integer>(w,1);
}
});
UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
//输出到控制台
AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
result.print();
//输出到文件中
// DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1);
// DataSink<Tuple2<String, Integer>> dataSink = dataSet.writeAsText("./data/reslut/r1");
// env.execute("myflink");
}
}
[/mw_shl_code]
5.执行结果
总结
Flink处理数据流程
Source -> Transformations ->Sink
数据源头 -> 数据转换 -> 数据输出
Flink程序的执行过程:
- 获取flink的执行环境(execution environment)
- 加载数据-- soure
- 对加载的数据进行转换 – transformation
- 对结果进行保存或者打印 --sink
- 触发flink程序的执行(execute(),count(),collect(),print()),例如:调用ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。
Flink中数据类型
Flink三种处理数据模型
Flink批处理中处理的是有界数据流 --Dataset
Flink流式处理中有界数据流也有无界数据流 --DataStream
有界数据流也有无界数据流
二、分区设置和排序
- 设置全局分区 env.setParallelism(1);
- 为某个算子设置分区 DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);
- 分区排序演示代码
[mw_shl_code=java,true]public class WordCont {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//设置全局分区
// env.setParallelism(1);
DataSource<String> dataSource = env.readTextFile("./data/word.txt");
FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String w, Collector<String> collector) throws Exception {
String[] split = w.split(" ");
for (String word : split) {
collector.collect(word);
}
}
});
MapOperator<String, Tuple2<String, Integer>> reduceWords = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String w) throws Exception {
return new Tuple2<String, Integer>(w, 1);
}
});
UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
//输出到控制台
// AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1); //为单个的算子设置分区
SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
result.print();
}
}
[/mw_shl_code]
三、设置 source和 sink
- 设置数据源头 DataSource<String> dataSource = env.readTextFile("./data/word.txt");
- 设置数据输出:
[mw_shl_code=java,true]DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1); //为单个的算子设置分区
SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
//csv文件, 生成的文件以指定分隔符分隔,默认为逗号
result.writeAsCsv("./data/result/r2", "\n", "&", FileSystem.WriteMode.OVERWRITE);
[/mw_shl_code]
四、Flink网址分析案例
实现过滤以 https:// 开头的网址
[mw_shl_code=java,true]/**
* 统计网站链接情况
*
* @author TimePause
* @create 2019-12-09 10:11
*/
public class Flink360site {
public static void main(String[] args) throws Exception {
//创建批处理环境
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
//设置分区
env.setParallelism(1);
//指定数据源
DataSource<String> dataSource = env.readTextFile("./data/360index");
//编写过滤规则
FilterOperator<String> filter= dataSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("https://");
}
});
//对符合过滤的数据计数
long count = filter.count();
//指定文件要写入的目的地
DataSink<String> stringDataSink = filter.writeAsText("./result/data/r2", FileSystem.WriteMode.OVERWRITE);
env.execute("统计网站链接");
}
}
[/mw_shl_code]
结果展示
五、计数器
[mw_shl_code=java,true]/**
* 利用计数器进行网站链接计数
*
* @author TimePause
* @create 2019-12-09 10:53
*/
public class FlinkAccumulator {
public static void main(String[] args) throws Exception {
//创建批处理环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//指定数据源
DataSource<String> dataSource = env.readTextFile("./data/360index");
//创建过滤器
FilterOperator<String> filter = dataSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("https://");
}
});
// 创建map操作器
MapOperator<String, String> map = filter.map(new RichMapFunction<String, String>() {
//创建计数器
private IntCounter intCount= new IntCounter();
//创建算子
@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator("myacc",intCount);
}
@Override
public String map(String s) throws Exception {
intCount.add(1);
return s;
}
});
// 指定文件输出的目的地
DataSink<String> dataSink = map.writeAsText("./data/ressult/accumulator/r1");
// 创建任务执行结果对象
JobExecutionResult mycounter = env.execute("mycounter");
// 创建计数器的计数结果
Integer myacc = mycounter.getAccumulatorResult("myacc");
System.out.println("myCounter value= " + myacc);
System.out.println("-----------------------------");
}
}
[/mw_shl_code]
六、Flink术语
Flink 使用 java 语言开发,提供了 scala 编程的接口。使用 java 或者 scala 开发 Flink 是需 要使用 jdk8 版本,如果使用 Maven,maven 版本需要使用 3.0.4 及以上。
DataFlows
parallel Dataflows
并行数据流
Task和算子链
JobManager、TaskManager和clients:
Flink 运行时包含两种类型的进程:
- JobManger:也叫作 masters,协调分布式执行,调度 task,协调 checkpoint,协调故障恢复。在 Flink 程序中至少有一个 JobManager,高可用可以设置多个 JobManager,其中 一个是 Leader,其他都是 standby 状态。
- TaskManager:也叫 workers,执行 dataflow 生成的 task,负责缓冲数据,及 TaskManager 之间的交换数据。Flink 程序中必须有一个 TaskManager. Flink 程序可以运行在 standalone 集群,Yarn 或者 Mesos 资源调度框架中。 clients不是Flink程序运行时的一部分,作用是向JobManager准备和发送dataflow,之后, 客户端可以断开连接或者保持连接。
TaskSlots 任务槽
每个Worker(TaskManager)是一个JVM进程,可以执行一个或者多个task,这些task可以运行在任务槽上,每个worker上至少有一个任务槽。每个任务槽都有固定的资源,例如:TaskManager有三个TaskSlots,那么每个TaskSlot会将TaskMananger中的内存均分,即每个任务槽的内存是总内存的1/3。任务槽的作用就是分离任务的托管内存,不会发生cpu隔离。
通过调整任务槽的数据量,用户可以指定每个TaskManager有多少任务槽,更多的任务槽意味着更多的task可以共享同一个JVM,同一个JVM中的task共享TCP连接和心跳信息,共享数据集和数据结构,从而减少TaskManager中的task开销。
- 总结:task slot的个数代表TaskManager可以并行执行的task数。
第三章 安装
一、集群搭建步骤
1.进入https://flink.apache.org/downloads.html 下载flink.
下载好Flink之后( 资料分享至底部 )上传到Master(node1)节点上解压。
2.配置…/conf/flink-conf.yaml jobmanager.rpc.address: node1指定主节点
配置…/conf/slaves (指定从节点)
node2
node3
3.将配置好的Flink发送到其他worker节点(node2,node3)上。
启动Flink集群 start-cluster.sh,访问webui。node1:8081
二、Flink读取Socket数据
1.编写java代码
[mw_shl_code=java,true]package ah.szxy.flink4;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 读取Socket数据(流式数据)
*
* @author TimePause
* @create 2019-12-09 14:56
*/
public class FlinkReadSocketData {
public static void main(String[] args) throws Exception {
//创建流式处理环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
// 参数管理工具类,帮助我们管理集群相关参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String node = "";
Integer port = 0;
if (parameterTool.has("node") && parameterTool.has("port")) {
node = parameterTool.get("node");
port = Integer.valueOf(parameterTool.get("port"));
} else {
System.out.println("集群提交需要参数");
System.exit(1);
}
// 创建数据来源
DataStreamSource<String> dataStreamSource = env.socketTextStream(node, port);
// 切分单词
SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(s1);
}
}
});
// 规定输出格式
SingleOutputStreamOperator<Tuple3<String,String,Integer>> map = flatMap.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(String s) throws Exception {
return new Tuple3<>(s, s, 1);
}
});
// 统计单词
KeyedStream<Tuple3<String,String,Integer>,Tuple> objectTupleKeyedStream = map.keyBy(0, 1);
SingleOutputStreamOperator<Tuple3<String, String, Integer>> sum = objectTupleKeyedStream.sum(2);
sum.print();
env.execute("MySocketProject");
}
}
[/mw_shl_code]
2.在运行时, 编辑运行的参数
3.参数指定接收node4的 9999端口发来信息,图1, 然后运行程序
注意: 需要在node4上面下载网络工具netcat
[mw_shl_code=text,true]下载工具
yum install -y nc
启动阻塞式窗口, 可以在里面输入相关字符数据( 图2 )
nc -lk 9999[/mw_shl_code]
图1
图2
图3
上传Flink程序到集群中运行
将上个实例项目打包, 放到集群中运行
1.打包程序
先clean一下,目的是清除原有的jar(图1)
然后package进行打包, 然后将含有相关jar包的jar上传到虚拟机(jar)
图2
2.集群的运行jar 需要jdk 1.8.0_191或者更高,本人的是 1.8.0_11版本,因此在上传jar到Flink集群会报错
flink job 运行时报错 AskTimeoutException, 解决方案是升级JDK
[mw_shl_code=text,true]# 删除原来的版本, 通过下面的命令找到对应的jdk存放目录, 删除
which java
which javac
# 上传 高版本jdk,解压, 重新配置环境变量以后重新加载一下 profile文件即可[/mw_shl_code]
3.上传任务到flink集群
[mw_shl_code=shell,true]# flink run -c 全限定类名 jar所在目录 --node 需要监听的节点 --port 需要监听的端口
flink run -c ah.szxy.flink4.FlinkReadSocketData ~/chy/software/MyFlinkCode-1.0-SNAPSHOT-jar-with-dependencies.jar --node node4 --port 9999
[/mw_shl_code]
4.node4通过netcat发送消息数据, 然后通过flink的集群查看(图1,图2)
图1
图2
三、Flink窗口操作
前提: 需要在node4中开启netcat, 运行程序后,在五秒内输入随机数据, 查看控制台打印结果
[mw_shl_code=shell,true]nc -lk 9999[/mw_shl_code]
相关代码
[mw_shl_code=java,true]/**
* Flink窗口操作
*
* @author TimePause
* @create 2019-12-09 19:44
*/
public class FlinkWindowOperator {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建数据源
DataStreamSource<String>dataStreamSource=env.socketTextStream("node4",9999);
// 切分单词
SingleOutputStreamOperator<String> flatmap=dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(s1);
}
}
});
// 规定输出格式
SingleOutputStreamOperator<Tuple2<String,Integer>> map=flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
// 统计并输出结果
KeyedStream<Tuple2<String, Integer>, Tuple> keyby = map.keyBy(0);
// 一个参数: 每隔n个时间单位计算数目, 两个参数, 每隔后一个时间单位计算前一个时间单位的数据
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyby.timeWindow(Time.seconds(15), Time.seconds(5));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = timeWindow.sum(1);
sum.print();
env.execute("FlinkWindowsOperator");
}
}
[/mw_shl_code]
结果
四、整合Kafka
启动kafka集群后
通过Flink代码自动生成topic-ReadKafkaTopic,我们将这个topic作为生产者
[mw_shl_code=shell,true]kafka-console-producer.sh --broker-list node2:9092,node3:9092,node4:9092 --topic ReadKafkaTopic
[/mw_shl_code]
然后监听另一个topic-ResultKafkaTopic(Flink代码作用是将ReadKafkaTopic中数据传到ResultKafkaTopic )
[mw_shl_code=shell,true]kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic ResultKafkaTopic[/mw_shl_code]
Flink整合Kafka代码
[mw_shl_code=java,true]package ah.szxy.flink6.kafka;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
* 使用Flink读取Kafka中的数据
*
* @author TimePause
* @create 2019-12-09 20:27
*/
public class FlinkReadKafka {
public static void main(String[] args) throws Exception {
//创建流处理环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node2:9092,node3:9092,node4:9092");
properties.setProperty("group.id", "myflink.group");
//在kafka中创建topic
FlinkKafkaConsumer011<String> readKafkaTopic = new FlinkKafkaConsumer011<String>("ReadKafkaTopic", new SimpleStringSchema(), properties);
//创建数据源
DataStreamSource<String> dataStreamSource = env.addSource(readKafkaTopic);
//分词
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(new Tuple2<>(s1,1));
}
}
});
//分组, 通过0号位置单词进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = flatMap.keyBy(0);
//统计
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
//
FlinkKafkaProducer011<String> resultKafkaTopic = new FlinkKafkaProducer011<String>("ResultKafkaTopic", new SimpleStringSchema(), properties);
sum.map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + "#" + value.f1;
}
}).addSink(resultKafkaTopic);
env.execute("readKafka");
}
}[/mw_shl_code]
最新经典文章,欢迎关注公众号
---------------------
|
|