本帖最后由 howtodown 于 2014-10-13 18:33 编辑
RM接收到客户端作业提交请求时会通过RPC server做回应,其实客户端就是通过ApplicationClientProtocol的RPC客户端提交作业的,在提交阶段的代码中,首先会调用getNewApplication来获得一个GetNewApplicationResponse,该返回类中包含了APP的ApplicationId,调度器资源信息。需要注意的是在RM的服务端有多个RPCserver,服务于作业提交的server为ClientRMService,默认监听18032端口,可以通过yarn.resourcemanager.address配置,下面是服务端的getNewApplication,包含在ClientRMService.java中
- @Override
- public GetNewApplicationResponse getNewApplication(
- GetNewApplicationRequest request) throws YarnException {
- //构建GetNewApplicationResponse对象
- GetNewApplicationResponse response = recordFactory
- .newRecordInstance(GetNewApplicationResponse.class);
- //设置作业ID
- response.setApplicationId(getNewApplicationId());
- // 设置调度器资源信息,作业ID设置完后,接下来设置调度器资源,目前包括CPU 内存两部分信息,相关函数有:yarn.scheduler.minimum-allocation-mb yarn.scheduler.minimum-allocation-vcores yarn.scheduler.maximum-allocation-mb yarn.scheduler.maximum-allocation-vcores,这些信息在调度器启动时指定。
- response.setMaximumResourceCapability(scheduler
- .getMaximumResourceCapability());
- return response;
- }
- ApplicationId getNewApplicationId() {
- ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils
- .newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),
- applicationCounter.incrementAndGet());
- LOG.info("Allocated new applicationId: " + applicationId.getId());
- return applicationId;
- }
- public static ApplicationId newInstance(long clusterTimestamp, int id) {
- ApplicationId appId = Records.newRecord(ApplicationId.class);
- appId.setClusterTimestamp(clusterTimestamp);
- appId.setId(id);
- appId.build();
- return appId;
- }
- @Override
- public SubmitApplicationResponse submitApplication(
- SubmitApplicationRequest request) throws YarnException {
- //获得提交上下文
- ApplicationSubmissionContext submissionContext = request
- .getApplicationSubmissionContext();
- //获得作业ID
- ApplicationId applicationId = submissionContext.getApplicationId();
- // ApplicationSubmissionContext needs to be validated for safety - only
- // those fields that are independent of the RM's configuration will be
- // checked here, those that are dependent on RM configuration are validated
- // in RMAppManager.
- //进入一系列的安全校验
- String user = null;
- try {
- // 提交账户是否安全
- user = UserGroupInformation.getCurrentUser().getShortUserName();
- } catch (IOException ie) {
- LOG.warn("Unable to get the current user.", ie);
- RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
- ie.getMessage(), "ClientRMService",
- "Exception in submitting application", applicationId);
- throw RPCUtil.getRemoteException(ie);
- }
- // Though duplication will checked again when app is put into rmContext,
- // but it is good to fail the invalid submission as early as possible.
- //作业ID是否已经存在
- if (rmContext.getRMApps().get(applicationId) != null) {
- String message = "Application with id " + applicationId +
- " is already present! Cannot add a duplicate!";
- LOG.warn(message);
- RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
- message, "ClientRMService", "Exception in submitting application",
- applicationId);
- throw RPCUtil.getRemoteException(message);
- }
- //设置作业队列
- if (submissionContext.getQueue() == null) {
- submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
- }
- //设置作业缺省名称
- if (submissionContext.getApplicationName() == null) {
- submissionContext.setApplicationName(
- }
- //设置作业类型
- if (submissionContext.getApplicationType() == null) {
- submissionContext
- .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
- } else {
- //作业类型长度???有什么用?
- if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
- submissionContext.setApplicationType(submissionContext
- .getApplicationType().substring(0,
- YarnConfiguration.APPLICATION_TYPE_LENGTH));
- }
- }
- try {
- // call RMAppManager to submit application directly
- //开始提交作业
- rmAppManager.submitApplication(submissionContext,
- System.currentTimeMillis(), user, false, null);
- LOG.info("Application with id " + applicationId.getId() +
- " submitted by user " + user);
- RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
- "ClientRMService", applicationId);
- } catch (YarnException e) {
- LOG.info("Exception in submitting application with id " +
- applicationId.getId(), e);
- RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
- e.getMessage(), "ClientRMService",
- "Exception in submitting application", applicationId);
- throw e;
- }
- SubmitApplicationResponse response = recordFactory
- .newRecordInstance(SubmitApplicationResponse.class);
- return response;
- }
- @SuppressWarnings("unchecked")
- protected void submitApplication(
- ApplicationSubmissionContext submissionContext, long submitTime,
- String user, boolean isRecovered, RMState state) throws YarnException {
- //获得作业ID
- ApplicationId applicationId = submissionContext.getApplicationId();
- //构建一个app并放入applicationACLS
- RMAppImpl application =
- createAndPopulateNewRMApp(submissionContext, submitTime, user);
- //判断是否需要恢复
- if (isRecovered) {
- recoverApplication(state, application);
- RMAppState rmAppState =
- state.getApplicationState().get(applicationId).getState();
- if (isApplicationInFinalState(rmAppState)) {
- // We are synchronously moving the application into final state so that
- // momentarily client will not see this application in NEW state. Also
- // for finished applications we will avoid renewing tokens.
- application
- .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
- return;
- }
- }
- if (UserGroupInformation.isSecurityEnabled()) {
- Credentials credentials = null;
- try {
- credentials = parseCredentials(submissionContext);
- } catch (Exception e) {
- LOG.warn(
- "Unable to parse credentials.", e);
- // Sending APP_REJECTED is fine, since we assume that the
- // RMApp is in NEW state and thus we haven't yet informed the
- // scheduler about the existence of the application
- assert application.getState() == RMAppState.NEW;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppRejectedEvent(applicationId, e.getMessage()));
- throw RPCUtil.getRemoteException(e);
- }
- this.rmContext.getDelegationTokenRenewer().addApplication(
- applicationId, credentials,
- submissionContext.getCancelTokensWhenComplete(), isRecovered);
- } else {
- //触发app启动事件
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
- }
- }
- private RMAppImpl createAndPopulateNewRMApp(
- ApplicationSubmissionContext submissionContext,
- long submitTime, String user)
- throws YarnException {
- ApplicationId applicationId = submissionContext.getApplicationId();
- validateResourceRequest(submissionContext);
- //构建APP,submissionContext中包含了一个APP的绝大部分信息
- RMAppImpl application =
- new RMAppImpl(applicationId, rmContext, this.conf,
- submissionContext.getApplicationName(), user,
- submissionContext.getQueue(),
- submissionContext, this.scheduler, this.masterService,
- submitTime, submissionContext.getApplicationType());
- // Concurrent app submissions with same applicationId will fail here
- // Concurrent app submissions with different applicationIds will not
- // influence each other
- //再次判断作业是否存在,若不存在则放入hashMap中,一旦放入成功则表明作业提交成功
- if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
- null) {
- String message = "Application with id " + applicationId
- + " is already present! Cannot add a duplicate!";
- LOG.warn(message);
- throw RPCUtil.getRemoteException(message);
- }
- // Inform the ACLs Manager
- this.applicationACLsManager.addApplication(applicationId,
- submissionContext.getAMContainerSpec().getApplicationACLs());
- return application;
- }
- private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
- private static final String UNAVAILABLE = "N/A";
- // Immutable fields
- private final ApplicationId applicationId;
- private final RMContext rmContext;
- private final Configuration conf;
- private final String user;
- private final String name;
- private final ApplicationSubmissionContext submissionContext;
- private final Dispatcher dispatcher;
- private final YarnScheduler scheduler;
- private final ApplicationMasterService masterService;
- private final StringBuilder diagnostics = new StringBuilder();
- private final int maxAppAttempts;
- private final ReadLock readLock;
- private final WriteLock writeLock;
- private final Map<ApplicationAttemptId, RMAppAttempt> attempts
- = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
- private final long submitTime;
- private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
- private final String applicationType;
- // Mutable fields
- private long startTime;
- private long finishTime = 0;
- private long storedFinishTime = 0;
- private RMAppAttempt currentAttempt;
- private String queue;
- @SuppressWarnings("rawtypes")
- private EventHandler handler;
- private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
- private static final AppFinishedTransition FINISHED_TRANSITION =
- new AppFinishedTransition();
- // These states stored are only valid when app is at killing or final_saving.
- private RMAppState stateBeforeKilling;
- private RMAppState stateBeforeFinalSaving;
- private RMAppEvent eventCausingFinalSaving;
- private RMAppState targetedFinalState;
- private RMAppState recoveredFinalState;
- Object transitionTodo;
相关文章;Hadoop2.x Yarn作业提交(服务端)