分享

跟着实例学Oozie

阅读导读:
1.如何应用Oozie?

2.提醒邮件的程序如何编写?
3.Oozie各个配置参数的含义?







我们在此描述的工作流会实现汽车GPS探测数据的获取过程。我们每个小时都会以文件的形式把探测数据传递到指定的HDFS目录中,其中包含有这个小时之内的所有探测数据。探测数据的获取是每天针对一天内所有的24个文件完成的。如果文件的数量是24,那么获取过程就会启动。否则:
当天什么都不做对前一天——最多到7天,发送剩下的内容到探测数据提供程序如果目录的存在时间已达到7天,那么就获取所有可用的探测数据文件。
过程的总体实现请见图1

Oozie-exam1.jpg

图1: 过程图
在此,主流程(数据获取流程)首先会为今天以及之前的六天计算出目录的名称,然后启动(fork)七个目录的子过程(子流程)。待所有子过程的状态都变成终止之后,join步骤就会把控制权交给end状态。子过程启动时,首先会获得关于目录的信息——它的日期以及文件数量。基于这条信息,它会决定是获取数据还是把数据归档,或者发送剩下的邮件,或者不做任何工作。Directory子过程实现以下代码负责实现的是directory子过程(代码1)。

  1. <workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'>
  2.     <start to='getDirInfo' />
  3.     <!-- STEP ONE -->
  4.     <action name='getDirInfo'>
  5.          <!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
  6.              otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
  7.              otherwise returns age of dir in days -->
  8.          <java>
  9.              <job-tracker>${jobTracker}</job-tracker>
  10.              <name-node>${nameNode}</name-node>
  11.              <main-class>com.navteq.oozie.GetDirInfo</main-class>
  12.              <arg>${inputDir}</arg>
  13.              <capture-output />
  14.          </java>
  15.          <ok to="makeIngestDecision" />
  16.          <error to="fail" />
  17.      </action>
  18.      <!-- STEP TWO -->
  19.      <decision name="makeIngestDecision">
  20.          <switch>
  21.              <!-- empty or doesn't exist -->
  22.              <case to="end">
  23.                 ${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||
  24.                 (wf:actionData('getDirInfo')['dir.age'] lt 1 and
  25.                 wf:actionData('getDirInfo')['dir.num-files'] lt 24)}
  26.              </case>
  27.              <!-- # of files >= 24 -->
  28.              <case to="ingest">
  29.                 ${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||
  30.                 wf:actionData('getDirInfo')['dir.age'] gt 6}
  31.              </case>
  32.              <default to="sendEmail"/>
  33.          </switch>
  34.      </decision>
  35.      <!--EMAIL-->
  36.      <action name="sendEmail">
  37.           <java>
  38.              <job-tracker>${jobTracker}</job-tracker>
  39.              <name-node>${nameNode}</name-node>
  40.              <main-class>com.navteq.oozie.StandaloneMailer</main-class>
  41.              <arg>probedata2@navteq.com</arg>
  42.              <arg>gregory.titievsky@navteq.com</arg>
  43.              <arg>${inputDir}</arg>
  44.              <arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg>
  45.              <arg>${wf:actionData('getDirInfo')['dir.age']}</arg>
  46.           </java>
  47.           <ok to="end" />
  48.           <error to="fail" />
  49.      </action>
  50.      <!--INGESTION -->
  51.      <action name="ingest">
  52.           <java>
  53.              <job-tracker>${jobTracker}</job-tracker>
  54.              <name-node>${nameNode}</name-node>
  55.              <prepare>
  56.                  <delete path="${outputDir}" />
  57.              </prepare>
  58.              <configuration>
  59.                  <property>
  60.                     <name>mapred.reduce.tasks</name>
  61.                     <value>300</value>
  62.                  </property>
  63.              </configuration>
  64.              <main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>
  65.              <arg>-conf</arg>
  66.              <arg>action.xml</arg>
  67.              <arg>${inputDir}</arg>
  68.              <arg>${outputDir}</arg>
  69.           </java>
  70.           <ok to=" archive-data" />
  71.           <error to="ingest-fail" />
  72.      </action>
  73.      <!—Archive Data -->
  74.      <action name="archive-data">
  75.           <fs>
  76.              <move source='${inputDir}' target='/probe/backup/${dirName}' />
  77.              <delete path = '${inputDir}' />
  78.           </fs>
  79.           <ok to="end" />
  80.           <error to="ingest-fail" />
  81.      </action>
  82.      <kill name="ingest-fail">
  83.           <message>Ingestion failed, error
  84.             message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  85.      </kill>
  86.      <kill name="fail">
  87.           <message>Java failed, error
  88.             message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  89.      </kill>
  90.      <end name='end' />
  91. </workflow-app>
