分享

Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码下载

 

问题导读

1.Kafka和Spark Streaming Java版本集成并将数据实时写入HBase,pom.xml是如何配置的?
2.HBaseCounterIncrementor.java都实现了什么功能?
3.SparkStreamingFromFlumeToHBaseExample.java实现了哪些功能?









Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
mvn配置pom.xml


  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3.         <modelVersion>4.0.0</modelVersion>
  4.         <groupId>spaek</groupId>
  5.         <artifactId>spark</artifactId>
  6.         <version>1</version>
  7.         <packaging>jar</packaging>
  8.         <properties>
  9.                 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  10.         </properties>
  11.         <dependencies>
  12.                 <dependency>
  13.                         <groupId>org.apache.spark</groupId>
  14.                         <artifactId>spark-streaming_2.10</artifactId>
  15.                         <version>1.2.0</version>
  16.                         <scope>provided</scope>
  17.                 </dependency>
  18.                 <dependency>
  19.                         <groupId>org.apache.spark</groupId>
  20.                         <artifactId>spark-streaming-kafka_2.10</artifactId>
  21.                         <version>1.2.0</version>
  22.                 </dependency>
  23.                 <dependency>
  24.                         <groupId>org.clojure</groupId>
  25.                         <artifactId>clojure</artifactId>
  26.                         <version>1.6.0</version>
  27.                 </dependency>
  28.                 <dependency>
  29.                         <groupId>com.google.guava</groupId>
  30.                         <artifactId>guava</artifactId>
  31.                         <version>11.0.2</version>
  32.                 </dependency>
  33.                 <dependency>
  34.                         <groupId>org.apache.hbase</groupId>
  35.                         <artifactId>hbase-client</artifactId>
  36.                         <version>0.98.4-hadoop2</version>
  37.                 </dependency>
  38.                 <dependency>
  39.                         <groupId>com.google.protobuf</groupId>
  40.                         <artifactId>protobuf-java</artifactId>
  41.                         <version>2.5.0</version>
  42.                 </dependency>
  43.                 <dependency>
  44.                         <groupId>io.netty</groupId>
  45.                         <artifactId>netty</artifactId>
  46.                         <version>3.6.6.Final</version>
  47.                 </dependency>
  48.                 <dependency>
  49.                         <groupId>org.apache.hbase</groupId>
  50.                         <artifactId>hbase-common</artifactId>
  51.                         <version>0.98.4-hadoop2</version>
  52.                 </dependency>
  53.                 <dependency>
  54.                         <groupId>org.apache.hbase</groupId>
  55.                         <artifactId>hbase-protocol</artifactId>
  56.                         <version>0.98.4-hadoop2</version>
  57.                 </dependency>
  58.                 <dependency>
  59.                         <groupId>org.apache.zookeeper</groupId>
  60.                         <artifactId>zookeeper</artifactId>
  61.                         <version>3.4.5</version>
  62.                 </dependency>
  63.                 <dependency>
  64.                         <groupId>org.cloudera.htrace</groupId>
  65.                         <artifactId>htrace-core</artifactId>
  66.                         <version>2.01</version>
  67.                 </dependency>
  68.         </dependencies>
  69.         <build>
  70.                 <plugins>
  71.                         <!-- Bind the maven-assembly-plugin to the package phase this will create
  72.                                 a jar file without the storm dependencies suitable for deployment to a cluster. -->
  73.                         <plugin>
  74.                                 <artifactId>maven-assembly-plugin</artifactId>
  75.                                 <configuration>
  76.                                         <descriptorRefs>
  77.                                                 <descriptorRef>jar-with-dependencies</descriptorRef>
  78.                                         </descriptorRefs>
  79.                                         <archive>
  80.                                                 <manifest>
  81.                                                         <mainClass />
  82.                                                 </manifest>
  83.                                         </archive>
  84.                                 </configuration>
  85.                                 <executions>
  86.                                         <execution>
  87.                                                 <id>make-assembly</id>
  88.                                                 <phase>package</phase>
  89.                                                 <goals>
  90.                                                         <goal>single</goal>
  91.                                                 </goals>
  92.                                         </execution>
  93.                                 </executions>
  94.                         </plugin>
  95.                         <plugin>
  96.                                 <groupId>com.theoryinpractise</groupId>
  97.                                 <artifactId>clojure-maven-plugin</artifactId>
  98.                                 <extensions>true</extensions>
  99.                                 <configuration>
  100.                                         <sourceDirectories>
  101.                                                 <sourceDirectory>src/clj</sourceDirectory>
  102.                                         </sourceDirectories>
  103.                                 </configuration>
  104.                                 <executions>
  105.                                         <execution>
  106.                                                 <id>compile</id>
  107.                                                 <phase>compile</phase>
  108.                                                 <goals>
  109.                                                         <goal>compile</goal>
  110.                                                 </goals>
  111.                                         </execution>
  112.                                 </executions>
  113.                         </plugin>
  114.                         <plugin>
  115.                                 <groupId>org.codehaus.mojo</groupId>
  116.                                 <artifactId>exec-maven-plugin</artifactId>
  117.                                 <version>1.2.1</version>
  118.                                 <executions>
  119.                                         <execution>
  120.                                                 <goals>
  121.                                                         <goal>exec</goal>
  122.                                                 </goals>
  123.                                         </execution>
  124.                                 </executions>
  125.                                 <configuration>
  126.                                         <executable>java</executable>
  127.                                         <includeProjectDependencies>true</includeProjectDependencies>
  128.                                         <includePluginDependencies>false</includePluginDependencies>
  129.                                         <classpathScope>compile</classpathScope>
  130.                                         <mainClass>${storm.topology}</mainClass>
  131.                                 </configuration>
  132.                         </plugin>
  133.                 </plugins>
  134.         </build>
  135. </project>
