分享

Flink如何通过反射强制动态加载udf

本帖最后由 hyj 于 2020-12-31 08:26 编辑

问题导读

1.如何实现flink的动态加载jar?
2.flink -C 参数的作用是什么?
3.源码pipeline.classpaths有什么发现?


背景
项目中想要把flink做到平台化,只需要编辑sql便能把任务跑起来,开发过程中遇到一个问题,就是如何能够自动的加载自定义的函数包,因为项目中已经把main打包成一个通用的jar, 使用时只需要把sql信息用参数形式传入就可以. 但是如果sql中需要使用到udf,那么就需要实现flink的动态加载jar

先说结论
在通用的jar main中通过反射使用类加载器,加载对应的jar包
通过反射设置StreamExecutionEnvironment中的configuration的confData中的pipeline.classpaths

具体代码例子如下

  1. public static void main(final String[] args) throws Exception {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  4.         StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
  5.     String path = "https://...est/template-core-0.0.1-shaded.jar";
  6.         loadJar(new URL(path));
  7.         Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
  8.         configuration.setAccessible(true);
  9.         Configuration o = (Configuration)configuration.get(env);
  10.         Field confData = Configuration.class.getDeclaredField("confData");
  11.         confData.setAccessible(true);
  12.         Map<String,Object> temp = (Map<String,Object>)confData.get(o);
  13.         List<String> jarList = new ArrayList<>();
  14.         jarList.add(path);
  15.         temp.put("pipeline.classpaths",jarList);
  16.         tableEnvironment.executeSql("CREATE FUNCTION ReturnSelf2 AS 'flinksql.function.udf.ReturnSelf2'");
  17.         tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
  18.                         " f_sequence INT,\n" +
  19.                         " f_random INT,\n" +
  20.                         " f_random_str STRING,\n" +
  21.                         " ts AS localtimestamp,\n" +
  22.                         " WATERMARK FOR ts AS ts\n" +
  23.                         ") WITH (\n" +
  24.                         " 'connector' = 'datagen',\n" +
  25.                         " 'rows-per-second'='5',\n" +
  26.                         "\n" +
  27.                         " 'fields.f_sequence.kind'='sequence',\n" +
  28.                         " 'fields.f_sequence.start'='1',\n" +
  29.                         " 'fields.f_sequence.end'='1000',\n" +
  30.                         "\n" +
  31.                         " 'fields.f_random.min'='1',\n" +
  32.                         " 'fields.f_random.max'='1000',\n" +
  33.                         "\n" +
  34.                         " 'fields.f_random_str.length'='10'\n" +
  35.                         ")");
  36.         tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
  37.                         "    f_random_str STRING" +
  38.                         ") WITH (\n" +
  39.                         "    'connector' = 'print'\n" +
  40.                         ")");
  41.         tableEnvironment.executeSql(
  42.                         "insert into sinktable " +
  43.                                 "select ReturnSelf2(f_random_str) " +
  44.                                 "from sourceTable");
  45. }
  46. //动态加载Jar
  47. public static void loadJar(URL jarUrl) {
  48.         //从URLClassLoader类加载器中获取类的addURL方法
  49.         Method method = null;
  50.         try {
  51.                 method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
  52.         } catch (NoSuchMethodException | SecurityException e1) {
  53.                 e1.printStackTrace();
  54.         }
  55.         // 获取方法的访问权限
  56.         boolean accessible = method.isAccessible();
  57.         try {
  58.                 //修改访问权限为可写
  59.                 if (accessible == false) {
  60.                         method.setAccessible(true);
  61.                 }
  62.                 // 获取系统类加载器
  63.                 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
  64.                 //jar路径加入到系统url路径里
  65.                 method.invoke(classLoader, jarUrl);
  66.         } catch (Exception e) {
  67.                 e.printStackTrace();
  68.         } finally {
  69.                 method.setAccessible(accessible);
  70.         }
  71. }
复制代码
再说解决过程
idea中loadjar的尝试
首先是loadJar方法是需要,在idea中,只是用了loadJar是可以加载外部udf包的,正常运行,但是当我打包放到集群上,则会提是找不到对应的class,猜测是因为idea开发中,启动的是一个minicluster, 所以我在idea中启动的时候, main调用loadJar方法, 把udf加载到minicluster JVM进程中, JM跟TM都是在同个JVM进程中运行的, 所以是可以正常启动. 但是如果是在集群环境, 即使是local standalone , 执行jps命令可以发现是JM跟TM是不同的进程的


1.png

所以是main即使在JM中 加载了,但是TM进程中没有加载

-C 参数的发现
后面发现flink命令行启动的时候可以添加-C参数, 可以指定其他的jar文件

执行命令

  1. flink --help
复制代码
1.png

所以可以使用如下命令, 我的jar是上传到阿里云的oss上使用的
  1. flink run -C "https://oss-cn-hangzhou.aliyuncs.com/test/cxytest/tanwan-function-0.1.jar..."  -c cn.xuying.flink.stream.SimpleTest /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
复制代码
但是我们项目平台是调用了集群的restapi来管理job的,而flink的restapi对应的接口是没有提供-C的之类的参数的, 也在flink中文邮件列表提了对应的问题,不过没有一个很好的答案


含泪跑源码pipeline.classpaths的发现
中途大佬的参与,发现了flink 客户端跑flinkjob的一些细节,

checkout源码,install,然后可以修改测试代码


1.png

1.png

找到测试的job打断点


1.png

运行test方法,可以发现env的里面的属性中多了-C参数url


1.png

然后我们不使用CliFrontendRunTest, 而是直接的执行那个TestJob的main函数, 断点可以发现没有了-C的配置


1.png

所以推测, flink 的client主要是解析你的命令行, 加一些参数配置, 然后执行你编写的flink main函数中的代码, 所以我们可以在我们自己的job代码中通过反射强制加入我们自己需要-C的配置 , 如文章开头的代码例子所示


最新经典文章,欢迎关注公众号


原文链接
https://blog.csdn.net/u010034713/article/details/109114562

已有(2)人评论

跳转到指定楼层
丈二青年~zZ 发表于 2021-1-4 08:50:08
JM TM 是什么意思?

点评

JobManager和TaskManager的简写  发表于 2021-1-5 13:59
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条