hyj 发表于 2020-12-31 08:18:39

Flink如何通过反射强制动态加载udf

本帖最后由 hyj 于 2020-12-31 08:26 编辑

问题导读

1.如何实现flink的动态加载jar?
2.flink -C 参数的作用是什么?
3.源码pipeline.classpaths有什么发现?


背景
项目中想要把flink做到平台化,只需要编辑sql便能把任务跑起来,开发过程中遇到一个问题,就是如何能够自动的加载自定义的函数包,因为项目中已经把main打包成一个通用的jar, 使用时只需要把sql信息用参数形式传入就可以. 但是如果sql中需要使用到udf,那么就需要实现flink的动态加载jar

先说结论
在通用的jar main中通过反射使用类加载器,加载对应的jar包
通过反射设置StreamExecutionEnvironment中的configuration的confData中的pipeline.classpaths

具体代码例子如下

public static void main(final String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
      StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);


    String path = "https://...est/template-core-0.0.1-shaded.jar";
      loadJar(new URL(path));

      Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
      configuration.setAccessible(true);
      Configuration o = (Configuration)configuration.get(env);

      Field confData = Configuration.class.getDeclaredField("confData");
      confData.setAccessible(true);
      Map<String,Object> temp = (Map<String,Object>)confData.get(o);
      List<String> jarList = new ArrayList<>();
      jarList.add(path);
      temp.put("pipeline.classpaths",jarList);

      tableEnvironment.executeSql("CREATE FUNCTION ReturnSelf2 AS 'flinksql.function.udf.ReturnSelf2'");
      tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
                        " f_sequence INT,\n" +
                        " f_random INT,\n" +
                        " f_random_str STRING,\n" +
                        " ts AS localtimestamp,\n" +
                        " WATERMARK FOR ts AS ts\n" +
                        ") WITH (\n" +
                        " 'connector' = 'datagen',\n" +
                        " 'rows-per-second'='5',\n" +
                        "\n" +
                        " 'fields.f_sequence.kind'='sequence',\n" +
                        " 'fields.f_sequence.start'='1',\n" +
                        " 'fields.f_sequence.end'='1000',\n" +
                        "\n" +
                        " 'fields.f_random.min'='1',\n" +
                        " 'fields.f_random.max'='1000',\n" +
                        "\n" +
                        " 'fields.f_random_str.length'='10'\n" +
                        ")");
      tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
                        "    f_random_str STRING" +
                        ") WITH (\n" +
                        "    'connector' = 'print'\n" +
                        ")");
      tableEnvironment.executeSql(
                        "insert into sinktable " +
                              "select ReturnSelf2(f_random_str) " +
                              "from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
      //从URLClassLoader类加载器中获取类的addURL方法
      Method method = null;
      try {
                method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
      } catch (NoSuchMethodException | SecurityException e1) {
                e1.printStackTrace();
      }
      // 获取方法的访问权限
      boolean accessible = method.isAccessible();
      try {
                //修改访问权限为可写
                if (accessible == false) {
                        method.setAccessible(true);
                }
                // 获取系统类加载器
                URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
                //jar路径加入到系统url路径里
                method.invoke(classLoader, jarUrl);
      } catch (Exception e) {
                e.printStackTrace();
      } finally {
                method.setAccessible(accessible);
      }
}再说解决过程
idea中loadjar的尝试
首先是loadJar方法是需要,在idea中,只是用了loadJar是可以加载外部udf包的,正常运行,但是当我打包放到集群上,则会提是找不到对应的class,猜测是因为idea开发中,启动的是一个minicluster, 所以我在idea中启动的时候, main调用loadJar方法, 把udf加载到minicluster JVM进程中, JM跟TM都是在同个JVM进程中运行的, 所以是可以正常启动. 但是如果是在集群环境, 即使是local standalone , 执行jps命令可以发现是JM跟TM是不同的进程的




所以是main即使在JM中 加载了,但是TM进程中没有加载

-C 参数的发现
后面发现flink命令行启动的时候可以添加-C参数, 可以指定其他的jar文件

执行命令

flink --help


所以可以使用如下命令, 我的jar是上传到阿里云的oss上使用的
flink run -C "https://oss-cn-hangzhou.aliyuncs.com/test/cxytest/tanwan-function-0.1.jar..."-c cn.xuying.flink.stream.SimpleTest /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar但是我们项目平台是调用了集群的restapi来管理job的,而flink的restapi对应的接口是没有提供-C的之类的参数的, 也在flink中文邮件列表提了对应的问题,不过没有一个很好的答案


含泪跑源码pipeline.classpaths的发现
中途大佬的参与,发现了flink 客户端跑flinkjob的一些细节,

checkout源码,install,然后可以修改测试代码






找到测试的job打断点




运行test方法,可以发现env的里面的属性中多了-C参数url




然后我们不使用CliFrontendRunTest, 而是直接的执行那个TestJob的main函数, 断点可以发现没有了-C的配置




所以推测, flink 的client主要是解析你的命令行, 加一些参数配置, 然后执行你编写的flink main函数中的代码, 所以我们可以在我们自己的job代码中通过反射强制加入我们自己需要-C的配置 , 如文章开头的代码例子所示


最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg

原文链接
https://blog.csdn.net/u010034713/article/details/109114562

丈二青年~zZ 发表于 2021-1-4 08:50:08

JM TM 是什么意思?
页: [1]
查看完整版本: Flink如何通过反射强制动态加载udf