desehawk 发表于 2015-3-22 23:44:26

Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码下载


问题导读

1.Kafka和Spark Streaming Java版本集成并将数据实时写入HBase,pom.xml是如何配置的?
2.HBaseCounterIncrementor.java都实现了什么功能?
3.SparkStreamingFromFlumeToHBaseExample.java实现了哪些功能?



static/image/hrline/4.gif





Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
mvn配置pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>spaek</groupId>
      <artifactId>spark</artifactId>
      <version>1</version>
      <packaging>jar</packaging>

      <properties>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
      <dependencies>

                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-streaming_2.10</artifactId>
                        <version>1.2.0</version>
                        <scope>provided</scope>
                </dependency>
                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-streaming-kafka_2.10</artifactId>
                        <version>1.2.0</version>
                </dependency>
                <dependency>
                        <groupId>org.clojure</groupId>
                        <artifactId>clojure</artifactId>
                        <version>1.6.0</version>
                </dependency>
                <dependency>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                        <version>11.0.2</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.hbase</groupId>
                        <artifactId>hbase-client</artifactId>
                        <version>0.98.4-hadoop2</version>
                </dependency>
                <dependency>
                        <groupId>com.google.protobuf</groupId>
                        <artifactId>protobuf-java</artifactId>
                        <version>2.5.0</version>
                </dependency>
                <dependency>
                        <groupId>io.netty</groupId>
                        <artifactId>netty</artifactId>
                        <version>3.6.6.Final</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.hbase</groupId>
                        <artifactId>hbase-common</artifactId>
                        <version>0.98.4-hadoop2</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.hbase</groupId>
                        <artifactId>hbase-protocol</artifactId>
                        <version>0.98.4-hadoop2</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                        <version>3.4.5</version>
                </dependency>
                <dependency>
                        <groupId>org.cloudera.htrace</groupId>
                        <artifactId>htrace-core</artifactId>
                        <version>2.01</version>
                </dependency>
      </dependencies>

      <build>
                <plugins>
                        <!-- Bind the maven-assembly-plugin to the package phase this will create
                              a jar file without the storm dependencies suitable for deployment to a cluster. -->
                        <plugin>
                              <artifactId>maven-assembly-plugin</artifactId>
                              <configuration>
                                        <descriptorRefs>
                                                <descriptorRef>jar-with-dependencies</descriptorRef>
                                        </descriptorRefs>
                                        <archive>
                                                <manifest>
                                                      <mainClass />
                                                </manifest>
                                        </archive>
                              </configuration>
                              <executions>
                                        <execution>
                                                <id>make-assembly</id>
                                                <phase>package</phase>
                                                <goals>
                                                      <goal>single</goal>
                                                </goals>
                                        </execution>
                              </executions>
                        </plugin>

                        <plugin>
                              <groupId>com.theoryinpractise</groupId>
                              <artifactId>clojure-maven-plugin</artifactId>
                              <extensions>true</extensions>
                              <configuration>
                                        <sourceDirectories>
                                                <sourceDirectory>src/clj</sourceDirectory>
                                        </sourceDirectories>
                              </configuration>
                              <executions>
                                        <execution>
                                                <id>compile</id>
                                                <phase>compile</phase>
                                                <goals>
                                                      <goal>compile</goal>
                                                </goals>
                                        </execution>
                              </executions>
                        </plugin>

                        <plugin>
                              <groupId>org.codehaus.mojo</groupId>
                              <artifactId>exec-maven-plugin</artifactId>
                              <version>1.2.1</version>
                              <executions>
                                        <execution>
                                                <goals>
                                                      <goal>exec</goal>
                                                </goals>
                                        </execution>
                              </executions>
                              <configuration>
                                        <executable>java</executable>
                                        <includeProjectDependencies>true</includeProjectDependencies>
                                        <includePluginDependencies>false</includePluginDependencies>
                                        <classpathScope>compile</classpathScope>
                                        <mainClass>${storm.topology}</mainClass>
                              </configuration>
                        </plugin>
                </plugins>
      </build>

</project>

java代码SparkStreamingFromFlumeToHBaseExample.java

package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;

