分享

彻底明白Flink系统学习19:【Flink1.7】开发之本地与集群执行详解【附例子】

pig2 2019-1-7 10:29:43 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 19771
问题导读

1.本地执行分为那两种?
2.本地执行在程序方面有什么区别?
3.集群执行,如何连接master?
4.本地执行和集群执行代码方面有什么区别?


上一篇
彻底明白Flink系统学习18:【Flink1.7】如何在Flink中使用Hadoop MapReduce代码
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26568

在单个Java虚拟机中,Flink可以在一台机器上运行。这允许用户在本地测试和调试Flink程序。本文概述了本地执行机制。

本地环境和执行程序允许在本地Java虚拟机中运行Flink程序,或在任何JVM中作为现有程序的一部分运行。只需点击IDE的“运行”按钮,即可在本地启动大多数demoi。

1.本地执行
Flink支持两种不同的本地执行:
1.LocalExecutionEnvironment启动完整的Flink运行时,包括JobManager和TaskManager。这些包括内存管理和在群集模式下执行的所有内部算法。
2.CollectionEnvironment在Java集合上执行Flink程序。此模式不会启动完整的Flink运行时,因此执行的开销和轻量级都非常低。例如,DataSet.map() - 将通过将map()函数应用于Java列表中的所有元素来执行转换。

调试
如果在本地运行Flink程序,也可以像调试任何其他Java程序一样调试程序。 可以使用System.out.println()写出一些内部变量,也可以使用调试器。 可以在map(),reduce()和所有其它方法中设置断点。

Maven依赖
如果在Maven项目中开发程序,则必须使用此依赖项添加flink-clients模块:
[mw_shl_code=scala,true]<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.11</artifactId>
  <version>1.7.0</version>
</dependency>[/mw_shl_code]

本地环境
LocalEnvironment是Flink程序本地执行的句柄。 使用它在本地JVM中运行程序 - 独立或嵌入其它程序。

通过ExecutionEnvironment.createLocalEnvironment()方法实例化本地环境。 默认情况下,根据cpu的core,使用尽可能多的本地线程来执行。 也可以指定所需的并行度。 可以使用enableLogging()/ disableLogging()将本地环境配置为是否将日志输出到控制台。

在大多数情况下,推荐调用ExecutionEnvironment.getExecutionEnvironment()方法。 当程序在本地启动时(在命令行界面之外),该方法返回LocalEnvironment。在集群执行环境下,在命令行界面调用程序时返回预先配置的环境信息。

[mw_shl_code=scala,true]public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

    DataSet<String> data = env.readTextFile("file:///path/to/file");

    data
        .filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("file:///path/to/result");

    JobExecutionResult res = env.execute();
}[/mw_shl_code]
执行完成后返回的jobExecutionResult对象包含程序runtime 和累加器结果。

localenvironment还允许将自定义配置值传递给flink。

[mw_shl_code=scala,true]Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);[/mw_shl_code]
注意本地执行环境不启动任何Web前端来监视执行。

Collection环境
使用CollectionEnvironment在Java Collections上执行是一种执行Flink程序的低开销方法。 此模式的典型用例是自动测试,调试和代码重用。

对于更具交互性的情况,用户也可以使用为批处理实现的算法。 可以在Java Application Server中使用更改的Flink程序变体来处理传入的请求。

基于集合的执行的框架
[mw_shl_code=scala,true]public static void main(String[] args) throws Exception {
    // initialize a new Collection-based execution environment
    final ExecutionEnvironment env = new CollectionEnvironment();

    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);

    /* Data Set transformations ... */

    // retrieve the resulting Tuple2 elements into a ArrayList.
    Collection<...> result = new ArrayList<...>();
    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));

    // kick off execution.
    env.execute();

    // Do some work with the resulting ArrayList (=Collection).
    for(... t : result) {
        System.err.println("Result = "+t);
    }
}[/mw_shl_code]
flink-examples-batch模块包含一个名为CollectionExecutionExample的完整示例。

请注意,基于集合的Flink程序的执行仅适用于适合JVM堆的小数据。 集合上的执行不是多线程的,只使用一个线程。


2.群集执行
Flink程序可以在许多机器的集群上分布式运行。 将程序发送到集群以执行有两种方法:

命令行
命令行允许将打包程序(JAR)提交到群集(或单机设置)。

远程环境
远程环境允许直接在集群上执行Flink 程序。 远程环境指向要在其上执行程序的群集。
[mw_shl_code=scala,true]<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.11</artifactId>
  <version>1.7.0</version>
</dependency>[/mw_shl_code]

例子:
以下说明RemoteEnvironment的使用:
[mw_shl_code=scala,true]public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment
        .createRemoteEnvironment("flink-master", 8081, "/home/user/udfs.jar");

    DataSet<String> data = env.readTextFile("hdfs://path/to/file");

    data
        .filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("hdfs://path/to/result");

    env.execute();
}[/mw_shl_code]
注意:该程序包含自定义用户代码,因此需要一个附加代码类的JAR文件。 远程环境的构造函数将路径传递给JAR文件。

最新经典文章,欢迎关注公众号

本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条