pig2 发表于 2014-2-20 00:33:07

MapReduce在压力测试中的应用

本帖最后由 pig2 于 2014-2-20 01:02 编辑

我们可以带着下面问题来阅读:
1.下面是如何利用MapReduce来做压力测试的?
2.MapReduce所起到的作用是什么?
思考:
你能否想出MapReduce 的其他应用
阅读本文首先明白一些工具是干什么的:
JMeter是Apache组织开发的基于Java的压力测试工具。用于对软件做压力测试。
LoadRunner,是一种预测系统行为和性能的负载测试工具。通过以模拟上千万用户实施并发负载及实时性能监测的方式来确认和查找问题,LoadRunner能够对整个企业架构进行测试。通过使用 LoadRunner,企业能最大限度地缩短测试时间,优化性能和加速应用系统的发布周期。 LoadRunner是一种适用于各种体系架构的自动负载测试工具,它能预测系统行为并评估系统性能。


-------------------------------------------------------------------------------------------------------------------------------------------

众所周知,MapReduce编程框架(以下简称MR)一直是大并发运算以及海量数据读写应用设计的利器。在MR编程体系下,一个job通常会把输入的数据集切分为若干块,由map task以完全并行的方式处理消化这些数据块。框架会对map的输出先进行排序,然后把结果作为输入提交给reduce任务。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。典型的MR程序有如下重要模块结构构成:


No.模块描述
1InputFormat定义map输入数据的格式
2OutputFormat定义reduce输出数据的格式
3OutputKey定义map输出数据中key的类型
4OutputValue定义map输出数据中value的类型
5InputSplit定义对输入数据进行切分的方式,以分配给Map task
6configure传入参数信息给map和reduce task
7Mapper定义map task
8Reducer定义reduce task
9Collector收集map/reduce task的输出数据
10Reporter计数器,用于汇报job执行过程一些特定事件的发生次数


类似hive这样的应用,拿到用户一句简单sql查询(假设无需二次执行的简单sql)后,将hdfs上的海量数据进行切分,然后每个map task分别对自己负责的那部分数据执行相同的sql查询,最后将各自获得的结果汇总输出给用户,这便可以保证在海量数据中以较快的速度获得查询结果。
简单介绍完MR编程框架后,我们再来谈谈常规压力测试的特点和需求。

------------------------------------------------------------------------------------------------------------------------------------------------

以LoadRunner和JMeter为例,这两种工具都可以对web应用进行大并发访问,模拟线上的高并发压力测试,并且也都相应的提供了多机联合产生负载这样的方式进一步模拟现实情况增大被测对象的压力。这是为了解决“如果一台测试机器模拟的虚拟用户数过多,他本身性能的下降也会直接影响到测试效果”这个问题。分析LR和JMeter的多机联合产生负载这种测试方式,我们不难发现类似MR框架的一些特点,即测试分作如下几步(以LR为例):

1.      设置测试机,即在多台用于测试的机器上安装Load Generator
2.      设置测试任务,即各种configure
3.      同时调度测试任务,通过agent执行对web应用的访问
4.      Controller负责统一调度运行场景并收集测试信息和执行结果

无论是LR还是JMeter都是优秀的压测工具,但是总有一些非常规的压力测试场景无法通过LR或JMeter方便的实现,例如对分布式系统做数据读写压力测试,被测目标并非一个单独的节点,而是由很多节点组成的,这样的压力测试场景意味着多机联合对单一节点产生的负载被分担到了很多个节点上。LR和JMeter针对这样的场景往往在设置上就很复杂。
此外,对很多特定的压测目标,测试人员在设计了专属测试工具之后,往往也需要有一个类似上述LR测试步骤的过程,即工具分发、调度执行、收集结果和过程信息这样一个测试执行框架,如果自己去实现这一套框架,耗费的人月数都是相当可观的,且复用程度有限。于是在云梯项目中我通过自己的实践,想到了将MR编程框架体系与压力测试需求相结合。
从事例说起