复制代码
代码1: Directory子过程
这个子过程的start节点会触发自定义的java节点,这个节点会获得目录信息(代码2)。
  1. package com.navteq.oozie;
  2. import java.io.File;
  3. import java.io.FileOutputStream;
  4. import java.io.OutputStream;
  5. import java.util.GregorianCalendar;
  6. import java.util.Properties;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.FileStatus;
  9. import org.apache.hadoop.fs.FileSystem;
  10. import org.apache.hadoop.fs.Path;
  11. public class GetDirInfo {
  12.     private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
  13.     public static void main(String[] args) throws Exception {
  14.         String dirPath = args[0];
  15.         String propKey0 = "dir.num-files";
  16.         String propVal0 = "-1";
  17.         String propKey1 = "dir.age";
  18.         String propVal1 = "-1";
  19.         System.out.println("Directory path: '"+dirPath+"'");
  20.         Configuration conf = new Configuration();
  21.         FileSystem fs = FileSystem.get(conf);
  22.         Path hadoopDir = new Path(dirPath);
  23.         if (fs.exists(hadoopDir)){
  24.               FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir);
  25.               int numFilesInDir = files.length;
  26.               propVal0 = Integer.toString(numFilesInDir);
  27.               long timePassed, daysPassedLong;
  28.               int daysPassed;
  29.               String dirName = hadoopDir.getName();
  30.               String[] dirNameArray = dirName.split("-");
  31.               if (dirNameArray.length == 3) {
  32.                   int year = Integer.valueOf(dirNameArray[0]);
  33.                   int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 based
  34.                   int date = Integer.valueOf(dirNameArray[2]);
  35.                   GregorianCalendar dirCreationDate = new GregorianCalendar(year,
  36.                         month, date);
  37.                   timePassed = (new GregorianCalendar()).getTimeInMillis()
  38.                         - dirCreationDate.getTimeInMillis();
  39.                   daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;;
  40.                   propVal1 = Integer.toString(daysPassed);
  41.               }
  42.         }
  43.         String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
  44.         if (oozieProp != null) {
  45.               File propFile = new File(oozieProp);
  46.               Properties props = new Properties();
  47.               props.setProperty(propKey0, propVal0);
  48.               props.setProperty(propKey1, propVal1);
  49.               OutputStream os = new FileOutputStream(propFile);
  50.               props.store(os, "");
  51.               os.close();
  52.         } else
  53.               throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
  54.                         + " System property not defined");
  55.     }
  56. }
复制代码
代码2: 获得目录信息的节点
这个类会获得目录名作为输入的参数,并首先检查该目录是否存在。如果目录不存在,那么存在时间(age)和文件数量都会返回-1,否则,这两个值就会返回给子过程。子过程的下一步是一个switch(决定)声明,它会决定如何处理目录。如果目录不存在(文件数 < 0),或者是当前日期(存在时间 < 1)并且文件数量少于24(文件数 < 24),那么子过程就会直接转换到终止状态。如果所有文件都位于子目录中(文件数 > 23)或者目录是在至少七天前创建的(存在时间 > 6),那么就会有如下操作:
  • 使用现存的Map/reduce程获取数据
  • 目录会备份在数据归档中,然后删除

