分享

storm-kafka源码走读之自定义Scheme(1)

xioaxu790 发表于 2014-11-28 20:09:05 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 26117
问题导读
1、如何学习简单的StringScheme实现?
2、通过什么发射String,效率会更高一点?
3、在哪个环节解析message时调用scheme信息?





使用KafkaSpout需要子类实现Scheme,storm-kafka实现了StringScheme,KeyValueStringScheme等等,大家可以用。
这些Scheme主要负责从消息流中解析出所需要的数据。
  1. public interface Scheme extends Serializable {  
  2.     public List<Object> deserialize(byte[] ser);  
  3.     public Fields getOutputFields();  
  4. }
复制代码


需要实现反序列化方法和输出fields名称,来看简单StringScheme实现:
  1. public class StringScheme implements Scheme {  
  2.   
  3.     public static final String STRING_SCHEME_KEY = "str";  
  4.   
  5.     public List<Object> deserialize(byte[] bytes) {  
  6.         return new Values(deserializeString(bytes));  
  7.     }  
  8.   
  9.     public static String deserializeString(byte[] string) {  
  10.         try {  
  11.             return new String(string, "UTF-8");  
  12.         } catch (UnsupportedEncodingException e) {  
  13.             throw new RuntimeException(e);  
  14.         }  
  15.     }  
  16.   
  17.     public Fields getOutputFields() {  
  18.         return new Fields(STRING_SCHEME_KEY);  
  19.     }  
  20. }  
复制代码

其实就是直接返回了一个String,在Spout往后发射时就一个字段,其名为“str”,如果采用StringScheme时,大家在Bolt中可以用
  1. tuple.getStringByField("str")
复制代码

来获取其值。有人有疑问前面为什么用new SchemeAsMultiScheme(new StringScheme())呐?来看SchemeAsMultiScheme代码
  1. public class SchemeAsMultiScheme implements MultiScheme {  
  2.   public final Scheme scheme;  
  3.   
  4.   public SchemeAsMultiScheme(Scheme scheme) {  
  5.     this.scheme = scheme;  
  6.   }  
  7.   
  8.   @Override public Iterable<List<Object>> deserialize(final byte[] ser) {  
  9.     List<Object> o = scheme.deserialize(ser);  
  10.     if(o == null) return null;  
  11.     else return Arrays.asList(o);  
  12.   }  
  13.   
  14.   @Override public Fields getOutputFields() {  
  15.     return scheme.getOutputFields();  
  16.   }  
  17. }  
  18.   
  19. public interface MultiScheme extends Serializable {  
  20.   public Iterable<List<Object>> deserialize(byte[] ser);  
  21.   public Fields getOutputFields();  
  22. }  
复制代码


其实本身还是调用了传入的scheme方法,只不过返回结果组合成一个list而已,小弟觉得不用也可以。但是storm-kafka里面默认是需要的,在KafkaUtils解析message时调用scheme信息:
  1. public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {  
  2.         Iterable<List<Object>> tups;  
  3.         ByteBuffer payload = msg.payload();  
  4.         if (payload == null) {  
  5.             return null;  
  6.         }  
  7.         ByteBuffer key = msg.key();  
  8.         if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {  
  9.             tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));  
  10.         } else {  
  11.             tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));  
  12.         }  
  13.         return tups;  
  14.     }  
复制代码

所以没什么大的需求还是用storm-kafka默认的吧。

例子
kafka收到的message多种多样,而且往下发射的信息页多种多样,所以很多时候我们需要自己写scheme,下面举2个例子

example 1
第一:一般默认发射一个field,但是如果我需要多发几个fields的话,该怎么办呐,现在发射2个,其实网上已有大牛,把kafka的offset加到了发射的信息中去,分析的过程如下:
  1. //returns false if it's reached the end of current batch  
  2.     public EmitState next(SpoutOutputCollector collector) {  
  3.         if (_waitingToEmit.isEmpty()) {  
  4.             fill();  
  5.         }  
  6.         while (true) {  
  7.             MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();  
  8.             if (toEmit == null) {  
  9.                 return EmitState.NO_EMITTED;  
  10.             }  
  11.             Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);  
  12.             if (tups != null) {  
  13.                 for (List<Object> tup : tups) {  
  14.                     collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));  
  15.                 }  
  16.                 break;  
  17.             } else {  
  18.                 ack(toEmit.offset);  
  19.             }  
  20.         }  
  21.         if (!_waitingToEmit.isEmpty()) {  
  22.             return EmitState.EMITTED_MORE_LEFT;  
  23.         } else {  
  24.             return EmitState.EMITTED_END;  
  25.         }  
  26.     }  
