阿飞 发表于 2021-2-19 23:18:21

重磅!Flink源码解析环境准备及提交流程之环境准备


问题导读

1.程序的起点包含哪些内容?
2.程序入口类是哪个?
3.选择创建哪种类型的客户端?
4.客户端有哪种类型?



Flink Yarn-per-job模式提交流程如图所示:




1.程序起点



1. flink\bin\flink

=> exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}"-classpath "`manglePathList"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

2. flink/bin/config.sh(相关环境配置都在这里)

       => JAVA_RUN=java

       => JVM_ARGS=""      => # Use conf/flink-conf.yaml

=>INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"

3. 执行java -cp 就会开启JVM虚拟机,在虚拟机上开启CliFrontend进程,然后开始执行main方法

说明:java -cp和 -classpath一样,是指定类运行所依赖其他类的路径。

       java -cp =》开启JVM虚拟机 =》开启Process(CliFrontend)=》程序入口CliFrontend.main

4. Flink提交任务的入口类为CliFrontend。找到这个类的main方法:

在IDEA中全局查找(ctrl + n):org.apache.flink.client.cli.CliFrontend,找到CliFrontend类,并找到main方法



/**
* Submits the job based on the arguments.(根据参数提交作业)
*/
public static void main(final String[] args) {
       ... ...
       final CliFrontend cli = new CliFrontend(
                            configuration,
                            customCommandLines);
       ... ...
}


2.程序入口

CliFrontend.java

public static void main(final String[] args) {
         ... ...
         final CliFrontend cli = new CliFrontend(
                                 configuration,
                                 customCommandLines);
         ... ...
      int retCode =SecurityUtils.getInstalledContext()
                            .runSecured(() -> cli.parseParameters(args));
         ... ...
}




public int parseParameters(String[] args) {
         ... ...
         // get action
         String action = args;

         // remove action from parameters
         final String[] params =Arrays.copyOfRange(args, 1, args.length);
      
                  // do action
                  switch (action) {
                        case ACTION_RUN:
                                 run(params);
                                 return 0;
                        case ACTION_LIST:
                                 list(params);
                                 return 0;
                        case ACTION_INFO:
                                 info(params);
                                 return 0;
                        case ACTION_CANCEL:
                                 cancel(params);
                                 return 0;
                        case ACTION_STOP:
                                 stop(params);
                                 return 0;
                        case ACTION_SAVEPOINT:
                                 savepoint(params);
                                 return 0;
                        ……

                  }
         ... ...
}

3.解析输入参数

CliFrontend.java
protected void run(String[] args) throws Exception {
       ... ...
       //获取默认的运行参数
       final Options commandOptions =CliFrontendParser.getRunCommandOptions();
         // 解析参数,返回commandLine
       final CommandLine commandLine = getCommandLine(commandOptions, args, true);
       ... ...
}



public CommandLine getCommandLine(final OptionscommandOptions, final String[] args, final boolean stopAtNonOptions) throwsCliArgsException {
         final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions,customCommandLineOptions);
         return CliFrontendParser.parse(commandLineOptions, args,stopAtNonOptions);
}

DefaultParser.java
public class CliFrontendParser {
         // 选项列表
         static final Option HELP_OPTION = new Option("h", "help", false,
                        "Show the helpmessage for the CLI Frontend or the action.");

         static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");

         static final Option CLASS_OPTION = new Option("c", "class", true,
                        "Class with theprogram entry point (\"main()\" method). Only needed if the " +
                        "JAR file doesnot specify the class in its manifest.");
... ...
}


DefaultParser.java

public CommandLine parse(Options options, String[] arguments,Properties properties, boolean stopAtNonOption)
            throws ParseException
{
    ... ...
    if (arguments != null)
    {
      for (String argument : arguments)
      {
            handleToken(argument);
      }
    }
... ...
}



private void handleToken(String token) throwsParseException
{
    currentToken = token;

    if (skipParsing)
    {
      cmd.addArg(token);
    }
    else if ("--".equals(token))
    {
      skipParsing = true;
    }
    else if (currentOption != null &¤tOption.acceptsArg() && isArgument(token))
    {
      currentOption.addValueForProcessing(Util.stripLeadingAndTrailingQuotes(token));
    }
    else if (token.startsWith("--"))
{
         // 解析--形式的参数
      handleLongOption(token);
    }
    else if (token.startsWith("-") && !"-".equals(token))
{
         // 解析 -形式的参数
      handleShortAndLongOption(token);
    }
    else
    {
      handleUnknownToken(token);
    }

    if (currentOption != null && !currentOption.acceptsArg())
    {
      currentOption = null;
    }
}


private void handleLongOption(String token) throwsParseException
{
    if (token.indexOf('=') == -1)
{
         //解析–L、-L、--l、-l形式的参数(不包含=)
      handleLongOptionWithoutEqual(token);
    }
    else
{
         // 解析--L=V、-L=V、--l=V、-l=V形式的参数(包含=)
      handleLongOptionWithEqual(token);
    }
}


各种情况的解析,逻辑大体相同:去除-或--前缀,校验参数,以其中一个为例

private void handleLongOptionWithoutEqual(String token)throws ParseException
{
         // 校验参数是否合法
    List<String> matchingOpts = options.getMatchingOptions(token);
    if (matchingOpts.isEmpty())
    {
      handleUnknownToken(currentToken);
    }
    else if (matchingOpts.size() > 1)
    {
      throw newAmbiguousOptionException(token, matchingOpts);
    }
    else
{
// 参数添加到执行命令
      handleOption(options.getOption(matchingOpts.get(0)));
    }
}


Options.java:

public List<String> getMatchingOptions(String opt)
{
         // 去除 -或 -- 前缀
    opt = Util.stripLeadingHyphens(opt);
   
    List<String> matchingOpts = newArrayList<String>();

    // for a perfect match return the singleoption only
    if (longOpts.keySet().contains(opt))
    {
      return Collections.singletonList(opt);
    }

    for (String longOpt : longOpts.keySet())
    {
      if (longOpt.startsWith(opt))
      {
            matchingOpts.add(longOpt);
      }
    }
   
    return matchingOpts;
}


DefaultParser.java

private void handleOption(Option option) throwsParseException
{
    // check the previous option beforehandling the next one
    checkRequiredArgs();

    option = (Option) option.clone();

    updateRequiredOptions(option);

    cmd.addOption(option);

    if (option.hasArg())
    {
      currentOption = option;
    }
    else
    {
      currentOption = null;
    }
}


4.选择创建哪种类型的客户端

CliFrontend.java

public static void main(final String[] args) {
         ... ...
         final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
                        configuration,
                        configurationDirectory);
         ... ...
         final CliFrontend cli = new CliFrontend(
                                 configuration,
                                 customCommandLines);
         ... ...
}


这里依次添加了 Yarn和Default(Standalone)两种客户端(后面根据isActive()选择):

public static List<CustomCommandLine> loadCustomCommandLines(Configurationconfiguration, String configurationDirectory) {
         List<CustomCommandLine>customCommandLines = new ArrayList<>();
         customCommandLines.add(newGenericCLI(configuration, configurationDirectory));

         //       Commandline interface of the YARN session, with a special initialization here
         //       toprefix all options with y/yarn.
         final String flinkYarnSessionCLI ="org.apache.flink.yarn.cli.FlinkYarnSessionCli";
         try {
                  customCommandLines.add(
                        loadCustomCommandLine(flinkYarnSessionCLI,
                                 configuration,
                                 configurationDirectory,
                                 "y",
                                 "yarn"));
         } catch (NoClassDefFoundError |Exception e) {
                  final StringerrorYarnSessionCLI ="org.apache.flink.yarn.cli.FallbackYarnSessionCli";
                  try {
                        LOG.info("LoadingFallbackYarnSessionCli");
                        customCommandLines.add(
                                          loadCustomCommandLine(errorYarnSessionCLI,configuration));
                  } catch (Exception exception){
                        LOG.warn("Couldnot load CLI class {}.", flinkYarnSessionCLI, e);
                  }
         }

         //       Tips:DefaultCLI must be added at last, because getActiveCustomCommandLine(..) willget the
         //             active CustomCommandLine in order andDefaultCLI isActive always return true.
         customCommandLines.add(new DefaultCLI(configuration));

         return customCommandLines;
}


在run()里面,进行客户端的选择:

protected void run(String[] args) throws Exception {
       ... ...
       final CustomCommandLine activeCommandLine =
                                 validateAndGetActiveCommandLine(checkNotNull(commandLine));
... ...
}



public CustomCommandLine validateAndGetActiveCommandLine(CommandLinecommandLine) {
... ...
         for (CustomCommandLine cli :customCommandLines) {
         ... ...
         //在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。
                  //对于DefaultCli,它的isActive方法总是返回true。
                  if (cli.isActive(commandLine)) {
                        return cli;
                  }
         }
... ...
}

FlinkYarnSessionCli.java => Yarn客户端isActive的判断逻辑:
public boolean isActive(CommandLine commandLine) {
         final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(),null);
         //是否指定为per-job模式,即指定”-m yarn-cluster”; ID = "yarn-cluster"
         final boolean yarnJobManager =ID.equals(jobManagerOption);
         // 是否存在flink在yarn的appID,即yarn-session模式是否启动
         final boolean hasYarnAppId =commandLine.hasOption(applicationId.getOpt())
                        ||configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
         // executor的名字为"yarn-session" 或 "yarn-per-job"
         final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET))
                         ||YarnJobClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET));
         //
      return hasYarnExecutor || yarnJobManager|| hasYarnAppId || (isYarnPropertiesFileMode(commandLine) &&yarnApplicationIdFromYarnProperties != null);
}


5.获取有效配置
CliFrontend.java
protected void run(String[] args) throws Exception {
       ... ...
       final Configuration effectiveConfiguration = getEffectiveConfiguration(
                                 activeCommandLine,commandLine, programOptions, jobJars);
... ...
}



private Configuration getEffectiveConfiguration(
                  final CommandLine commandLine,
                  final ProgramOptionsprogramOptions,
                  final List<URL> jobJars)throws FlinkException {
... ...
         final Configuration executorConfig =checkNotNull(activeCustomCommandLine)
                                 .applyCommandLineOptionsToConfiguration(commandLine);
... ...
}

FlinkYarnSessionCli.java

public Configuration applyCommandLineOptionsToConfiguration(CommandLinecommandLine) throws FlinkException {
         // we ignore the addressOption becauseit can only contain "yarn-cluster"
         final ConfigurationeffectiveConfiguration = new Configuration(configuration);

         applyDescriptorOptionToConfig(commandLine,effectiveConfiguration);

         final ApplicationId applicationId =getApplicationId(commandLine);
         if (applicationId != null) {
                  final StringzooKeeperNamespace;
                  if (commandLine.hasOption(zookeeperNamespace.getOpt())){
                        zooKeeperNamespace =commandLine.getOptionValue(zookeeperNamespace.getOpt());
                  } else {
                        zooKeeperNamespace =effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());
                  }

                  effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
                  effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID,ConverterUtils.toString(applicationId));
                  // TARGET 就是execution.target,目标执行器
                  //决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job
                  effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
         } else {
               effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
         }

         if (commandLine.hasOption(jmMemory.getOpt())) {
                  String jmMemoryVal =commandLine.getOptionValue(jmMemory.getOpt());
                  if(!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) {
                        jmMemoryVal +="m";
                  }
                  effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY,jmMemoryVal);
         }

         if (commandLine.hasOption(tmMemory.getOpt())) {
                  String tmMemoryVal =commandLine.getOptionValue(tmMemory.getOpt());
                  if(!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
                        tmMemoryVal +="m";
                  }
                  effectiveConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse(tmMemoryVal));
         }

         if (commandLine.hasOption(slots.getOpt())) {
                  effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
         }
... ...
}


6.调用用户代码的main方法
CliFrontend.java

protected void run(String[] args) throws Exception {
       ... ...
       executeProgram(effectiveConfiguration,program);
... ...
}


protected void executeProgram(final Configurationconfiguration, final PackagedProgram program) throws ProgramInvocationException{
         ClientUtils.executeProgram(newDefaultExecutorServiceLoader(), configuration, program, false, false);
}

ClientUtils.java

public static void executeProgram(
                  PipelineExecutorServiceLoader executorServiceLoader,
                  Configuration configuration,
                  PackagedProgram program,
                  boolean enforceSingleJobExecution,
                  boolean suppressSysout) throws ProgramInvocationException {
         checkNotNull(executorServiceLoader);
         final ClassLoader userCodeClassLoader =program.getUserCodeClassLoader();
         final ClassLoader contextClassLoader =Thread.currentThread().getContextClassLoader();
         try {
                  //设置当前的classloader为用户代码的classloader
                  Thread.currentThread().setContextClassLoader(userCodeClassLoader);

                  LOG.info("Startingprogram (detached: {})",!configuration.getBoolean(DeploymentOptions.ATTACHED));
         //用户代码中的getExecutionEnvironment会返回该Environment
                  ContextEnvironment.setAsContext(
                        executorServiceLoader,
                        configuration,
                        userCodeClassLoader,
                        enforceSingleJobExecution,
                        suppressSysout);

                  StreamContextEnvironment.setAsContext(
                        executorServiceLoader,
                        configuration,
                        userCodeClassLoader,
                        enforceSingleJobExecution,
                        suppressSysout);

                  try {
                        //调用用户代码的main方法
                        program.invokeInteractiveModeForExecution();
                  } finally {
                        ContextEnvironment.unsetAsContext();
                        StreamContextEnvironment.unsetAsContext();
                  }
         } finally {
                  Thread.currentThread().setContextClassLoader(contextClassLoader);
         }
}


PackagedProgram.java
public void invokeInteractiveModeForExecution() throws ProgramInvocationException{
         callMainMethod(mainClass, args);
}



private static void callMainMethod(Class<?> entryClass,String[] args) throws ProgramInvocationException {
         ... ...
         mainMethod = entryClass.getMethod("main", String[].class);
         ... ...
       //反射调用main函数
         mainMethod.invoke(null, (Object) args);
         ... ...
}

7.调用执行环境的execute方法
StreamExecutionEnvironment.java

public JobExecutionResult execute() throwsException {
         return execute(DEFAULT_JOB_NAME);
}



public JobExecutionResult execute(String jobName)throws Exception {
         ... ...
         return execute(getStreamGraph(jobName));
}


public JobExecutionResult execute(StreamGraphstreamGraph) throws Exception {
         final JobClient jobClient = executeAsync(streamGraph);
         ... ...
}


public JobClient executeAsync(StreamGraph streamGraph)throws Exception {
         ... ...
         //根据提交模式选择匹配的factory
         final PipelineExecutorFactoryexecutorFactory =
                  executorServiceLoader.getExecutorFactory(configuration);
... ...
         //选择合适的executor提交任务
         CompletableFuture<JobClient>jobClientFuture = executorFactory
                  .getExecutor(configuration)
                  .execute(streamGraph, configuration);
... ...
}


最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg


原文链接
https://mp.weixin.qq.com/s/hbwvvZBKnzto5Rb89xqcog
页: [1]
查看完整版本: 重磅!Flink源码解析环境准备及提交流程之环境准备