howtodown 发表于 2014-10-13 17:35:46

Hadoop2.x Yarn作业提交(服务端)

本帖最后由 howtodown 于 2014-10-13 18:33 编辑

问题导读:
1.服务于作业提交的server为ClientRMService,默认监听哪个端口?
2.如何获取作业id?


static/image/hrline/4.gif





RM接收到客户端作业提交请求时会通过RPC server做回应,其实客户端就是通过ApplicationClientProtocol的RPC客户端提交作业的,在提交阶段的代码中,首先会调用getNewApplication来获得一个GetNewApplicationResponse,该返回类中包含了APP的ApplicationId,调度器资源信息。需要注意的是在RM的服务端有多个RPCserver,服务于作业提交的server为ClientRMService,默认监听18032端口,可以通过yarn.resourcemanager.address配置,下面是服务端的getNewApplication,包含在ClientRMService.java中

@Override
public GetNewApplicationResponse getNewApplication(
    GetNewApplicationRequest request) throws YarnException {
//构建GetNewApplicationResponse对象
GetNewApplicationResponse response = recordFactory
      .newRecordInstance(GetNewApplicationResponse.class);
//设置作业ID
response.setApplicationId(getNewApplicationId());
// 设置调度器资源信息,作业ID设置完后,接下来设置调度器资源,目前包括CPU 内存两部分信息,相关函数有:yarn.scheduler.minimum-allocation-mb yarn.scheduler.minimum-allocation-vcores yarn.scheduler.maximum-allocation-mb yarn.scheduler.maximum-allocation-vcores,这些信息在调度器启动时指定。


response.setMaximumResourceCapability(scheduler
      .getMaximumResourceCapability());         
   
return response;
}
作业ID的获得通过getNewApplicationId,是由集群启动时间戳和计数器计算得来
ApplicationId getNewApplicationId() {
ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils
      .newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),
          applicationCounter.incrementAndGet());
LOG.info("Allocated new applicationId: " + applicationId.getId());
return applicationId;
}
作业ID的构建函数
public static ApplicationId newInstance(long clusterTimestamp, int id) {
ApplicationId appId = Records.newRecord(ApplicationId.class);
appId.setClusterTimestamp(clusterTimestamp);
appId.setId(id);
appId.build();
return appId;
}
在客户端接收到返回信息后,便知道了自己的作业ID、资源分配的最大值,下面进入提交阶段,依然在ClientRMService中
@Override
public SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) throws YarnException {
//获得提交上下文
ApplicationSubmissionContext submissionContext = request
      .getApplicationSubmissionContext();
//获得作业ID
ApplicationId applicationId = submissionContext.getApplicationId();


// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.
    //进入一系列的安全校验
String user = null;
try {
    // 提交账户是否安全
    user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ie) {
    LOG.warn("Unable to get the current user.", ie);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
      ie.getMessage(), "ClientRMService",
      "Exception in submitting application", applicationId);
    throw RPCUtil.getRemoteException(ie);
}


// Though duplication will checked again when app is put into rmContext,
// but it is good to fail the invalid submission as early as possible.
//作业ID是否已经存在
if (rmContext.getRMApps().get(applicationId) != null) {
    String message = "Application with id " + applicationId +
      " is already present! Cannot add a duplicate!";
    LOG.warn(message);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
      message, "ClientRMService", "Exception in submitting application",
      applicationId);
    throw RPCUtil.getRemoteException(message);
}
    //设置作业队列
if (submissionContext.getQueue() == null) {
    submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
//设置作业缺省名称
if (submissionContext.getApplicationName() == null) {
    submissionContext.setApplicationName(
      YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
//设置作业类型
if (submissionContext.getApplicationType() == null) {
    submissionContext
      .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
} else {
    //作业类型长度???有什么用?
    if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
      submissionContext.setApplicationType(submissionContext
      .getApplicationType().substring(0,
          YarnConfiguration.APPLICATION_TYPE_LENGTH));
    }
}


try {
    // call RMAppManager to submit application directly
    //开始提交作业
    rmAppManager.submitApplication(submissionContext,
      System.currentTimeMillis(), user, false, null);


    LOG.info("Application with id " + applicationId.getId() +   
      " submitted by user " + user);
    RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
      "ClientRMService", applicationId);
} catch (YarnException e) {
    LOG.info("Exception in submitting application with id " +
      applicationId.getId(), e);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
      e.getMessage(), "ClientRMService",
      "Exception in submitting application", applicationId);
    throw e;
}


