fc013 发表于 2019-12-27 22:01:44

Flink入门简介二




问题导读

1.怎样使用Flink实现WordCount程序?
2.怎样搭建Flink集群?
3.Flink怎样整合Kafka?


上一篇:Flink入门简介一

第二章 编程模型

一、第一个Flink程序-WordCount

步骤

1.在IDEA中创建Maven项目




2.导入pom.xml:

<properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <maven.compiler.source>1.8</maven.compiler.source>
      <maven.compiler.target>1.8</maven.compiler.target>
      <flink.version>1.7.1</flink.version>
    </properties>

    <dependencies>
      <!-- Flink 依赖 -->
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <!-- Flink Kafka连接器的依赖 -->
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
      </dependency>

      <!-- Flink Scala2.11 版本 -->
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.1</version>
      </dependency>

      <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
      </dependency>

      <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
      <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
      </dependency>
      <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
      </dependency>
      <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
      </dependency>
      <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
      </dependency>
      <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.5</version>
      </dependency>
    </dependencies>

    <build>
      <plugins>
            <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                  <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                  </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                  <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
                  <!--<appendAssemblyId>false</appendAssemblyId>-->
                  <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                  </descriptorRefs>
                  <archive>
                        <manifest>
                            <mainClass>com.lw.myflink.Streaming.FlinkSocketWordCount</mainClass>
                        </manifest>
                  </archive>
                </configuration>
                <executions>
                  <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                  </execution>
                </executions>
            </plugin>
      </plugins>
    </build>



3.创建被统计的文本



4.程序代码

package ah.szxy.flink;

import akka.stream.impl.fusing.GroupBy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* 创建Flink程序,统计单词数目
*1.创建环境
*      批处理: ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
*      流处理: StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
*2.在批处理中Flink处理的数据对象是DataSet
*    在流处理中Flink处理的数据对象是DataStream
*3.代码流程必须符合    source ->transformation->sink
*   transformation 都是懒执行, 需要最后使用env.execute()触发执行或者使用print(),count(),collect()触发执行
*4.Flink编程不是K.V格式的编程, 通过某些方式来虚拟key
*5.Flink中的tuple最多支持25个元素, 每个元素都是从0开始
*
* @author TimePause
* @create 2019-12-08 21:04
*/
public class WordCont {
    public static void main(String[] args) throws Exception {
      ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
      DataSource<String> dataSource=env.readTextFile("./data/word.txt");
      FlatMapOperator<String,String> words=dataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String w, Collector<String> collector) throws Exception {
                String[] split = w.split(" ");
                for (String word : split) {
                  collector.collect(word);
                }
            }
      });
      MapOperator<String,Tuple2<String,Integer>> reduceWords =words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String w) throws Exception {
                return new Tuple2<String, Integer>(w,1);
            }
      });

      UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
      //输出到控制台
       AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
       result.print();
      //输出到文件中
       // DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1);
       // DataSink<Tuple2<String, Integer>> dataSink = dataSet.writeAsText("./data/reslut/r1");
       // env.execute("myflink");


    }
}



5.执行结果



总结

Flink处理数据流程

Source -> Transformations ->Sink
数据源头 -> 数据转换 -> 数据输出

Flink程序的执行过程:


[*]获取flink的执行环境(execution environment)
[*]加载数据-- soure
[*]对加载的数据进行转换 – transformation
[*]对结果进行保存或者打印 --sink
[*]触发flink程序的执行(execute(),count(),collect(),print()),例如:调用ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。


Flink中数据类型


[*]有界数据流
[*]无界数据流


Flink三种处理数据模型


[*]Flink批处理


Flink批处理中处理的是有界数据流 --Dataset


[*]Flink流式处理


Flink流式处理中有界数据流也有无界数据流 --DataStream


[*]FlinkSQL处理


有界数据流也有无界数据流

二、分区设置和排序


[*]设置全局分区 env.setParallelism(1);
[*]为某个算子设置分区 DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);
[*]分区排序演示代码



