分享

kafka权威指南 第三章第4节 kafka发送消息的三种方式介绍

desehawk 2017-6-27 10:44:05 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 9184

问题导读

1.kafka发送消息有哪三种方式?
2.kafka三种发送消息方式各有什么特点?
3.kafka三种发送消息方式是如何实现的?






直接发送
下面是一种最简单的发送数据的方式
[mw_shl_code=java,true]ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France"); // 1
try {
producer.send(record); //2
} catch (Exception e) {
e.printStackTrace(); //3
}[/mw_shl_code]
1 Producer接收是ProducerRecord对象,因此最开始的时候需要创建ProducerRecord的对象。ProducerRecord有好几种构造方法,稍后我们会讲到。上面例子中需要填写接收消息的topic(以啊不能都是字符串类型),想要发送的key和value。key和value的类型必须要和序列化保持一致。
2 使用send()方法发送ProducerRecord对象,就像最开始Kafka的架构一样,消息会先缓存在buffer,然后开启独立的线程发送给broker。send()方法返回一个Java Future对象,对象中包含RecordMetadata,在上面的例子中并没有关心返回值,因此也就不知道消息是否发送成功,这种一般适用于允许丢失消息的情况。比如记录一些日志信息或者是不太重要的应用信息。
3 虽然我们忽略了向broker发送数据时出的错或者是Broker自己出的错,在producer发送数据前如果有错误仍然会抛出异常。有可能是在序列化消息的时候,产生了异常。比如说Buffer已经满了或者是发送线程中断产生的中断异常。

同步发送

[mw_shl_code=java,true]ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record).get();[/mw_shl_code]

这里,使用了Future.get()方法,会等待kafka的确认回复。当broker遇到错误或者应用出现问题时,future接口都会抛出异常,然后我们可以捕获到这个异常进行处理。如果没有错误。将会获得RecordMetadata对象,这个对象包含了消息写入的偏移值。

Producer有两种错误类型。一种是可以通过再次发送消息解决的错误,比如连接出现问题,需要重新连接;或者是"noleader"错误,通过等待一会Leader重新选举完就可以继续。producer可以配置自动重试。另一种是通过重试无法处理的错误,比如消息过大,这种情况下,Producer就不会重试,而是直接抛出异常。

异步发送

设想一下,如果应用跟Kafka集群之间传递消息需要10ms,那么发送100个消息,将需1秒钟的时间。另一方面,如果我们仅仅发送消息,而忽略返回的时间,那么100个消息根本花费不了多长时间。在大多数的情况下,都不需要回复——kafka在消息写入broker之后会返回消息所在的offset,这部分的信息对于producer来说,其实没什么用。另一方面,我们还需要知道消息发送失败后,抛出的异常、错误日志或者是把消息写入"errors标记的文件",稍后再统一处理。

为了异步的发送数据,但是还能处理异常,producer支持消息成功写入后回调。下面就是回调的例子:

[mw_shl_code=java,true]private class DemoProducerCallback implements Callback { //1
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace(); //2
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");//3
producer.send(record, new DemoProducerCallback());//4[/mw_shl_code]

1 为了使用回调方法,需要实现org.apache.kafka.clients.producer.Callback接口,实现它的onCompletion方法。
2 当Kafka返回错误的时候,onCompletion方法会收到一个非null的异常。上面的例子直接打印异常消息,但是如果是生产环境,需要做一些处理错误的操作。
3 记录的创建和之前是一样的
4 需要再发送消息的时候,传入回调的对象


本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条