分享

Flink入门简介二




问题导读

1.怎样使用Flink实现WordCount程序?
2.怎样搭建Flink集群?
3.Flink怎样整合Kafka?



上一篇:Flink入门简介一

第二章 编程模型

一、第一个Flink程序-WordCount

步骤

1.在IDEA中创建Maven项目


20191208204241494.png

2.导入pom.xml:

[mw_shl_code=xml,true] <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.7.1</flink.version>
    </properties>

    <dependencies>
        <!-- Flink 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Flink Kafka连接器的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <!-- Flink Scala2.11 版本 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>

        <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
                    <!--<appendAssemblyId>false</appendAssemblyId>-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.lw.myflink.Streaming.FlinkSocketWordCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

[/mw_shl_code]

3.创建被统计的文本

20191208215941730.png

4.程序代码

[mw_shl_code=java,true]package ah.szxy.flink;

import akka.stream.impl.fusing.GroupBy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* 创建Flink程序,统计单词数目
*  1.创建环境
*      批处理: ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
*      流处理: StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
*  2.在批处理中Flink处理的数据对象是DataSet
*    在流处理中Flink处理的数据对象是DataStream
*  3.代码流程必须符合    source ->transformation->sink
*     transformation 都是懒执行, 需要最后使用env.execute()触发执行或者使用print(),count(),collect()触发执行
*  4.Flink编程不是K.V格式的编程, 通过某些方式来虚拟key
*  5.Flink中的tuple最多支持25个元素, 每个元素都是从0开始
*
* @author TimePause
* @create 2019-12-08 21:04
*/
public class WordCont {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> dataSource=env.readTextFile("./data/word.txt");
        FlatMapOperator<String,String> words=dataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String w, Collector<String> collector) throws Exception {
                String[] split = w.split(" ");
                for (String word : split) {
                    collector.collect(word);
                }
            }
        });
        MapOperator<String,Tuple2<String,Integer>> reduceWords =words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String w) throws Exception {
                return new Tuple2<String, Integer>(w,1);
            }
        });

        UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
        //输出到控制台
       AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
       result.print();
        //输出到文件中
       // DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1);
       // DataSink<Tuple2<String, Integer>> dataSink = dataSet.writeAsText("./data/reslut/r1");
       // env.execute("myflink");


    }
}

[/mw_shl_code]

5.执行结果

20191208220014237.png

总结

Flink处理数据流程

Source -> Transformations ->Sink
数据源头 -> 数据转换 -> 数据输出

Flink程序的执行过程:

  • 获取flink的执行环境(execution environment)
  • 加载数据-- soure
  • 对加载的数据进行转换 – transformation
  • 对结果进行保存或者打印 --sink
  • 触发flink程序的执行(execute(),count(),collect(),print()),例如:调用ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。


Flink中数据类型

  • 有界数据流
  • 无界数据流


Flink三种处理数据模型

  • Flink批处理


Flink批处理中处理的是有界数据流 --Dataset

  • Flink流式处理


Flink流式处理中有界数据流也有无界数据流 --DataStream

  • FlinkSQL处理


有界数据流也有无界数据流

二、分区设置和排序

  • 设置全局分区 env.setParallelism(1);
  • 为某个算子设置分区 DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);
  • 分区排序演示代码



[mw_shl_code=java,true]public class WordCont {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //设置全局分区
        // env.setParallelism(1);
        DataSource<String> dataSource = env.readTextFile("./data/word.txt");
        FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String w, Collector<String> collector) throws Exception {
                String[] split = w.split(" ");
                for (String word : split) {
                    collector.collect(word);
                }
            }
        });
        MapOperator<String, Tuple2<String, Integer>> reduceWords = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String w) throws Exception {
                return new Tuple2<String, Integer>(w, 1);
            }
        });

        UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
        //输出到控制台
//       AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
        DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);  //为单个的算子设置分区
        SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
        result.print();
    }
}
[/mw_shl_code]

20191209112700951.png

