在网上找了个例子,是storm通过从数组中随机取字符串写入操作系统文本的,我打包后在集群环境跑了一下,一切正常。
在这个基础上,我想先修改为从数组随机取字符串,然后写入到oracle数据库,我重新写了blot的类,具体代码如下
package cn.itcast.storm.bolt;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WriteOracleBolt extends BaseBasicBolt {
private static final long serialVersionUID = -6586283337287975719L;
private static final Log log = LogFactory.getLog(WriteOracleBolt.class);
Connection con = null;
PreparedStatement psta = null;
//boolean flag = false;
public WriteOracleBolt() {
log.warn("&&&&&&&&&&&&&&&&& WriteOracleBolt constructor method invoked");
}
public void prepare(Map stormConf, TopologyContext context) {
log.warn("################# WriteOracleBolt prepare() method invoked");
//writer = new FileWriter("/home/" + this);
try {
String sql = "insert into test(name) values(?)";
con = BaseDao.getConnection();
con.setAutoCommit(false);
//将SQL语句传入进行预编译
psta = con.prepareStatement(sql);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
log.warn("################# WriteOracleBolt declareOutputFields() method invoked");
}
public void execute(Tuple input, BasicOutputCollector collector) {
log.warn("################# WriteOracleBolt execute() method invoked");
String s = input.getString(0);
//writer.write(s);
//writer.write("\n");
//writer.flush();
try {
psta.setString(1, s);
psta.executeUpdate() ;
con.commit();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
再重新打包后,放到storm集群运行时,ui界面出报出这个错误了
java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84) at backtype.storm.utils.DisruptorQueue.consumeBatchWhen
这是什么问题呢?
是代码有问题还是配置有问题?
|