本帖最后由 hyj 于 2020-12-31 08:26 编辑
问题导读
1.如何实现flink的动态加载jar?
2.flink -C 参数的作用是什么?
3.源码pipeline.classpaths有什么发现?
背景
项目中想要把flink做到平台化,只需要编辑sql便能把任务跑起来,开发过程中遇到一个问题,就是如何能够自动的加载自定义的函数包,因为项目中已经把main打包成一个通用的jar, 使用时只需要把sql信息用参数形式传入就可以. 但是如果sql中需要使用到udf,那么就需要实现flink的动态加载jar
先说结论
在通用的jar main中通过反射使用类加载器,加载对应的jar包
通过反射设置StreamExecutionEnvironment中的configuration的confData中的pipeline.classpaths
具体代码例子如下
- public static void main(final String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
- StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
-
-
- String path = "https://...est/template-core-0.0.1-shaded.jar";
- loadJar(new URL(path));
-
- Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
- configuration.setAccessible(true);
- Configuration o = (Configuration)configuration.get(env);
-
- Field confData = Configuration.class.getDeclaredField("confData");
- confData.setAccessible(true);
- Map<String,Object> temp = (Map<String,Object>)confData.get(o);
- List<String> jarList = new ArrayList<>();
- jarList.add(path);
- temp.put("pipeline.classpaths",jarList);
-
- tableEnvironment.executeSql("CREATE FUNCTION ReturnSelf2 AS 'flinksql.function.udf.ReturnSelf2'");
- tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
- " f_sequence INT,\n" +
- " f_random INT,\n" +
- " f_random_str STRING,\n" +
- " ts AS localtimestamp,\n" +
- " WATERMARK FOR ts AS ts\n" +
- ") WITH (\n" +
- " 'connector' = 'datagen',\n" +
- " 'rows-per-second'='5',\n" +
- "\n" +
- " 'fields.f_sequence.kind'='sequence',\n" +
- " 'fields.f_sequence.start'='1',\n" +
- " 'fields.f_sequence.end'='1000',\n" +
- "\n" +
- " 'fields.f_random.min'='1',\n" +
- " 'fields.f_random.max'='1000',\n" +
- "\n" +
- " 'fields.f_random_str.length'='10'\n" +
- ")");
- tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
- " f_random_str STRING" +
- ") WITH (\n" +
- " 'connector' = 'print'\n" +
- ")");
- tableEnvironment.executeSql(
- "insert into sinktable " +
- "select ReturnSelf2(f_random_str) " +
- "from sourceTable");
- }
- //动态加载Jar
- public static void loadJar(URL jarUrl) {
- //从URLClassLoader类加载器中获取类的addURL方法
- Method method = null;
- try {
- method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
- } catch (NoSuchMethodException | SecurityException e1) {
- e1.printStackTrace();
- }
- // 获取方法的访问权限
- boolean accessible = method.isAccessible();
- try {
- //修改访问权限为可写
- if (accessible == false) {
- method.setAccessible(true);
- }
- // 获取系统类加载器
- URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
- //jar路径加入到系统url路径里
- method.invoke(classLoader, jarUrl);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- method.setAccessible(accessible);
- }
- }
复制代码
再说解决过程
idea中loadjar的尝试
首先是loadJar方法是需要,在idea中,只是用了loadJar是可以加载外部udf包的,正常运行,但是当我打包放到集群上,则会提是找不到对应的class,猜测是因为idea开发中,启动的是一个minicluster, 所以我在idea中启动的时候, main调用loadJar方法, 把udf加载到minicluster JVM进程中, JM跟TM都是在同个JVM进程中运行的, 所以是可以正常启动. 但是如果是在集群环境, 即使是local standalone , 执行jps命令可以发现是JM跟TM是不同的进程的
所以是main即使在JM中 加载了,但是TM进程中没有加载
-C 参数的发现
后面发现flink命令行启动的时候可以添加-C参数, 可以指定其他的jar文件
执行命令
复制代码
所以可以使用如下命令, 我的jar是上传到阿里云的oss上使用的
- flink run -C "https://oss-cn-hangzhou.aliyuncs.com/test/cxytest/tanwan-function-0.1.jar..." -c cn.xuying.flink.stream.SimpleTest /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
复制代码
但是我们项目平台是调用了集群的restapi来管理job的,而flink的restapi对应的接口是没有提供-C的之类的参数的, 也在flink中文邮件列表提了对应的问题,不过没有一个很好的答案
含泪跑源码pipeline.classpaths的发现
中途大佬的参与,发现了flink 客户端跑flinkjob的一些细节,
checkout源码,install,然后可以修改测试代码
找到测试的job打断点
运行test方法,可以发现env的里面的属性中多了-C参数url
然后我们不使用CliFrontendRunTest, 而是直接的执行那个TestJob的main函数, 断点可以发现没有了-C的配置
所以推测, flink 的client主要是解析你的命令行, 加一些参数配置, 然后执行你编写的flink main函数中的代码, 所以我们可以在我们自己的job代码中通过反射强制加入我们自己需要-C的配置 , 如文章开头的代码例子所示
最新经典文章,欢迎关注公众号
原文链接
https://blog.csdn.net/u010034713/article/details/109114562
|