public class SparkStreamingFromFlumeToHBaseExample {

private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) {
    if (args.length == 0) {
      System.err
          .println("Usage: SparkStreamingFromFlumeToHBaseWindowingExample {master} {host} {port} {table} {columnFamily} {windowInSeconds} {slideInSeconds");
      System.exit(1);
    }

    // String master = args;
    // String host = args;
    // int port = Integer.parseInt(args);
    String tableName = "test";// args;
    String columnFamily = "f";// args;
    // int windowInSeconds = 3;// Integer.parseInt(args);
    // int slideInSeconds = 1;// Integer.parseInt(args);

    String zkQuorum = "localhost";
    String group = "test-consumer-group";
    String topicss = "test";
    String numThread = "2";

    Duration batchInterval = new Duration(5000);
    // Duration windowInterval = new Duration(windowInSeconds * 1000);
    // Duration slideInterval = new Duration(slideInSeconds * 1000);

    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
    JavaStreamingContext jssc =
      new JavaStreamingContext(sparkConf, new Duration(2000));

    final Broadcast<String> broadcastTableName =
      jssc.sparkContext().broadcast(tableName);
    final Broadcast<String> broadcastColumnFamily =
      jssc.sparkContext().broadcast(columnFamily);

    // JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream(host, port);

    int numThreads = Integer.parseInt(numThread);
    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    String[] topics = topicss.split(",");
    for (String topic : topics) {
      topicMap.put(topic, numThreads);
    }

    JavaPairReceiverInputDStream<String, String> messages =
      KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);

    JavaDStream<String> lines =
      messages.map(new Function<Tuple2<String, String>, String>() {
          @Override
          public String call(Tuple2<String, String> tuple2) {
            return tuple2._2();
          }
      });

    JavaDStream<String> words =
      lines.flatMap(new FlatMapFunction<String, String>() {
          @Override
          public Iterable<String> call(String x) {
            return Lists.newArrayList(SPACE.split(x));
          }
      });

    JavaPairDStream<String, Integer> lastCounts =
      messages.map(new Function<Tuple2<String, String>, String>() {
          @Override
          public String call(Tuple2<String, String> tuple2) {
            return tuple2._2();
          }
      }).flatMap(new FlatMapFunction<String, String>() {
          @Override
          public Iterable<String> call(String x) {
            return Lists.newArrayList(SPACE.split(x));
          }
      }).mapToPair(new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
          }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {

          @Override
          public Integer call(Integer x, Integer y) throws Exception {
            // TODO Auto-generated method stub
            return x.intValue() + y.intValue();
          }
      });

    lastCounts
      .foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {

          @Override
          public Void call(JavaPairRDD<String, Integer> values, Time time)
            throws Exception {

            values.foreach(new VoidFunction<Tuple2<String, Integer>>() {

            @Override
            public void call(Tuple2<String, Integer> tuple) throws Exception {
                HBaseCounterIncrementor incrementor =
                  HBaseCounterIncrementor.getInstance(
                        broadcastTableName.value(),
                        broadcastColumnFamily.value());
                incrementor.incerment("Counter", tuple._1(), tuple._2());
                System.out.println("Counter:" + tuple._1() + "," + tuple._2());

            }
            });

            return null;
          }
      });

    jssc.start();

}
}

java代码CounterMap.java

package org.apache.spark.examples.streaming;

import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Set;

public class CounterMap {
HashMap<String, Counter> map = new HashMap<String, Counter>();

public void increment(String key, long increment) {
    Counter count = map.get(key);
    if (count == null) {
      count = new Counter();
      map.put(key, count);
    }
    count.value += increment;
}


public long getValue(String key) {
    Counter count = map.get(key);
    if (count != null) {
      return count.value;
    } else {
      return 0;
    }
}

public Set<Entry<String, Counter>> entrySet() {
    return map.entrySet();
}

public void clear() {
    map.clear();
}

public static class Counter {
    public long value;
}


}

java代码HBaseCounterIncrementor.java
package org.apache.spark.examples.streaming;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.examples.streaming.CounterMap;
import org.apache.spark.examples.streaming.CounterMap.Counter;

