分享

Spring 集成 Kafka介绍

admin 发表于 2014-11-27 19:46:59 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 128144


问题导读


1.Outbound Channel Adapter作用是什么?
2.配置中Spring对Kafka做了哪些处理?






spring-integration-kafka是Spring官方提供的一个Spring集成框架的扩展,用来为使用Spring框架的应用程序提供Kafka框架的集成。
当前spring-integration-kafka仅提供Kafka 0.8的集成,低版本的Kafka并不支持。
新的文章介绍了代码实践: Kafka和Spring集成实践
spring-integration-kafka仅仅支持两个组件,分别对应Producer和 High Level Consumer。 它们分别是:
  • Outbound Channel Adapter
  • Inbound Channel Adapter based on the High level consumer API
其它的Kafka的特性比如Simple Consumer API。 所以使用spring-integration-kafka你并不能指定特定的offset来读取数据,或者进行更灵活的定制。
总的来说,spring-integration-kafka还处于很低级的阶段,和整体的Spring framework/Spring Integration Framework还不是很完美的整合。如果你现在就使用它,可能在开发的过程中会遇到些问题。
Outbound Channel Adapter:
用来向Kafka集群发送消息。消息读取于Spring Integration channel。当前的版本需要你指定Topic和MessageKey。
  1. final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);
  2. channel.send(
  3.         MessageBuilder.withPayload(payload)
  4.                 .setHeader("messageKey", "key")
  5.                 .setHeader("topic", "test").build());
复制代码


channel是这样配置的:
  1. <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
  2.                                      kafka-producer-context-ref="kafkaProducerContext"
  3.                                      auto-startup="false"
  4.                                      channel="inputToKafka">
  5.      <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
  6. </int-kafka:outbound-channel-adapter>
复制代码


上面一个很关键的属性是kafka-producer-context-ref, 它用来配置Producer。
  1. <int-kafka:producer-context id="kafkaProducerContext">
  2.     <int-kafka:producer-configurations>
  3.         <int-kafka:producer-configuration broker-list="localhost:9092"
  4.                    key-class-type="java.lang.String"
  5.                    value-class-type="java.lang.String"
  6.                    topic="test1"
  7.                    value-encoder="kafkaEncoder"
  8.                    key-encoder="kafkaEncoder"
  9.                    compression-codec="default"/>
  10.         <int-kafka:producer-configuration broker-list="localhost:9092"
  11.                    topic="test2"
  12.                    compression-codec="default"
  13.                    async="true"/>
  14.         <int-kafka:producer-configuration broker-list="localhost:9092"
  15.                     topic="regextopic.*"
  16.                     compression-codec="default"/>
  17.     </int-kafka:producer-configurations>
  18. </int-kafka:producer-context>
复制代码



可以看到, Spring将很多Kafka native codes中的配置抽象成SPring bean,通过Spring配置的方式生成Spring beans。
每个producer-configuration最终转换成一个Kafka producer。每个Topic都对应一个Producer。上面的例子会产生两个Producer,一个对应 topic test1,另外一个对应topic test2。
每个Producer都可以配置下面的属性:
  1. broker-list             List of comma separated brokers that this producer connects to
  2. topic                   Topic name or Java regex pattern of topic name
  3. compression-codec       Compression method to be used. Default is no compression. Supported compression codec are gzip and snappy.
  4.                         Anything else would result in no compression
  5. value-encoder           Serializer to be used for encoding messages.
  6. key-encoder             Serializer to be used for encoding the partition key
  7. key-class-type          Type of the key class. This will be ignored if no key-encoder is provided
  8. value-class-type        Type of the value class. This will be ignored if no value-encoder is provided.
  9. partitioner             Custom implementation of a Kafka Partitioner interface.
  10. async                   True/False - default is false. Setting this to true would make the Kafka producer to use
  11.                         an async producer
  12. batch-num-messages      Numbe
复制代码