三、设置 source和 sink

  • 设置数据源头 DataSource<String> dataSource = env.readTextFile("./data/word.txt");
  • 设置数据输出:



[mw_shl_code=java,true]DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);  //为单个的算子设置分区
   SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
   //csv文件, 生成的文件以指定分隔符分隔,默认为逗号
   result.writeAsCsv("./data/result/r2", "\n", "&", FileSystem.WriteMode.OVERWRITE);

[/mw_shl_code]

四、Flink网址分析案例

实现过滤以 https:// 开头的网址


[mw_shl_code=java,true]/**
* 统计网站链接情况
*
* @author TimePause
* @create 2019-12-09 10:11
*/
public class Flink360site {
    public static void main(String[] args) throws Exception {
        //创建批处理环境
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //设置分区
        env.setParallelism(1);
        //指定数据源
        DataSource<String> dataSource = env.readTextFile("./data/360index");

        //编写过滤规则
        FilterOperator<String> filter= dataSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.startsWith("https://");
            }
        });

        //对符合过滤的数据计数
        long count = filter.count();

        //指定文件要写入的目的地
        DataSink<String> stringDataSink = filter.writeAsText("./result/data/r2", FileSystem.WriteMode.OVERWRITE);
        env.execute("统计网站链接");
    }
}

[/mw_shl_code]

结果展示

20191209113322986.png

五、计数器

[mw_shl_code=java,true]/**
* 利用计数器进行网站链接计数
*
* @author TimePause
* @create 2019-12-09 10:53
*/
public class FlinkAccumulator {
    public static void main(String[] args) throws Exception {
        //创建批处理环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //指定数据源
        DataSource<String> dataSource = env.readTextFile("./data/360index");

        //创建过滤器
        FilterOperator<String> filter = dataSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.startsWith("https://");
            }
        });

        // 创建map操作器
        MapOperator<String, String> map = filter.map(new RichMapFunction<String, String>() {
            //创建计数器
            private IntCounter intCount= new IntCounter();

            //创建算子
            @Override
            public void open(Configuration parameters) throws Exception {
                getRuntimeContext().addAccumulator("myacc",intCount);
            }

            @Override
            public String map(String s) throws Exception {
                intCount.add(1);
                return s;
            }
        });

        // 指定文件输出的目的地
        DataSink<String> dataSink = map.writeAsText("./data/ressult/accumulator/r1");
        // 创建任务执行结果对象
        JobExecutionResult mycounter = env.execute("mycounter");
        // 创建计数器的计数结果
        Integer myacc = mycounter.getAccumulatorResult("myacc");
        System.out.println("myCounter value= " + myacc);
        System.out.println("-----------------------------");
    }
}

[/mw_shl_code]

六、Flink术语

Flink 使用 java 语言开发,提供了 scala 编程的接口。使用 java 或者 scala 开发 Flink 是需 要使用 jdk8 版本,如果使用 Maven,maven 版本需要使用 3.0.4 及以上。

DataFlows


20191209115059363.png

parallel Dataflows

并行数据流


20191209115215929.png

Task和算子链

20191209115443579.png

JobManager、TaskManager和clients:

20191209115457896.png

Flink 运行时包含两种类型的进程:

  • JobManger:也叫作 masters,协调分布式执行,调度 task,协调 checkpoint,协调故障恢复。在 Flink 程序中至少有一个 JobManager,高可用可以设置多个 JobManager,其中 一个是 Leader,其他都是 standby 状态。
  • TaskManager:也叫 workers,执行 dataflow 生成的 task,负责缓冲数据,及 TaskManager 之间的交换数据。Flink 程序中必须有一个 TaskManager. Flink 程序可以运行在 standalone 集群,Yarn 或者 Mesos 资源调度框架中。 clients不是Flink程序运行时的一部分,作用是向JobManager准备和发送dataflow,之后, 客户端可以断开连接或者保持连接。


TaskSlots 任务槽

  • TaskSlots 任务槽:


