Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码下载
问题导读
1.Kafka和Spark Streaming Java版本集成并将数据实时写入HBase,pom.xml是如何配置的?
2.HBaseCounterIncrementor.java都实现了什么功能?
3.SparkStreamingFromFlumeToHBaseExample.java实现了哪些功能?
static/image/hrline/4.gif
Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
mvn配置pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>spaek</groupId>
<artifactId>spark</artifactId>
<version>1</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.4-hadoop2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.6.6.Final</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>0.98.4-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>0.98.4-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.cloudera.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>2.01</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Bind the maven-assembly-plugin to the package phase this will create
a jar file without the storm dependencies suitable for deployment to a cluster. -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass />
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.theoryinpractise</groupId>
<artifactId>clojure-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<sourceDirectories>
<sourceDirectory>src/clj</sourceDirectory>
</sourceDirectories>
</configuration>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
java代码SparkStreamingFromFlumeToHBaseExample.java
package org.apache.spark.examples.streaming;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
public class SparkStreamingFromFlumeToHBaseExample {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
if (args.length == 0) {
System.err
.println("Usage: SparkStreamingFromFlumeToHBaseWindowingExample {master} {host} {port} {table} {columnFamily} {windowInSeconds} {slideInSeconds");
System.exit(1);
}
// String master = args;
// String host = args;
// int port = Integer.parseInt(args);
String tableName = "test";// args;
String columnFamily = "f";// args;
// int windowInSeconds = 3;// Integer.parseInt(args);
// int slideInSeconds = 1;// Integer.parseInt(args);
String zkQuorum = "localhost";
String group = "test-consumer-group";
String topicss = "test";
String numThread = "2";
Duration batchInterval = new Duration(5000);
// Duration windowInterval = new Duration(windowInSeconds * 1000);
// Duration slideInterval = new Duration(slideInSeconds * 1000);
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc =
new JavaStreamingContext(sparkConf, new Duration(2000));
final Broadcast<String> broadcastTableName =
jssc.sparkContext().broadcast(tableName);
final Broadcast<String> broadcastColumnFamily =
jssc.sparkContext().broadcast(columnFamily);
// JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream(host, port);
int numThreads = Integer.parseInt(numThread);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = topicss.split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
JavaDStream<String> lines =
messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words =
lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> lastCounts =
messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
}).mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer x, Integer y) throws Exception {
// TODO Auto-generated method stub
return x.intValue() + y.intValue();
}
});
lastCounts
.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> values, Time time)
throws Exception {
values.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
HBaseCounterIncrementor incrementor =
HBaseCounterIncrementor.getInstance(
broadcastTableName.value(),
broadcastColumnFamily.value());
incrementor.incerment("Counter", tuple._1(), tuple._2());
System.out.println("Counter:" + tuple._1() + "," + tuple._2());
}
});
return null;
}
});
jssc.start();
}
}
java代码CounterMap.java
package org.apache.spark.examples.streaming;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Set;
public class CounterMap {
HashMap<String, Counter> map = new HashMap<String, Counter>();
public void increment(String key, long increment) {
Counter count = map.get(key);
if (count == null) {
count = new Counter();
map.put(key, count);
}
count.value += increment;
}
public long getValue(String key) {
Counter count = map.get(key);
if (count != null) {
return count.value;
} else {
return 0;
}
}
public Set<Entry<String, Counter>> entrySet() {
return map.entrySet();
}
public void clear() {
map.clear();
}
public static class Counter {
public long value;
}
}
java代码HBaseCounterIncrementor.java
package org.apache.spark.examples.streaming;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.examples.streaming.CounterMap;
import org.apache.spark.examples.streaming.CounterMap.Counter;
public class HBaseCounterIncrementor {
static HBaseCounterIncrementor singleton;
static String tableName;
static String columnFamily;
static HTable hTable;
static long lastUsed;
static long flushInterval;
static CloserThread closerThread;
static FlushThread flushThread;
static HashMap<String, CounterMap> rowKeyCounterMap =
new HashMap<String, CounterMap>();
static Object locker = new Object();
private HBaseCounterIncrementor(String tableName, String columnFamily) {
HBaseCounterIncrementor.tableName = tableName;
HBaseCounterIncrementor.columnFamily = columnFamily;
}
public static HBaseCounterIncrementor getInstance(String tableName,
String columnFamily) {
if (singleton == null) {
synchronized (locker) {
if (singleton == null) {
singleton = new HBaseCounterIncrementor(tableName, columnFamily);
initialize();
}
}
}
return singleton;
}
private static void initialize() {
if (hTable == null) {
synchronized (locker) {
if (hTable == null) {
Configuration hConfig = HBaseConfiguration.create();
try {
hTable = new HTable(hConfig, tableName);
updateLastUsed();
} catch (IOException e) {
throw new RuntimeException(e);
}
flushThread = new FlushThread(flushInterval);
flushThread.start();
closerThread = new CloserThread();
closerThread.start();
}
}
}
}
public void incerment(String rowKey, String key, int increment) {
incerment(rowKey, key, (long) increment);
}
public void incerment(String rowKey, String key, long increment) {
CounterMap counterMap = rowKeyCounterMap.get(rowKey);
if (counterMap == null) {
counterMap = new CounterMap();
rowKeyCounterMap.put(rowKey, counterMap);
}
counterMap.increment(key, increment);
initialize();
}
private static void updateLastUsed() {
lastUsed = System.currentTimeMillis();
}
protected void close() {
if (hTable != null) {
synchronized (locker) {
if (hTable != null) {
if (hTable != null && System.currentTimeMillis() - lastUsed > 30000) {
flushThread.stopLoop();
flushThread = null;
try {
hTable.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
hTable = null;
}
}
}
}
}
public static class CloserThread extends Thread {
boolean continueLoop = true;
@Override
public void run() {
while (continueLoop) {
if (System.currentTimeMillis() - lastUsed > 30000) {
singleton.close();
break;
}
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stopLoop() {
continueLoop = false;
}
}
protected static class FlushThread extends Thread {
long sleepTime;
boolean continueLoop = true;
public FlushThread(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public void run() {
while (continueLoop) {
try {
flushToHBase();
} catch (IOException e) {
e.printStackTrace();
break;
}
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void flushToHBase() throws IOException {
synchronized (hTable) {
if (hTable == null) {
initialize();
}
updateLastUsed();
for (Entry<String, CounterMap> entry : rowKeyCounterMap.entrySet()) {
CounterMap pastCounterMap = entry.getValue();
rowKeyCounterMap.put(entry.getKey(), new CounterMap());
Increment increment = new Increment(Bytes.toBytes(entry.getKey()));
boolean hasColumns = false;
for (Entry<String, Counter> entry2 : pastCounterMap.entrySet()) {
increment.addColumn(Bytes.toBytes(columnFamily),
Bytes.toBytes(entry2.getKey()), entry2.getValue().value);
hasColumns = true;
}
if (hasColumns) {
updateLastUsed();
hTable.increment(increment);
}
}
updateLastUsed();
}
}
public void stopLoop() {
continueLoop = false;
}
}
}
mvn package打包后将spark-1-jar-with-dependencies.jar上传到spark集群运行
/root/spark-1.2.0-bin-hadoop2.3/bin/spark-submit --class org.apache.spark.examples.streaming.SparkStreamingFromFlumeToHBaseExample --master local /root/spark-1-jar-with-dependencies.jar100
Hbase创建相应的表
hbase(main):002:0> create 'test', 'f'
启动kafka的server和producer
# bin/kafka-console-producer.sh --broker-list n1:9092 --topic test
# bin/kafka-server-start.sh config/server.properties
在producer端输入字符后HBase中会插入相应记录如下
代码下载:
链接:http://pan.baidu.com/s/1i3IgElZ 密码:**** Hidden Message *****
xiex......
Hbase创建相应的表 好资料,感谢分享
{:soso_e179:} 正需要,谢共享!
非常有帮助,感谢分享
look look
谢谢分享,好资料
感谢楼主的辛苦劳做。