public class HBaseCounterIncrementor {

static HBaseCounterIncrementor singleton;
static String tableName;
static String columnFamily;
static HTable hTable;
static long lastUsed;
static long flushInterval;
static CloserThread closerThread;
static FlushThread flushThread;
static HashMap<String, CounterMap> rowKeyCounterMap =
      new HashMap<String, CounterMap>();
static Object locker = new Object();

private HBaseCounterIncrementor(String tableName, String columnFamily) {
    HBaseCounterIncrementor.tableName = tableName;
    HBaseCounterIncrementor.columnFamily = columnFamily;
}

public static HBaseCounterIncrementor getInstance(String tableName,
      String columnFamily) {

    if (singleton == null) {
      synchronized (locker) {
      if (singleton == null) {
          singleton = new HBaseCounterIncrementor(tableName, columnFamily);
          initialize();
      }
      }
    }
    return singleton;
}

private static void initialize() {
    if (hTable == null) {
      synchronized (locker) {
      if (hTable == null) {
          Configuration hConfig = HBaseConfiguration.create();
          try {
            hTable = new HTable(hConfig, tableName);
            updateLastUsed();

          } catch (IOException e) {
            throw new RuntimeException(e);
          }
          flushThread = new FlushThread(flushInterval);
          flushThread.start();
          closerThread = new CloserThread();
          closerThread.start();
      }
      }
    }
}

public void incerment(String rowKey, String key, int increment) {
    incerment(rowKey, key, (long) increment);
}

public void incerment(String rowKey, String key, long increment) {
    CounterMap counterMap = rowKeyCounterMap.get(rowKey);
    if (counterMap == null) {
      counterMap = new CounterMap();
      rowKeyCounterMap.put(rowKey, counterMap);
    }
    counterMap.increment(key, increment);

    initialize();
}

private static void updateLastUsed() {
    lastUsed = System.currentTimeMillis();
}

protected void close() {
    if (hTable != null) {
      synchronized (locker) {
      if (hTable != null) {
          if (hTable != null && System.currentTimeMillis() - lastUsed > 30000) {
            flushThread.stopLoop();
            flushThread = null;
            try {
            hTable.close();
            } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            }

            hTable = null;
          }
      }
      }
    }
}

public static class CloserThread extends Thread {

    boolean continueLoop = true;

    @Override
    public void run() {
      while (continueLoop) {

      if (System.currentTimeMillis() - lastUsed > 30000) {
          singleton.close();
          break;
      }

      try {
          Thread.sleep(60000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      }
    }

    public void stopLoop() {
      continueLoop = false;
    }
}

protected static class FlushThread extends Thread {
    long sleepTime;
    boolean continueLoop = true;

    public FlushThread(long sleepTime) {
      this.sleepTime = sleepTime;
    }

    @Override
    public void run() {
      while (continueLoop) {
      try {
          flushToHBase();
      } catch (IOException e) {
          e.printStackTrace();
          break;
      }

      try {
          Thread.sleep(sleepTime);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      }
    }

    private void flushToHBase() throws IOException {
      synchronized (hTable) {
      if (hTable == null) {
          initialize();
      }
      updateLastUsed();

      for (Entry<String, CounterMap> entry : rowKeyCounterMap.entrySet()) {
          CounterMap pastCounterMap = entry.getValue();
          rowKeyCounterMap.put(entry.getKey(), new CounterMap());

          Increment increment = new Increment(Bytes.toBytes(entry.getKey()));

          boolean hasColumns = false;
          for (Entry<String, Counter> entry2 : pastCounterMap.entrySet()) {
            increment.addColumn(Bytes.toBytes(columnFamily),
                Bytes.toBytes(entry2.getKey()), entry2.getValue().value);
            hasColumns = true;
          }
          if (hasColumns) {
            updateLastUsed();
            hTable.increment(increment);
          }
      }
      updateLastUsed();
      }
    }

    public void stopLoop() {
      continueLoop = false;
    }
}

}

mvn package打包后将spark-1-jar-with-dependencies.jar上传到spark集群运行
/root/spark-1.2.0-bin-hadoop2.3/bin/spark-submit --class org.apache.spark.examples.streaming.SparkStreamingFromFlumeToHBaseExample --master local   /root/spark-1-jar-with-dependencies.jar100

Hbase创建相应的表
hbase(main):002:0> create 'test', 'f'

启动kafka的server和producer
# bin/kafka-console-producer.sh --broker-list n1:9092 --topic test
# bin/kafka-server-start.sh config/server.properties


在producer端输入字符后HBase中会插入相应记录如下







代码下载:
链接:http://pan.baidu.com/s/1i3IgElZ 密码:**** Hidden Message *****






fzleejm 发表于 2015-3-23 01:21:26

xiex......

wxhanshan 发表于 2015-3-23 08:44:34

Hbase创建相应的表

zhangfuhuo 发表于 2015-3-23 09:04:58

好资料,感谢分享

271592448 发表于 2015-3-23 09:11:22

{:soso_e179:}

lgwhwyhd 发表于 2015-3-23 09:20:01

正需要,谢共享!

012huang 发表于 2015-3-23 09:34:17

非常有帮助,感谢分享

admln 发表于 2015-3-23 09:56:36

look look

laneypeng 发表于 2015-3-23 10:06:27

谢谢分享,好资料

diandidemeng 发表于 2015-3-23 10:19:54

感谢楼主的辛苦劳做。
页: [1] 2 3 4 5 6 7 8 9 10
查看完整版本: Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码下载