如图:在SpoutEmitter报了类不能转换的错误2015-06-02T09:31:35.083+0800 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to jazywoo.storm.TransactionMetadata
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassCastException: [B cannot be cast to jazywoo.storm.TransactionMetadata
at jazywoo.storm.MyTransactionalSpout2$TransactionalSpoutEmitter.emitBatch(MyTransactionalSpout2.java:156) ~[stormjar.jar:na]
at backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:65) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:325) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]
... 6 common frames omitted
2015-06-02T09:31:35.120+0800 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3812$fn__3813.invoke(worker.clj:456) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
spout代码如下:
[mw_shl_code=java,true]package jazywoo.storm;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import redis.clients.jedis.*;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseTransactionalSpout;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MyTransactionalSpout2 extends BaseTransactionalSpout<TransactionMetadata>{
@Override
public ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(
Map conf, TopologyContext context) {
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("========创建协调者========");
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new TransactionalSpoutCoordinator();
}
@Override
public backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata> getEmitter(
Map conf, TopologyContext context) {
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("========创建发射器========");
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new TransactionalSpoutEmitter();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("========定义数据流========");
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
declarer.declare(new Fields("txid", "iVehicleID", "gps_list"));;
}
public static class TransactionalSpoutCoordinator implements ITransactionalSpout.Coordinator<TransactionMetadata>{
Jedis jedis = new Jedis("node01",6379);
TransactionMetadata lastTransactionMetadata;
public TransactionalSpoutCoordinator() {
}
@Override
public void close() {
jedis.disconnect();
}
@Override
public boolean isReady() {
byte[] tmp_data = jedis.lindex("gps".getBytes(), 0);
if(tmp_data == null){
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
return false;
}else{
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("========数据准备完毕========");
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}
}
@Override
public TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {
TransactionMetadata ret;
if(jedis.lindex("gps".getBytes(), 0)!=null){
ret = new TransactionMetadata(false);
}else ret = new TransactionMetadata(true);
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("========数据初始化完毕========");
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return ret;
}
}
public static class TransactionalSpoutEmitter implements ITransactionalSpout.Emitter<TransactionMetadata>{
Jedis jedis = new Jedis("node01",6379);
public TransactionalSpoutEmitter() {
}
@Override
public void cleanupBefore(BigInteger txid) {
}
@Override
public void close() {
jedis.disconnect();
}
@Override
public void emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta,
BatchOutputCollector collector) {
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("========数据是否存在:"+coordinatorMeta.isHaveData()+"========");
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(coordinatorMeta.isHaveData()){
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("emitBatch "+tx.getAttemptId());
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long time1 = System.currentTimeMillis();
byte[] tmp_data = jedis.lpop("gps".getBytes());
String[] dataSources = (String[])byte2Object(tmp_data);
String line = null;
Map<Integer, List<Object[]>> map = new HashMap<Integer, List<Object[]>>();
try {
for (int i=0;i<dataSources.length;i++){
// System.out.println("line -- "+line);
if(i == 0){
continue;
}
line = dataSources;
String[] strArray = line.toString().split(",");
int iVehicleID = Integer.parseInt(strArray[0]);
double iTime = Double.parseDouble(strArray[1]);
double iX = Double.parseDouble(strArray[2]);
double iY = Double.parseDouble(strArray[3]);
double iSpeed = Double.parseDouble(strArray[4]);
double iAngle = Double.parseDouble(strArray[5]);
int iStatus = (int) Double.parseDouble(strArray[6]);
if(!map.containsKey(iVehicleID)){
map.put(iVehicleID, new ArrayList<Object[]>());
}
List<Object[]> list = map.get(iVehicleID);
list.add(new Object[]{iVehicleID, iTime, iX, iY, iSpeed, iAngle, iStatus});
}
} catch (NumberFormatException e) {
e.printStackTrace();
}
long time2 = System.currentTimeMillis();
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("read -- all "+(time2-time1)/1000+"毫秒");
writer.write("\n");
writer.write("vehicle "+map.size());
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Iterator vehicle_iter = map.entrySet().iterator();
int index2 =1;
while (vehicle_iter.hasNext()) {
index2++;
//处理每辆车轨迹
Map.Entry entry_vehicle = (Entry) vehicle_iter.next();
int iVehicleID = (int) entry_vehicle.getKey();
List<Object[]> list = (List<Object[]>) entry_vehicle.getValue();
collector.emit(new Values(tx, iVehicleID, list));
//System.out.println("emit -- "+iVehicleID);
}
long time3 = System.currentTimeMillis();
try {
String path = "/home/allen/workspace/road.txt";
FileOutputStream stream = new FileOutputStream(path, true);
OutputStreamWriter writer = new OutputStreamWriter(stream);
writer.write("emit -- all "+(time3-time2)/1000+"毫秒");
writer.write("\n");
writer.close();
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//反序列化方法
private Object byte2Object(byte[] bytes) {
if (bytes == null || bytes.length == 0)
return null;
try {
ObjectInputStream inputStream;
inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Object obj = inputStream.readObject();
return obj;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
}
}
[/mw_shl_code]
|
|