问题导读
1.本文是如何用spring-hadoop和spring-batch对MR整合的?
2.mvn install:install-file -DgroupId=com.oracle -DartifactID=xx -Dversion=2.1.0 -Dpackaging=jar -Dfile=xx.jar的作用是什么?
参加大会回来,讲师们的一个观点让我很受启发:就是淘宝和百度的Hadoop工程师们,会写10几个步骤的连续的MR作业来做运算,也不会去写低效 hive。在我的实际工作中,最多有四步的MR运算,代码已经比较混乱了,并不易于维护。这两天用spring-hadoop和spring-batch对MR整合,形成工作流,代码逻辑清晰很多,以后维护也方便了
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:hdp="http://www.springframework.org/schema/hadoop"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:batch="http://www.springframework.org/schema/batch"
- xmlns:p="http://www.springframework.org/schema/p"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context.xsd
- http://www.springframework.org/schema/hadoop
- http://www.springframework.org/schema/hadoop/spring-hadoop.xsd
- http://www.springframework.org/schema/batch
- http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">
-
- <hdp:configuration>
- fs.default.name=${fs.default.name}
- </hdp:configuration>
-
- <context:property-placeholder location="classpath:etlHadoop.properties" />
-
- <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />
- <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
- <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref="jobRepository" />
-
- <bean id="etlDriver" class="com.etl.EtlChainDriver">
- </bean>
-
- <bean id="loadJob" class="oracle.hadoop.loader.OraLoader">
- </bean>
-
- <bean id="etlInitStepListener" class="com.etl.listener.EtlInitStepListener"/>
- <bean id="etlJobListener" class="com.etl.listener.EtlJobListener"/>
-
- <batch:job id="etlJob">
- <batch:step id="initjs">
- <hdp:script-tasklet id="script-tasklet">
- <hdp:script language="javascript">
- importPackage(java.util);
- importPackage(java.lang);
-
- name = UUID.randomUUID().toString()
- System.out.println(name)
- </hdp:script>
- </hdp:script-tasklet>
- <batch:next on="COMPLETED" to="etl" />
- <batch:listeners>
- <batch:listener ref="etlInitStepListener"></batch:listener>
- </batch:listeners>
- </batch:step>
-
- <batch:step id="etl">
- <!-- spring batch设置scope为step,表示生成step的时候才创建bean,因为这个时候jobParameters才传过来 -->
- <hdp:tool-tasklet id="tool-tasklet" tool-ref="etlDriver" scope="step">
- <hdp:arg value="#{jobParameters['etlinput']}" />
- <hdp:arg value="#{jobParameters['etloutput']}" />
- </hdp:tool-tasklet>
- </batch:step>
- <batch:listeners>
- <batch:listener ref="etlJobListener"/>
- </batch:listeners>
- </batch:job>
-
- <batch:job id="loadToOracle">
- <batch:step id="loadStep">
- <hdp:tool-tasklet id="jar-tasklet" tool-ref="loadJob" scope="step">
- <!-- properties -->
- mapred.input.dir=#{jobParameters['mapred.input.dir']}
- mapred.output.dir=#{jobParameters['mapred.output.dir']}
- mapreduce.outputformat.class=#{jobParameters['mapreduce.outputformat.class']}
- oracle.hadoop.loader.loaderMapFile=#{jobParameters['oracle.hadoop.loader.loaderMapFile']}
- mapreduce.inputformat.class=#{jobParameters['mapreduce.inputformat.class']}
- mapreduce.outputformat.class=#{jobParameters['mapreduce.outputformat.class']}
- <!-- 这里必须硬编码\u0009(表示制表符),否则运行作业报没有fieldTerminator -->
- oracle.hadoop.loader.input.fieldTerminator=\u0009
- oracle.hadoop.loader.olhcachePath=#{jobParameters['oracle.hadoop.loader.olhcachePath']}
- oracle.hadoop.loader.loadByPartition=#{jobParameters['oracle.hadoop.loader.loadByPartition']}
- oracle.hadoop.loader.connection.url=#{jobParameters['oracle.hadoop.loader.connection.url']}
- oracle.hadoop.loader.connection.user=#{jobParameters['oracle.hadoop.loader.connection.user']}
- oracle.hadoop.loader.connection.password=#{jobParameters['oracle.hadoop.loader.connection.password']}
- </hdp:tool-tasklet>
- </batch:step>
- </batch:job>
- </beans>
复制代码
总的来说是两大步骤,先对原始日志进行ETL,然后用OLH将洗过的数据导入到oracle中,注意ETL中分为两步:
先用脚本做一些初始化工作,我这里随便写了点代码,实际中可以拷贝数据,删除已存在的output目录等等;
第二步是真正的ETL执行。通过定义StepExecutionListener和JobExecutionListener,我们可以在任务完成时进行回调操作,完成一些逻辑处理,在我的例子中就是com.etl.listener.EtlInitStepListener和com.etl.listener.EtlJobListener。所有的JobParameters都在java代码中赋值,除了fieldTerminator,这在实际应用中具有很大的灵活性。
项目采用maven管理,pom如下,
复制代码
注意oracle的东西都不是开源的,所以都不在maven中央仓库中,执行mvn install:install-file -DgroupId=com.oracle -DartifactID=xx -Dversion=2.1.0 -Dpackaging=jar -Dfile=xx.jar装载到本地仓库中,需要装载的jar包有ojdbc6.jar(注意不能是classes.jar),oraloader.jar,orai18n.jar等(即在$OLH_HOME/jlib目录下的jar文件)。
打包及运行:mvn clean package appassembler:assemble
chmod u+x target/appassembler/bin/etl
./target/appassembler/bin/etl
|