分享

kafka权威指南 第三章第6节 分区器

xingoo 2017-7-8 10:34:48 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 7091
本帖最后由 xingoo 于 2017-7-9 11:54 编辑

问题导读
1 生产者中消息时如何决定写入哪个分区中的?
2 如何自定义分区器?




在前面的例子中,创建ProducerRecord对象的时候需要指定topic、key和value。也可以只配置topic和value,key的值会默认为null,大部分的应用都会直接提供key。key的作用有两个:一个是给消息添加了额外的信息;另一个重要的作用就是通过它可以决定消息持久化到哪个partition。注意,相同的key的数据肯定会进入同一个分区。这就意味着如果一个进程只消费一个分区的数据,那么相同key的消息都会被一个相同的进程消费。

创建带有key-value的记录可以按照下面的方式创建:
[mw_shl_code=java,true]ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");[/mw_shl_code]

如果创建没有key的记录,可以像下面这样:
[mw_shl_code=java,true]ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");[/mw_shl_code]这里的key会被设置为null,说明这个客户是没有名字属性的。

当key为null并且使用了默认的分区器,那么消息将会随机分布到主题的各个分区。消息写入时会采用Round-robin模式在各个分区之间负载均衡。


如果key存在并且使用的也是默认的分区器,Kafka将会对这个key取哈希值(使用的是自己的算法,因此哈希值不会随着JDK的升级更新而改变),然后会通过这个哈希值决定消息写入哪个分区。需要注意的是,相同的key总是会进入相同的那个分区,所以需要使用主题所有的分区做哈希映射,而不能仅仅是可用的分区。这就意味如果有的分区损坏不可用了,那么消息会直接报错,而不是重新选择一个其他的分区写入。这是一种比较极端的情况,因为一般情况下你都会对分区进行备份达到高可用。

另外需要主要的是key的映射需要保证分区数是不变的。举个例子,如果分区数是不变的,那么045189这个用户以这个id做为key,它通过哈希计算决定写入34分区,那么当读取的时候也只需要从这个分区就能读到这条数据。但是如果此时新增了分区,消息就不能保证还能写入34分区了,旧的消息将会保存在34分区中,但是新的消息将会被分配到新的分区中。因此最佳实践的方式是,在最开始设计解决方案的时候,就规划好分区的数量,之后就不要再做任何改变了。

实现用户自定义分区策略

到目前为止,我们描述的还都是默认的分区器,当然也是大多数场景下使用的。Kafka不会限制你必须使用hash分区器,因为有时候也需要使用其他的方式分区。比如,你是一个B2B的服务提供商,你最大的客户是一个叫做banana的设备制造公司,并且它提供了公司10%的业务,如果使用默认的分区器,banana的记录会跟其他的记录一样分配。这就可能导致写入banana的这个分区压力会比其他的分区大得多,很有可能就会出现磁盘空间不足、进程处理速度下降的问题。因此我们希望单独把banana分配到指定的分区中,这时就需要自定义分区器了。

[mw_shl_code=java,true]import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {
    public void configure(Map<String, ?> configs) {}
    public int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
        if ((keyBytes == null) || (!(key instanceOf String)))
            throw new InvalidRecordException("We expect all messages to have customername as key")
        // banana将会进入最后一个分区
        if (((String) key).equals("Banana"))
            return numPartitions; // Banana will always go to last partition
        // Other records will get hashed to the rest of the partitions
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
    public void close() {}
}[/mw_shl_code]
分区器接口主要包括configure和partition以及close方法。这里我们仅仅实现一下partition方法,当然其实我们也需要把客户的name属性进行配置转换一下,而不是直接在分区中硬编码。
这里仅仅考虑字符串的场景,其他的类型就直接抛出异常了。

本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条