由于不同的版本不一样,这里总结下相关内容
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
/**
* @author sunzq
* @since 2017/8/29
*/
public class Application {
public static void main(String[] args) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667");
props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(GROUP_ID_CONFIG, "test08291103");
// props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829");
props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// topic name: test9
consumer.subscribe(Collections.singleton("test9"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
更多参考
https://www.cnblogs.com/kischn/p/7447306.html
4.编写生产消息代码
package com.cloudera;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by fayson on 2017/10/24.
*/
public class MyProducer {
public static String TOPIC_NAME = "test3";
public static void main(String[] args){
System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");
System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
// System.setProperty("sun.security.krb5.debug","true");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
Producer<String,String> producer = new KafkaProducer<String,String>(props);
for (int i = 0; i < 10; i++) {
String key = "key-"+ i;
String message = "Message-"+ i;
ProducerRecord record= new ProducerRecord<String, String>(TOPIC_NAME, key, message);
producer.send(record);
System.out.println(key + "----"+ message);
}
producer.close();
}
}
5.编写消费消息代码
package com.cloudera;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
/**
* Created by fayson on 2017/10/24.
*/
public class MyConsumer {
private static String TOPIC_NAME = "test3";
public static void main(String[] args){
System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");
System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
TopicPartition partition0= new TopicPartition(TOPIC_NAME, 0);
TopicPartition partition1= new TopicPartition(TOPIC_NAME, 1);
TopicPartition partition2= new TopicPartition(TOPIC_NAME, 2);
consumer.assign(Arrays.asList(partition0,partition1, partition2));
ConsumerRecords<String,String> records = null;
while (true){
try {
Thread.sleep(10000l);
System.out.println();
records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Receivedmessage: (" + record.key() + "," + record.value() + ") at offset " + record.offset());
}
} **catch** (**InterruptedException** e){
e.printStackTrace();
}
}
}
}
更多参考
https://cloud.tencent.com/developer/article/1078162
package deng.yb.kafka_kerberos.utils;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
public class MyProperties extends Properties{
private Properties properties;
private static final String JAAS_TEMPLATE =
"KafkaClient {\n"
+ "com.sun.security.auth.module.Krb5LoginModule required\n" +
"useKeyTab=true\n" +
"keyTab=\"%1$s\"\n" +
"principal=\"%2$s\";\n"
+ "};";
public MyProperties(){
properties = new Properties();
}
public MyProperties self(){
return this;
}
public MyProperties put(String key , String value) {
if (properties == null) {
properties = new Properties();
}
properties.put(key, value);
return self();
}
public static MyProperties initKerberos(){
return new MyProperties()
.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer")
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
.put("security.protocol", "SASL_PLAINTEXT")
.put("sasl.kerberos.service.name", "kafka");
}
public static MyProperties initProducer(){
return new MyProperties()
.put(ProducerConfig.ACKS_CONFIG, "all")
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
.put("security.protocol", "SASL_PLAINTEXT")
.put("sasl.kerberos.service.name", "kafka");
}
public Properties getProperties() {
return properties;
}
//生成jaas.conf临时文件
public static void configureJAAS(String keyTab, String principal) {
String content = String.format(JAAS_TEMPLATE, keyTab, principal);
File jaasConf = null;
PrintWriter writer = null;
try {
jaasConf = File.createTempFile("jaas", ".conf");
writer = new PrintWriter(jaasConf);
writer.println(content);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (writer != null) {
writer.close();
}
jaasConf.deleteOnExit();
}
System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath());
}
}
生产者类
package deng.yb.kafka_kerberos;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import deng.yb.kafka_kerberos.utils.MyProperties;
public class Producer {
//发送topic
public static String TOPIC_NAME = "kafak2hdfs";
public static void main(String[] args) {
System.setProperty("java.security.krb5.conf",
Thread.currentThread().getContextClassLoader().getResource("krb5.conf").getPath());
//初始化jaas.conf文件
MyProperties.configureJAAS(Thread.currentThread().getContextClassLoader().getResource("wms_dev.keytab").getPath(), "wms_dev@WONHIGH.COM");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
//System.setProperty("sun.security.krb5.debug","true");
//初始化kerberos环境
MyProperties props = MyProperties.initProducer();
//kafka brokers地址
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"bi-slave1:9092,bi-slave2:9092,bi-slave3:9092");
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(
props.getProperties());
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String message = "Message-" + i;
ProducerRecord record = new ProducerRecord<String, String>(
TOPIC_NAME, key, message);
producer.send(record);
System.out.println(key + "----" + message);
}
producer.close();
}
}
消费者
package deng.yb.kafka_kerberos;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import deng.yb.kafka_kerberos.utils.MyProperties;
public class Comsumer {
private static String TOPIC_NAME = "kafak2hdfs";
public static void main(String[] args) {
System.setProperty("java.security.krb5.conf", Thread.currentThread()
.getContextClassLoader().getResource("krb5.conf").getPath());
//初始化jaas.conf文件
MyProperties.configureJAAS(Thread.currentThread().getContextClassLoader().getResource("wms_dev.keytab").getPath(), "wms_dev@WONHIGH.COM");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
MyProperties props = MyProperties.initKerberos();
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"bi-slave1:9092,bi-slave2:9092,bi-slave3:9092");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
props.getProperties());
consumer.subscribe(Arrays.asList(TOPIC_NAME));
/*
* TopicPartition partition0= new TopicPartition(TOPIC_NAME, 0);
*
* TopicPartition partition1= new TopicPartition(TOPIC_NAME, 1);
*
* TopicPartition partition2= new TopicPartition(TOPIC_NAME, 2);
*/
// consumer.assign(Arrays.asList(partition0,partition1, partition2));
ConsumerRecords<String, String> records = null;
while (true) {
try {
Thread.sleep(1000);
System.out.println();
records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Receivedmessage: (" + record.key()
+ "," + record.value() + ") at offset "
+ record.offset());
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/*
* while (true){
*
* try {
*
* Thread.sleep(10000l);
*
* System.out.println();
*
* records = consumer.poll(Long.MAX_VALUE);
*
* for (ConsumerRecord<String, String> record : records) {
*
* System.out.println("Receivedmessage: (" + record.key() + "," +
* record.value() + ") at offset " + record.offset());
*
* }
*
* } **catch** (**InterruptedException** e){
*
* e.printStackTrace();
*
* }
*/
}
}