复制代码


java代码SparkStreamingFromFlumeToHBaseExample.java

  1. package org.apache.spark.examples.streaming;
  2. import java.util.Arrays;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.regex.Pattern;
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.function.FlatMapFunction;
  9. import org.apache.spark.api.java.function.Function;
  10. import org.apache.spark.api.java.function.Function2;
  11. import org.apache.spark.api.java.function.PairFunction;
  12. import org.apache.spark.api.java.function.VoidFunction;
  13. import org.apache.spark.broadcast.Broadcast;
  14. import org.apache.spark.streaming.Duration;
  15. import org.apache.spark.streaming.Time;
  16. import org.apache.spark.streaming.api.java.JavaDStream;
  17. import org.apache.spark.streaming.api.java.JavaPairDStream;
  18. import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
  19. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  20. import org.apache.spark.streaming.kafka.KafkaUtils;
  21. import scala.Tuple2;
  22. import com.google.common.base.Optional;
  23. import com.google.common.collect.Lists;
  24. public class SparkStreamingFromFlumeToHBaseExample {
  25.   private static final Pattern SPACE = Pattern.compile(" ");
  26.   public static void main(String[] args) {
  27.     if (args.length == 0) {
  28.       System.err
  29.           .println("Usage: SparkStreamingFromFlumeToHBaseWindowingExample {master} {host} {port} {table} {columnFamily} {windowInSeconds} {slideInSeconds");
  30.       System.exit(1);
  31.     }
  32.     // String master = args[0];
  33.     // String host = args[1];
  34.     // int port = Integer.parseInt(args[2]);
  35.     String tableName = "test";// args[3];
  36.     String columnFamily = "f";// args[4];
  37.     // int windowInSeconds = 3;// Integer.parseInt(args[5]);
  38.     // int slideInSeconds = 1;// Integer.parseInt(args[5]);
  39.     String zkQuorum = "localhost";
  40.     String group = "test-consumer-group";
  41.     String topicss = "test";
  42.     String numThread = "2";
  43.     Duration batchInterval = new Duration(5000);
  44.     // Duration windowInterval = new Duration(windowInSeconds * 1000);
  45.     // Duration slideInterval = new Duration(slideInSeconds * 1000);
  46.     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
  47.     JavaStreamingContext jssc =
  48.         new JavaStreamingContext(sparkConf, new Duration(2000));
  49.     final Broadcast<String> broadcastTableName =
  50.         jssc.sparkContext().broadcast(tableName);
  51.     final Broadcast<String> broadcastColumnFamily =
  52.         jssc.sparkContext().broadcast(columnFamily);
  53.     // JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream(host, port);
  54.     int numThreads = Integer.parseInt(numThread);
  55.     Map<String, Integer> topicMap = new HashMap<String, Integer>();
  56.     String[] topics = topicss.split(",");
  57.     for (String topic : topics) {
  58.       topicMap.put(topic, numThreads);
  59.     }
  60.     JavaPairReceiverInputDStream<String, String> messages =
  61.         KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
  62.     JavaDStream<String> lines =
  63.         messages.map(new Function<Tuple2<String, String>, String>() {
  64.           @Override
  65.           public String call(Tuple2<String, String> tuple2) {
  66.             return tuple2._2();
  67.           }
  68.         });
  69.     JavaDStream<String> words =
  70.         lines.flatMap(new FlatMapFunction<String, String>() {
  71.           @Override
  72.           public Iterable<String> call(String x) {
  73.             return Lists.newArrayList(SPACE.split(x));
  74.           }
  75.         });
  76.     JavaPairDStream<String, Integer> lastCounts =
  77.         messages.map(new Function<Tuple2<String, String>, String>() {
  78.           @Override
  79.           public String call(Tuple2<String, String> tuple2) {
  80.             return tuple2._2();
  81.           }
  82.         }).flatMap(new FlatMapFunction<String, String>() {
  83.           @Override
  84.           public Iterable<String> call(String x) {
  85.             return Lists.newArrayList(SPACE.split(x));
  86.           }
  87.         }).mapToPair(new PairFunction<String, String, Integer>() {
  88.           @Override
  89.           public Tuple2<String, Integer> call(String s) {
  90.             return new Tuple2<String, Integer>(s, 1);
  91.           }
  92.         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
  93.           @Override
  94.           public Integer call(Integer x, Integer y) throws Exception {
  95.             // TODO Auto-generated method stub
  96.             return x.intValue() + y.intValue();
  97.           }
  98.         });
  99.     lastCounts
  100.         .foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
  101.           @Override
  102.           public Void call(JavaPairRDD<String, Integer> values, Time time)
  103.               throws Exception {
  104.             values.foreach(new VoidFunction<Tuple2<String, Integer>>() {
  105.               @Override
  106.               public void call(Tuple2<String, Integer> tuple) throws Exception {
  107.                 HBaseCounterIncrementor incrementor =
  108.                     HBaseCounterIncrementor.getInstance(
  109.                         broadcastTableName.value(),
  110.                         broadcastColumnFamily.value());
  111.                 incrementor.incerment("Counter", tuple._1(), tuple._2());
  112.                 System.out.println("Counter:" + tuple._1() + "," + tuple._2());
  113.               }
  114.             });
  115.             return null;
  116.           }
  117.         });
  118.     jssc.start();
  119.   }
  120. }