public class WordCont {
    public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      //设置全局分区
      // env.setParallelism(1);
      DataSource<String> dataSource = env.readTextFile("./data/word.txt");
      FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String w, Collector<String> collector) throws Exception {
                String[] split = w.split(" ");
                for (String word : split) {
                  collector.collect(word);
                }
            }
      });
      MapOperator<String, Tuple2<String, Integer>> reduceWords = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String w) throws Exception {
                return new Tuple2<String, Integer>(w, 1);
            }
      });

      UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
      //输出到控制台
//       AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
      DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);//为单个的算子设置分区
      SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
      result.print();
    }
}




三、设置 source和 sink


[*]设置数据源头 DataSource<String> dataSource = env.readTextFile("./data/word.txt");
[*]设置数据输出:



DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);//为单个的算子设置分区
   SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
   //csv文件, 生成的文件以指定分隔符分隔,默认为逗号
   result.writeAsCsv("./data/result/r2", "\n", "&", FileSystem.WriteMode.OVERWRITE);



四、Flink网址分析案例

实现过滤以 https:// 开头的网址


/**
* 统计网站链接情况
*
* @author TimePause
* @create 2019-12-09 10:11
*/
public class Flink360site {
    public static void main(String[] args) throws Exception {
      //创建批处理环境
      ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
      //设置分区
      env.setParallelism(1);
      //指定数据源
      DataSource<String> dataSource = env.readTextFile("./data/360index");

      //编写过滤规则
      FilterOperator<String> filter= dataSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.startsWith("https://");
            }
      });

      //对符合过滤的数据计数
      long count = filter.count();

      //指定文件要写入的目的地
      DataSink<String> stringDataSink = filter.writeAsText("./result/data/r2", FileSystem.WriteMode.OVERWRITE);
      env.execute("统计网站链接");
    }
}



结果展示



五、计数器

/**
* 利用计数器进行网站链接计数
*
* @author TimePause
* @create 2019-12-09 10:53
*/
public class FlinkAccumulator {
    public static void main(String[] args) throws Exception {
      //创建批处理环境
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      //指定数据源
      DataSource<String> dataSource = env.readTextFile("./data/360index");

      //创建过滤器
      FilterOperator<String> filter = dataSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.startsWith("https://");
            }
      });

      // 创建map操作器
      MapOperator<String, String> map = filter.map(new RichMapFunction<String, String>() {
            //创建计数器
            private IntCounter intCount= new IntCounter();

            //创建算子
            @Override
            public void open(Configuration parameters) throws Exception {
                getRuntimeContext().addAccumulator("myacc",intCount);
            }

            @Override
            public String map(String s) throws Exception {
                intCount.add(1);
                return s;
            }
      });

      // 指定文件输出的目的地
      DataSink<String> dataSink = map.writeAsText("./data/ressult/accumulator/r1");
      // 创建任务执行结果对象
      JobExecutionResult mycounter = env.execute("mycounter");
      // 创建计数器的计数结果
      Integer myacc = mycounter.getAccumulatorResult("myacc");
      System.out.println("myCounter value= " + myacc);
      System.out.println("-----------------------------");
    }
}



六、Flink术语

Flink 使用 java 语言开发,提供了 scala 编程的接口。使用 java 或者 scala 开发 Flink 是需 要使用 jdk8 版本,如果使用 Maven,maven 版本需要使用 3.0.4 及以上。

DataFlows




parallel Dataflows

并行数据流




Task和算子链



JobManager、TaskManager和clients:



Flink 运行时包含两种类型的进程:


[*]JobManger:也叫作 masters,协调分布式执行,调度 task,协调 checkpoint,协调故障恢复。在 Flink 程序中至少有一个 JobManager,高可用可以设置多个 JobManager,其中 一个是 Leader,其他都是 standby 状态。
[*]TaskManager:也叫 workers,执行 dataflow 生成的 task,负责缓冲数据,及 TaskManager 之间的交换数据。Flink 程序中必须有一个 TaskManager. Flink 程序可以运行在 standalone 集群,Yarn 或者 Mesos 资源调度框架中。 clients不是Flink程序运行时的一部分,作用是向JobManager准备和发送dataflow,之后, 客户端可以断开连接或者保持连接。


