阅读导读:
1.如何应用Oozie?
2.提醒邮件的程序如何编写?
3.Oozie各个配置参数的含义?
我们在此描述的工作流会实现汽车GPS探测数据的获取过程。我们每个小时都会以文件的形式把探测数据传递到指定的HDFS目录中,其中包含有这个小时之内的所有探测数据。探测数据的获取是每天针对一天内所有的24个文件完成的。如果文件的数量是24,那么获取过程就会启动。否则:
当天什么都不做对前一天——最多到7天,发送剩下的内容到探测数据提供程序如果目录的存在时间已达到7天,那么就获取所有可用的探测数据文件。
过程的总体实现请见图1
图1: 过程图
在此,主流程(数据获取流程)首先会为今天以及之前的六天计算出目录的名称,然后启动(fork)七个目录的子过程(子流程)。待所有子过程的状态都变成终止之后,join步骤就会把控制权交给end状态。子过程启动时,首先会获得关于目录的信息——它的日期以及文件数量。基于这条信息,它会决定是获取数据还是把数据归档,或者发送剩下的邮件,或者不做任何工作。Directory子过程实现以下代码负责实现的是directory子过程(代码1)。
- <workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'>
-
- <start to='getDirInfo' />
-
- <!-- STEP ONE -->
- <action name='getDirInfo'>
- <!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
- otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
- otherwise returns age of dir in days -->
- <java>
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <main-class>com.navteq.oozie.GetDirInfo</main-class>
- <arg>${inputDir}</arg>
- <capture-output />
- </java>
- <ok to="makeIngestDecision" />
- <error to="fail" />
- </action>
-
- <!-- STEP TWO -->
- <decision name="makeIngestDecision">
- <switch>
- <!-- empty or doesn't exist -->
- <case to="end">
- ${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||
- (wf:actionData('getDirInfo')['dir.age'] lt 1 and
- wf:actionData('getDirInfo')['dir.num-files'] lt 24)}
- </case>
- <!-- # of files >= 24 -->
- <case to="ingest">
- ${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||
- wf:actionData('getDirInfo')['dir.age'] gt 6}
- </case>
- <default to="sendEmail"/>
- </switch>
- </decision>
-
- <!--EMAIL-->
- <action name="sendEmail">
- <java>
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <main-class>com.navteq.oozie.StandaloneMailer</main-class>
- <arg>probedata2@navteq.com</arg>
- <arg>gregory.titievsky@navteq.com</arg>
- <arg>${inputDir}</arg>
- <arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg>
- <arg>${wf:actionData('getDirInfo')['dir.age']}</arg>
- </java>
- <ok to="end" />
- <error to="fail" />
- </action>
-
- <!--INGESTION -->
- <action name="ingest">
- <java>
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <prepare>
- <delete path="${outputDir}" />
- </prepare>
- <configuration>
- <property>
- <name>mapred.reduce.tasks</name>
- <value>300</value>
- </property>
- </configuration>
- <main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>
- <arg>-conf</arg>
- <arg>action.xml</arg>
- <arg>${inputDir}</arg>
- <arg>${outputDir}</arg>
- </java>
- <ok to=" archive-data" />
- <error to="ingest-fail" />
- </action>
-
- <!—Archive Data -->
- <action name="archive-data">
- <fs>
- <move source='${inputDir}' target='/probe/backup/${dirName}' />
- <delete path = '${inputDir}' />
- </fs>
- <ok to="end" />
- <error to="ingest-fail" />
- </action>
-
- <kill name="ingest-fail">
- <message>Ingestion failed, error
- message[${wf:errorMessage(wf:lastErrorNode())}]</message>
- </kill>
-
- <kill name="fail">
- <message>Java failed, error
- message[${wf:errorMessage(wf:lastErrorNode())}]</message>
- </kill>
- <end name='end' />
- </workflow-app>
复制代码
代码1: Directory子过程
这个子过程的start节点会触发自定义的java节点,这个节点会获得目录信息(代码2)。
- package com.navteq.oozie;
-
- import java.io.File;
- import java.io.FileOutputStream;
- import java.io.OutputStream;
- import java.util.GregorianCalendar;
- import java.util.Properties;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
-
- public class GetDirInfo {
- private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
-
- public static void main(String[] args) throws Exception {
- String dirPath = args[0];
- String propKey0 = "dir.num-files";
- String propVal0 = "-1";
- String propKey1 = "dir.age";
- String propVal1 = "-1";
- System.out.println("Directory path: '"+dirPath+"'");
-
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- Path hadoopDir = new Path(dirPath);
- if (fs.exists(hadoopDir)){
- FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir);
- int numFilesInDir = files.length;
- propVal0 = Integer.toString(numFilesInDir);
- long timePassed, daysPassedLong;
- int daysPassed;
- String dirName = hadoopDir.getName();
- String[] dirNameArray = dirName.split("-");
- if (dirNameArray.length == 3) {
- int year = Integer.valueOf(dirNameArray[0]);
- int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 based
- int date = Integer.valueOf(dirNameArray[2]);
- GregorianCalendar dirCreationDate = new GregorianCalendar(year,
- month, date);
- timePassed = (new GregorianCalendar()).getTimeInMillis()
- - dirCreationDate.getTimeInMillis();
- daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;;
- propVal1 = Integer.toString(daysPassed);
- }
- }
- String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
- if (oozieProp != null) {
- File propFile = new File(oozieProp);
- Properties props = new Properties();
- props.setProperty(propKey0, propVal0);
- props.setProperty(propKey1, propVal1);
- OutputStream os = new FileOutputStream(propFile);
- props.store(os, "");
- os.close();
- } else
- throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
- + " System property not defined");
- }
- }
复制代码
代码2: 获得目录信息的节点
这个类会获得目录名作为输入的参数,并首先检查该目录是否存在。如果目录不存在,那么存在时间(age)和文件数量都会返回-1,否则,这两个值就会返回给子过程。子过程的下一步是一个switch(决定)声明,它会决定如何处理目录。如果目录不存在(文件数 < 0),或者是当前日期(存在时间 < 1)并且文件数量少于24(文件数 < 24),那么子过程就会直接转换到终止状态。如果所有文件都位于子目录中(文件数 > 23)或者目录是在至少七天前创建的(存在时间 > 6),那么就会有如下操作:
- 使用现存的Map/reduce程获取数据
- 目录会备份在数据归档中,然后删除
对action节点的其它配置
获取动作向你展示了另外一些Oozie配置参数,包括:
- Prepare——如果出现了prepare参数,就意味着在启动作业(job)之前会删除路径列表。这应该专门用于清理目录。删除操作会在fs.default.name文件系统中执行。
- Configuration——如果出现了configuration元素,它其中就会包含针对Map/Reduce 作业的JobConf属性。它不仅可以用于map/reduce动作, 而且还可以用于启动map/reduce作业的java动作。
如果不是以上两种情况,那么子过程就会发送剩余的邮件,然后退出。邮件是作为另一个java主类实现的(代码3)。
- package com.navteq.oozie;
-
- import java.util.Properties;
- import javax.mail.Message;
- import javax.mail.Session;
- import javax.mail.Transport;
- import javax.mail.internet.InternetAddress;
- import javax.mail.internet.MimeMessage;
-
- public class StandaloneMailer {
-
- private static String _mServer = "imailchi.navtech.com";
- private static Properties _props = null;
-
- private StandaloneMailer(){}
-
- public static void init(String mServer){
-
- _mServer = mServer;
- _props = new Properties();
- _props.setProperty("mail.smtp.host", _mServer);
- }
-
- public static void SendMail(String subject, String message, String from, String to) throws Exception {
-
- // create some properties and get the default Session
- Session session = Session.getDefaultInstance(_props, null);
-
- // create a message
- Message msg = new MimeMessage(session);
-
- // set the from and to address
- InternetAddress addressFrom = new InternetAddress(from);
- msg.setFrom(addressFrom);
-
- String [] recipients = new String[] {to};
- InternetAddress[] addressTo = new InternetAddress[recipients.length];
- for (int i = 0; i < recipients.length; i++){
- addressTo[i] = new InternetAddress(recipients[i]);
- }
- msg.setRecipients(Message.RecipientType.TO, addressTo);
-
- // Setting the Subject and Content Type
- msg.setSubject(subject);
- msg.setContent(message, "text/plain");
- Transport.send(msg);
- }
-
- public static void main (String[] args) throws Exception {
- if (args.length ==5){
- init(_mServer);
- StringBuilder subject = new StringBuilder();
- StringBuilder body = new StringBuilder();
- subject.append("Directory ").append(args[2]).append(" contains").append(args[3]).append("
-
- files.");
- body.append("Directory ").append(args[2]).append(" is ").append(args[4]).
- append(" days old and contains only ").append(args[3]).append(" files instead of 24.");
- SendMail(subject.toString(), body.toString(), args[0], args[1]);
- }
- else throw new Exception("Invalid number of parameters provided for email");
- }
- }
复制代码
列表3: 发送提醒邮件
这是使用了javax.mail API的简单实现,用于发送邮件。主过程的实现我们已经实现了子过程,然后,对主过程的实现就变得非常简单了(列表4)
复制代码 代码4: 数据获取主过程
这个过程首先会触发java节点,计算需要处理的目录列表(列表5),然后对每个目录执行子过程,从而处理给定的目录。
- package com.navteq.oozie;
-
- import java.io.File;
- import java.io.FileOutputStream;
- import java.io.OutputStream;
- import java.util.Calendar;
- import java.util.GregorianCalendar;
- import java.util.Properties;
-
- public class GenerateLookupDirs {
-
- public static final long dayMillis = 1000 * 60 * 60 * 24;
- private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
-
- public static void main(String[] args) throws Exception {
- Calendar curDate = new GregorianCalendar();
- int year, month, date;
- String propKey, propVal;
-
- String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
- if (oozieProp != null) {
- File propFile = new File(oozieProp);
- Properties props = new Properties();
-
- for (int i = 0; i<8; ++i)
- {
- year = curDate.get(Calendar.YEAR);
- month = curDate.get(Calendar.MONTH) + 1;
- date = curDate.get(Calendar.DATE);
- propKey = "dir"+i;
- propVal = year + "-" +
- (month < 10 ? "0" + month : month) + "-" +
- (date < 10 ? "0" + date : date);
- props.setProperty(propKey, propVal);
- curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
- }
- OutputStream os = new FileOutputStream(propFile);
- props.store(os, "");
- os.close();
- } else
- throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
- + " System property not defined");
- }
- }
复制代码
代码5: 目录计算程序
相关文章:
Oozie简介
|