value-encoder 和 key-encoder 可以引用其它的Spring bean。partitioner 也可以是一个实现Kafka Partitioner 接口的Spring bean。
这里有一个encoder的例子:
  1. <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder">
  2.     <constructor-arg value="com.company.AvroGeneratedSpecificRecord" />
  3. </bean>
复制代码


如果没有配置,将采用Kafka默认的encoder。 默认的Encoder将数据视为byte数组。
如果key和消息都是字符串, Kafka提供Spring Encoder.它需要一个VerifiableProperties 作为构造函数参数。spring-integration-kafka提供可一个Properties对象封装。
所以你可以配置属性如:
  1. <bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
  2.     <property name="properties">
  3.         <props>
  4.             <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
  5.             <prop key="message.send.max.retries">5</prop>
  6.             <prop key="send.buffer.bytes">5242880</prop>
  7.         </props>
  8.     </property>
  9. </bean>
  10. <int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties">
  11.     <int-kafka:producer-configurations>
  12.         <int-kafka:producer-configuration ... > ... </int-kafka:producer-configuration>
  13.         <int-kafka:producer-configuration ... > ... </int-kafka:producer-configuration>
  14.         ...
  15.     <int-kafka:producer-configurations>
  16. </int-kafka:producer-context>
复制代码


Inbound Channel Adapter
用来消费消息。 消息会被放入到channel中。 Kafka提供两种方式的consumer API: High Level Consumer 和 Simple Consumer. 对于client来说,如果不需要特别的控制,只是用来出来目前的消息,High Level consumer API更直接,更易用。 但是它不提供offset管理。 如果你想获取前面的消息,或者重新获取已获取的消息, 它做不到。 你需要使用Simple consumer API. Spring Integration Kafka inbound channel adapter 当前只支持High Level Consumer.
  1. <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
  2.                                        kafka-consumer-context-ref="consumerContext"
  3.                                        auto-startup="false"
  4.                                        channel="inputFromKafka">
  5.         <int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="5"/>
  6. </int-kafka:inbound-channel-adapter>
复制代码


必须定义kafka-consumer-context-ref用来产生consumer。
  1. <int-kafka:consumer-context id="consumerContext"
  2.                                 consumer-timeout="4000"
  3.                                 zookeeper-connect="zookeeperConnect">
  4.         <int-kafka:consumer-configurations>
  5.             <int-kafka:consumer-configuration group-id="default"
  6.                     value-decoder="valueDecoder"
  7.                     key-decoder="valueDecoder"
  8.                     max-messages="5000">
  9.                 <int-kafka:topic id="test1" streams="4"/>
  10.                 <int-kafka:topic id="test2" streams="4"/>
  11.             </int-kafka:consumer-configuration>
  12.             <int-kafka:consumer-configuration group-id="default3"
  13.                      value-decoder="kafkaSpecificDecoder"
  14.                      key-decoder="kafkaReflectionDecoder"
  15.                      max-messages="10">
  16.                 <int-kafka:topic-filter pattern="regextopic.*" streams="4" exclude="false"/>
  17.             </int-kafka:consumer-configuration>
  18.         </int-kafka:consumer-configurations>
  19. </int-kafka:consumer-context>
复制代码


需要配置zookeeper connection:
  1. <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181" zk-connection-timeout="6000"
  2.                     zk-session-timeout="6000"
  3.                     zk-sync-time="2000" />
复制代码




已有(4)人评论

跳转到指定楼层
Joker 发表于 2014-11-29 18:35:50
管理员你好,请问你手边有没有  javaweb整合Hadoop的小案例CRUD就行,不胜感激
回复

使用道具 举报

s060403072 发表于 2014-11-29 19:05:16


参考这个试试:
hadoop web实例


回复

使用道具 举报

dearboll 发表于 2016-10-26 21:54:21
感谢分享。这是转帖吗?想问一下,这个底层是调用KafkaProducer和KafkaConsumer实现的吗?具体的代码在哪里呢?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条