每个Worker(TaskManager)是一个JVM进程,可以执行一个或者多个task,这些task可以运行在任务槽上,每个worker上至少有一个任务槽。每个任务槽都有固定的资源,例如:TaskManager有三个TaskSlots,那么每个TaskSlot会将TaskMananger中的内存均分,即每个任务槽的内存是总内存的1/3。任务槽的作用就是分离任务的托管内存,不会发生cpu隔离。
通过调整任务槽的数据量,用户可以指定每个TaskManager有多少任务槽,更多的任务槽意味着更多的task可以共享同一个JVM,同一个JVM中的task共享TCP连接和心跳信息,共享数据集和数据结构,从而减少TaskManager中的task开销。

  • 总结:task slot的个数代表TaskManager可以并行执行的task数。



20191209115600350.png

第三章 安装

一、集群搭建步骤

1.进入https://flink.apache.org/downloads.html 下载flink.
下载好Flink之后( 资料分享至底部 )上传到Master(node1)节点上解压。

2.配置…/conf/flink-conf.yaml jobmanager.rpc.address: node1指定主节点
配置…/conf/slaves (指定从节点)

node2
node3

3.将配置好的Flink发送到其他worker节点(node2,node3)上。
启动Flink集群 start-cluster.sh,访问webui。node1:8081


20191209144348331.png

二、Flink读取Socket数据

1.编写java代码


[mw_shl_code=java,true]package ah.szxy.flink4;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* 读取Socket数据(流式数据)
*
* @author TimePause
* @create 2019-12-09 14:56
*/
public class FlinkReadSocketData {
    public static void main(String[] args) throws Exception {
        //创建流式处理环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        // 参数管理工具类,帮助我们管理集群相关参数
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String node = "";
        Integer port = 0;
        if (parameterTool.has("node") && parameterTool.has("port")) {
            node = parameterTool.get("node");
            port = Integer.valueOf(parameterTool.get("port"));
        } else {
            System.out.println("集群提交需要参数");
            System.exit(1);
        }
        // 创建数据来源
        DataStreamSource<String> dataStreamSource = env.socketTextStream(node, port);
        // 切分单词
        SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                    collector.collect(s1);
                }
            }
        });

        // 规定输出格式
        SingleOutputStreamOperator<Tuple3<String,String,Integer>> map =  flatMap.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> map(String s) throws Exception {
                return new Tuple3<>(s, s, 1);
            }
        });
        // 统计单词
        KeyedStream<Tuple3<String,String,Integer>,Tuple> objectTupleKeyedStream = map.keyBy(0, 1);
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> sum = objectTupleKeyedStream.sum(2);
        sum.print();

        env.execute("MySocketProject");
    }
}

[/mw_shl_code]

2.在运行时, 编辑运行的参数

20191209153934406.png

3.参数指定接收node4的 9999端口发来信息,图1, 然后运行程序

注意: 需要在node4上面下载网络工具netcat


[mw_shl_code=text,true]下载工具
yum install -y nc

启动阻塞式窗口, 可以在里面输入相关字符数据( 图2 )
nc -lk 9999[/mw_shl_code]

图1

20191209154058131.png

图2

20191209155131915.png

图3

20191209155253452.png

上传Flink程序到集群中运行

将上个实例项目打包, 放到集群中运行

1.打包程序

先clean一下,目的是清除原有的jar(图1)

然后package进行打包, 然后将含有相关jar包的jar上传到虚拟机(jar)


20191209174229347.png

图2

20191209174332706.png

2.集群的运行jar 需要jdk 1.8.0_191或者更高,本人的是 1.8.0_11版本,因此在上传jar到Flink集群会报错

flink job 运行时报错 AskTimeoutException, 解决方案是升级JDK

[mw_shl_code=text,true]# 删除原来的版本, 通过下面的命令找到对应的jdk存放目录, 删除
which java
which javac

# 上传 高版本jdk,解压, 重新配置环境变量以后重新加载一下 profile文件即可[/mw_shl_code]

3.上传任务到flink集群

