问题导读:
1.Oozie是什么?
2.如何定义工作流?
3.如何指定工作流名称?
4.如何kill工作流?
5.工作流路径如何定义?
下面来看下Oozie的运行示意图
下面是一个Workflow的例子
- <workflow-app name='wordcount-wf'
- <workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.1">
- <start to='wordcount'/>
- <action name='wordcount'>
- <map-reduce>
- <job-tracker>${jobTracker}</job-tracker>
- <name-node>${nameNode}</name-node>
- <configuration>
- <property>
- <name>mapred.mapper.class</name>
- <value>org.myorg.WordCount.Map</value>
- </property>
- <property>
-
- <name>mapred.reducer.class</name>
- <value>org.myorg.WordCount.Reduce</value>
- </property>
- <property>
- <name>mapred.input.dir</name>
- <value>${inputDir}</value>
- </property>
- <property>
- <name>mapred.output.dir</name>
- <value>${outputDir}</value>
- </property>
- </configuration>
- </map-reduce>
- <ok to='end'/>
- <error to='end'/>
- </action>
- <kill name='kill'>
- <message>Something went wrong: ${wf:errorCode('wordcount')}</message>
- </kill/>
- <end name='end'/>
- </workflow-app>
复制代码
——这里大家先看看就可以了,后面会给大家详细的讲解
下面给大家讲下如何定义工作流
首先我们要定义一个工作流的启动控制节点
启始节点的入口工作流程的一个路径。
当工作流启动的时候,它会自动的转换为指定的节点。
语法:
- <workflow-app name=”[WF-DEF-NAME]” xmlns=”uri:oozie:workflow:0.1”>
- ……
- <start to=”[NODE-NAME]”> ---这里to指的是第一个工作流的执行名称
- …..
- </workflow-app>
复制代码
下面给个实例
- <workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <start to="firstHadoopJob"/>
- ...
- </ workflow-app >
复制代码
讲完了启动控制节点,那现在也得讲讲最终控制节点了
最终控制节点为工作流程工作时,表示工作流任务完成。
工作流任务完成后最终会成功完成(SUCCEEDED)
语法
- <workflow-app name="[WF-DEF-NAME]"
- xmlns="uri:oozie:workflow:0.1">
- ...
- <end name="[NODE-NAME]"/>
- ...
- </workflow>
复制代码
name指的是这个工作流的作业名称
下面是个例子
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <start name=”[NODE-NAME]”>
- <end name="[NODE-NAME]"/>
- ...
- </workflow>
复制代码
当然如果workflow出现错误呢,所以我们在Oozie中也可以对控制节点进行Kill
--杀节点允许工作流任务杀死自己
语法:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <kill name="[NODE-NAME]">
- <message>[MESSAGE-TO-LOG]</message>
- </kill>
- ...
- </workflow-app>
复制代码
——<message>中间表示的是kill掉工作流时候所 返回的值
下面是个例子
- <workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <kill name="killBecauseNoInput">
- <message>Inpu t unavailable</message>
- </kill>
- ...
- </workflow-app>
复制代码
——<message>中间表示的是kill掉工作流时候所 返回的值
当然有了这些我们还远远不够
下面给大家讲的是
如何给工作流进行选择执行路径可以执行
语法
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <decision name="[NODE-NAME]">
- <switch>
- <case to="[NODE_NAME]">[PREDICATE]</case>
- ...
- <case to="[NODE_NAME]">[PREDICATE]</case>
- <default to="[NODE_NAME]"/>
- </switch>
- </decision>
- ...
- </workflow-app>
复制代码
下面是个例子
- <workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <decision name="mydecision">
- <switch>
- <case to="reconsolidatejob">
- ${fs:fileSize(secondjobOutputDir) gt 10 * GB}
- </case>
- <case to="rexpandjob">
- ${fs:filSize(secondjobOutputDir) lt 100 * MB}
- </case>
- <case to="recomputejob">
- ${ hadoop:counters('secondjob')[RECORDS][REDUCE_OUT] lt 1000000 }
- </case>
- <default to="end"/>
- </switch>
- </decision>
- ...
- </workflow-app>
复制代码
——假如你有学习过java的话应该会根明白这个意思,其实和java的switch语法差不多,这里就不给大家多讲了
下面是一个fork和 join控制节点
可以执行路径为多个并发的执行路径
另外注意fork和join必须是成对使用
语法:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <fork name="[FORK-NODE-NAME]">
- <path start="[NODE-NAME]" />
- ...
- <path start="[NODE-NAME]" />
- </fork>
- ...
- <join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
- ...
- </workflow-app>
复制代码
下面是个实例
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <fork name="forking">
- <path start="firstparalleljob"/>
- <path start="secondparalleljob"/>
- </fork>
- <action name="firstparallejob">
- <map-reduce>
- <job-tracker>foo:8021</job-tracker>
- <name-node>bar:8020</name-node>
- <job-xml>job1.xml</job-xml>
- </map-reduce>
- <ok to="joining"/>
- <error to="kill"/>
- </action>
- <action name="secondparalleljob">
- <map-reduce>
- <job-tracker>foo:8021</job-tracker>
- <name-node>bar:8020</name-node>
- <job-xml>job2.xml</job-xml>
- </map-reduce>
- <ok to="joining"/>
- <error to="kill"/>
- </action>
- <join name="joining" to="nextaction"/>
- ...
- </workflow-app>
复制代码
下面是一些Oozie工作流的动作节点
下面是Oozie中的关于动作节点的一些总的配置
操作计算/处理总是远程
操作是异步的
行动2转换,= OK =和=误差=
恢复行动
下面给是讲解Oozie中是如何给mapreduce进行自动化调度
--注意 在进行hadoop reduce自动话的时候在工作流中一定不能出现mapred.job.tracker和fs.default.name属性和内嵌配置
语法:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="[NODE-NAME]">
- <map-reduce>
- <job-tracker>[JOB-TRACKER]</job-tracker>
- <name-node>[NAME-NODE]</name-node>
- <prepare>
- <delete path="[PATH]"/>
- ...
- <mkdir path="[PATH]"/>
- ...
- </prepare>
- <streaming>
- <mapper>[MAPPER-PROCESS]</mapper>
- <reducer>[REDUCER-PROCESS]</reducer>
- <record-reader>[RECORD-READER-CLASS]</record-reader>
- <record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
- ...
- <env>[NAME=VALUE]</env>
- ...
- </streaming>
- <!-- Either streaming or pipes can be specified for an action, not both -->
- <pipes>
- <map>[MAPPER]</map>
- <reduce>[REDUCER]</reducer>
- <inputformat>[INPUTFORMAT]</inputformat>
- <partitioner>[PARTITIONER]</partitioner>
- <writer>[OUTPUTFORMAT]</writer>
- <program>[EXECUTABLE]</program>
- </pipes>
- <job-xml>[JOB-XML-FILE]</job-xml>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- <file>[FILE-PATH]</file>
- ...
- <archive>[FILE-PATH]</archive>
- ...
- </map-reduce> <ok to="[NODE-NAME]"/>
- <error to="[NODE-NAME]"/>
- </action>
- ...
- </workflow-app>
复制代码
下面是mapreduce中Oozie对HDFS文件的一些文件的操作
- <workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="myfirstHadoopJob">
- <map-reduce>
- <job-tracker>foo:8021</job-tracker>
- <name-node>bar:8020</name-node>
- <prepare>
- <delete path="hdfs://foo:8020/usr/tucu/output-data"/>
- </prepare>
- <job-xml>/myfirstjob.xml</job-xml>
- <configuration>
- <property>
- <name>mapred.input.dir</name>
- <value>/usr/tucu/input-data</value>
- </property>
- <property>
- <name>mapred.output.dir</name>
- <value>/usr/tucu/input-data</value>
- </property>
- <property>
- <name>mapred.reduce.tasks</name>
- <value>${firstJobReducers}</value>
- </property>
- <property>
- <name>oozie.action.external.stats.write</name>
- <value>true</value>
- </property>
- </configuration>
- </map-reduce>
- <ok to="myNextAction"/>
- <error to="errorCleanup"/>
- </action>
- ...
- </workflow-app>
复制代码
下面是个流实例
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="firstjob">
- <map-reduce>
- <job-tracker>foo:8021</job-tracker>
- <name-node>bar:8020</name-node>
- <prepare>
- <delete path="${output}"/>
- </prepare>
- <streaming>
- <mapper>/bin/bash testarchive/bin/mapper.sh testfile</mapper>
- <reducer>/bin/bash testarchive/bin/reducer.sh</reducer>
- </streaming>
- <configuration>
- <property>
- <name>mapred.input.dir</name>
- <value>${input}</value>
- </property>
- <property>
- <name>mapred.output.dir</name>
- <value>${output}</value>
- </property>
- <property>
- <name>stream.num.map.output.key.fields</name>
- <value>3</value>
- </property>
- </configuration>
- <file>/users/blabla/testfile.sh#testfile</file>
- <archive>/users/blabla/testarchive.jar#testarchive</archive>
- </map-reduce>
- <ok to="end"/>
- <error to="kill"/>
- </action>
- ...
- </workflow-app>
复制代码
下面是个管道实例
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="firstjob">
- <map-reduce>
- <job-tracker>foo:8021</job-tracker>
- <name-node>bar:8020</name-node>
- <prepare>
- <delete path="${output}"/>
- </prepare>
- <pipes>
- <program>bin/wordcount-simple#wordcount-simple</program>
- </pipes>
- <configuration>
- <property>
- <name>mapred.input.dir</name>
- <value>${input}</value>
- </property>
- <property>
- <name>mapred.output.dir</name>
- <value>${output}</value>
- </property>
- </configuration>
- <archive>/users/blabla/testarchive.jar#testarchive</archive>
- </map-reduce>
- <ok to="end"/>
- <error to="kill"/>
- </action>
- ...
- </workflow-app>
复制代码
通过前面的一些例子我想大家也对Oozie有一定的了解了,下面讲的是Oozie如何调用pig
注意我们在使用Oozie进行pig自动话一定不能使用Hadoop的mapred.job.tracker和fs.default.name属性一定不能出现在工作xml和内嵌的配置
语法:
Oozie 框架在0.2 pig上进行行动
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
- ...
- <action name="[NODE-NAME]">
- <pig>
- <job-tracker>[JOB-TRACKER]</job-tracker>
- <name-node>[NAME-NODE]</name-node>
- <prepare>
- <delete path="[PATH]"/>
- ...
- <mkdir path="[PATH]"/>
- ...
- </prepare>
- <job-xml>[JOB-XML-FILE]</job-xml>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- <script>[PIG-SCRIPT]</script>
- <param>[PARAM-VALUE]</param>
- ...
- <param>[PARAM-VALUE]</param>
- <argument>[ARGUMENT-VALUE]</argument>
- ...
- <argument>[ARGUMENT-VALUE]</argument>
- <file>[FILE-PATH]</file>
- ...
- <archive>[FILE-PATH]</archive>
- ...
- </pig>
- <ok to="[NODE-NAME]"/>
- <error to="[NODE-NAME]"/>
- </action>
- ...
- </workflow-app>
复制代码
Oozie 框架在0.1 pig上进行行动
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="[NODE-NAME]">
- <pig>
- <job-tracker>[JOB-TRACKER]</job-tracker>
- <name-node>[NAME-NODE]</name-node>
- <prepare>
- <delete path="[PATH]"/>
- ...
- <mkdir path="[PATH]"/>
- ...
- </prepare>
- <job-xml>[JOB-XML-FILE]</job-xml>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- <script>[PIG-SCRIPT]</script>
- <param>[PARAM-VALUE]</param>
- ...
- <param>[PARAM-VALUE]</param>
- <file>[FILE-PATH]</file>
- ...
- <archive>[FILE-PATH]</archive>
- ...
- </pig>
- <ok to="[NODE-NAME]"/>
- <error to="[NODE-NAME]"/>
- </action>
- ...
- </workflow-app>
复制代码
当然我们后面还要使用Oozie别的方式去实现pig
比如
Oozie 0.2模式
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.2">
- ...
- <action name="myfirstpigjob">
- <pig>
- <job-tracker>foo:8021</job-tracker>
- <name-node>bar:8020</name-node>
- <prepare>
- <delete path="${jobOutput}"/>
- </prepare>
- <configuration>
- <property>
- <name>mapred.compress.map.output</name>
- <value>true</value>
- </property>
- <property>
- <name>oozie.action.external.stats.write</name>
- <value>true</value>
- </property>
- </configuration>
- <script>/mypigscript.pig</script>
- <argument>-param</argument>
- <argument>INPUT=${inputDir}</argument>
- <argument>-param</argument>
- <argument>OUTPUT=${outputDir}/pig-output3</argument>
- </pig>
- <ok to="myotherjob"/>
- <error to="errorcleanup"/>
- </action>
- ...
- </workflow-app>
复制代码
Oozie 0.1模式
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="myfirstpigjob">
- <pig>
- <job-tracker>foo:8021</job-tracker>
- <name-node>bar:8020</name-node>
- <prepare>
- <delete path="${jobOutput}"/>
- </prepare>
- <configuration>
- <property>
- <name>mapred.compress.map.output</name>
- <value>true</value>
- </property>
- </configuration>
- <script>/mypigscript.pig</script>
- <param>InputDir=/home/tucu/input-data</param>
- <param>OutputDir=${jobOutput}</param>
- </pig>
- <ok to="myotherjob"/>
- <error to="errorcleanup"/>
- </action>
- ...
- </workflow-app>
复制代码
下面是Oozie使用的Fs进行自动话调度
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
- ...
- <action name="[NODE-NAME]">
- <fs>
- <delete path='[PATH]'/>
- ...
- <mkdir path='[PATH]'/>
- ...
- <move source='[SOURCE-PATH]' target='[TARGET-PATH]'/>
- ...
- <chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
- ...
- <touchz path='[PATH]' />
- ...
- <chgrp path='[PATH]' group='[GROUP]' dir-files='false' />
- </fs>
- <ok to="[NODE-NAME]"/>
- <error to="[NODE-NAME]"/>
- </action>
- ...
- </workflow-app>
复制代码
下面是个实例
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.5">
- ...
- <action name="hdfscommands">
- <fs>
- <delete path='hdfs://foo:8020/usr/tucu/temp-data'/>
- <mkdir path='archives/${wf:id()}'/>
- <move source='${jobInput}' target='archives/${wf:id()}/processed-input'/>
- <chmod path='${jobOutput}' permissions='-rwxrw-rw-' dir-files='true'><recursive/></chmod>
- <chgrp path='${jobOutput}' group='testgroup' dir-files='true'><recursive/></chgrp>
- </fs>
- <ok to="myotherjob"/>
- <error to="errorcleanup"/>
- </action>
- ...
- </workflow-app>
复制代码
下面这个是0.4的框架对Oozie进行调度
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.4">
- ...
- <action name="hdfscommands">
- <fs>
- <name-node>hdfs://foo:8020</name-node>
- <job-xml>fs-info.xml</job-xml>
- <configuration>
- <property>
- <name>some.property</name>
- <value>some.value</value>
- </property>
- </configuration>
- <delete path='/usr/tucu/temp-data'/>
- </fs>
- <ok to="myotherjob"/>
- <error to="errorcleanup"/>
- </action>
- ...
- </workflow-app>
复制代码
下面是Oozie对ssh远程的基本操作
语法:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="[NODE-NAME]">
- <ssh>
- <host>[USER]@[HOST]</host>
- <command>[SHELL]</command>
- <args>[ARGUMENTS]</args>
- ...
- <capture-output/>
- </ssh>
- <ok to="[NODE-NAME]"/>
- <error to="[NODE-NAME]"/>
- </action>
- ...
- </workflow-app>
复制代码
下面是个实例
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="myssjob">
- <ssh>
- <host>foo@bar.com<host>
- <command>uploaddata</command>
- <args>jdbc:derby://bar.com:1527/myDB</args>
- <args>hdfs://foobar.com:8020/usr/tucu/myData</args>
- </ssh>
- <ok to="myotherjob"/>
- <error to="errorcleanup"/>
- </action>
- ...
- </workflow-app>
复制代码
下面讲的是一个子流程
语法:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="[NODE-NAME]">
- <sub-workflow>
- <app-path>[WF-APPLICATION-PATH]</app-path>
- <propagate-configuration/>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- </sub-workflow>
- <ok to="[NODE-NAME]"/>
- <error to="[NODE-NAME]"/>
- </action>
- ...
- </workflow-app
复制代码
下面是个实例
- <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="a">
- <sub-workflow>
- <app-path>child-wf</app-path>
- <configuration>
- <property>
- <name>input.dir</name>
- <value>${wf:id()}/second-mr-output</value>
- </property>
- </configuration>
- </sub-workflow>
- <ok to="end"/>
- <error to="kill"/>
- </action>
- ...
- </workflow-app>
复制代码
|