Flink如何通过反射强制动态加载udf
本帖最后由 hyj 于 2020-12-31 08:26 编辑问题导读
1.如何实现flink的动态加载jar?
2.flink -C 参数的作用是什么?
3.源码pipeline.classpaths有什么发现?
背景
项目中想要把flink做到平台化,只需要编辑sql便能把任务跑起来,开发过程中遇到一个问题,就是如何能够自动的加载自定义的函数包,因为项目中已经把main打包成一个通用的jar, 使用时只需要把sql信息用参数形式传入就可以. 但是如果sql中需要使用到udf,那么就需要实现flink的动态加载jar
先说结论
在通用的jar main中通过反射使用类加载器,加载对应的jar包
通过反射设置StreamExecutionEnvironment中的configuration的confData中的pipeline.classpaths
具体代码例子如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
String path = "https://...est/template-core-0.0.1-shaded.jar";
loadJar(new URL(path));
Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map<String,Object> temp = (Map<String,Object>)confData.get(o);
List<String> jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
tableEnvironment.executeSql("CREATE FUNCTION ReturnSelf2 AS 'flinksql.function.udf.ReturnSelf2'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
" f_random_str STRING" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select ReturnSelf2(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}再说解决过程
idea中loadjar的尝试
首先是loadJar方法是需要,在idea中,只是用了loadJar是可以加载外部udf包的,正常运行,但是当我打包放到集群上,则会提是找不到对应的class,猜测是因为idea开发中,启动的是一个minicluster, 所以我在idea中启动的时候, main调用loadJar方法, 把udf加载到minicluster JVM进程中, JM跟TM都是在同个JVM进程中运行的, 所以是可以正常启动. 但是如果是在集群环境, 即使是local standalone , 执行jps命令可以发现是JM跟TM是不同的进程的
所以是main即使在JM中 加载了,但是TM进程中没有加载
-C 参数的发现
后面发现flink命令行启动的时候可以添加-C参数, 可以指定其他的jar文件
执行命令
flink --help
所以可以使用如下命令, 我的jar是上传到阿里云的oss上使用的
flink run -C "https://oss-cn-hangzhou.aliyuncs.com/test/cxytest/tanwan-function-0.1.jar..."-c cn.xuying.flink.stream.SimpleTest /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar但是我们项目平台是调用了集群的restapi来管理job的,而flink的restapi对应的接口是没有提供-C的之类的参数的, 也在flink中文邮件列表提了对应的问题,不过没有一个很好的答案
含泪跑源码pipeline.classpaths的发现
中途大佬的参与,发现了flink 客户端跑flinkjob的一些细节,
checkout源码,install,然后可以修改测试代码
找到测试的job打断点
运行test方法,可以发现env的里面的属性中多了-C参数url
然后我们不使用CliFrontendRunTest, 而是直接的执行那个TestJob的main函数, 断点可以发现没有了-C的配置
所以推测, flink 的client主要是解析你的命令行, 加一些参数配置, 然后执行你编写的flink main函数中的代码, 所以我们可以在我们自己的job代码中通过反射强制加入我们自己需要-C的配置 , 如文章开头的代码例子所示
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg
原文链接
https://blog.csdn.net/u010034713/article/details/109114562
JM TM 是什么意思?
页:
[1]