热度 1
多线程下,不能进行简单的pv累加了,两种解决思路:
(1)使用shuffle grouping,每个excutor计算的pv*excutor个数,
一个Executer默认一个task,如果设置Task数大于1,公式应该是:pv(单线程结果) * Task 数 ,
同一个Executer下task的线程ID相同,taskId不同
(2)bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
参考代码:
多线程下,不能进行简单的pv累加了,两种解决思路:
(1)使用shuffle grouping,每个excutor计算的pv*excutor个数,
一个Executer默认一个task,如果设置Task数大于1,公式应该是:pv(单线程结果) * Task 数 ,
同一个Executer下task的线程ID相同,taskId不同
(2)bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
参考代码:
public class Main {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MySpout(), 1);
builder.setBolt("bolt", new PVBolt(), 4).shuffleGrouping("spout");//4个excutor
builder.setBolt("sumBolt", new PVSumBolt(),1).shuffleGrouping("bolt");//单线程统计sum
...
}
}
public class PVBolt implements IRichBolt{
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
String logString = null;
String session_id = null;
long Pv = 0;
@Override
public void execute(Tuple input) {
logString = input.getString(0);
session_id = logString.split("\t")[1];
if (session_id != null) {
Pv ++ ;
}
collector.emit(new Values(Thread.currentThread().getId(),Pv));//多线程下的每个excutor的pv统计
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("threadId","pv"));
}
...
}
public class PVSumBolt implements IRichBolt{
private static final long serialVersionUID = 1L;
Map<Long, Long> counts = new HashMap<Long, Long>();
@Override
public void execute(Tuple input) {
long threadID = input.getLong(0) ;
long pv = input.getLong(1);
counts.put(threadID, pv);
long word_sum = 0;
//获取总数,遍历counts 的values,进行sum
Iterator<Long> i = counts.values().iterator() ;
while(i.hasNext())
{
word_sum += i.next();
}
}
}
参考:cloudy资料