本帖最后由 k9009147217 于 2016-10-27 11:27 编辑
这个是多线程创建部分:
final int maxRowKeySize = 100;
System.out.println(i+"i"+"="+thread.getId());
int loopSize = rsList.size() % maxRowKeySize == 0 ? rsList.size()
/ maxRowKeySize : rsList.size() / maxRowKeySize + 1;
System.out.println(i+"i2"+"="+thread.getId()+"="+loopSize);
ExecutorService executors = Executors.newFixedThreadPool(loopSize);
CountDownLatch countDownLatch = new CountDownLatch(loopSize);
for (int loop = 0; loop < loopSize; loop++) {
int end = (loop + 1) * maxRowKeySize > rsList.size() ? rsList
.size() : (loop + 1) * maxRowKeySize;
executors.execute(new GetHbaseData(rsList.subList(loop * maxRowKeySize,end),
countDownLatch,col));
}
System.out.println("loopSize="+loopSize);
countDownLatch.await();
executors.shutdown();
}
这个是每个线程获取对应的value:
static class GetHbaseData implements Runnable {
private CountDownLatch countDownLatch;
private List<String> list;
private String name;
public GetHbaseData(List<String> strList,
CountDownLatch countDownLatch,String Name) {
this.list = strList;
this.countDownLatch = countDownLatch;
this.name = Name;
}
@Override
public void run() {
try {
ArrayList<Get> gets = new ArrayList<Get>();
for(int i=0;i<list.size();i++)
{
if(!list.get(i).isEmpty())
{
Get get = new Get(Bytes.toBytes(list.get(i)));
get.addFamily(Bytes.toBytes("cf1"));
get.addColumn(Bytes.toBytes("cf1"),
Bytes.toBytes(name));
gets.add(get);
}
}
String rs = " ";
Result[] results = connection.getTable(TableName.valueOf("HData"))
.get(gets);
System.out.println(i+"i2");
for(int i=0;i<results.length;i++)
{
String str ="NULL";
String str1 ="NULL";
Result res = results;
Cell[] cells = res.rawCells();
if(cells.length > 0)
str = new String(CellUtil.cloneValue(cells[0]));
str1 = new String(str.getBytes("gbk"), "utf-8");
//System.out.println(str1);
rs += list.get(i)+":"+str1+"&";
// rsList.add(str1);
}使用打印定位到实在get(gets)这个地方读取数据很慢
|