复制代码


从上面看出,发射tuple时已经把offset作为messageId往下发射了,所以我们认为在下面接收tuple的Bolt中可以通过messageId获取offset,但是我们再来看看backtype.storm.daemon.executor 的代码:
  1. (log-message"Opening spout " component-id ":" (keys task-datas))  
  2.         (doseq[[task-id task-data]task-datas  
  3.                 :let[^ISpout spout-obj (:objecttask-data)  
  4.                       tasks-fn(:tasks-fntask-data)  
  5.                       send-spout-msg (fn[out-stream-id values message-id out-task-id]  
  6.                                        (.increment emitted-count)  
  7.                                        (let[out-tasks (ifout-task-id  
  8.                                                          (tasks-fnout-task-id out-stream-id values)  
  9.                                                          (tasks-fnout-stream-id values))  
  10.                                              rooted? (andmessage-id has-ackers?)  
  11.                                              root-id (ifrooted? (MessageId/generateId rand))  
  12.                                              out-ids (fast-list-for[t out-tasks](ifrooted? (MessageId/generateId rand)))]  
复制代码

从这段代码可以看出,messageId是随机生成的,跟之前kafkaSpout 锚定的new KafkaMessageId(_partition, toEmit.offset)一点关系都没有,所以需要自己手动把offset加到发射的tuple中去,这就需要我们自己实现Scheme了,代码如下:
  1. publicclass KafkaOffsetWrapperScheme implements Scheme {  
  2.    
  3.     public static final String SCHEME_OFFSET_KEY = "offset";  
  4.    
  5.     private String _offsetTupleKeyName;  
  6.     private Scheme _localScheme;  
  7.    
  8.     public KafkaOffsetWrapperScheme() {  
  9.         _localScheme = new StringScheme();  
  10.         _offsetTupleKeyName = SCHEME_OFFSET_KEY;  
  11.     }  
  12.    
  13.    
  14.     public KafkaOffsetWrapperScheme(Scheme localScheme,  
  15.                                     String offsetTupleKeyName) {  
  16.         _localScheme = localScheme;  
  17.         _offsetTupleKeyName = offsetTupleKeyName;  
  18.     }  
  19.    
  20.     public KafkaOffsetWrapperScheme(Scheme localScheme) {  
  21.         this(localScheme, SCHEME_OFFSET_KEY);  
  22.     }  
  23.    
  24.     public List<Object> deserialize(byte[] bytes) {  
  25.         return_localScheme.deserialize(bytes);  
  26.     }  
  27.    
  28.     publicFields getOutputFields() {  
  29.         List<String> outputFields = _localScheme  
  30.                         .getOutputFields()  
  31.                         .toList();  
  32.         outputFields.add(_offsetTupleKeyName);  
  33.         returnnew Fields(outputFields);  
  34.     }  
  35. }
复制代码


这里的scheme输出是两个fields,一个是str,由StringScheme负责反序列化,或者自己实现其他的scheme;一个是offset,但是offset如何加到发射的tuple中呐??我们从PartitionManager中找到被发射的tuple
  1. public EmitState next(SpoutOutputCollector collector) {  
  2.     if (_waitingToEmit.isEmpty()) {  
  3.         fill();  
  4.     }  
  5.     while (true) {  
  6.         MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();  
  7.         if (toEmit == null) {  
  8.             return EmitState.NO_EMITTED;  
  9.         }  
  10.         Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);  
  11.         if (tups != null) {  
  12.             for (List<Object> tup : tups) {  
  13.                 tup.add(toEmit.offset);  
  14.                 collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));  
  15.             }  
  16.             break;  
  17.         } else {  
  18.             ack(toEmit.offset);  
  19.         }  
  20.     }  
  21.     if (!_waitingToEmit.isEmpty()) {  
  22.         return EmitState.EMITTED_MORE_LEFT;  
  23.     } else {  
  24.         return EmitState.EMITTED_END;  
  25.     }  
  26. }  
复制代码


KafkaUtils.generateTuples(xxx,xxx)
  1. public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {  
  2.         Iterable<List<Object>> tups;  
  3.         ByteBuffer payload = msg.payload();  
  4.         if (payload == null) {  
  5.             return null;  
  6.         }  
  7.         ByteBuffer key = msg.key();  
  8.         if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {  
  9.             tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));  
  10.         } else {  
  11.             tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));  
  12.         }  
  13.         return tups;  
  14.     }  