复制代码


java代码CounterMap.java

  1. package org.apache.spark.examples.streaming;
  2. import java.util.HashMap;
  3. import java.util.Map.Entry;
  4. import java.util.Set;
  5. public class CounterMap {
  6.   HashMap<String, Counter> map = new HashMap<String, Counter>();
  7.   
  8.   public void increment(String key, long increment) {
  9.     Counter count = map.get(key);
  10.     if (count == null) {
  11.       count = new Counter();
  12.       map.put(key, count);
  13.     }
  14.     count.value += increment;
  15.   }
  16.   
  17.   
  18.   public long getValue(String key) {
  19.     Counter count = map.get(key);
  20.     if (count != null) {
  21.       return count.value;
  22.     } else {
  23.       return 0;
  24.     }
  25.   }
  26.   
  27.   public Set<Entry<String, Counter>> entrySet() {
  28.     return map.entrySet();
  29.   }
  30.   
  31.   public void clear() {
  32.     map.clear();
  33.   }
  34.   
  35.   public static class Counter {
  36.     public long value;
  37.   }
  38.   
  39.   
  40. }
复制代码


java代码HBaseCounterIncrementor.java
  1. package org.apache.spark.examples.streaming;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.Map.Entry;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.hbase.HBaseConfiguration;
  7. import org.apache.hadoop.hbase.client.HTable;
  8. import org.apache.hadoop.hbase.client.Increment;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.apache.spark.examples.streaming.CounterMap;
  11. import org.apache.spark.examples.streaming.CounterMap.Counter;
  12. public class HBaseCounterIncrementor {
  13.   static HBaseCounterIncrementor singleton;
  14.   static String tableName;
  15.   static String columnFamily;
  16.   static HTable hTable;
  17.   static long lastUsed;
  18.   static long flushInterval;
  19.   static CloserThread closerThread;
  20.   static FlushThread flushThread;
  21.   static HashMap<String, CounterMap> rowKeyCounterMap =
  22.       new HashMap<String, CounterMap>();
  23.   static Object locker = new Object();
  24.   private HBaseCounterIncrementor(String tableName, String columnFamily) {
  25.     HBaseCounterIncrementor.tableName = tableName;
  26.     HBaseCounterIncrementor.columnFamily = columnFamily;
  27.   }
  28.   public static HBaseCounterIncrementor getInstance(String tableName,
  29.       String columnFamily) {
  30.     if (singleton == null) {
  31.       synchronized (locker) {
  32.         if (singleton == null) {
  33.           singleton = new HBaseCounterIncrementor(tableName, columnFamily);
  34.           initialize();
  35.         }
  36.       }
  37.     }
  38.     return singleton;
  39.   }
  40.   private static void initialize() {
  41.     if (hTable == null) {
  42.       synchronized (locker) {
  43.         if (hTable == null) {
  44.           Configuration hConfig = HBaseConfiguration.create();
  45.           try {
  46.             hTable = new HTable(hConfig, tableName);
  47.             updateLastUsed();
  48.           } catch (IOException e) {
  49.             throw new RuntimeException(e);
  50.           }
  51.           flushThread = new FlushThread(flushInterval);
  52.           flushThread.start();
  53.           closerThread = new CloserThread();
  54.           closerThread.start();
  55.         }
  56.       }
  57.     }
  58.   }
  59.   public void incerment(String rowKey, String key, int increment) {
  60.     incerment(rowKey, key, (long) increment);
  61.   }
  62.   public void incerment(String rowKey, String key, long increment) {
  63.     CounterMap counterMap = rowKeyCounterMap.get(rowKey);
  64.     if (counterMap == null) {
  65.       counterMap = new CounterMap();
  66.       rowKeyCounterMap.put(rowKey, counterMap);
  67.     }
  68.     counterMap.increment(key, increment);
  69.     initialize();
  70.   }
  71.   private static void updateLastUsed() {
  72.     lastUsed = System.currentTimeMillis();
  73.   }
  74.   protected void close() {
  75.     if (hTable != null) {
  76.       synchronized (locker) {
  77.         if (hTable != null) {
  78.           if (hTable != null && System.currentTimeMillis() - lastUsed > 30000) {
  79.             flushThread.stopLoop();
  80.             flushThread = null;
  81.             try {
  82.               hTable.close();
  83.             } catch (IOException e) {
  84.               // TODO Auto-generated catch block
  85.               e.printStackTrace();
  86.             }
  87.             hTable = null;
  88.           }
  89.         }
  90.       }
  91.     }
  92.   }
  93.   public static class CloserThread extends Thread {
  94.     boolean continueLoop = true;
  95.     @Override
  96.     public void run() {
  97.       while (continueLoop) {
  98.         if (System.currentTimeMillis() - lastUsed > 30000) {
  99.           singleton.close();
  100.           break;
  101.         }
  102.         try {
  103.           Thread.sleep(60000);
  104.         } catch (InterruptedException e) {
  105.           e.printStackTrace();
  106.         }
  107.       }
  108.     }
  109.     public void stopLoop() {
  110.       continueLoop = false;
  111.     }
  112.   }
  113.   protected static class FlushThread extends Thread {
  114.     long sleepTime;
  115.     boolean continueLoop = true;
  116.     public FlushThread(long sleepTime) {
  117.       this.sleepTime = sleepTime;
  118.     }
  119.     @Override
  120.     public void run() {
  121.       while (continueLoop) {
  122.         try {
  123.           flushToHBase();
  124.         } catch (IOException e) {
  125.           e.printStackTrace();
  126.           break;
  127.         }
  128.         try {
  129.           Thread.sleep(sleepTime);
  130.         } catch (InterruptedException e) {
  131.           e.printStackTrace();
  132.         }
  133.       }
  134.     }
  135.     private void flushToHBase() throws IOException {
  136.       synchronized (hTable) {
  137.         if (hTable == null) {
  138.           initialize();
  139.         }
  140.         updateLastUsed();
  141.         for (Entry<String, CounterMap> entry : rowKeyCounterMap.entrySet()) {
  142.           CounterMap pastCounterMap = entry.getValue();
  143.           rowKeyCounterMap.put(entry.getKey(), new CounterMap());
  144.           Increment increment = new Increment(Bytes.toBytes(entry.getKey()));
  145.           boolean hasColumns = false;
  146.           for (Entry<String, Counter> entry2 : pastCounterMap.entrySet()) {
  147.             increment.addColumn(Bytes.toBytes(columnFamily),
  148.                 Bytes.toBytes(entry2.getKey()), entry2.getValue().value);
  149.             hasColumns = true;
  150.           }
  151.           if (hasColumns) {
  152.             updateLastUsed();
  153.             hTable.increment(increment);
  154.           }
  155.         }
  156.         updateLastUsed();
  157.       }
  158.     }
  159.     public void stopLoop() {
  160.       continueLoop = false;
  161.     }
  162.   }
  163. }
