问题导读
1.程序的起点包含哪些内容?
2.程序入口类是哪个?
3.选择创建哪种类型的客户端?
4.客户端有哪种类型?
Flink Yarn-per-job模式提交流程如图所示:
1.程序起点
1. flink\bin\flink
=> exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}"-classpath "`manglePathList"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
2. flink/bin/config.sh(相关环境配置都在这里)
=> JAVA_RUN=java
=> JVM_ARGS="" => # Use conf/flink-conf.yaml
=>INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
3. 执行java -cp 就会开启JVM虚拟机,在虚拟机上开启CliFrontend进程,然后开始执行main方法
说明:java -cp和 -classpath一样,是指定类运行所依赖其他类的路径。
java -cp =》开启JVM虚拟机 =》开启Process(CliFrontend)=》程序入口CliFrontend.main
4. Flink提交任务的入口类为CliFrontend。找到这个类的main方法:
在IDEA中全局查找(ctrl + n):org.apache.flink.client.cli.CliFrontend,找到CliFrontend类,并找到main方法
-
- /**
- * Submits the job based on the arguments.(根据参数提交作业)
- */
- public static void main(final String[] args) {
- ... ...
- final CliFrontend cli = new CliFrontend(
- configuration,
- customCommandLines);
- ... ...
- }
复制代码
2.程序入口
CliFrontend.java
-
- public static void main(final String[] args) {
- ... ...
- final CliFrontend cli = new CliFrontend(
- configuration,
- customCommandLines);
- ... ...
- int retCode =SecurityUtils.getInstalledContext()
- .runSecured(() -> cli.parseParameters(args));
- ... ...
- }
复制代码
-
- public int parseParameters(String[] args) {
- ... ...
- // get action
- String action = args[0];
-
- // remove action from parameters
- final String[] params =Arrays.copyOfRange(args, 1, args.length);
-
- // do action
- switch (action) {
- case ACTION_RUN:
- run(params);
- return 0;
- case ACTION_LIST:
- list(params);
- return 0;
- case ACTION_INFO:
- info(params);
- return 0;
- case ACTION_CANCEL:
- cancel(params);
- return 0;
- case ACTION_STOP:
- stop(params);
- return 0;
- case ACTION_SAVEPOINT:
- savepoint(params);
- return 0;
- ……
-
- }
- ... ...
- }
复制代码
3.解析输入参数
CliFrontend.java
- protected void run(String[] args) throws Exception {
- ... ...
- //获取默认的运行参数
- final Options commandOptions =CliFrontendParser.getRunCommandOptions();
- // 解析参数,返回commandLine
- final CommandLine commandLine = getCommandLine(commandOptions, args, true);
- ... ...
- }
复制代码
-
- public CommandLine getCommandLine(final OptionscommandOptions, final String[] args, final boolean stopAtNonOptions) throwsCliArgsException {
- final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions,customCommandLineOptions);
- return CliFrontendParser.parse(commandLineOptions, args,stopAtNonOptions);
- }
复制代码
DefaultParser.java
- public class CliFrontendParser {
- // 选项列表
- static final Option HELP_OPTION = new Option("h", "help", false,
- "Show the helpmessage for the CLI Frontend or the action.");
-
- static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
-
- static final Option CLASS_OPTION = new Option("c", "class", true,
- "Class with theprogram entry point ("main()" method). Only needed if the " +
- "JAR file doesnot specify the class in its manifest.");
- ... ...
- }
复制代码
DefaultParser.java
-
- public CommandLine parse(Options options, String[] arguments,Properties properties, boolean stopAtNonOption)
- throws ParseException
- {
- ... ...
- if (arguments != null)
- {
- for (String argument : arguments)
- {
- handleToken(argument);
- }
- }
- ... ...
- }
复制代码
-
- private void handleToken(String token) throwsParseException
- {
- currentToken = token;
-
- if (skipParsing)
- {
- cmd.addArg(token);
- }
- else if ("--".equals(token))
- {
- skipParsing = true;
- }
- else if (currentOption != null &¤tOption.acceptsArg() && isArgument(token))
- {
- currentOption.addValueForProcessing(Util.stripLeadingAndTrailingQuotes(token));
- }
- else if (token.startsWith("--"))
- {
- // 解析--形式的参数
- handleLongOption(token);
- }
- else if (token.startsWith("-") && !"-".equals(token))
- {
- // 解析 -形式的参数
- handleShortAndLongOption(token);
- }
- else
- {
- handleUnknownToken(token);
- }
-
- if (currentOption != null && !currentOption.acceptsArg())
- {
- currentOption = null;
- }
- }
复制代码
-
- private void handleLongOption(String token) throwsParseException
- {
- if (token.indexOf('=') == -1)
- {
- //解析–L、-L、--l、-l形式的参数(不包含=)
- handleLongOptionWithoutEqual(token);
- }
- else
- {
- // 解析--L=V、-L=V、--l=V、-l=V形式的参数(包含=)
- handleLongOptionWithEqual(token);
- }
- }
复制代码
各种情况的解析,逻辑大体相同:去除-或--前缀,校验参数,以其中一个为例
-
- private void handleLongOptionWithoutEqual(String token)throws ParseException
- {
- // 校验参数是否合法
- List<String> matchingOpts = options.getMatchingOptions(token);
- if (matchingOpts.isEmpty())
- {
- handleUnknownToken(currentToken);
- }
- else if (matchingOpts.size() > 1)
- {
- throw newAmbiguousOptionException(token, matchingOpts);
- }
- else
- {
- // 参数添加到执行命令
- handleOption(options.getOption(matchingOpts.get(0)));
- }
- }
复制代码
Options.java:
-
- public List<String> getMatchingOptions(String opt)
- {
- // 去除 -或 -- 前缀
- opt = Util.stripLeadingHyphens(opt);
-
- List<String> matchingOpts = newArrayList<String>();
-
- // for a perfect match return the singleoption only
- if (longOpts.keySet().contains(opt))
- {
- return Collections.singletonList(opt);
- }
-
- for (String longOpt : longOpts.keySet())
- {
- if (longOpt.startsWith(opt))
- {
- matchingOpts.add(longOpt);
- }
- }
-
- return matchingOpts;
- }
复制代码
DefaultParser.java
-
- private void handleOption(Option option) throwsParseException
- {
- // check the previous option beforehandling the next one
- checkRequiredArgs();
-
- option = (Option) option.clone();
-
- updateRequiredOptions(option);
-
- cmd.addOption(option);
-
- if (option.hasArg())
- {
- currentOption = option;
- }
- else
- {
- currentOption = null;
- }
- }
复制代码
4.选择创建哪种类型的客户端
CliFrontend.java
-
- public static void main(final String[] args) {
- ... ...
- final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
- configuration,
- configurationDirectory);
- ... ...
- final CliFrontend cli = new CliFrontend(
- configuration,
- customCommandLines);
- ... ...
- }
复制代码
这里依次添加了 Yarn和Default(Standalone)两种客户端(后面根据isActive()选择):
-
- public static List<CustomCommandLine> loadCustomCommandLines(Configurationconfiguration, String configurationDirectory) {
- List<CustomCommandLine>customCommandLines = new ArrayList<>();
- customCommandLines.add(newGenericCLI(configuration, configurationDirectory));
-
- // Commandline interface of the YARN session, with a special initialization here
- // toprefix all options with y/yarn.
- final String flinkYarnSessionCLI ="org.apache.flink.yarn.cli.FlinkYarnSessionCli";
- try {
- customCommandLines.add(
- loadCustomCommandLine(flinkYarnSessionCLI,
- configuration,
- configurationDirectory,
- "y",
- "yarn"));
- } catch (NoClassDefFoundError |Exception e) {
- final StringerrorYarnSessionCLI ="org.apache.flink.yarn.cli.FallbackYarnSessionCli";
- try {
- LOG.info("LoadingFallbackYarnSessionCli");
- customCommandLines.add(
- loadCustomCommandLine(errorYarnSessionCLI,configuration));
- } catch (Exception exception){
- LOG.warn("Couldnot load CLI class {}.", flinkYarnSessionCLI, e);
- }
- }
-
- // Tips:DefaultCLI must be added at last, because getActiveCustomCommandLine(..) willget the
- // active CustomCommandLine in order andDefaultCLI isActive always return true.
- customCommandLines.add(new DefaultCLI(configuration));
-
- return customCommandLines;
- }
复制代码
在run()里面,进行客户端的选择:
-
- protected void run(String[] args) throws Exception {
- ... ...
- final CustomCommandLine activeCommandLine =
- validateAndGetActiveCommandLine(checkNotNull(commandLine));
- ... ...
- }
复制代码
-
- public CustomCommandLine validateAndGetActiveCommandLine(CommandLinecommandLine) {
- ... ...
- for (CustomCommandLine cli :customCommandLines) {
- ... ...
- //在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。
- //对于DefaultCli,它的isActive方法总是返回true。
- if (cli.isActive(commandLine)) {
- return cli;
- }
- }
- ... ...
- }
复制代码
FlinkYarnSessionCli.java => Yarn客户端isActive的判断逻辑:
- public boolean isActive(CommandLine commandLine) {
- final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(),null);
- //是否指定为per-job模式,即指定”-m yarn-cluster”; ID = "yarn-cluster"
- final boolean yarnJobManager =ID.equals(jobManagerOption);
- // 是否存在flink在yarn的appID,即yarn-session模式是否启动
- final boolean hasYarnAppId =commandLine.hasOption(applicationId.getOpt())
- ||configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();
- // executor的名字为"yarn-session" 或 "yarn-per-job"
- final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET))
- ||YarnJobClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET));
- //
- return hasYarnExecutor || yarnJobManager|| hasYarnAppId || (isYarnPropertiesFileMode(commandLine) &&yarnApplicationIdFromYarnProperties != null);
- }
复制代码
5.获取有效配置
CliFrontend.java
- protected void run(String[] args) throws Exception {
- ... ...
- final Configuration effectiveConfiguration = getEffectiveConfiguration(
- activeCommandLine,commandLine, programOptions, jobJars);
- ... ...
- }
复制代码
-
- private Configuration getEffectiveConfiguration(
- final CommandLine commandLine,
- final ProgramOptionsprogramOptions,
- final List<URL> jobJars)throws FlinkException {
- ... ...
- final Configuration executorConfig =checkNotNull(activeCustomCommandLine)
- .applyCommandLineOptionsToConfiguration(commandLine);
- ... ...
- }
复制代码
FlinkYarnSessionCli.java
-
- public Configuration applyCommandLineOptionsToConfiguration(CommandLinecommandLine) throws FlinkException {
- // we ignore the addressOption becauseit can only contain "yarn-cluster"
- final ConfigurationeffectiveConfiguration = new Configuration(configuration);
-
- applyDescriptorOptionToConfig(commandLine,effectiveConfiguration);
-
- final ApplicationId applicationId =getApplicationId(commandLine);
- if (applicationId != null) {
- final StringzooKeeperNamespace;
- if (commandLine.hasOption(zookeeperNamespace.getOpt())){
- zooKeeperNamespace =commandLine.getOptionValue(zookeeperNamespace.getOpt());
- } else {
- zooKeeperNamespace =effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());
- }
-
- effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
- effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID,ConverterUtils.toString(applicationId));
- // TARGET 就是execution.target,目标执行器
- //决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job
- effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
- } else {
- effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
- }
-
- if (commandLine.hasOption(jmMemory.getOpt())) {
- String jmMemoryVal =commandLine.getOptionValue(jmMemory.getOpt());
- if(!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) {
- jmMemoryVal +="m";
- }
- effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY,jmMemoryVal);
- }
-
- if (commandLine.hasOption(tmMemory.getOpt())) {
- String tmMemoryVal =commandLine.getOptionValue(tmMemory.getOpt());
- if(!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
- tmMemoryVal +="m";
- }
- effectiveConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse(tmMemoryVal));
- }
-
- if (commandLine.hasOption(slots.getOpt())) {
- effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
- }
- ... ...
- }
复制代码
6.调用用户代码的main方法
CliFrontend.java
-
- protected void run(String[] args) throws Exception {
- ... ...
- executeProgram(effectiveConfiguration,program);
- ... ...
- }
复制代码
- protected void executeProgram(final Configurationconfiguration, final PackagedProgram program) throws ProgramInvocationException{
- ClientUtils.executeProgram(newDefaultExecutorServiceLoader(), configuration, program, false, false);
- }
复制代码
ClientUtils.java
-
- public static void executeProgram(
- PipelineExecutorServiceLoader executorServiceLoader,
- Configuration configuration,
- PackagedProgram program,
- boolean enforceSingleJobExecution,
- boolean suppressSysout) throws ProgramInvocationException {
- checkNotNull(executorServiceLoader);
- final ClassLoader userCodeClassLoader =program.getUserCodeClassLoader();
- final ClassLoader contextClassLoader =Thread.currentThread().getContextClassLoader();
- try {
- //设置当前的classloader为用户代码的classloader
- Thread.currentThread().setContextClassLoader(userCodeClassLoader);
-
- LOG.info("Startingprogram (detached: {})",!configuration.getBoolean(DeploymentOptions.ATTACHED));
- //用户代码中的getExecutionEnvironment会返回该Environment
- ContextEnvironment.setAsContext(
- executorServiceLoader,
- configuration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
-
- StreamContextEnvironment.setAsContext(
- executorServiceLoader,
- configuration,
- userCodeClassLoader,
- enforceSingleJobExecution,
- suppressSysout);
-
- try {
- //调用用户代码的main方法
- program.invokeInteractiveModeForExecution();
- } finally {
- ContextEnvironment.unsetAsContext();
- StreamContextEnvironment.unsetAsContext();
- }
- } finally {
- Thread.currentThread().setContextClassLoader(contextClassLoader);
- }
- }
复制代码
PackagedProgram.java
- public void invokeInteractiveModeForExecution() throws ProgramInvocationException{
- callMainMethod(mainClass, args);
- }
复制代码
-
- private static void callMainMethod(Class<?> entryClass,String[] args) throws ProgramInvocationException {
- ... ...
- mainMethod = entryClass.getMethod("main", String[].class);
- ... ...
- //反射调用main函数
- mainMethod.invoke(null, (Object) args);
- ... ...
- }
复制代码
7.调用执行环境的execute方法
StreamExecutionEnvironment.java
-
- public JobExecutionResult execute() throwsException {
- return execute(DEFAULT_JOB_NAME);
- }
复制代码
-
- public JobExecutionResult execute(String jobName)throws Exception {
- ... ...
- return execute(getStreamGraph(jobName));
- }
复制代码
- public JobExecutionResult execute(StreamGraphstreamGraph) throws Exception {
- final JobClient jobClient = executeAsync(streamGraph);
- ... ...
- }
复制代码
-
- public JobClient executeAsync(StreamGraph streamGraph)throws Exception {
- ... ...
- //根据提交模式选择匹配的factory
- final PipelineExecutorFactoryexecutorFactory =
- executorServiceLoader.getExecutorFactory(configuration);
- ... ...
- //选择合适的executor提交任务
- CompletableFuture<JobClient>jobClientFuture = executorFactory
- .getExecutor(configuration)
- .execute(streamGraph, configuration);
- ... ...
- }
复制代码
最新经典文章,欢迎关注公众号
原文链接
https://mp.weixin.qq.com/s/hbwvvZBKnzto5Rb89xqcog
|