[mw_shl_code=shell,true]# flink run -c 全限定类名 jar所在目录 --node 需要监听的节点 --port 需要监听的端口
flink run -c ah.szxy.flink4.FlinkReadSocketData ~/chy/software/MyFlinkCode-1.0-SNAPSHOT-jar-with-dependencies.jar --node node4 --port 9999

[/mw_shl_code]

4.node4通过netcat发送消息数据, 然后通过flink的集群查看(图1,图2)

图1

20191209190449674.png

图2

20191209190606468.png

三、Flink窗口操作

前提: 需要在node4中开启netcat, 运行程序后,在五秒内输入随机数据, 查看控制台打印结果


[mw_shl_code=shell,true]nc -lk 9999[/mw_shl_code]

相关代码

[mw_shl_code=java,true]/**
* Flink窗口操作
*
* @author TimePause
* @create 2019-12-09 19:44
*/
public class FlinkWindowOperator {
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建数据源
        DataStreamSource<String>dataStreamSource=env.socketTextStream("node4",9999);
        // 切分单词
        SingleOutputStreamOperator<String> flatmap=dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                    collector.collect(s1);
                }
            }
        });
        // 规定输出格式
        SingleOutputStreamOperator<Tuple2<String,Integer>> map=flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
        });
        // 统计并输出结果
        KeyedStream<Tuple2<String, Integer>, Tuple> keyby = map.keyBy(0);
        // 一个参数: 每隔n个时间单位计算数目, 两个参数, 每隔后一个时间单位计算前一个时间单位的数据
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyby.timeWindow(Time.seconds(15), Time.seconds(5));
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = timeWindow.sum(1);
        sum.print();
        env.execute("FlinkWindowsOperator");

    }
}

[/mw_shl_code]

结果

20191209201946631.png

20191209201924735.png

四、整合Kafka

启动kafka集群后

通过Flink代码自动生成topic-ReadKafkaTopic,我们将这个topic作为生产者


[mw_shl_code=shell,true]kafka-console-producer.sh --broker-list node2:9092,node3:9092,node4:9092 --topic ReadKafkaTopic
[/mw_shl_code]

然后监听另一个topic-ResultKafkaTopic(Flink代码作用是将ReadKafkaTopic中数据传到ResultKafkaTopic )

[mw_shl_code=shell,true]kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic ResultKafkaTopic[/mw_shl_code]

Flink整合Kafka代码

[mw_shl_code=java,true]package ah.szxy.flink6.kafka;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
* 使用Flink读取Kafka中的数据
*
* @author TimePause
* @create 2019-12-09 20:27
*/
public class FlinkReadKafka {
    public static void main(String[] args) throws Exception {
        //创建流处理环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node2:9092,node3:9092,node4:9092");
        properties.setProperty("group.id", "myflink.group");

        //在kafka中创建topic
        FlinkKafkaConsumer011<String> readKafkaTopic = new FlinkKafkaConsumer011<String>("ReadKafkaTopic", new SimpleStringSchema(), properties);
        //创建数据源
        DataStreamSource<String> dataStreamSource = env.addSource(readKafkaTopic);
        //分词
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                    collector.collect(new Tuple2<>(s1,1));
                }

            }
        });
        //分组, 通过0号位置单词进行分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = flatMap.keyBy(0);
        //统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
        //
        FlinkKafkaProducer011<String> resultKafkaTopic = new FlinkKafkaProducer011<String>("ResultKafkaTopic", new SimpleStringSchema(), properties);

        sum.map(new MapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) throws Exception {
                return value.f0 + "#" + value.f1;
            }
        }).addSink(resultKafkaTopic);

        env.execute("readKafka");
    }
}[/mw_shl_code]



最新经典文章,欢迎关注公众号



---------------------

作者:时间静止不是简史
来源:csdn
原文:Flink——运行在数据流上的有状态计算框架和处理引擎

已有(2)人评论

跳转到指定楼层
琅琊榜尾 发表于 2019-12-30 14:20:29
界面好骚啊,天天盯着人家屁股的么?哈哈哈,感谢分享
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条