复制代码


mvn package打包后将spark-1-jar-with-dependencies.jar上传到spark集群运行
  1. /root/spark-1.2.0-bin-hadoop2.3/bin/spark-submit --class org.apache.spark.examples.streaming.SparkStreamingFromFlumeToHBaseExample --master local[8]   /root/spark-1-jar-with-dependencies.jar  100
复制代码


Hbase创建相应的表
  1. hbase(main):002:0> create 'test', 'f'
复制代码


启动kafka的server和producer
  1. [root@n1 kafka-0.8.1]# bin/kafka-console-producer.sh --broker-list n1:9092 --topic test
  2. [root@n1 kafka-0.8.1]# bin/kafka-server-start.sh config/server.properties
复制代码



在producer端输入字符后HBase中会插入相应记录如下

20150117004831453.png





代码下载:
链接:http://pan.baidu.com/s/1i3IgElZ 密码:
游客,如果您要查看本帖隐藏内容请回复







已有(274)人评论

跳转到指定楼层
wxhanshan 发表于 2015-3-23 08:44:34
Hbase创建相应的表
回复

使用道具 举报

zhangfuhuo 发表于 2015-3-23 09:04:58
好资料,感谢分享
回复

使用道具 举报

271592448 发表于 2015-3-23 09:11:22
回复

使用道具 举报

lgwhwyhd 发表于 2015-3-23 09:20:01
正需要,谢共享!
回复

使用道具 举报

012huang 发表于 2015-3-23 09:34:17
非常有帮助,感谢分享
回复

使用道具 举报

laneypeng 发表于 2015-3-23 10:06:27
谢谢分享,好资料

回复

使用道具 举报

diandidemeng 发表于 2015-3-23 10:19:54
感谢楼主的辛苦劳做。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条