分享

HDAOOP SIMPLIZE TOOLKIT hadoop mapreduce简化开发包

jonenine1976 发表于 2017-1-29 11:23:00 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 6610
https://github.com/jonenine/HST

虽然大数据的发展已经将近10个年头了,hadoop技术仍然没有过时,特别是一些低成本,入门级的小项目,使用hadoop还是蛮不错的。而且,也不是每一个公司都有能力招聘和培养自己的spark人才。

         我本人对于hadoop mapreduce是有一些意见的。hadoop mapreduce技术对于开发人员的友好度不高,程序难写,调试困难,对于复杂的业务逻辑远没有spark得心应手。

2016年的春节前接到一个任务,要在一个没有spark的平台实现电力系统的一些统计分析算法,可选的技术只有hadoop mapreduce。受了这个刺激之后产生了一些奇思妙想,然后做了一些试验,并最终形成HST---hadoop simplize toolkit,还真是无心载柳柳成荫啊。



HST基本优点如下:

屏蔽了hadoop数据类型,取消了driver,将mapper和reducer转化为transformer和joiner,业务逻辑更接近sql。相当程度的减少了代码量,极大的降低了大数据编程的门槛,让基层程序员通过简单的学习即可掌握大数据的开发。

克服了hadoop mapreduce数据源单一的情况,比如在一个job内,input可以同时读文件和来自不同集群的hbase。

远程日志系统,让mapper和reducer的日志集中到driver的控制台,极大减轻了并行多进程程序的调试难度。

克服了hadoop mapreduce编写业务逻辑时,不容易区分数据来自哪个数据源的困难。接近了spark(或者sql)的水平。

天生的多线程执行,即在mapper和reducer端都默认使用多线程来执行业务逻辑。

对于多次迭代的任务,相连的两个任务可以建立关联,下一个任务直接引用上一个任务的结果,使多次迭代任务的代码结构变得清晰优美。



以下会逐条说明



基本概念的小变化:

   Source类代替了hadoop Input体系(format,split和reader)

   Transformer代替了mapper

   Joiner代替了Reducer

   去掉了饱受诟病的Driver,改为内置的实现,现在完全不用操心了。



1.      基本上,屏蔽了hadoop的数据类型,使用纯java类型

在原生的hadoop mapreduce开发中,使用org.apache.hadoop.io包下的各种hadoop数据类型,比如hadoop的Text类型,算法的编写中一些转换非常不方便。而在HST中一律使用java基本类型,完全屏蔽了hadoop类型体系。

比如在hbase作为source(Input)的时候,再也不用直接使用ImmutableBytesWritable和Result了,HST为你做了自动的转换。

现在的mapper(改名叫Transformer了)风格是这样的

public static class TransformerForHBase0 extends HBaseTransformer<Long>


现在map方法叫flatmap,看到没,已经帮你自动转成了string和map

public void flatMap(String key, Map<String, String> row,

           Collector<Long> collector)



可阅读xs.hadoop.iterated.IteratedUtil类中关于类型自动转换的部分

2.      克服了hadoop mapreduce数据源单一的情况。
比如在一个job内,数据源同时读文件和hbase,这在原生的hadoop mapreduce是不可能做到的



以前访问hbase,需要使用org.apache.hadoop.hbase.client.Scan和TableMapReduceUtil,现在完全改为与spark相似的方式。

现在的风格是这样的:

Configuration conf0 = HBaseConfiguration.create();

        conf0.set("hbase.zookeeper.property.clientPort", "2181");

        conf0.set("hbase.zookeeper.quorum", "172.16.144.132,172.16.144.134,172.16.144.136");

        conf0.set(TableInputFormat.INPUT_TABLE,"APPLICATION_JOBS");

        conf0.set(TableInputFormat.SCAN_COLUMN_FAMILY,"cf");

        conf0.set(TableInputFormat.SCAN_CACHEBLOCKS,"false");

conf0.set(TableInputFormat.SCAN_BATCHSIZE,"20000");

...其他hbase的Configuration,可以来自不同集群。



IteratedJob<Long> iJob = scheduler.createJob("testJob")

                .from(Source.hBase(conf0), TransformerForHBase0.class)

              .from(Source.hBase(conf1), TransformerForHBase1.class)

               .from(Source.textFile("file:///home/cdh/0.txt"),Transformer0.class)

        .join(JoinerHBase.class)



Hadoop中的input,现在完全由source类来代替。通过内置的机制转化为inputformat,inputsplit和reader。在HST的框架下,其实可以很容易的写出诸如Source.dbms(),Source.kafka()以及Source.redis()方法。想想吧,在一个hadoop job中,你终于可以将任意数据源,例如来自不同集群的HBASE和来自数据库的source进行join了,这是多么happy的事情啊!



3.      远程日志系统。让mapper和reducer的日志集中在driver进行显示,极大减轻了了并行多进程程序的调试难度



各位都体验过,job fail后到控制台页面,甚至ssh到计算节点去查看日志的痛苦了吧。对,hadoop原生的开发,调试很痛苦的呢!

