分享

Storm:事务性Topology运行时Spout报错

如图:在SpoutEmitter报了类不能转换的错误2015-06-02T09:31:35.083+0800 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to jazywoo.storm.TransactionMetadata
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassCastException: [B cannot be cast to jazywoo.storm.TransactionMetadata
        at jazywoo.storm.MyTransactionalSpout2$TransactionalSpoutEmitter.emitBatch(MyTransactionalSpout2.java:156) ~[stormjar.jar:na]
        at backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:65) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:325) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]
        ... 6 common frames omitted
2015-06-02T09:31:35.120+0800 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__3812$fn__3813.invoke(worker.clj:456) [storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
)$[U]W110LXRQ1]N0{3(8(S.png



spout代码如下:
[mw_shl_code=java,true]package jazywoo.storm;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import redis.clients.jedis.*;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseTransactionalSpout;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class MyTransactionalSpout2 extends BaseTransactionalSpout<TransactionMetadata>{
        @Override
        public ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(
                        Map conf, TopologyContext context) {
                try {
                        String path = "/home/allen/workspace/road.txt";
                        FileOutputStream stream = new FileOutputStream(path, true);
                        OutputStreamWriter writer = new OutputStreamWriter(stream);
                        writer.write("========创建协调者========");
                        writer.write("\n");
                        writer.close();
                        stream.close();
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
                return new TransactionalSpoutCoordinator();
        }

        @Override
        public backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata> getEmitter(
                        Map conf, TopologyContext context) {
                try {
                        String path = "/home/allen/workspace/road.txt";
                        FileOutputStream stream = new FileOutputStream(path, true);
                        OutputStreamWriter writer = new OutputStreamWriter(stream);
                        writer.write("========创建发射器========");
                        writer.write("\n");
                        writer.close();
                        stream.close();
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
                return new TransactionalSpoutEmitter();
        }
       
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                try {
                        String path = "/home/allen/workspace/road.txt";
                        FileOutputStream stream = new FileOutputStream(path, true);
                        OutputStreamWriter writer = new OutputStreamWriter(stream);
                        writer.write("========定义数据流========");
                        writer.write("\n");
                        writer.close();
                        stream.close();
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
                declarer.declare(new Fields("txid", "iVehicleID", "gps_list"));;
        }
       
       
        public static class TransactionalSpoutCoordinator implements ITransactionalSpout.Coordinator<TransactionMetadata>{
                Jedis jedis = new Jedis("node01",6379);
                TransactionMetadata lastTransactionMetadata;
                public TransactionalSpoutCoordinator() {
                }


                @Override
                public void close() {
                        jedis.disconnect();
                }
               

                @Override
                public boolean isReady() {
                        byte[] tmp_data = jedis.lindex("gps".getBytes(), 0);
                        if(tmp_data == null){
//                                try {
//                                        Thread.sleep(1000);
//                                } catch (InterruptedException e) {
//                                        e.printStackTrace();
//                                }
                                return false;
                        }else{
                                try {
                                        String path = "/home/allen/workspace/road.txt";
                                        FileOutputStream stream = new FileOutputStream(path, true);
                                        OutputStreamWriter writer = new OutputStreamWriter(stream);
                                        writer.write("========数据准备完毕========");
                                        writer.write("\n");
                                        writer.close();
                                        stream.close();
                                } catch (IOException e) {
                                        // TODO Auto-generated catch block
                                        e.printStackTrace();
                                }
                                return true;
                        }
                       
                }


                @Override
                public TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {
                        TransactionMetadata ret;
                        if(jedis.lindex("gps".getBytes(), 0)!=null){
                                ret = new TransactionMetadata(false);
                        }else ret = new TransactionMetadata(true);
                        try {
                                String path = "/home/allen/workspace/road.txt";
                                FileOutputStream stream = new FileOutputStream(path, true);
                                OutputStreamWriter writer = new OutputStreamWriter(stream);
                                writer.write("========数据初始化完毕========");
                                writer.write("\n");
                                writer.close();
                                stream.close();
                               
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                        return ret;
                }
               
               
               
        }
       
        public static class TransactionalSpoutEmitter  implements ITransactionalSpout.Emitter<TransactionMetadata>{
                Jedis jedis = new Jedis("node01",6379);
               
                public TransactionalSpoutEmitter() {
                }

                @Override
                public void cleanupBefore(BigInteger txid) {
                       
                }

                @Override
                public void close() {
                        jedis.disconnect();
                }

                @Override
                public void emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta,
                                BatchOutputCollector collector) {
                        try {
                                String path = "/home/allen/workspace/road.txt";
                                FileOutputStream stream = new FileOutputStream(path, true);
                                OutputStreamWriter writer = new OutputStreamWriter(stream);
                                writer.write("========数据是否存在:"+coordinatorMeta.isHaveData()+"========");
                                writer.write("\n");
                                writer.close();
                                stream.close();
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                        if(coordinatorMeta.isHaveData()){
                        try {
                                String path = "/home/allen/workspace/road.txt";
                                FileOutputStream stream = new FileOutputStream(path, true);
                                OutputStreamWriter writer = new OutputStreamWriter(stream);
                                writer.write("emitBatch "+tx.getAttemptId());
                                writer.write("\n");
                                writer.close();
                                stream.close();
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                        long time1 = System.currentTimeMillis();
                        byte[] tmp_data = jedis.lpop("gps".getBytes());
                        String[] dataSources = (String[])byte2Object(tmp_data);
                        String line = null;
                        Map<Integer, List<Object[]>> map = new HashMap<Integer, List<Object[]>>();
                        try {
                                for (int i=0;i<dataSources.length;i++){
//                                        System.out.println("line -- "+line);
                                        if(i == 0){
                                                continue;
                                        }
                                        line = dataSources;
                                        String[] strArray = line.toString().split(",");
                                        int iVehicleID = Integer.parseInt(strArray[0]);
                                        double iTime = Double.parseDouble(strArray[1]);
                                        double iX = Double.parseDouble(strArray[2]);
                                        double iY = Double.parseDouble(strArray[3]);
                                        double iSpeed = Double.parseDouble(strArray[4]);
                                        double iAngle = Double.parseDouble(strArray[5]);
                                        int iStatus = (int) Double.parseDouble(strArray[6]);
                                        if(!map.containsKey(iVehicleID)){
                                                map.put(iVehicleID, new ArrayList<Object[]>());
                                        }
                                        List<Object[]> list = map.get(iVehicleID);
                                        list.add(new Object[]{iVehicleID, iTime, iX, iY, iSpeed, iAngle, iStatus});
                                }
                        } catch (NumberFormatException e) {
                                e.printStackTrace();
                        }
                        long time2 = System.currentTimeMillis();
                        try {
                                String path = "/home/allen/workspace/road.txt";
                                FileOutputStream stream = new FileOutputStream(path, true);
                                OutputStreamWriter writer = new OutputStreamWriter(stream);
                                writer.write("read -- all "+(time2-time1)/1000+"毫秒");
                                writer.write("\n");
                                writer.write("vehicle "+map.size());
                                writer.write("\n");
                                writer.close();
                                stream.close();
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                        Iterator vehicle_iter = map.entrySet().iterator();
                        int index2 =1;
            while (vehicle_iter.hasNext()) {
                    index2++;
                    //处理每辆车轨迹
                    Map.Entry entry_vehicle = (Entry) vehicle_iter.next();
                    int iVehicleID = (int) entry_vehicle.getKey();
                    List<Object[]> list = (List<Object[]>) entry_vehicle.getValue();
                    collector.emit(new Values(tx, iVehicleID, list));
                    //System.out.println("emit -- "+iVehicleID);
            }        
            long time3 = System.currentTimeMillis();
            try {
                                String path = "/home/allen/workspace/road.txt";
                                FileOutputStream stream = new FileOutputStream(path, true);
                                OutputStreamWriter writer = new OutputStreamWriter(stream);
                                writer.write("emit -- all "+(time3-time2)/1000+"毫秒");
                                writer.write("\n");
                                writer.close();
                                stream.close();
                        } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
                        }
                }
               
               
                //反序列化方法
            private Object byte2Object(byte[] bytes) {
                if (bytes == null || bytes.length == 0)
                    return null;
                try {
                    ObjectInputStream inputStream;
                    inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    Object obj = inputStream.readObject();
                    return obj;
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (ClassNotFoundException e) {
                    e.printStackTrace();
                }
                return null;
            }


               
               
        }

       



       
       


}
[/mw_shl_code]

已有(1)人评论

跳转到指定楼层
NEOGX 发表于 2015-6-2 10:43:43
放到 String path = "/home/allen/workspace/road.txt";下面容易产生权限问题。
尝试放到其他路径下面,比如/usr
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条