对action节点的其它配置
获取动作向你展示了另外一些Oozie配置参数,包括:
  • Prepare——如果出现了prepare参数,就意味着在启动作业(job)之前会删除路径列表。这应该专门用于清理目录。删除操作会在fs.default.name文件系统中执行。
  • Configuration——如果出现了configuration元素,它其中就会包含针对Map/Reduce 作业的JobConf属性。它不仅可以用于map/reduce动作, 而且还可以用于启动map/reduce作业的java动作。

如果不是以上两种情况,那么子过程就会发送剩余的邮件,然后退出。邮件是作为另一个java主类实现的(代码3)。
  1. package com.navteq.oozie;
  2. import java.util.Properties;
  3. import javax.mail.Message;
  4. import javax.mail.Session;
  5. import javax.mail.Transport;
  6. import javax.mail.internet.InternetAddress;
  7. import javax.mail.internet.MimeMessage;
  8. public class StandaloneMailer {
  9.     private static String _mServer = "imailchi.navtech.com";
  10.     private static Properties _props = null;
  11.     private StandaloneMailer(){}
  12.     public static void init(String mServer){
  13.         _mServer = mServer;
  14.         _props = new Properties();
  15.         _props.setProperty("mail.smtp.host", _mServer);
  16.     }
  17.     public static void SendMail(String subject, String message, String from, String to) throws Exception {
  18.      // create some properties and get the default Session
  19.      Session session = Session.getDefaultInstance(_props, null);
  20.      // create a message
  21.      Message msg = new MimeMessage(session);
  22.      // set the from and to address
  23.      InternetAddress addressFrom = new InternetAddress(from);
  24.      msg.setFrom(addressFrom);
  25.      String [] recipients = new String[] {to};
  26.      InternetAddress[] addressTo = new InternetAddress[recipients.length];
  27.      for (int i = 0; i < recipients.length; i++){
  28.        addressTo[i] = new InternetAddress(recipients[i]);
  29.      }
  30.      msg.setRecipients(Message.RecipientType.TO, addressTo);
  31.      // Setting the Subject and Content Type
  32.      msg.setSubject(subject);
  33.      msg.setContent(message, "text/plain");
  34.      Transport.send(msg);
  35.     }
  36.     public static void main (String[] args) throws Exception {
  37.         if (args.length ==5){
  38.              init(_mServer);
  39.              StringBuilder subject = new StringBuilder();
  40.              StringBuilder body = new StringBuilder();
  41.              subject.append("Directory ").append(args[2]).append(" contains").append(args[3]).append("
  42. files.");
  43.              body.append("Directory ").append(args[2]).append(" is ").append(args[4]).
  44.              append(" days old and contains only ").append(args[3]).append(" files instead of 24.");
  45.              SendMail(subject.toString(), body.toString(), args[0], args[1]);
  46.         }
  47.         else throw new Exception("Invalid number of parameters provided for email");
  48.     }
  49. }
复制代码
列表3: 发送提醒邮件
这是使用了javax.mail API的简单实现,用于发送邮件。主过程的实现我们已经实现了子过程,然后,对主过程的实现就变得非常简单了(列表4)
  1. <workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'>
  2.     <start to='getDirs2Process' />
  3.     <!-- STEP ONE -->
  4.     <action name='getDirs2Process'>
  5.          <!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
  6.               otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
  7.               otherwise returns age of dir in days -->
  8.          <java>
  9.               <job-tracker>${jobTracker}</job-tracker>
  10.               <name-node>${nameNode}</name-node>
  11.               <main-class>com.navteq.oozie.GenerateLookupDirs</main-class>
  12.               <capture-output />
  13.          </java>
  14.          <ok to="forkSubWorkflows" />
  15.          <error to="fail" />
  16.     </action>
  17.     <fork name="forkSubWorkflows">
  18.        <path start="processDir0"/>
  19.        <path start="processDir1"/>
  20.        <path start="processDir2"/>
  21.        <path start="processDir3"/>
  22.        <path start="processDir4"/>
  23.        <path start="processDir5"/>
  24.        <path start="processDir6"/>
  25.        <path start="processDir7"/>
  26.     </fork>
  27.     <action name="processDir0">
  28.          <sub-workflow>
  29.              <app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
  30.              <configuration>
  31.                 <property>
  32.                     <name>inputDir</name>
  33.                     <value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData
  34. ('getDirs2Process')['dir0']}</value>
  35.                 </property>
  36.                 <property>
  37.                     <name>outputDir</name>
  38.                     <value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData
  39. ('getDirs2Process')['dir0']}</value>
  40.                 </property>
  41.                 <property>
  42.                     <name>jobTracker</name>
  43.                     <value>${jobTracker}</value>
  44.                 </property>
  45.                 <property>
  46.                     <name>nameNode</name>
  47.                     <value>${nameNode}</value>
  48.                 </property>
  49.                 <property>
  50.                     <name>activeDir</name>
  51.                     <value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value>
  52.                 </property>
  53.                 <property>
  54.                     <name>dirName</name>
  55.                     <value>${wf:actionData('getDirs2Process')['dir0']}</value>
  56.                 </property>
  57.             </configuration>
  58.         </sub-workflow>
  59.         <ok to="joining"/>
  60.         <error to="fail"/>
  61.     </action>
  62. ….
  63.      <action name="processDir7">
  64.           <sub-workflow>
  65.              <app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
  66.                  <configuration>
  67.                    <property>
  68.                      <name>inputDir</name>
  69.                      <value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData
  70. ('getDirs2Process')['dir7']}</value>
  71.                    </property>
  72.                    <property>
  73.                      <name>outputDir</name>
  74.                      <value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData
  75. ('getDirs2Process')['dir7']}</value>
  76.                    </property>
  77.                    <property>
  78.                      <name>dirName</name>
  79.                      <value>${wf:actionData('getDirs2Process')['dir7']}</value>
  80.                    </property>
  81.                </configuration>
  82.           </sub-workflow>
  83.           <ok to="joining"/>
  84.           <error to="fail"/>
  85.      </action>
  86.      <join name="joining" to="end"/>
  87.      <kill name="fail">
  88.           <message>Java failed, error
  89.              message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  90.      </kill>
  91.      <end name='end' />
  92. </workflow-app>
