问题导读
1.Kafka和Spark Streaming Java版本集成并将数据实时写入HBase,pom.xml是如何配置的?
2.HBaseCounterIncrementor.java都实现了什么功能?
3.SparkStreamingFromFlumeToHBaseExample.java实现了哪些功能?
Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
mvn配置pom.xml
复制代码
java代码SparkStreamingFromFlumeToHBaseExample.java
- package org.apache.spark.examples.streaming;
-
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.regex.Pattern;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.VoidFunction;
- import org.apache.spark.broadcast.Broadcast;
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.Time;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaPairDStream;
- import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
-
- import scala.Tuple2;
-
- import com.google.common.base.Optional;
- import com.google.common.collect.Lists;
-
- public class SparkStreamingFromFlumeToHBaseExample {
-
- private static final Pattern SPACE = Pattern.compile(" ");
-
- public static void main(String[] args) {
- if (args.length == 0) {
- System.err
- .println("Usage: SparkStreamingFromFlumeToHBaseWindowingExample {master} {host} {port} {table} {columnFamily} {windowInSeconds} {slideInSeconds");
- System.exit(1);
- }
-
- // String master = args[0];
- // String host = args[1];
- // int port = Integer.parseInt(args[2]);
- String tableName = "test";// args[3];
- String columnFamily = "f";// args[4];
- // int windowInSeconds = 3;// Integer.parseInt(args[5]);
- // int slideInSeconds = 1;// Integer.parseInt(args[5]);
-
- String zkQuorum = "localhost";
- String group = "test-consumer-group";
- String topicss = "test";
- String numThread = "2";
-
- Duration batchInterval = new Duration(5000);
- // Duration windowInterval = new Duration(windowInSeconds * 1000);
- // Duration slideInterval = new Duration(slideInSeconds * 1000);
-
- SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
- JavaStreamingContext jssc =
- new JavaStreamingContext(sparkConf, new Duration(2000));
-
- final Broadcast<String> broadcastTableName =
- jssc.sparkContext().broadcast(tableName);
- final Broadcast<String> broadcastColumnFamily =
- jssc.sparkContext().broadcast(columnFamily);
-
- // JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream(host, port);
-
- int numThreads = Integer.parseInt(numThread);
- Map<String, Integer> topicMap = new HashMap<String, Integer>();
- String[] topics = topicss.split(",");
- for (String topic : topics) {
- topicMap.put(topic, numThreads);
- }
-
- JavaPairReceiverInputDStream<String, String> messages =
- KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
-
- JavaDStream<String> lines =
- messages.map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- });
-
- JavaDStream<String> words =
- lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
- }
- });
-
- JavaPairDStream<String, Integer> lastCounts =
- messages.map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- }).flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
- }
- }).mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-
- @Override
- public Integer call(Integer x, Integer y) throws Exception {
- // TODO Auto-generated method stub
- return x.intValue() + y.intValue();
- }
- });
-
- lastCounts
- .foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
-
- @Override
- public Void call(JavaPairRDD<String, Integer> values, Time time)
- throws Exception {
-
- values.foreach(new VoidFunction<Tuple2<String, Integer>>() {
-
- @Override
- public void call(Tuple2<String, Integer> tuple) throws Exception {
- HBaseCounterIncrementor incrementor =
- HBaseCounterIncrementor.getInstance(
- broadcastTableName.value(),
- broadcastColumnFamily.value());
- incrementor.incerment("Counter", tuple._1(), tuple._2());
- System.out.println("Counter:" + tuple._1() + "," + tuple._2());
-
- }
- });
-
- return null;
- }
- });
-
- jssc.start();
-
- }
- }
复制代码
java代码CounterMap.java
- package org.apache.spark.examples.streaming;
-
- import java.util.HashMap;
- import java.util.Map.Entry;
- import java.util.Set;
-
- public class CounterMap {
- HashMap<String, Counter> map = new HashMap<String, Counter>();
-
- public void increment(String key, long increment) {
- Counter count = map.get(key);
- if (count == null) {
- count = new Counter();
- map.put(key, count);
- }
- count.value += increment;
- }
-
-
- public long getValue(String key) {
- Counter count = map.get(key);
- if (count != null) {
- return count.value;
- } else {
- return 0;
- }
- }
-
- public Set<Entry<String, Counter>> entrySet() {
- return map.entrySet();
- }
-
- public void clear() {
- map.clear();
- }
-
- public static class Counter {
- public long value;
- }
-
-
- }
复制代码
java代码HBaseCounterIncrementor.java
- package org.apache.spark.examples.streaming;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map.Entry;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Increment;
- import org.apache.hadoop.hbase.util.Bytes;
-
- import org.apache.spark.examples.streaming.CounterMap;
- import org.apache.spark.examples.streaming.CounterMap.Counter;
-
- public class HBaseCounterIncrementor {
-
- static HBaseCounterIncrementor singleton;
- static String tableName;
- static String columnFamily;
- static HTable hTable;
- static long lastUsed;
- static long flushInterval;
- static CloserThread closerThread;
- static FlushThread flushThread;
- static HashMap<String, CounterMap> rowKeyCounterMap =
- new HashMap<String, CounterMap>();
- static Object locker = new Object();
-
- private HBaseCounterIncrementor(String tableName, String columnFamily) {
- HBaseCounterIncrementor.tableName = tableName;
- HBaseCounterIncrementor.columnFamily = columnFamily;
- }
-
- public static HBaseCounterIncrementor getInstance(String tableName,
- String columnFamily) {
-
- if (singleton == null) {
- synchronized (locker) {
- if (singleton == null) {
- singleton = new HBaseCounterIncrementor(tableName, columnFamily);
- initialize();
- }
- }
- }
- return singleton;
- }
-
- private static void initialize() {
- if (hTable == null) {
- synchronized (locker) {
- if (hTable == null) {
- Configuration hConfig = HBaseConfiguration.create();
- try {
- hTable = new HTable(hConfig, tableName);
- updateLastUsed();
-
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- flushThread = new FlushThread(flushInterval);
- flushThread.start();
- closerThread = new CloserThread();
- closerThread.start();
- }
- }
- }
- }
-
- public void incerment(String rowKey, String key, int increment) {
- incerment(rowKey, key, (long) increment);
- }
-
- public void incerment(String rowKey, String key, long increment) {
- CounterMap counterMap = rowKeyCounterMap.get(rowKey);
- if (counterMap == null) {
- counterMap = new CounterMap();
- rowKeyCounterMap.put(rowKey, counterMap);
- }
- counterMap.increment(key, increment);
-
- initialize();
- }
-
- private static void updateLastUsed() {
- lastUsed = System.currentTimeMillis();
- }
-
- protected void close() {
- if (hTable != null) {
- synchronized (locker) {
- if (hTable != null) {
- if (hTable != null && System.currentTimeMillis() - lastUsed > 30000) {
- flushThread.stopLoop();
- flushThread = null;
- try {
- hTable.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- hTable = null;
- }
- }
- }
- }
- }
-
- public static class CloserThread extends Thread {
-
- boolean continueLoop = true;
-
- @Override
- public void run() {
- while (continueLoop) {
-
- if (System.currentTimeMillis() - lastUsed > 30000) {
- singleton.close();
- break;
- }
-
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- public void stopLoop() {
- continueLoop = false;
- }
- }
-
- protected static class FlushThread extends Thread {
- long sleepTime;
- boolean continueLoop = true;
-
- public FlushThread(long sleepTime) {
- this.sleepTime = sleepTime;
- }
-
- @Override
- public void run() {
- while (continueLoop) {
- try {
- flushToHBase();
- } catch (IOException e) {
- e.printStackTrace();
- break;
- }
-
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- private void flushToHBase() throws IOException {
- synchronized (hTable) {
- if (hTable == null) {
- initialize();
- }
- updateLastUsed();
-
- for (Entry<String, CounterMap> entry : rowKeyCounterMap.entrySet()) {
- CounterMap pastCounterMap = entry.getValue();
- rowKeyCounterMap.put(entry.getKey(), new CounterMap());
-
- Increment increment = new Increment(Bytes.toBytes(entry.getKey()));
-
- boolean hasColumns = false;
- for (Entry<String, Counter> entry2 : pastCounterMap.entrySet()) {
- increment.addColumn(Bytes.toBytes(columnFamily),
- Bytes.toBytes(entry2.getKey()), entry2.getValue().value);
- hasColumns = true;
- }
- if (hasColumns) {
- updateLastUsed();
- hTable.increment(increment);
- }
- }
- updateLastUsed();
- }
- }
-
- public void stopLoop() {
- continueLoop = false;
- }
- }
-
- }
复制代码
mvn package打包后将spark-1-jar-with-dependencies.jar上传到spark集群运行
- /root/spark-1.2.0-bin-hadoop2.3/bin/spark-submit --class org.apache.spark.examples.streaming.SparkStreamingFromFlumeToHBaseExample --master local[8] /root/spark-1-jar-with-dependencies.jar 100
复制代码
Hbase创建相应的表
- hbase(main):002:0> create 'test', 'f'
复制代码
启动kafka的server和producer
- [root@n1 kafka-0.8.1]# bin/kafka-console-producer.sh --broker-list n1:9092 --topic test
- [root@n1 kafka-0.8.1]# bin/kafka-server-start.sh config/server.properties
复制代码
在producer端输入字符后HBase中会插入相应记录如下
代码下载:
链接:http://pan.baidu.com/s/1i3IgElZ 密码:
|