admin 发表于 2014-11-27 19:46:59

Spring 集成 Kafka介绍



问题导读


1.Outbound Channel Adapter作用是什么?
2.配置中Spring对Kafka做了哪些处理?


static/image/hrline/4.gif



spring-integration-kafka是Spring官方提供的一个Spring集成框架的扩展,用来为使用Spring框架的应用程序提供Kafka框架的集成。
当前spring-integration-kafka仅提供Kafka 0.8的集成,低版本的Kafka并不支持。新的文章介绍了代码实践: Kafka和Spring集成实践spring-integration-kafka仅仅支持两个组件,分别对应Producer和 High Level Consumer。 它们分别是:
[*]Outbound Channel Adapter
[*]Inbound Channel Adapter based on the High level consumer API
其它的Kafka的特性比如Simple Consumer API。 所以使用spring-integration-kafka你并不能指定特定的offset来读取数据,或者进行更灵活的定制。总的来说,spring-integration-kafka还处于很低级的阶段,和整体的Spring framework/Spring Integration Framework还不是很完美的整合。如果你现在就使用它,可能在开发的过程中会遇到些问题。
Outbound Channel Adapter:用来向Kafka集群发送消息。消息读取于Spring Integration channel。当前的版本需要你指定Topic和MessageKey。final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);
channel.send(
      MessageBuilder.withPayload(payload)
                .setHeader("messageKey", "key")
                .setHeader("topic", "test").build());

channel是这样配置的:<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                     kafka-producer-context-ref="kafkaProducerContext"
                                     auto-startup="false"
                                     channel="inputToKafka">
   <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>

上面一个很关键的属性是kafka-producer-context-ref, 它用来配置Producer。<int-kafka:producer-context id="kafkaProducerContext">
    <int-kafka:producer-configurations>
      <int-kafka:producer-configuration broker-list="localhost:9092"
                   key-class-type="java.lang.String"
                   value-class-type="java.lang.String"
                   topic="test1"
                   value-encoder="kafkaEncoder"
                   key-encoder="kafkaEncoder"
                   compression-codec="default"/>
      <int-kafka:producer-configuration broker-list="localhost:9092"
                   topic="test2"
                   compression-codec="default"
                   async="true"/>
      <int-kafka:producer-configuration broker-list="localhost:9092"
                  topic="regextopic.*"
                  compression-codec="default"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>


可以看到, Spring将很多Kafka native codes中的配置抽象成SPring bean,通过Spring配置的方式生成Spring beans。
每个producer-configuration最终转换成一个Kafka producer。每个Topic都对应一个Producer。上面的例子会产生两个Producer,一个对应 topic test1,另外一个对应topic test2。
每个Producer都可以配置下面的属性:broker-list             List of comma separated brokers that this producer connects to
topic                   Topic name or Java regex pattern of topic name
compression-codec       Compression method to be used. Default is no compression. Supported compression codec are gzip and snappy.
                        Anything else would result in no compression
value-encoder         Serializer to be used for encoding messages.
key-encoder             Serializer to be used for encoding the partition key
key-class-type          Type of the key class. This will be ignored if no key-encoder is provided
value-class-type      Type of the value class. This will be ignored if no value-encoder is provided.
partitioner             Custom implementation of a Kafka Partitioner interface.
async                   True/False - default is false. Setting this to true would make the Kafka producer to use
                        an async producer
batch-num-messages      Numbe

value-encoder 和 key-encoder 可以引用其它的Spring bean。partitioner 也可以是一个实现Kafka Partitioner 接口的Spring bean。
这里有一个encoder的例子:<bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder">
    <constructor-arg value="com.company.AvroGeneratedSpecificRecord" />
</bean>

如果没有配置,将采用Kafka默认的encoder。 默认的Encoder将数据视为byte数组。
如果key和消息都是字符串, Kafka提供Spring Encoder.它需要一个VerifiableProperties 作为构造函数参数。spring-integration-kafka提供可一个Properties对象封装。
所以你可以配置属性如:<bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
      <props>
            <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
            <prop key="message.send.max.retries">5</prop>
            <prop key="send.buffer.bytes">5242880</prop>
      </props>
    </property>
</bean>
<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties">
    <int-kafka:producer-configurations>
      <int-kafka:producer-configuration ... > ... </int-kafka:producer-configuration>
      <int-kafka:producer-configuration ... > ... </int-kafka:producer-configuration>
      ...
    <int-kafka:producer-configurations>
</int-kafka:producer-context>

Inbound Channel Adapter用来消费消息。 消息会被放入到channel中。 Kafka提供两种方式的consumer API: High Level Consumer 和 Simple Consumer. 对于client来说,如果不需要特别的控制,只是用来出来目前的消息,High Level consumer API更直接,更易用。 但是它不提供offset管理。 如果你想获取前面的消息,或者重新获取已获取的消息, 它做不到。 你需要使用Simple consumer API. Spring Integration Kafka inbound channel adapter 当前只支持High Level Consumer.<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
                                       kafka-consumer-context-ref="consumerContext"
                                       auto-startup="false"
                                       channel="inputFromKafka">
      <int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="5"/>
</int-kafka:inbound-channel-adapter>

必须定义kafka-consumer-context-ref用来产生consumer。<int-kafka:consumer-context id="consumerContext"
                              consumer-timeout="4000"
                              zookeeper-connect="zookeeperConnect">
      <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration group-id="default"
                  value-decoder="valueDecoder"
                  key-decoder="valueDecoder"
                  max-messages="5000">
                <int-kafka:topic id="test1" streams="4"/>
                <int-kafka:topic id="test2" streams="4"/>
            </int-kafka:consumer-configuration>
            <int-kafka:consumer-configuration group-id="default3"
                     value-decoder="kafkaSpecificDecoder"
                     key-decoder="kafkaReflectionDecoder"
                     max-messages="10">
                <int-kafka:topic-filter pattern="regextopic.*" streams="4" exclude="false"/>
            </int-kafka:consumer-configuration>
      </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

需要配置zookeeper connection:<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181" zk-connection-timeout="6000"
                  zk-session-timeout="6000"
                  zk-sync-time="2000" />



Joker 发表于 2014-11-29 18:35:50

管理员你好,请问你手边有没有javaweb整合Hadoop的小案例CRUD就行,不胜感激

s060403072 发表于 2014-11-29 19:05:16



参考这个试试:
hadoop web实例


dearboll 发表于 2016-10-26 21:54:21

感谢分享。这是转帖吗?想问一下,这个底层是调用KafkaProducer和KafkaConsumer实现的吗?具体的代码在哪里呢?

a530491093 发表于 2017-9-8 15:15:01

感谢分享!
页: [1]
查看完整版本: Spring 集成 Kafka介绍