hanyunsong 发表于 2020-9-10 16:45:49

Flink sql client如何实现远程提交Yarn




链接: https://pan.baidu.com/s/1mYzfF1ptNJzPAMqzMqCRaQ 提取码: jvwe



flink sql client
基于flink-1.9.1,支持flink在不同集群模式下的sql任务提交,区别于flink官方提供的sql client模块,官方提供的是需要单独执行的服务,这里提供的是一个sdk,可以无缝接入自己的流计算平台.

功能支持



quick start

standalone模式


[*]启动flink standalone集群,rest端口为8081
[*]运行com.github.mxb.flink.sql.cluster.StandaloneClusterTest中的kafkaToMysqlTest测试用例

Test用例


[*]com.github.mxb.flink.sql.cluster.StandaloneClusterTest.kafkaToMysqlTest测试用例

○ 完整SQL



○ 执行StandaloneClusterTest中的kafkaToMysqlTest测试用例



○ flink-standalone集群job信息



flink on yarn模式


[*]启动flink on yarn集群,获得applicationId
[*]运行com.github.mxb.flink.sql.cluster.YarnClusterClientTest中的kafkaToMysqlTest测试用例

本地Minicluster模式


[*]运行com.github.mxb.flink.sql.local.LocalClusterClientTest里面的测试用例(可用于本地调试分布式任务)

sdk quick start


[*]项目引入flink-sql-client
<dependency>
    <groupId>com.github.mxb</groupId>
    <artifactId>flink-sql-client</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

[*]实例化取出clusterClient并执行SQL job
public class Test{
    public ClusterClient getClusterClient(){
      FlinkResourceInfo standaloneResourceInfo = new FlinkResourceInfo();
      standaloneResourceInfo.setResourceType(ResourceType.STANDALONE);

      ClusterDescriptor clusterDescriptor = ClusterDescriptorFactory.createClusterDescriptor(standaloneResourceInfo);
      StandAloneClusterId standAloneClusterId = new StandAloneClusterId("127.0.0.1", 8081);
      ClusterClient clusterClient = clusterDescriptor.retrieve(standAloneClusterId);
    }

    public String executeSqlJob(String sql, List<File> dependencyJars){
      ClusterClient clusterClient = getClusterClient();
      JobRunConfig jobRunConfig = JobRunConfig.builder()
                        .jobName(getTestJobName())
                        .defaultParallelism(1)
                        .sourceParallelism(1)
                        .checkpointInterval(60_000L).build();
      ProgramTargetDescriptor programTargetDescriptor = clusterClient.executeSqlJob(jobRunConfig, dependencyJars, sql);
      return programTargetDescriptor.getJobId();
    }
}
问题处理

执行groupByTest用例时会出现InvalidClassException异常,local class incompatible serialVersionUID;解决:在flink-parent中修改对应的类并重新引入
@SerialVersionUID(value = 1)
abstract class ProcessFunctionWithCleanupState

文章作者:molsionmo
文章来源:https://github.com/molsionmo/flink-sql-client





页: [1]
查看完整版本: Flink sql client如何实现远程提交Yarn