现在好了,有远程日志系统,可以在调试时将mapper和reducer的日志集中在driver上,错误和各种counter也会自动发送到driver上,并实时显示在你的控制台上。如果在eclipse中调试程序,就可以实现点击console中的错误,直接跳到错误代码行的功能喽!

Ps:有人可能会问,如何在集群外使用eclipse调试一个job,却可以以集群方式运行呢?这里不再赘述了,网上有很多答案的哦



4.      克服了hadoop mapreduce在join上,区分数据来自哪个数据源的困难,接近spark(或者sql)的水平

在上面给出示例中,大家都看到了,现在的mapper可以绑定input喽!,也就是每个input都有自己独立的mapper。正因为此,现在的input和mapper改名叫Source和Transformer。

那么,大家又要问了,在mapper中,我已经可以轻松根据不同的数据输入写出不同的mapper了,那reducer中怎么办,spark和sql都是很容易实现的哦?比如看人家sql

Select a.id,b.name from A  a,B  b where a.id = b.id

多么轻松愉悦啊!

在原生hadoop mapreduce中,在reducer中找出哪个数据对应来自哪个input可是一个令人抓狂的问题呢!

现在这个问题已经被轻松解决喽!看下面这个joiner,对应原生的reducer

public static class Joiner0 extends Joiner<Long, String, String>


Reduce方法改名叫join方法,是不是更贴近sql的概念呢?

        public void join(Long key,RowHandler handler,Collector collector) throws Exception{

                List<Object> row  = handler.getSingleFieldRows(0);//对应索引为0的source

         List<Object> row2 = handler.getSingleFieldRows(1);//对应第二个定义的source

注意上面两句,可以按照数据源定义的索引来取出来自不同数据源join后的数据了,以后有时间可能会改成按照别名来取出,大家看源码的时候,会发现别名这个部分的接口都写好了,要不你来帮助实现了吧。



5.      天生的多线程执行,即在mapper和reducer端都默认使用多线程来执行业务逻辑。

    看看源码吧,HST框架是并发调用flatMap和join方法的,同时又不能改变系统调用reduce方法的顺序(否则hadoop的辛苦排序可就白瞎了),这可不是一件容易的事呢!

看到这里,有的同学说了。你这个HST好是好,但你搞的自动转换类型这个机制可能会把性能拉下来的。这个吗,不得不承认,可能是会有一点影响。但在生产环境做的比对可以证明,影响太小了,基本忽略不计。

笔者在生产环境做了做了多次试验,mapper改成多线程后性能并未有提高,特别是对一些业务简单的job,增加Transformer中的并发级别效率可能还会下降。

很多同学喜欢在mapper中做所谓“mapper端的join”。这种方式,相信在HST中通过提高mapper的并发级别后会有更好的表现。

Reducer中的性能相对原生提升的空间还是蛮大的。大部分的mapreduce项目,都是mapper简单而reducer复杂,HST采用并发执行join的方式对提升reducer性能是超好的。



6.      对于多次迭代的任务,相连的两个任务可以建立关联,在流程上的下一个job直接引用上一个job的结果,使多次迭代任务的代码结构变得清晰优美

虽然在最后才提到这一点,但这却是我一开始想要写HST原因。多次迭代的任务太麻烦了,上一个任务要写在hdfs做存储,下一个任务再取出使用,麻烦不麻烦。如果都由程序自动完成,岂不美哉!

在上一个任务里format一下

IteratedJob<Long> iJob = scheduler.createJob("testJob")

...//各种source定义

.format("f1","f2")

在第二个任务中,直接引用

IteratedJob<Long> stage2Job = scheduler.createJob("stage2Job")

.fromPrevious(iJob, Transformer2_0.class);

                  //Transformer2_0.class



       public static class Transformer2_0 extends PreviousResultTransformer<Long>

       ...

           public void flatMap(Long inputKey, String[] inputValues,Collector<Long> collector) {

           String f1 = getFiledValue(inputValues, "f1");

           String f2 = getFiledValue(inputValues, "f2");

    看到没,就是这么简单。

在最开始的计划中,我还设计了使用redis队列来缓冲前面job的结果,供后面的job作为输入。这样本来必须严格串行的job可以在一定程度上并发。另外还设计了子任务的并发调度,这都留给以后去实现吧。



7. 便捷的自定义参数传递。

有时候,在业务中需要作一些“开关变量”,在运行时动态传入不同的值以实现不同的业务逻辑。这个问题HST框架其实也为你考虑到了。

Driver中的自定义参数,source中的自定义参数都会以内置的方式传到transformer或joiner中去,方便程序员书写业务。

   查看transformer或joiner的源码就会发现:

getSourceParam(name)和getDriverParam(pIndex)方法,在计算节点轻松的得到在driver和source中设置的各层次级别的自定义参数,爽吧!



8. 其他工具

HST提供的方便还不止以上这些,比如在工具类中还提供了两行数据(map类型)直接join的方法。这些都留给你自己去发现并实践吧!
https://github.com/jonenine/HST

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条