先从简单实现类似LR多机联合负载这样一个压测场景展开。被测目标是这样的:一个web应用服务,用于收集分布式系统的跨机房流量信息,后端采用hbase作为存储数据库,接口为单一节点的http listen端口,需要模拟真实跨机房场景,利用较少的机器数量(约真实系统的50分之一)模拟线上系统的并发度。
测试工具代码是发送http request部分,出于安全考虑,重要部分略过:while(System.currentTimeMillis() - start <= runtime){
          StringBuffer sb = new StringBuffer();
          List<String> data = new ArrayList<String>();
          HttpURLConnection httpurlconnection = null;
          try{
            URL url = new URL(this.reportAd);
            httpurlconnection = (HttpURLConnection) url.openConnection();
            httpurlconnection.setConnectTimeout(5000);
            httpurlconnection.setReadTimeout(5000);
            httpurlconnection.setDoOutput(true);
            httpurlconnection.setRequestMethod("POST");
            httpurlconnection.setRequestProperty("Content-type", "text/plain");
         
            for(long i=0; i<this.recordnum; i++){
                。。。。。。
                s = Math.abs(R.nextLong())%102400000+1024;
                staticWriteSize += s;
                reporter.incrCounter("TestTool", "Write Size", s);
                staticWriteTime += (endTime - startTime);
                reporter.incrCounter("TestTool", "Write Time", endTime - startTime);
                。。。。。。
            }else{
                。。。。。。
                reporter.incrCounter("TestTool", "Read Size", s);
                staticReadTime += (endTime - startTime);
                reporter.incrCounter("TestTool", "Read Time", endTime - startTime);
                。。。。。。
            }
            Pair p = value.get(R.nextInt(value.size()));
            。。。。。。
            staticCount++;
            }
            reporter.incrCounter("TestTool", "Record num", this.recordnum);
            reporter.setStatus("Record: "+staticCount+"("+staticWrite+"w, "+staticRead+"r), Write Size: "
                +staticWriteSize+", Write Time: "+staticWriteTime
                +", Read Size: "+staticReadSize+", Read Time: "+staticReadTime);
            httpurlconnection.getOutputStream().write(sb.toString().getBytes());
            httpurlconnection.getOutputStream().flush();
            httpurlconnection.getOutputStream().close();
            int code = httpurlconnection.getResponseCode();
            if(code != 200) {
            LOG.warn("send data to master server failed, code=" + code);
            }
            reporter.incrCounter("TestTool", "Http Post num", 1);
            map.staticPost.addAndGet(1);
            Thread.sleep(interval);
          } catch (Exception e) {
            map.staticPost.addAndGet(1);
            reporter.incrCounter("TestTool", e.getClass().toString(), 1);
            LOG.warn(e.getMessage(), e);
          } finally {
            if (httpurlconnection != null) {
            httpurlconnection.disconnect();
            }
          }有了工具代码之后,我们通过实现Mapper来封装该工具,因此上述代码中我使用了“org.apache.hadoop.mapred.Reporter”的方法“incrCounter(Stringarg0, String arg1, long arg2)”来对测试中的重要过程数据进行计数,该方法会将所有map/reduce task中汇报的arg0|arg1定义的值arg2进行相加,输出到MR的jobtracker页面上,通过观察作业执行页面可以实时获取这些测试执行过程信息。此外,我还调度了“setStatus(String arg0)”方法,该方法可以实时更新当前所处task的页面信息,提供更详细的单个task执行情况信息。jobdetails.jsp页面观察结果如下所示:



CounterMapReduceTotal
TestToolHttp Post numxxxx0xxxx
Record numxxxxyyzzzz
Read Sizexx0xx


jobtasks.jsp页面观察结果如下所示:


TaskCompleteStatusStart TimeFinish TimeErrorsCounters
task_xx_m_1自定义信息126-Oct-2013 22:41:41

10
task_xx_m_2自定义信息226-Oct-2013 22:41:41

10


更多的执行过程信息,我们则通过“org.apache.commons.logging.Log”来收集,通过tasklog页面可以查阅到这些详细的日志信息。作为map task的输入,我通过在hdfs上生成的一堆随机数据来实现,InputSplit类读取了hdfs上作为模拟真实数据的输入后,将其根据map数切分成n份(n=自定义的map数量),并将其分发给对应的map task,map task拿到自己那份数据后,立即启动多个线程执行上述测试工具代码:
public void map(LongWritable key, List<Pair> value,
      OutputCollector<Text, LongWritable> context, Reporter reporter)
      throws IOException {
      。。。。。。
      for(int i=0; i < this.threadnum; i++){
      SCNThread t = new SCNThread(value, reporter, start, this);
      t.start();
      this.alivethread.addAndGet(1);
      }
      。。。。。。各task自行同步线程启动数量,当所有线程都启动之后,输出收集器开始运作:long now = System.currentTimeMillis();
      if (now > this.start + this.interval) {
      this.start += this.interval;
      context.collect(new LongWritable(this.start), new LongWritable(
            this.writeBlocks));
      }
。。。。。。可以看到key是时间戳,value是我们想收集的数值,收集器收集到的数据将进一步提供给Reducer来分析,这里有一个压力测试的关键点,即最大并发开始时间点和结束时间点的判断。观察Reducer类的reduce方法:public void reduce(Text key, Iterator<LongWritable> value,
      OutputCollector<Text, Text> context, Reporter reporter)由于所有map都以相同的时间戳作为key,因此同一时刻迭代器value的size代表了有多少个map已经达到了最大并发度,我们判断这个size,当其与我们预期的map总数一致时,则可以将该时间戳作为最大并发压力的开始时间点,当size开始小于预期map总数时,则代表最大并发压力的结束时间点,测试结果分析时可以掐取这一段数据作为测试结果,免去开始准备阶段和快结束阶段压力变小对测试结果的干扰。
更进一步我们可以在hdfs上设计一个标志位,当一个maptask执行完毕之后,通过该标志位通知到其他所有map task,以便快速结束当前的测试。
测试结果被reducer分析汇总后输出到hdfs上,最终我们只需要查看一下这个输出文件的内容就可以得到我们需要的测试结果了。
其实我们不难发现,这种测试框架与DDoS攻击很类似,当手握数千台机器之后,基本上就具备了指哪毁哪的能力,MR框架体系蕴藏的能量的确是非常巨大的。

流程图
没有流程图,上述文字描述终归不够直观,因此详细流程请看下图所示:





多语言测试工具的支持
对于java类测试工具,我们可以应用该流程图所示方案进行大并发度的压力测试,对于非java语言类的测试工具,我们一方面可以自行撰写其他编程语言的进程调度和收集器,另一方面也可以使用hadoop streaming这个编程工具来实现。Hadoop Streaming是 Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或脚本文件作为 Mapper和 Reducer。这样一来我们用python或shell编写的测试工具也可以通过streaming简单的调度起来执行。













Hadoop技术组

qq543539043 发表于 2015-6-6 09:38:03

博主讲的真的很好 还会关注你的

hery 发表于 2015-7-2 18:59:54

不错。。。。。

about-hadoop 发表于 2016-9-6 14:24:58

不错不错!

电猿 发表于 2017-7-18 15:08:16

将的很不错,谢谢分享
页: [1]
查看完整版本: MapReduce在压力测试中的应用