Kafka Java API 之Producer源码解析
本帖最后由 pig2 于 2014-12-27 23:33 编辑问题导读
1.Kafka提供了哪个类作为java producer的api?
2.Producer类哪两种发送方式,默认是那种方式?
static/image/hrline/4.gif
Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式:
1、 默认是sync方式
即producer的调用类在消息真正发送到队列中去以后才返回,其工作原理如下:
1)、new Producer
当我们new了一个kafka java api提供的Producer类时,其底层,实际会产生两个核心类的实例:Producer(Scala的,不是java api提供的那个)、DefaultEventHandler。在创建的同时,会默认new一个ProducerPool,即我们每new一个java的Producer类,就会有一个scala的Producer、EventHandler和ProducerPool!
2)、 producer.send()
当我们调用java的producer.send方法时,底层调用scala的Producer的send方法,其内部其实调的是eventhandler.handle(message)方法。
a)、 eventHandler会首先序列化该消息
eventHandler.serialize(events)
b)、 然后会根据传入的broker信息、topic信息,去取最新的该topic的metadata信息
BrokerPartitionInfo.updateInfo
| -> ClientUtils.fetchTopicMetadata //创建一个topicMetadataRequest,并随机的选取传入的broker信息中任何一个去取metadata,直到取到为止
| -> val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) //对随机选到的broker会创建一个SyncProducer
| -> SyncProducer.send//发送topicMetadataRequest到该broker去取metadata,获得该topic所对应的所有的broker信息
看到这里也就明白了为什么kafka的api里面并不要求你提供完整的整个kafka集群的broker信息,而是任选一个或几个。因为在这里它会去你提供的broker取该topic的最新的所有的broker信息。
这里要注意的是,用于发送topicMetadataRequest的SyncProducer虽然是用ProducerPool.createSyncProducer方法建出来的,但用完并不还回pool,而是直接Close,所以会发现有INFO log打出来
<main> Connected toxxx.xxx.xxx.xxx:9092 for producing
<main> Disconnecting fromxxx.xxx.xxx.xxx:9092
注意:
这个刷新metadata并不仅在第一次初始化时做。为了能适应kafka broker运行中因为各种原因挂掉、paritition改变等变化,eventHandler会定期的再去刷新一次该metadata,刷新的间隔用参数topic.metadata.refresh.interval.ms定义,默认值是10分钟。
这里有四点需要强调:
i)、不调用send, 不会建立socket,不会去定期刷新metadata
ii)、在每次取metadata时,kafka会单独开一个socket去取metadata,开完再关掉。
iii)、根据取得的最新的完整的metadata,刷新Pool中到broker的连接(第一次建立时,pool里面是空的)
iiii)、每10分钟的刷新会直接重新把到每个broker的socket连接重建,意味着在这之后的第一个请求会有几百毫秒的延迟。如果不想要该延迟,把topic.metadata.refresh.interval.ms值改为-1,这样只有在发送失败时,才会重新刷新。Kafka的集群中如果某个partition所在的broker挂了,可以检查错误后重启重新加入集群,手动做rebalance,producer的连接会再次断掉,直到rebalance完成,那么刷新后取到的连接着中就会有这个新加入的broker。
在ClientUtils.fetchTopicMetadata调用完成后,回到BrokerPartitionInfo.updateInfo继续执行,在其末尾,pool会根据上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)
注意:
在ProducerPool中,SyncProducer的数目是由该topic的partition数目控制的,即每一个SyncProducer对应一个broker,内部封了一个到该broker的socket连接。
每次刷新时,会把已存在SyncProducer给close掉,即关闭socket连接,然后新建SyncProducer,即新建socket连接,去覆盖老的。
如果不存在,则直接创建新的。
d)、然后,才是真正发送数据
dispatchSerializedData(outstandingProduceRequests)
e)、如果发送失败,会进行重试。重试时,又会刷新metadata,而kafka的leader选举需要一定的时间,所以这次刷新可能需要等待,最大等待时间由参数retry.backoff.ms(默认为100)定义。
重试最大次数由参数message.send.max.retries定义默认为3
2、async方式通过参数producer.type控制,例子:
Properties p = new Properties();
props.put("producer.type", "async");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, byte[]>(config);
async方式与sync方式的不同在于,在初始化scala的producer时,会创建一个ProducerSendThread对象。然后,在调用send时,它并不是直接调用eventHandler.handle方法,而是把消息放入一个长度由queue.buffering.max.messages参数定义的队列(默认10000),当队列满足以下两种条件时,会由ProducerSendThread触发eventHandler.handle方法,把队列中的消息作为一个batch发送
a)、时间超过queue.buffering.max.ms定义的值,默认5000ms
b)、队列中当前消息个数超过batch.num.messages定义的值,默认200
3、结论:
1). Kafka提供的java api中的Producer,底层只是维护该topic到每个broker的连接,并不是一个传统意义上的连接池。在使用sync方式时,我们应该自己实现一个连接池,里面包含若干Producer对象,以实现最大化写入效率。我自己写了一个简单的:https://github.com/EdisonXu/simple-kafka-producer-pool
2). 在写入的数据频率不高或要求获得写入结果时,应使用sync方式,否则会因async的等待时间引入额外的延迟
3). 在写入的数据频率很高时,应使用async方式,以batch的形式写入,获得最大效率
默认的不是调用完send直接就返回了一个Futrue类吗? 不应该是异步的方式吗? 求解,谢谢楼主
页:
[1]