原因:
zk挂掉的这几天, kafka中之前的数据已经被清掉了, 但是zk中保存的offset还是几天之前的, 导致KafkaSpout要获取的offset超过了当前kafka的offset, 就像ArrayIndexOutOfRangeException一样
解决方案:
KafkaSpout 配置项中可以选择读取的方式, 共有三种, 如果Topology启动的时候未进行配置, 则默认是从Zk中读取, 所以导致了异常
-2: 从最老的开始读
-1: 从最近的开始读
0: 从Zk中读
相关代码如下, storm.kafka.PartitionManager,
[mw_shl_code=java,true]public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, GlobalPartitionId id) {
_partition = id;
_connections = connections;
_spoutConfig = spoutConfig;
_topologyInstanceId = topologyInstanceId;
_consumer = connections.register(id.host, id.partition);
_state = state;
_stormConf = stormConf;
String jsonTopologyId = null;
Long jsonOffset = null;
try {
Map<Object, Object> json = _state.readJSON(committedPath());
if(json != null) {
jsonTopologyId = (String)((Map<Object,Object>)json.get("topology")).get("id");
jsonOffset = (Long)json.get("offset");
}
}
catch(Throwable e) {
LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath(), e);
}
if(!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = _consumer.getOffsetsBefore(spoutConfig.topic, id.partition, spoutConfig.startOffsetTime, 1)[0];
LOG.info("Using startOffsetTime to choose last commit offset.");
} else if(jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
_committedTo = _consumer.getOffsetsBefore(spoutConfig.topic, id.partition, -1, 1)[0];
LOG.info("Setting last commit offset to HEAD.");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo);
}
LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
_emittedToOffset = _committedTo;[/mw_shl_code]
重点关注红色代码, spoutConfig.forceFromStart 为true的时候, 才会真正去读取自己设置的offset, 否则将会使用Zk中的offset
那么问题来了, 如何设置呢, SpoutConfig很贴心的给我们提供了一个方法
[mw_shl_code=java,true]public void forceStartOffsetTime(long millis) {
startOffsetTime = millis;
forceFromStart = true;
} [/mw_shl_code]
所以我们只需要在我们的Topology中添加如下代码即可
[mw_shl_code=java,true]/* -2=最老 -1=最新, 0=zk offset*/
if (args != null && args[1] != null && Integer.valueOf(args[1]) != 0) {
if (Integer.valueOf(args[1]) == -2) {
spoutConfig.forceStartOffsetTime(-2); //从kafka最老的记录读取
} else if (Integer.valueOf(args[1]) == -1) {
spoutConfig.forceStartOffsetTime(-1); //从kafka最新的记录读取
}//其他情况则默认从zk的offset读取}[/mw_shl_code]
发布Topology的时候, 如果需要从最新记录读取, 则像这样 storm jar com.abc.StormTopology stormTopology -1
|