问题导读:
1、如何理解Oozie的工作流程 ?
2、它是如何与Hive、Sqoop等Hadoop协同工作的 ?
Oozie工作流程定义是一个DAG(Directed Acyclical Graphs)图,它由控制流节点(Control Flow Nodes)或动作节点(Action Nodes)组成,各个节点又是通过表征转移的箭线(transitions
arrows)互相连通。对于工作流一般对应存在流程定义语言,例如jBPM是jPDL,大多数都是基于XML定义的,Oozie流程定义语言也是基于XML定义的,称为hPDL(Hadoop Process Definition Language)。
下面,我们详细说明工作流定义相关的内容:
工作流生命周期 在Oozie中,工作流的状态可能存在如下几种: 状态 | 含义说明 | PREP | 一个工作流Job第一次创建将处于PREP状态,表示工作流Job已经定义,但是没有运行。 | RUNNING | 当一个已经被创建的工作流Job开始执行的时候,就处于RUNNING状态。它不会达到结束状态,只能因为出错而结束,或者被挂起。 | SUSPENDED | 一个RUNNING状态的工作流Job会变成SUSPENDED状态,而且它会一直处于该状态,除非这个工作流Job被重新开始执行或者被杀死。 | SUCCEEDED | 当一个RUNNING状态的工作流Job到达了end节点,它就变成了SUCCEEDED最终完成状态。 | KILLED | 当一个工作流Job处于被创建后的状态,或者处于RUNNING、SUSPENDED状态时,被杀死,则工作流Job的状态变为KILLED状态。 | FAILED | 当一个工作流Job不可预期的错误失败而终止,就会变成FAILED状态。 |
上述各种状态存在相应的转移(工作流程因为某些事件,可能从一个状态跳转到另一个状态),其中合法的状态转移有如下几种,如下表所示: 转移前状态 | 转移后状态集合 | 未启动 | PREP | PREP | RUNNING、KILLED | RUNNING | SUSPENDED、SUCCEEDED、KILLED、FAILED | SUSPENDED | RUNNING、KILLED |
明确上述给出的状态转移空间以后,可以根据实际需要更加灵活地来控制工作流Job的运行。
控制流节点(Control Flow Nodes) 工作流程定义中,控制工作流的开始和结束,以及工作流Job的执行路径的节点,它定义了流程的开始(start节点)和结束(end节点或kill节点),同时提供了一种控制流程执行路径的机制(decision决策节点、fork分支节点、join会签节点)。通过上面提到的各种节点,我们大概应该能够知道它们在工作流中起着怎样的作用。下面,我们看一下不同节点的语法格式: start节点
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
-
- ...
-
- <start to="[NODE-NAME]" />
-
- ...
-
- </workflow-app>
-
复制代码
上面start元素的to属性,指向第一个将要执行的工作流节点。 end节点 - <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
-
- ...
-
- <end name="[NODE-NAME]" />
-
- ...
-
- </workflow-app>
复制代码
达到该节点,工作流Job会变成SUCCEEDED状态,表示成功完成。需要注意的是,一个工作流定义必须只能有一个end节点。 kill节点 - <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <kill name="[NODE-NAME]">
- <message>[MESSAGE-TO-LOG]</message>
- </kill>
复制代码
kill元素的name属性,是要杀死的工作流节点的名称,message元素指定了工作流节点被杀死的备注信息。达到该节点,工作流Job会变成状态KILLED。 decision节点 - <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <decision name="[NODE-NAME]">
- <switch>
- <case to="[NODE_NAME]">[PREDICATE]</case>
- ...
- <case to="[NODE_NAME]">[PREDICATE]</case>
- <default to="[NODE_NAME]" />
- </switch>
- </decision>
- ...
- </workflow-app>
复制代码
decision节点通过预定义一组条件,当工作流Job执行到该节点时,会根据其中的条件进行判断选择,满足条件的路径将被执行。decision节点通过switch…case语法来进行路径选择,只要有满足条件的判断,就会执行对应的路径,如果没有可以配置default元素指向的节点。 fork节点和join节点 - <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <fork name="[FORK-NODE-NAME]">
- <path start="[NODE-NAME]" />
- ...
- <path start="[NODE-NAME]" />
- </fork>
- ...
- <join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
- ...
- </workflow-app>
复制代码
for元素下面会有多个path元素,指定了可以并发执行的多个执行路径。fork中多个并发执行路径会在join节点的位置会合,只有所有的路径都到达后,才会继续执行join节点。
动作节点(Action Nodes) 工作流程定义中,能够触发一个计算任务(Computation Task)或者处理任务(Processing Task)执行的节点。所有的动作(
Action)都有一些基本的特性,我先首先来看一下: - 可恢复性
如果一个动作节点执行失败,Oozie提供了一些恢复执行的策略,这个要根据失败的特点来进行:如果是状态转移过程中失败,Oozie会根据指定的重试时间间隔去重新执行;如果不是转移性质的失败,则只能通过手工干预来进行恢复;如果重试恢复执行都没有解决问题,则最终会跳转到error节点。 下面详细介绍Oozie内置支持的动作节点类型,如下所示:
map-reduce动作会在工作流Job中启动一个MapReduce Job任务运行,我们可以详细配置这个MapReduce Job。另外,可以通过map-reduce元素的子元素来配置一些其他的任务,如streaming、pipes、file、archive等等。
下面给出包含这些内容的语法格式说明: - <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="[NODE-NAME]">
- <map-reduce>
- <job-tracker>[JOB-TRACKER]</job-tracker>
- <name-node>[NAME-NODE]</name-node>
- <prepare>
- <delete path="[PATH]" />
- ...
- <mkdir path="[PATH]" />
- ...
- </prepare>
- <streaming>
- <mapper>[MAPPER-PROCESS]</mapper>
- <reducer>[REDUCER-PROCESS]</reducer>
- <record-reader>[RECORD-READER-CLASS]</record-reader>
- <record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
- ...
- <env>[NAME=VALUE]</env>
- ..
- </streaming>
- <!-- Either streaming or pipes can be specified for an action, not both -->
- <pipes>
- <map>[MAPPER]</map>
- <reduce
- [REUCER]
- </reducer>
- <inputformat>[INPUTFORMAT]</inputformat>
- <partitioner>[PARTITIONER]</partitioner>
- <writer>[OUTPUTFORMAT]</writer>
- <program>[EXECUTABLE]</program>
- </pipes>
- <job-xml>[JOB-XML-FILE]</job-xml>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- <file>[FILE-PATH]</file>
- ...
- <archive>[FILE-PATH]</archive>
- ...
- </map-reduce>
- <ok to="[NODE-NAME]" />
- <error to="[NODE-NAME]" />
- </action>
- ..
- </workflow-app>
复制代码
- Hive动作
Hive主要是基于类似SQL的HQL语言的,它能够方便地操作HDFS中数据,实现对海量数据的分析工作。HIve动作的语法格式如下所示: - <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
- ...
- <action name="[NODE-NAME]">
- <hive xmlns="uri:oozie:hive-action:0.2">
- <job-tracker>[JOB-TRACKER]</job-tracker>
- <name-node>[NAME-NODE]</name-node>
- <prepare>
- <delete path="[PATH]" />
- ...
- <mkdir path="[PATH]" />
- ...
- </prepare>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- <script>[HIVE-SCRIPT]</script>
- <param>[PARAM-VALUE]</param>
- ...
- </hive>
- <ok to="[NODE-NAME]" />
- <error to="[NODE-NAME]" />
- </action>
- ...
- </workflow-app>
复制代码
Sqoop动作 Sqoop是一个能够在Hadoop和结构化存储系统之间进行数据的导入导出的工具,Sqoop动作的语法格式如下: - <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
- ...
- <action name="[NODE-NAME]">
- <sqoop xmlns="uri:oozie:sqoop-action:0.2">
- <job-tracker>[JOB-TRACKER]</job-tracker>
- <name-node>[NAME-NODE]</name-node>
- <prepare>
- <delete path="[PATH]" />
- ...
- <mkdir path="[PATH]" />
- ...
- </prepare>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- <command>[SQOOP-COMMAND]</command>
- <file>[FILE-PATH]</file>
- ...
- </sqoop>
- <ok to="[NODE-NAME]" />
- <error to="[NODE-NAME]" />
- </action>
- ...
- </workflow-app>
复制代码
Pig动作 pig动作可以启动运行pig脚本实现的Job,在工作流定义中配置的语法格式说明如下:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
- ...
- <action name="[NODE-NAME]">
- <pig>
- <job-tracker>[JOB-TRACKER]</job-tracker>
- <name-node>[NAME-NODE]</name-node>
- <prepare>
- <delete path="[PATH]" />
- ...
- <mkdir path="[PATH]" />
- ...
- </prepare>
- <job-xml>[JOB-XML-FILE]</job-xml>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </prperty>
- ...
- </configuration>
- <script>[PIG-SCRIPT]</script>
- <param>[PARAM-VALUE]</param>
- ...
- <param>[PARAM-VALUE]</param>
- <argument>[ARGUMENT-VALUE]</argument>
- ...
- <argument>[ARGUMENT-VALUE]</argument>
- <file>[FILE-PATH]</file>
- ...
- <archive>[FILE-PATH]</archive>
- ...
- </pig>
- <ok to="[NODE-NAME]" />
- <error to="[NODE-NAME]" />
- </action>
- ...
- </workflow-app>
复制代码
Fs动作 Fs动作主要是基于HDFS的一些基本操作,如删除路径、创建路径、移动文件、设置文件全乡等等。 语法格式:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="[NODE-NAME]">
- <fs>
- <delete path='[PATH]' />
- ...
- <mkdir path='[PATH]' />
- ...
- <move source='[SOURCE-PATH]' target='[TARGET-PATH]' />
- ...
- <chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
- ...
- <touchz path='[PATH]' />
- </fs>
- <ok to="[NODE-NAME]" />
- <error to="[NODE-NAME]" />
- </action>
- </workflow-app>
复制代码
Sub-workflow动作 Sub-workflow动作是一个子流程的动作,主流程执行过程中,遇到子流程节点执行时,会一直等待子流程节点执行完成后,才能继续跳转到下一个要执行的节点。 语法格式:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
- ...
- <action name="[NODE-NAME]">
- <sub-workflow>
- <app-path>[WF-APPLICATION-PATH]</app-path>
- <propagate-configuration />
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- </sub-workflow>
- <ok to="[NODE-NAME]" />
- <error to="[NODE-NAME]" />
- </action>
- ...
- </workflow-app>
复制代码
Shell动作Shell动作可以执行Shell命令,并通过配置命令所需要的参数。它的语法格式:
- <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.4">
- ...
- <action name="[NODE-NAME]">
- <shell xmlns="uri:oozie:shell-action:0.2">
- <job-tracker>[JOB-TRACKER]</job-tracker>
- <name-node>[NAME-NODE]</name-node>
- <prepare>
- <delete path="[PATH]" />
- ...
- <mkdir path="[PATH]" />
- ...
- </prepare>
- <configuration>
- <property>
- <name>[PROPERTY-NAME]</name>
- <value>[PROPERTY-VALUE]</value>
- </property>
- ...
- </configuration>
- <exec>[SHELL-COMMAND]</exec>
- <argument>[ARGUMENT-VALUE]</argument>
- <capture-output />
- </shell>
- <ok to="[NODE-NAME]" />
- <error to="[NODE-NAME]" />
- </action>
- ...
- </wrkflow-app>
复制代码
表达式语言函数(Expression Language Functions) Oozie除了可以使用Properties文件定义一些属性之外,还提供了一些内置的EL函数,能够方便地实现流程的定义和控制,下面我们分组列表说明:
常量名称 | 含义说明 | KB | 1KB,类型为long。 | MB | 1MB,类型为long。 | GB | 1GB,类型为long。 | TB | 1TB,类型为long。 | PB | 1PB,类型为long。 |
- 基本EL函数
函数声明 | 含义说明 | String firstNotNull(String value1, String value2) | 返回value1和value2中不为null的值,若都为null则返回null | String concat(String s1, String s2) | 连接字符串s1和s2,如果s1或s2为null值,则使用空字符串替换null值 | String replaceAll(String src, String regex, String replacement) | 满足正则表达式regex,则使用replace替换src字符串中匹配上的部分 | String appendAll(String src, String append, String delimeter) | 将src中的分隔符delimeter替换为append | String trim(String s) | 去掉字符串两边的空格,如果s为null则返回空字符串 | String urlEncode(String s) | 对字符串s使用URL UTF-8进行编码 | String timestamp() | 返回UTC当前时间字符串,格式为YYYY-MM-DDThh:mm:ss.sZ | String toJsonStr(Map) | Oozie 3.3支持,将Map转转成一个XML编码的JSON表示形式 | String toPropertiesStr(Map) | Oozie 3.3支持,将Map转转成一个XML编码的Properties表示形式 | String toConfigurationStr(Map) | Oozie 3.3支持,将Map转转成一个XML编码的Configuration表示形式
|
函数声明 | 含义说明 | String wf:id() | 返回当前的工作流Job的ID | String wf:name() | 返回当前的工作流Job的名称 | String wf:appPath() | 返回当前的工作流Job的应用路径 | String wf:conf(String name) | 返回当前的工作流Job的配置属性 | String wf:user() | 返回启动当前的工作流Job的用户名称 | String wf:group() | 返回当前的工作流Job的的用户组名称 | String wf:callback(String stateVar) | 返回当前的工作流Job的当前动作节点的回调URL | String wf:transition(String node) | 返回转移节点,该节点是一个工作流动作节点触发的 | String wf:lastErrorNode() | 返回最后一个以ERROR状态退出的节点名称 | String wf:errorCode(String node) | 返回指定动作节点执行的错误码,如果没有则返回空 | String wf:errorMessage(String message) | 返回指定动作节点执行的错误信息,如果没有则返回空 | int wf:run() | 返回当前工作流Job的运行编号,正常的话返回0,如果执行过re-run则返回非0 | Map wf:actionData(String node) | 返回当前动作节点完成时输出的信息 | int wf:actionExternalId(String node) | 返回动作节点的外部ID | int wf:actionTrackerUri(String node) | 返回跟踪一个动作节点的URI | int wf:actionExternalStatus(String node) | 返回一个动作节点的状态
|
常量名称 | 含义说明 | RECORDS | Hadoop Record计数器组名称 | MAP_IN | Hadoop Mapper输入Record计数器名称 | MAP_OUT | Hadoop Mapper输出Record计数器名称 | REDUCE_IN | Hadoop Reducer输入Record计数器名称 | REDUCE_OUT | HadoopReducer输出Record计数器名称 | GROUPS | 1024 * Hadoop Mapper/Reducer输入Record组计数器名称 |
函数声明 | 含义说明 | Map < String, Map > hadoop:counters(String node) | 返回工作流Job某个动作节点的统计计数器信息,例如,MR的动作统计集合内容:
{
“ACTION_TYPE”: “MAP_REDUCE”,
“org.apache.hadoop.mapred.JobInProgress$Counter”: {
“TOTAL_LAUNCHED_REDUCES”: 1,
“TOTAL_LAUNCHED_MAPS”: 1,
“DATA_LOCAL_MAPS”: 1
},
“FileSystemCounters”: {
“FILE_BYTES_READ”: 1746,
“HDFS_BYTES_READ”: 1409,
“FILE_BYTES_WRITTEN”: 3524,
“HDFS_BYTES_WRITTEN”: 1547
},
“org.apache.hadoop.mapred.Task$Counter”: {
“REDUCE_INPUT_GROUPS”: 33,
“COMBINE_OUTPUT_RECORDS”: 0,
“MAP_INPUT_RECORDS”: 33,
“REDUCE_SHUFFLE_BYTES”: 0,
“REDUCE_OUTPUT_RECORDS”: 33,
“SPILLED_RECORDS”: 66,
“MAP_OUTPUT_BYTES”: 1674,
“MAP_INPUT_BYTES”: 1409,
“MAP_OUTPUT_RECORDS”: 33,
“COMBINE_INPUT_RECORDS”: 0,
“REDUCE_INPUT_RECORDS”: 33
}
}
则${hadoop:counters(“mr-node”)["FileSystemCounters"]["FILE_BYTES_READ"]},得到名称为mr-node的动作节点组的FILE_BYTES_READ计数器的值 |
选项 | 含义说明 | boolean fs:exists(String path) | path是否存在 | boolean fs:isDir(String path) | path是否是目录 | long fs:dirSize(String path) | 如果path不是目录或者path是一个文件,则返回-1,否则返回该path下所有文件的字节数 | long fs:fileSize(String path) | 如果path是目录,则返回-1,否则返回该path下所有文件的字节数 | long fs:blockSize(String path) | 如果path不是文件或者不存在则返回-1,否则返回文件的块大小字节数 |
|