复制代码

目前我们已经成功把offset加到了发射的tuple中,在bolt中,可以通过tuple.getValue(1),或tuple.getStringByField("offset");来或者
唯一要做的就是在构建SpoutConfig时,指定scheme为KafkaOffsetWrapperScheme

example 2
第二,kafka里面的存的message是其他格式的,如thrift,avro,protobuf格式,那这样就需要自己实现反序列化的过程
这里以avro scheme格式为例(这里就不对avro扫盲了,自己google一下吧)
这时kafka中存放的是avro格式的message,如果avro schema如下
  1. {"namespace": "example.avro",  
  2. "type": "record",  
  3. "name": "User",  
  4. "fields": [  
  5.      {"name": "name", "type": "string"},  
  6.      {"name": "favorite_number",  "type": ["int", "null"]},  
  7.      {"name": "favorite_color", "type": ["string", "null"]}  
  8. ]  
  9. }  
复制代码

那我们需要实现Scheme接口
  1. public class AvroMessageScheme implements Scheme{  
  2.   
  3.     private final static Logger logger = LoggerFactory.getLogger(AvroMessageScheme.class);  
  4.   
  5.     private GenericRecord e2;  
  6.     private AvroRecord avroRecord;  
  7.   
  8.     public AvroMessageScheme() {  
  9.   
  10.         }  
  11.   
  12.         @Override  
  13.         public List<Object> deserialize(byte[] bytes) {  
  14.                 e2 = null;  
  15.                 avroRecord = null;  
  16.   
  17.         try {  
  18.             InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("examples.avsc");  
  19.             Schema schema = new Schema.Parser().parse(is);  
  20.             DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);  
  21.             Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);  
  22.             e2 = datumReader.read(null, decoder);  
  23.             avroRecord = new AvroRecord(e2);  
  24.         } catch (Exception e) {  
  25.             e.printStackTrace();  
  26.             return new Values(avroRecord);  
  27.         }  
  28.   
  29.         return new Values(avroRecord);  
  30.     }  
  31.   
  32.         @Override  
  33.         public Fields getOutputFields() {  
  34.                  return new Fields("msg");   
  35.         }  
  36.   
  37. }  
复制代码

这里往下面发射的是一个POJO类,其实完全可以发射String。这样效率会更高一点。
其AvroRecord POJO如下
  1. public class AvroRecord implements Serializable {  
  2.     private String name;  
  3.     private int favorite_number;  
  4.     private String favorite_color;  
  5.   
  6.     public AvroRecord(GenericRecord gr) {  
  7.         try {  
  8.             this.name = String.valueOf(gr.get("name"));  
  9.             this.favorite_number = Integer.parseInt(gr.get("favorite_number"));  
  10.             this.favorite_color = gr.get("favorite_color").toString();  
  11.         } catch (Exception e) {  
  12.             logger.error("read AvroRecord error!");  
  13.         }  
  14.     }  
  15.   
  16.     @Override  
  17.     public String toString() {  
  18.         return "AvroRecord{" +  
  19.                 "name='" + name + '\'' +  
  20.                 ", favorite_number=" + favorite_number +  
  21.                 ", favorite_color='" + favorite_color + '\'' +  
  22.                 '}';  
  23.     }  
  24.   
  25.     public String getName() {  
  26.         return name;  
  27.     }  
  28.   
  29.     public void setName(String name) {  
  30.         this.name = name;  
  31.     }  
  32.   
  33.     public String getFavorite_color() {  
  34.         return favorite_color;  
  35.     }  
  36.   
  37.     public void setFavorite_color(String favorite_color) {  
  38.         this.favorite_color = favorite_color;  
  39.     }  
  40.   
  41.     public int getFavorite_number() {  
  42.         return favorite_number;  
  43.     }  
  44.   
  45.     public void setFavorite_number(int favorite_number) {  
  46.         this.favorite_number = favorite_number;  
  47.     }  
  48. }
复制代码
该例子笔者未经过测试,望慎重使用

相关内容:

storm-kafka源码走读之KafkaSpout(2)

storm-kafka源码走读之PartitionManager(3)






没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条