分享

Hadoop2.x Yarn作业提交(服务端)

howtodown 2014-10-13 17:35:46 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 28506
本帖最后由 howtodown 于 2014-10-13 18:33 编辑
问题导读:
1.服务于作业提交的server为ClientRMService,默认监听哪个端口?
2.如何获取作业id?








RM接收到客户端作业提交请求时会通过RPC server做回应,其实客户端就是通过ApplicationClientProtocol的RPC客户端提交作业的,在提交阶段的代码中,首先会调用getNewApplication来获得一个GetNewApplicationResponse,该返回类中包含了APP的ApplicationId,调度器资源信息。需要注意的是在RM的服务端有多个RPCserver,服务于作业提交的server为ClientRMService,默认监听18032端口,可以通过yarn.resourcemanager.address配置,下面是服务端的getNewApplication,包含在ClientRMService.java中

  1. @Override  
  2. public GetNewApplicationResponse getNewApplication(  
  3.     GetNewApplicationRequest request) throws YarnException {  
  4.   //构建GetNewApplicationResponse对象  
  5.   GetNewApplicationResponse response = recordFactory  
  6.       .newRecordInstance(GetNewApplicationResponse.class);  
  7.   //设置作业ID  
  8.   response.setApplicationId(getNewApplicationId());  
  9.   // 设置调度器资源信息,作业ID设置完后,接下来设置调度器资源,目前包括CPU 内存两部分信息,相关函数有:yarn.scheduler.minimum-allocation-mb yarn.scheduler.minimum-allocation-vcores yarn.scheduler.maximum-allocation-mb yarn.scheduler.maximum-allocation-vcores,这些信息在调度器启动时指定。  
  10.   
  11.   
  12.   response.setMaximumResourceCapability(scheduler  
  13.       .getMaximumResourceCapability());         
  14.    
  15.   return response;  
  16. }  
复制代码

作业ID的获得通过getNewApplicationId,是由集群启动时间戳和计数器计算得来
  1. ApplicationId getNewApplicationId() {  
  2.   ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils  
  3.       .newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),  
  4.           applicationCounter.incrementAndGet());  
  5.   LOG.info("Allocated new applicationId: " + applicationId.getId());  
  6.   return applicationId;  
  7. }  
复制代码

作业ID的构建函数
  1. public static ApplicationId newInstance(long clusterTimestamp, int id) {  
  2.   ApplicationId appId = Records.newRecord(ApplicationId.class);  
  3.   appId.setClusterTimestamp(clusterTimestamp);  
  4.   appId.setId(id);  
  5.   appId.build();  
  6.   return appId;  
  7. }  
复制代码

