问题导读
1.storm中传递的是什么数据类型?
2.storm中自定义数据类型,该如何传递?
3.storm如何实现序列化?
1.对storm中传递类型的解释(大家如果不想看也可以直接跳到第二部分,不过看看是很有好处的)
大家知道在storm中传递的是tuple,而tuple是没有定义数据类型格式的,也就是tuple是dynamically(动态)类型的,下面解释一下tuple为什么 要定义成动态类型的:
一是因为tuple如果定义为静态类型,一定需要定义大量的stormAPI来支持,而且需要大量的注解来让用户明白,官方的说法就是静态类型(静态类型比较安全)的安全性比不上动态类型的简单易用。
二是因为一个Bolt可能会接收好多不同类型的数据,如果定义成静态类型,用户需要为每种类型声明不同的方法来解析数据,tuple用动态数据类型就简单直接易用了。
三是使用动态类型可以让storm在例如Clojure和JRuby动态类型语言中直接使用。
2.在storm中流动的数据流格式可以多种多样,数据流可以以各种格式的形式在task之间进行传递。
因为storm中可以对数据格式自动进行序列化,但是也只是对于一些常见格式能进行序列化,其中包括
int, short, long, float, double, bool, byte, string, byte arrays,也就是说用户可以直接在task之间传递这些类型而不需要做其它的操作(本人实际操作过确实没错)
3.但是对于一些其它类型,或是自己定义的一些类型(比如要在task之间传递一个对象格式),就需要自己进行序列化了,下面是如何进行序列化
你只要在配置文件(storm.yaml)中声明序列化,storm内部会自动实现序列化
storm用Kyro来进行序列化 (Kryo是一个灵活的快速的序列化框架)
通过 topology.kryo.register的属性在配置文件中进行配置,此属性包括两种形式:
1直接注册类名 。Storm默认使用Kryo的FieldsSerializer来序列化类(只需要注册类明即可,而且所注册类要有有一个无参的构造函数)
2通过注册类名来对com.esotericsoftware.kryo.Serializer接口进行实现。
例如:
topology.kryo.register:
- com.mycompany.CustomType1 //用FieldsSerializer来进行序列化
- com.mycompany.CustomType2: com.mycompany.serializer.CustomType2Serializer //用com.mycompany.serializer.CustomType2Serializer来进行序列化
在这里列举一个小的例子:比如你想在task之间传递一个名为Persion的对象,即传递类型为Persion
首先定义一个Persion类
public class Persion
{
public Persion(){} //构造函数必须有,否则会报缺少构造函数的错误
String name; //定义人的两个属性
int age;
public Persion(String xing,String ming)
{
this.xing=xing;
this.ming=ming;
}
public String getName()
{
return name;
}
public int getAge()
{
return age;
}
}
之后要在storm.yaml配置文件中注册这个类:
topology.kryo.register: //注意前面不能有空格
- Persion
之后提交这个拓扑,在拓扑中就可以运行成功了
这两个参数自己基本没有用到过(因为自己只是提交了一个拓扑),不过官方文档有这两个参数
Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS = true; //忽略所有已经写好序列化设置
Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION=false; //禁止使用java语言自己的序列化
|
|