问题导读
1、如何学习简单的StringScheme实现?
2、通过什么发射String,效率会更高一点?
3、在哪个环节解析message时调用scheme信息?
使用KafkaSpout需要子类实现Scheme,storm-kafka实现了StringScheme,KeyValueStringScheme等等,大家可以用。
这些Scheme主要负责从消息流中解析出所需要的数据。
- public interface Scheme extends Serializable {
- public List<Object> deserialize(byte[] ser);
- public Fields getOutputFields();
- }
复制代码
需要实现反序列化方法和输出fields名称,来看简单StringScheme实现:
- public class StringScheme implements Scheme {
-
- public static final String STRING_SCHEME_KEY = "str";
-
- public List<Object> deserialize(byte[] bytes) {
- return new Values(deserializeString(bytes));
- }
-
- public static String deserializeString(byte[] string) {
- try {
- return new String(string, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- public Fields getOutputFields() {
- return new Fields(STRING_SCHEME_KEY);
- }
- }
复制代码
其实就是直接返回了一个String,在Spout往后发射时就一个字段,其名为“str”,如果采用StringScheme时,大家在Bolt中可以用
- tuple.getStringByField("str")
复制代码
来获取其值。有人有疑问前面为什么用new SchemeAsMultiScheme(new StringScheme())呐?来看SchemeAsMultiScheme代码
- public class SchemeAsMultiScheme implements MultiScheme {
- public final Scheme scheme;
-
- public SchemeAsMultiScheme(Scheme scheme) {
- this.scheme = scheme;
- }
-
- @Override public Iterable<List<Object>> deserialize(final byte[] ser) {
- List<Object> o = scheme.deserialize(ser);
- if(o == null) return null;
- else return Arrays.asList(o);
- }
-
- @Override public Fields getOutputFields() {
- return scheme.getOutputFields();
- }
- }
-
- public interface MultiScheme extends Serializable {
- public Iterable<List<Object>> deserialize(byte[] ser);
- public Fields getOutputFields();
- }
复制代码
其实本身还是调用了传入的scheme方法,只不过返回结果组合成一个list而已,小弟觉得不用也可以。但是storm-kafka里面默认是需要的,在KafkaUtils解析message时调用scheme信息:
- public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
- Iterable<List<Object>> tups;
- ByteBuffer payload = msg.payload();
- if (payload == null) {
- return null;
- }
- ByteBuffer key = msg.key();
- if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
- tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
- } else {
- tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
- }
- return tups;
- }
复制代码
所以没什么大的需求还是用storm-kafka默认的吧。
例子
kafka收到的message多种多样,而且往下发射的信息页多种多样,所以很多时候我们需要自己写scheme,下面举2个例子
example 1
第一:一般默认发射一个field,但是如果我需要多发几个fields的话,该怎么办呐,现在发射2个,其实网上已有大牛,把kafka的offset加到了发射的信息中去,分析的过程如下:
- //returns false if it's reached the end of current batch
- public EmitState next(SpoutOutputCollector collector) {
- if (_waitingToEmit.isEmpty()) {
- fill();
- }
- while (true) {
- MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
- if (toEmit == null) {
- return EmitState.NO_EMITTED;
- }
- Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
- if (tups != null) {
- for (List<Object> tup : tups) {
- collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
- }
- break;
- } else {
- ack(toEmit.offset);
- }
- }
- if (!_waitingToEmit.isEmpty()) {
- return EmitState.EMITTED_MORE_LEFT;
- } else {
- return EmitState.EMITTED_END;
- }
- }
复制代码
从上面看出,发射tuple时已经把offset作为messageId往下发射了,所以我们认为在下面接收tuple的Bolt中可以通过messageId获取offset,但是我们再来看看backtype.storm.daemon.executor 的代码:
- (log-message"Opening spout " component-id ":" (keys task-datas))
- (doseq[[task-id task-data]task-datas
- :let[^ISpout spout-obj (:objecttask-data)
- tasks-fn(:tasks-fntask-data)
- send-spout-msg (fn[out-stream-id values message-id out-task-id]
- (.increment emitted-count)
- (let[out-tasks (ifout-task-id
- (tasks-fnout-task-id out-stream-id values)
- (tasks-fnout-stream-id values))
- rooted? (andmessage-id has-ackers?)
- root-id (ifrooted? (MessageId/generateId rand))
- out-ids (fast-list-for[t out-tasks](ifrooted? (MessageId/generateId rand)))]
复制代码
从这段代码可以看出,messageId是随机生成的,跟之前kafkaSpout 锚定的new KafkaMessageId(_partition, toEmit.offset)一点关系都没有,所以需要自己手动把offset加到发射的tuple中去,这就需要我们自己实现Scheme了,代码如下:
- publicclass KafkaOffsetWrapperScheme implements Scheme {
-
- public static final String SCHEME_OFFSET_KEY = "offset";
-
- private String _offsetTupleKeyName;
- private Scheme _localScheme;
-
- public KafkaOffsetWrapperScheme() {
- _localScheme = new StringScheme();
- _offsetTupleKeyName = SCHEME_OFFSET_KEY;
- }
-
-
- public KafkaOffsetWrapperScheme(Scheme localScheme,
- String offsetTupleKeyName) {
- _localScheme = localScheme;
- _offsetTupleKeyName = offsetTupleKeyName;
- }
-
- public KafkaOffsetWrapperScheme(Scheme localScheme) {
- this(localScheme, SCHEME_OFFSET_KEY);
- }
-
- public List<Object> deserialize(byte[] bytes) {
- return_localScheme.deserialize(bytes);
- }
-
- publicFields getOutputFields() {
- List<String> outputFields = _localScheme
- .getOutputFields()
- .toList();
- outputFields.add(_offsetTupleKeyName);
- returnnew Fields(outputFields);
- }
- }
复制代码
这里的scheme输出是两个fields,一个是str,由StringScheme负责反序列化,或者自己实现其他的scheme;一个是offset,但是offset如何加到发射的tuple中呐??我们从PartitionManager中找到被发射的tuple
- public EmitState next(SpoutOutputCollector collector) {
- if (_waitingToEmit.isEmpty()) {
- fill();
- }
- while (true) {
- MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
- if (toEmit == null) {
- return EmitState.NO_EMITTED;
- }
- Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
- if (tups != null) {
- for (List<Object> tup : tups) {
- tup.add(toEmit.offset);
- collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
- }
- break;
- } else {
- ack(toEmit.offset);
- }
- }
- if (!_waitingToEmit.isEmpty()) {
- return EmitState.EMITTED_MORE_LEFT;
- } else {
- return EmitState.EMITTED_END;
- }
- }
复制代码
KafkaUtils.generateTuples(xxx,xxx)
- public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
- Iterable<List<Object>> tups;
- ByteBuffer payload = msg.payload();
- if (payload == null) {
- return null;
- }
- ByteBuffer key = msg.key();
- if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
- tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
- } else {
- tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
- }
- return tups;
- }
复制代码
目前我们已经成功把offset加到了发射的tuple中,在bolt中,可以通过tuple.getValue(1),或tuple.getStringByField("offset");来或者
唯一要做的就是在构建SpoutConfig时,指定scheme为KafkaOffsetWrapperScheme
example 2
第二,kafka里面的存的message是其他格式的,如thrift,avro,protobuf格式,那这样就需要自己实现反序列化的过程
这里以avro scheme格式为例(这里就不对avro扫盲了,自己google一下吧)
这时kafka中存放的是avro格式的message,如果avro schema如下
- {"namespace": "example.avro",
- "type": "record",
- "name": "User",
- "fields": [
- {"name": "name", "type": "string"},
- {"name": "favorite_number", "type": ["int", "null"]},
- {"name": "favorite_color", "type": ["string", "null"]}
- ]
- }
复制代码
那我们需要实现Scheme接口
- public class AvroMessageScheme implements Scheme{
-
- private final static Logger logger = LoggerFactory.getLogger(AvroMessageScheme.class);
-
- private GenericRecord e2;
- private AvroRecord avroRecord;
-
- public AvroMessageScheme() {
-
- }
-
- @Override
- public List<Object> deserialize(byte[] bytes) {
- e2 = null;
- avroRecord = null;
-
- try {
- InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("examples.avsc");
- Schema schema = new Schema.Parser().parse(is);
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
- Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
- e2 = datumReader.read(null, decoder);
- avroRecord = new AvroRecord(e2);
- } catch (Exception e) {
- e.printStackTrace();
- return new Values(avroRecord);
- }
-
- return new Values(avroRecord);
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields("msg");
- }
-
- }
复制代码
这里往下面发射的是一个POJO类,其实完全可以发射String。这样效率会更高一点。
其AvroRecord POJO如下
- public class AvroRecord implements Serializable {
- private String name;
- private int favorite_number;
- private String favorite_color;
-
- public AvroRecord(GenericRecord gr) {
- try {
- this.name = String.valueOf(gr.get("name"));
- this.favorite_number = Integer.parseInt(gr.get("favorite_number"));
- this.favorite_color = gr.get("favorite_color").toString();
- } catch (Exception e) {
- logger.error("read AvroRecord error!");
- }
- }
-
- @Override
- public String toString() {
- return "AvroRecord{" +
- "name='" + name + '\'' +
- ", favorite_number=" + favorite_number +
- ", favorite_color='" + favorite_color + '\'' +
- '}';
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getFavorite_color() {
- return favorite_color;
- }
-
- public void setFavorite_color(String favorite_color) {
- this.favorite_color = favorite_color;
- }
-
- public int getFavorite_number() {
- return favorite_number;
- }
-
- public void setFavorite_number(int favorite_number) {
- this.favorite_number = favorite_number;
- }
- }
复制代码
该例子笔者未经过测试,望慎重使用
相关内容:
storm-kafka源码走读之KafkaSpout(2)
storm-kafka源码走读之PartitionManager(3)
|