在客户端接收到返回信息后,便知道了自己的作业ID、资源分配的最大值,下面进入提交阶段,依然在ClientRMService中
  1. @Override  
  2. public SubmitApplicationResponse submitApplication(  
  3.     SubmitApplicationRequest request) throws YarnException {  
  4.   //获得提交上下文  
  5.   ApplicationSubmissionContext submissionContext = request  
  6.       .getApplicationSubmissionContext();  
  7.   //获得作业ID  
  8.   ApplicationId applicationId = submissionContext.getApplicationId();  
  9.   
  10.   
  11.   // ApplicationSubmissionContext needs to be validated for safety - only  
  12.   // those fields that are independent of the RM's configuration will be  
  13.   // checked here, those that are dependent on RM configuration are validated  
  14.   // in RMAppManager.  
  15.     //进入一系列的安全校验  
  16.   String user = null;  
  17.   try {  
  18.     // 提交账户是否安全  
  19.     user = UserGroupInformation.getCurrentUser().getShortUserName();  
  20.   } catch (IOException ie) {  
  21.     LOG.warn("Unable to get the current user.", ie);  
  22.     RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,  
  23.         ie.getMessage(), "ClientRMService",  
  24.         "Exception in submitting application", applicationId);  
  25.     throw RPCUtil.getRemoteException(ie);  
  26.   }  
  27.   
  28.   
  29.   // Though duplication will checked again when app is put into rmContext,  
  30.   // but it is good to fail the invalid submission as early as possible.  
  31.   //作业ID是否已经存在  
  32.   if (rmContext.getRMApps().get(applicationId) != null) {  
  33.     String message = "Application with id " + applicationId +  
  34.         " is already present! Cannot add a duplicate!";  
  35.     LOG.warn(message);  
  36.     RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,  
  37.         message, "ClientRMService", "Exception in submitting application",  
  38.         applicationId);  
  39.     throw RPCUtil.getRemoteException(message);  
  40.   }  
  41.     //设置作业队列  
  42.   if (submissionContext.getQueue() == null) {  
  43.     submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);  
  44.   }  
  45.   //设置作业缺省名称  
  46.   if (submissionContext.getApplicationName() == null) {  
  47.     submissionContext.setApplicationName(  
  48.         YarnConfiguration.DEFAULT_APPLICATION_NAME);  
  49.   }  
  50.   //设置作业类型  
  51.   if (submissionContext.getApplicationType() == null) {  
  52.     submissionContext  
  53.       .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);  
  54.   } else {  
  55.     //作业类型长度???有什么用?  
  56.     if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {  
  57.       submissionContext.setApplicationType(submissionContext  
  58.         .getApplicationType().substring(0,  
  59.           YarnConfiguration.APPLICATION_TYPE_LENGTH));  
  60.     }  
  61.   }  
  62.   
  63.   
  64.   try {  
  65.     // call RMAppManager to submit application directly  
  66.     //开始提交作业  
  67.     rmAppManager.submitApplication(submissionContext,  
  68.         System.currentTimeMillis(), user, false, null);  
  69.   
  70.   
  71.     LOG.info("Application with id " + applicationId.getId() +   
  72.         " submitted by user " + user);  
  73.     RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,  
  74.         "ClientRMService", applicationId);  
  75.   } catch (YarnException e) {  
  76.     LOG.info("Exception in submitting application with id " +  
  77.         applicationId.getId(), e);  
  78.     RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,  
  79.         e.getMessage(), "ClientRMService",  
  80.         "Exception in submitting application", applicationId);  
  81.     throw e;  
  82.   }  
  83.   
  84.   
  85.   SubmitApplicationResponse response = recordFactory  
  86.       .newRecordInstance(SubmitApplicationResponse.class);  
  87.   return response;  
  88. }  
复制代码



作业提交阶段
  1. @SuppressWarnings("unchecked")  
  2. protected void submitApplication(  
  3.     ApplicationSubmissionContext submissionContext, long submitTime,  
  4.     String user, boolean isRecovered, RMState state) throws YarnException {  
  5.   //获得作业ID  
  6.   ApplicationId applicationId = submissionContext.getApplicationId();  
  7.   //构建一个app并放入applicationACLS  
  8.   RMAppImpl application =  
  9.       createAndPopulateNewRMApp(submissionContext, submitTime, user);  
  10.   //判断是否需要恢复  
  11.   if (isRecovered) {  
  12.     recoverApplication(state, application);  
  13.     RMAppState rmAppState =  
  14.         state.getApplicationState().get(applicationId).getState();  
  15.     if (isApplicationInFinalState(rmAppState)) {  
  16.       // We are synchronously moving the application into final state so that  
  17.       // momentarily client will not see this application in NEW state. Also  
  18.       // for finished applications we will avoid renewing tokens.  
  19.       application  
  20.           .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));  
  21.       return;  
  22.     }  
  23.   }  
  24.    
  25.   if (UserGroupInformation.isSecurityEnabled()) {  
  26.     Credentials credentials = null;  
  27.     try {  
  28.       credentials = parseCredentials(submissionContext);  
  29.     } catch (Exception e) {  
  30.       LOG.warn(  
  31.           "Unable to parse credentials.", e);  
  32.       // Sending APP_REJECTED is fine, since we assume that the  
  33.       // RMApp is in NEW state and thus we haven't yet informed the  
  34.       // scheduler about the existence of the application  
  35.       assert application.getState() == RMAppState.NEW;  
  36.       this.rmContext.getDispatcher().getEventHandler().handle(  
  37.           new RMAppRejectedEvent(applicationId, e.getMessage()));  
  38.       throw RPCUtil.getRemoteException(e);  
  39.     }  
  40.     this.rmContext.getDelegationTokenRenewer().addApplication(  
  41.         applicationId, credentials,  
  42.         submissionContext.getCancelTokensWhenComplete(), isRecovered);  
  43.   } else {  
  44.     //触发app启动事件  
  45.     this.rmContext.getDispatcher().getEventHandler()  
  46.         .handle(new RMAppEvent(applicationId,  
  47.             isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));  
  48.   }  
  49. }  