TaskSlots 任务槽


[*]TaskSlots 任务槽:


每个Worker(TaskManager)是一个JVM进程,可以执行一个或者多个task,这些task可以运行在任务槽上,每个worker上至少有一个任务槽。每个任务槽都有固定的资源,例如:TaskManager有三个TaskSlots,那么每个TaskSlot会将TaskMananger中的内存均分,即每个任务槽的内存是总内存的1/3。任务槽的作用就是分离任务的托管内存,不会发生cpu隔离。
通过调整任务槽的数据量,用户可以指定每个TaskManager有多少任务槽,更多的任务槽意味着更多的task可以共享同一个JVM,同一个JVM中的task共享TCP连接和心跳信息,共享数据集和数据结构,从而减少TaskManager中的task开销。


[*]总结:task slot的个数代表TaskManager可以并行执行的task数。





第三章 安装

一、集群搭建步骤

1.进入https://flink.apache.org/downloads.html 下载flink.
下载好Flink之后( 资料分享至底部 )上传到Master(node1)节点上解压。

2.配置…/conf/flink-conf.yaml jobmanager.rpc.address: node1指定主节点
配置…/conf/slaves (指定从节点)

node2
node3

3.将配置好的Flink发送到其他worker节点(node2,node3)上。
启动Flink集群 start-cluster.sh,访问webui。node1:8081




二、Flink读取Socket数据

1.编写java代码


package ah.szxy.flink4;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* 读取Socket数据(流式数据)
*
* @author TimePause
* @create 2019-12-09 14:56
*/
public class FlinkReadSocketData {
    public static void main(String[] args) throws Exception {
      //创建流式处理环境
      StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
      // 参数管理工具类,帮助我们管理集群相关参数
      ParameterTool parameterTool = ParameterTool.fromArgs(args);
      String node = "";
      Integer port = 0;
      if (parameterTool.has("node") && parameterTool.has("port")) {
            node = parameterTool.get("node");
            port = Integer.valueOf(parameterTool.get("port"));
      } else {
            System.out.println("集群提交需要参数");
            System.exit(1);
      }
      // 创建数据来源
      DataStreamSource<String> dataStreamSource = env.socketTextStream(node, port);
      // 切分单词
      SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                  collector.collect(s1);
                }
            }
      });

      // 规定输出格式
      SingleOutputStreamOperator<Tuple3<String,String,Integer>> map =flatMap.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> map(String s) throws Exception {
                return new Tuple3<>(s, s, 1);
            }
      });
      // 统计单词
      KeyedStream<Tuple3<String,String,Integer>,Tuple> objectTupleKeyedStream = map.keyBy(0, 1);
      SingleOutputStreamOperator<Tuple3<String, String, Integer>> sum = objectTupleKeyedStream.sum(2);
      sum.print();

      env.execute("MySocketProject");
    }
}



2.在运行时, 编辑运行的参数



3.参数指定接收node4的 9999端口发来信息,图1, 然后运行程序

注意: 需要在node4上面下载网络工具netcat


下载工具
yum install -y nc

启动阻塞式窗口, 可以在里面输入相关字符数据( 图2 )
nc -lk 9999

图1



图2



图3



上传Flink程序到集群中运行

将上个实例项目打包, 放到集群中运行

1.打包程序

先clean一下,目的是清除原有的jar(图1)

然后package进行打包, 然后将含有相关jar包的jar上传到虚拟机(jar)




图2



2.集群的运行jar 需要jdk 1.8.0_191或者更高,本人的是 1.8.0_11版本,因此在上传jar到Flink集群会报错

flink job 运行时报错 AskTimeoutException, 解决方案是升级JDK

# 删除原来的版本, 通过下面的命令找到对应的jdk存放目录, 删除
which java
which javac

# 上传 高版本jdk,解压, 重新配置环境变量以后重新加载一下 profile文件即可