SubmitApplicationResponse response = recordFactory
      .newRecordInstance(SubmitApplicationResponse.class);
return response;
}


作业提交阶段
@SuppressWarnings("unchecked")
protected void submitApplication(
    ApplicationSubmissionContext submissionContext, long submitTime,
    String user, boolean isRecovered, RMState state) throws YarnException {
//获得作业ID
ApplicationId applicationId = submissionContext.getApplicationId();
//构建一个app并放入applicationACLS
RMAppImpl application =
      createAndPopulateNewRMApp(submissionContext, submitTime, user);
//判断是否需要恢复
if (isRecovered) {
    recoverApplication(state, application);
    RMAppState rmAppState =
      state.getApplicationState().get(applicationId).getState();
    if (isApplicationInFinalState(rmAppState)) {
      // We are synchronously moving the application into final state so that
      // momentarily client will not see this application in NEW state. Also
      // for finished applications we will avoid renewing tokens.
      application
          .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
      return;
    }
}
   
if (UserGroupInformation.isSecurityEnabled()) {
    Credentials credentials = null;
    try {
      credentials = parseCredentials(submissionContext);
    } catch (Exception e) {
      LOG.warn(
          "Unable to parse credentials.", e);
      // Sending APP_REJECTED is fine, since we assume that the
      // RMApp is in NEW state and thus we haven't yet informed the
      // scheduler about the existence of the application
      assert application.getState() == RMAppState.NEW;
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppRejectedEvent(applicationId, e.getMessage()));
      throw RPCUtil.getRemoteException(e);
    }
    this.rmContext.getDelegationTokenRenewer().addApplication(
      applicationId, credentials,
      submissionContext.getCancelTokensWhenComplete(), isRecovered);
} else {
    //触发app启动事件
    this.rmContext.getDispatcher().getEventHandler()
      .handle(new RMAppEvent(applicationId,
            isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
}
}
application在下面函数中创建并加入相应集合,如果加入集合成功则代表作业提交成功
private RMAppImpl createAndPopulateNewRMApp(
    ApplicationSubmissionContext submissionContext,
    long submitTime, String user)
    throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
validateResourceRequest(submissionContext);
//构建APP,submissionContext中包含了一个APP的绝大部分信息
RMAppImpl application =
      new RMAppImpl(applicationId, rmContext, this.conf,
          submissionContext.getApplicationName(), user,
          submissionContext.getQueue(),
          submissionContext, this.scheduler, this.masterService,
          submitTime, submissionContext.getApplicationType());


// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
// influence each other
//再次判断作业是否存在,若不存在则放入hashMap中,一旦放入成功则表明作业提交成功
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
      null) {
    String message = "Application with id " + applicationId
      + " is already present! Cannot add a duplicate!";
    LOG.warn(message);
    throw RPCUtil.getRemoteException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
      submissionContext.getAMContainerSpec().getApplicationACLs());
return application;
}
一个app包含的信息比较多,如下(RMAppImpl.java)

private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
private static final String UNAVAILABLE = "N/A";


// Immutable fields
private final ApplicationId applicationId;
private final RMContext rmContext;
private final Configuration conf;
private final String user;
private final String name;
private final ApplicationSubmissionContext submissionContext;
private final Dispatcher dispatcher;
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
private final int maxAppAttempts;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
    = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
private final String applicationType;


// Mutable fields
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
private RMAppAttempt currentAttempt;
private String queue;
@SuppressWarnings("rawtypes")
private EventHandler handler;
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final AppFinishedTransition FINISHED_TRANSITION =
    new AppFinishedTransition();


// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
private RMAppState stateBeforeFinalSaving;
private RMAppEvent eventCausingFinalSaving;
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;


Object transitionTodo;



相关文章;Hadoop2.x Yarn作业提交(服务端)



页: [1]
查看完整版本: Hadoop2.x Yarn作业提交(服务端)