复制代码
代码4: 数据获取主过程
这个过程首先会触发java节点,计算需要处理的目录列表(列表5),然后对每个目录执行子过程,从而处理给定的目录。
  1. package com.navteq.oozie;
  2. import java.io.File;
  3. import java.io.FileOutputStream;
  4. import java.io.OutputStream;
  5. import java.util.Calendar;
  6. import java.util.GregorianCalendar;
  7. import java.util.Properties;
  8. public class GenerateLookupDirs {
  9.     public static final long dayMillis = 1000 * 60 * 60 * 24;
  10.     private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
  11.     public static void main(String[] args) throws Exception {
  12.         Calendar curDate = new GregorianCalendar();
  13.         int year, month, date;
  14.         String propKey, propVal;
  15.         String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
  16.         if (oozieProp != null) {
  17.             File propFile = new File(oozieProp);
  18.             Properties props = new Properties();
  19.             for (int i = 0; i<8; ++i)
  20.             {
  21.                  year = curDate.get(Calendar.YEAR);
  22.                  month = curDate.get(Calendar.MONTH) + 1;
  23.                  date = curDate.get(Calendar.DATE);
  24.                  propKey = "dir"+i;
  25.                  propVal = year + "-" +
  26.                          (month < 10 ? "0" + month : month) + "-" +
  27.                          (date < 10 ? "0" + date : date);
  28.                  props.setProperty(propKey, propVal);
  29.                  curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
  30.             }
  31.             OutputStream os = new FileOutputStream(propFile);
  32.             props.store(os, "");
  33.             os.close();
  34.         } else
  35.             throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
  36.                      + " System property not defined");
  37.     }
  38. }
复制代码
代码5: 目录计算程序

相关文章:
Oozie简介

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条