链接: https://pan.baidu.com/s/1mYzfF1ptNJzPAMqzMqCRaQ 提取码: jvwe
flink sql client
基于flink-1.9.1,支持flink在不同集群模式下的sql任务提交,区别于flink官方提供的sql client模块,官方提供的是需要单独执行的服务,这里提供的是一个sdk,可以无缝接入自己的流计算平台.
功能支持
quick start
standalone模式
- 启动flink standalone集群,rest端口为8081
- 运行com.github.mxb.flink.sql.cluster.StandaloneClusterTest中的kafkaToMysqlTest测试用例
Test用例
- com.github.mxb.flink.sql.cluster.StandaloneClusterTest.kafkaToMysqlTest测试用例
○ 完整SQL
○ 执行StandaloneClusterTest中的kafkaToMysqlTest测试用例
○ flink-standalone集群job信息
flink on yarn模式
- 启动flink on yarn集群,获得applicationId
- 运行com.github.mxb.flink.sql.cluster.YarnClusterClientTest中的kafkaToMysqlTest测试用例
本地Minicluster模式
- 运行com.github.mxb.flink.sql.local.LocalClusterClientTest里面的测试用例(可用于本地调试分布式任务)
sdk quick start
- <dependency>
- <groupId>com.github.mxb</groupId>
- <artifactId>flink-sql-client</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
复制代码
- 实例化取出clusterClient并执行SQL job
- public class Test{
- public ClusterClient getClusterClient(){
- FlinkResourceInfo standaloneResourceInfo = new FlinkResourceInfo();
- standaloneResourceInfo.setResourceType(ResourceType.STANDALONE);
-
- ClusterDescriptor clusterDescriptor = ClusterDescriptorFactory.createClusterDescriptor(standaloneResourceInfo);
- StandAloneClusterId standAloneClusterId = new StandAloneClusterId("127.0.0.1", 8081);
- ClusterClient clusterClient = clusterDescriptor.retrieve(standAloneClusterId);
- }
-
- public String executeSqlJob(String sql, List<File> dependencyJars){
- ClusterClient clusterClient = getClusterClient();
- JobRunConfig jobRunConfig = JobRunConfig.builder()
- .jobName(getTestJobName())
- .defaultParallelism(1)
- .sourceParallelism(1)
- .checkpointInterval(60_000L).build();
- ProgramTargetDescriptor programTargetDescriptor = clusterClient.executeSqlJob(jobRunConfig, dependencyJars, sql);
- return programTargetDescriptor.getJobId();
- }
- }
复制代码
问题处理
执行groupByTest用例时会出现InvalidClassException异常,local class incompatible serialVersionUID;解决:在flink-parent中修改对应的类并重新引入
- @SerialVersionUID(value = 1)
- abstract class ProcessFunctionWithCleanupState
复制代码
文章作者:molsionmo
文章来源:https://github.com/molsionmo/flink-sql-client
|