分享

Kafka Java API 之Producer源码解析

hiqj 2014-12-27 22:49:39 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 42599
本帖最后由 pig2 于 2014-12-27 23:33 编辑
问题导读

1.Kafka提供了哪个类作为java producer的api?
2.Producer类哪两种发送方式,默认是那种方式?



Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式:   
   1、 默认是sync方式
即producer的调用类在消息真正发送到队列中去以后才返回,其工作原理如下:   
  
    1)、new Producer
    当我们new了一个kafka java api提供的Producer类时,其底层,实际会产生两个核心类的实例:Producer(Scala的,不是java api提供的那个)、DefaultEventHandler。在创建的同时,会默认new一个ProducerPool,即我们每new一个java的Producer类,就会有一个scala的Producer、EventHandler和ProducerPool!     

    2)、 producer.send()
    当我们调用java的producer.send方法时,底层调用scala的Producer的send方法,其内部其实调的是eventhandler.handle(message)方法。
       a)、 eventHandler会首先序列化该消息
                 eventHandler.serialize(events)
      b)、 然后会根据传入的broker信息、topic信息,去取最新的该topic的metadata信息
    BrokerPartitionInfo.updateInfo
     | -> ClientUtils.fetchTopicMetadata   //创建一个topicMetadataRequest,并随机的选取传入的broker信息中任何一个去取metadata,直到取到为止
             | -> val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) //对随机选到的broker会创建一个SyncProducer
                         | -> SyncProducer.send  //发送topicMetadataRequest到该broker去取metadata,获得该topic所对应的所有的broker信息

    看到这里也就明白了为什么kafka的api里面并不要求你提供完整的整个kafka集群的broker信息,而是任选一个或几个。因为在这里它会去你提供的broker取该topic的最新的所有的broker信息。

    这里要注意的是,用于发送topicMetadataRequest的SyncProducer虽然是用ProducerPool.createSyncProducer方法建出来的,但用完并不还回pool,而是直接Close,所以会发现有INFO log打出来
  1.     [INFO] <main> Connected toxxx.xxx.xxx.xxx:9092 for producing
  2. [INFO] <main> Disconnecting fromxxx.xxx.xxx.xxx:9092
复制代码

    注意:
    这个刷新metadata并不仅在第一次初始化时做。为了能适应kafka broker运行中因为各种原因挂掉、paritition改变等变化,eventHandler会定期的再去刷新一次该metadata,刷新的间隔用参数topic.metadata.refresh.interval.ms定义,默认值是10分钟。

    这里有四点需要强调:
        i)、不调用send, 不会建立socket,不会去定期刷新metadata
        ii)、在每次取metadata时,kafka会单独开一个socket去取metadata,开完再关掉。
        iii)、根据取得的最新的完整的metadata,刷新Pool中到broker的连接(第一次建立时,pool里面是空的)
        iiii)、每10分钟的刷新会直接重新把到每个broker的socket连接重建,意味着在这之后的第一个请求会有几百毫秒的延迟。如果不想要该延迟,把topic.metadata.refresh.interval.ms值改为-1,这样只有在发送失败时,才会重新刷新。Kafka的集群中如果某个partition所在的broker挂了,可以检查错误后重启重新加入集群,手动做rebalance,producer的连接会再次断掉,直到rebalance完成,那么刷新后取到的连接着中就会有这个新加入的broker。

    在ClientUtils.fetchTopicMetadata调用完成后,回到BrokerPartitionInfo.updateInfo继续执行,在其末尾,pool会根据上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)

    注意:
    在ProducerPool中,SyncProducer的数目是由该topic的partition数目控制的,即每一个SyncProducer对应一个broker,内部封了一个到该broker的socket连接。
    每次刷新时,会把已存在SyncProducer给close掉,即关闭socket连接,然后新建SyncProducer,即新建socket连接,去覆盖老的。
    如果不存在,则直接创建新的。
     
        d)、然后,才是真正发送数据
    dispatchSerializedData(outstandingProduceRequests)
     
        e)、如果发送失败,会进行重试。重试时,又会刷新metadata,而kafka的leader选举需要一定的时间,所以这次刷新可能需要等待,最大等待时间由参数retry.backoff.ms(默认为100)定义。
    重试最大次数由参数message.send.max.retries定义默认为3

   2、async方式通过参数producer.type控制,例子:
  1.     Properties p = new Properties();
  2. props.put("producer.type", "async");
  3. ProducerConfig config = new ProducerConfig(props);
  4. producer = new Producer<String, byte[]>(config);   
复制代码
   async方式与sync方式的不同在于,在初始化scala的producer时,会创建一个ProducerSendThread对象。然后,在调用send时,它并不是直接调用eventHandler.handle方法,而是把消息放入一个长度由queue.buffering.max.messages参数定义的队列(默认10000),当队列满足以下两种条件时,会由ProducerSendThread触发eventHandler.handle方法,把队列中的消息作为一个batch发送
        a)、时间超过queue.buffering.max.ms定义的值,默认5000ms
        b)、队列中当前消息个数超过batch.num.messages定义的值,默认200

    3、结论:
1). Kafka提供的java api中的Producer,底层只是维护该topic到每个broker的连接,并不是一个传统意义上的连接池。在使用sync方式时,我们应该自己实现一个连接池,里面包含若干Producer对象,以实现最大化写入效率。我自己写了一个简单的:https://github.com/EdisonXu/simple-kafka-producer-pool
2). 在写入的数据频率不高或要求获得写入结果时,应使用sync方式,否则会因async的等待时间引入额外的延迟
3). 在写入的数据频率很高时,应使用async方式,以batch的形式写入,获得最大效率

已有(1)人评论

跳转到指定楼层
dearboll 发表于 2016-10-24 14:18:26
默认的不是调用完send直接就返回了一个Futrue类吗? 不应该是异步的方式吗? 求解,谢谢楼主
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条