3.上传任务到flink集群

# flink run -c 全限定类名 jar所在目录 --node 需要监听的节点 --port 需要监听的端口
flink run -c ah.szxy.flink4.FlinkReadSocketData ~/chy/software/MyFlinkCode-1.0-SNAPSHOT-jar-with-dependencies.jar --node node4 --port 9999



4.node4通过netcat发送消息数据, 然后通过flink的集群查看(图1,图2)

图1



图2



三、Flink窗口操作

前提: 需要在node4中开启netcat, 运行程序后,在五秒内输入随机数据, 查看控制台打印结果


nc -lk 9999

相关代码

/**
* Flink窗口操作
*
* @author TimePause
* @create 2019-12-09 19:44
*/
public class FlinkWindowOperator {
    public static void main(String[] args) throws Exception {
      // 创建流处理环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 创建数据源
      DataStreamSource<String>dataStreamSource=env.socketTextStream("node4",9999);
      // 切分单词
      SingleOutputStreamOperator<String> flatmap=dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                  collector.collect(s1);
                }
            }
      });
      // 规定输出格式
      SingleOutputStreamOperator<Tuple2<String,Integer>> map=flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
      });
      // 统计并输出结果
      KeyedStream<Tuple2<String, Integer>, Tuple> keyby = map.keyBy(0);
      // 一个参数: 每隔n个时间单位计算数目, 两个参数, 每隔后一个时间单位计算前一个时间单位的数据
      WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyby.timeWindow(Time.seconds(15), Time.seconds(5));
      SingleOutputStreamOperator<Tuple2<String, Integer>> sum = timeWindow.sum(1);
      sum.print();
      env.execute("FlinkWindowsOperator");

    }
}



结果





四、整合Kafka

启动kafka集群后

通过Flink代码自动生成topic-ReadKafkaTopic,我们将这个topic作为生产者


kafka-console-producer.sh --broker-list node2:9092,node3:9092,node4:9092 --topic ReadKafkaTopic


然后监听另一个topic-ResultKafkaTopic(Flink代码作用是将ReadKafkaTopic中数据传到ResultKafkaTopic )

kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic ResultKafkaTopic

Flink整合Kafka代码

package ah.szxy.flink6.kafka;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
* 使用Flink读取Kafka中的数据
*
* @author TimePause
* @create 2019-12-09 20:27
*/
public class FlinkReadKafka {
    public static void main(String[] args) throws Exception {
      //创建流处理环境
      StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
      Properties properties = new Properties();
      properties.setProperty("bootstrap.servers", "node2:9092,node3:9092,node4:9092");
      properties.setProperty("group.id", "myflink.group");

      //在kafka中创建topic
      FlinkKafkaConsumer011<String> readKafkaTopic = new FlinkKafkaConsumer011<String>("ReadKafkaTopic", new SimpleStringSchema(), properties);
      //创建数据源
      DataStreamSource<String> dataStreamSource = env.addSource(readKafkaTopic);
      //分词
      SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                  collector.collect(new Tuple2<>(s1,1));
                }

            }
      });
      //分组, 通过0号位置单词进行分组
      KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = flatMap.keyBy(0);
      //统计
      SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
      //
      FlinkKafkaProducer011<String> resultKafkaTopic = new FlinkKafkaProducer011<String>("ResultKafkaTopic", new SimpleStringSchema(), properties);

      sum.map(new MapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) throws Exception {
                return value.f0 + "#" + value.f1;
            }
      }).addSink(resultKafkaTopic);

      env.execute("readKafka");
    }
}



最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg

---------------------
作者:时间静止不是简史
来源:csdn
原文:Flink——运行在数据流上的有状态计算框架和处理引擎

美丽天空 发表于 2019-12-30 12:55:20

感谢分享

琅琊榜尾 发表于 2019-12-30 14:20:29

界面好骚啊,天天盯着人家屁股的么?哈哈哈,感谢分享
页: [1]
查看完整版本: Flink入门简介二