复制代码

application在下面函数中创建并加入相应集合,如果加入集合成功则代表作业提交成功
  1. private RMAppImpl createAndPopulateNewRMApp(  
  2.     ApplicationSubmissionContext submissionContext,  
  3.     long submitTime, String user)  
  4.     throws YarnException {  
  5.   ApplicationId applicationId = submissionContext.getApplicationId();  
  6.   validateResourceRequest(submissionContext);  
  7.   //构建APP,submissionContext中包含了一个APP的绝大部分信息  
  8.   RMAppImpl application =  
  9.       new RMAppImpl(applicationId, rmContext, this.conf,  
  10.           submissionContext.getApplicationName(), user,  
  11.           submissionContext.getQueue(),  
  12.           submissionContext, this.scheduler, this.masterService,  
  13.           submitTime, submissionContext.getApplicationType());  
  14.   
  15.   
  16.   // Concurrent app submissions with same applicationId will fail here  
  17.   // Concurrent app submissions with different applicationIds will not  
  18.   // influence each other  
  19.   //再次判断作业是否存在,若不存在则放入hashMap中,一旦放入成功则表明作业提交成功  
  20.   if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=  
  21.       null) {  
  22.     String message = "Application with id " + applicationId  
  23.         + " is already present! Cannot add a duplicate!";  
  24.     LOG.warn(message);  
  25.     throw RPCUtil.getRemoteException(message);  
  26.   }  
  27.   // Inform the ACLs Manager  
  28.   this.applicationACLsManager.addApplication(applicationId,  
  29.       submissionContext.getAMContainerSpec().getApplicationACLs());  
  30.   return application;  
  31. }  
复制代码

一个app包含的信息比较多,如下(RMAppImpl.java)

  1. private static final Log LOG = LogFactory.getLog(RMAppImpl.class);  
  2. private static final String UNAVAILABLE = "N/A";  
  3.   
  4.   
  5. // Immutable fields  
  6. private final ApplicationId applicationId;  
  7. private final RMContext rmContext;  
  8. private final Configuration conf;  
  9. private final String user;  
  10. private final String name;  
  11. private final ApplicationSubmissionContext submissionContext;  
  12. private final Dispatcher dispatcher;  
  13. private final YarnScheduler scheduler;  
  14. private final ApplicationMasterService masterService;  
  15. private final StringBuilder diagnostics = new StringBuilder();  
  16. private final int maxAppAttempts;  
  17. private final ReadLock readLock;  
  18. private final WriteLock writeLock;  
  19. private final Map<ApplicationAttemptId, RMAppAttempt> attempts  
  20.     = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();  
  21. private final long submitTime;  
  22. private final Set<RMNode> updatedNodes = new HashSet<RMNode>();  
  23. private final String applicationType;  
  24.   
  25.   
  26. // Mutable fields  
  27. private long startTime;  
  28. private long finishTime = 0;  
  29. private long storedFinishTime = 0;  
  30. private RMAppAttempt currentAttempt;  
  31. private String queue;  
  32. @SuppressWarnings("rawtypes")  
  33. private EventHandler handler;  
  34. private static final FinalTransition FINAL_TRANSITION = new FinalTransition();  
  35. private static final AppFinishedTransition FINISHED_TRANSITION =  
  36.     new AppFinishedTransition();  
  37.   
  38.   
  39. // These states stored are only valid when app is at killing or final_saving.  
  40. private RMAppState stateBeforeKilling;  
  41. private RMAppState stateBeforeFinalSaving;  
  42. private RMAppEvent eventCausingFinalSaving;  
  43. private RMAppState targetedFinalState;  
  44. private RMAppState recoveredFinalState;  
  45.   
  46.   
  47. Object transitionTodo;  
复制代码




相关文章;Hadoop2.x Yarn作业提交(服务端)



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条