分享

sparkStreaming读取sqlserver,然后把读出来的数据插入到hbase

remarkzhao 发表于 2017-10-31 16:45:25 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 16116
弱弱请教以下各位大神:

      我现在有个场景,sqlserver里的数据每时每刻都在增加,现在想用sparkstreaming把增量的数据读取过来,插入到hbase

      比如现在有个医院里的检查结果表,有病人ID字段,检查报告字段,我想用sparkstreaming 每过15秒去拉去一次增量数据,然后用病人ID为rowkey,
检查报告为某列的列值插入到hbase里。然后这几天研究了以下sparkstreaming 下面是我写的代码,但是感觉真的写不下去了,不知道该咋写了。。



object SparkStreamingDemo {

  def getData(): ResultSet = {

    val conn_str = "jdbc:sqlserver://192.168.1.21;username=sa;password=yishidb;database=CDRDB16;useUnicode=true&characterEncoding=UTF-8"

    val conn = DriverManager.getConnection(conn_str)

    try {
      // Configure to be Read Only
      val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

      // Execute Query,查询检查表 ,有patientId,describe属性。
      val rs = statement.executeQuery("select PATIENT_ID,EXAM_FINDING_DESC from DC_EXAM_RESULT")
      // Iterate Over ResultSet

      return rs;
    }
    finally {
      conn.close
    }
  }
  def main(args: Array[String]) {

    //创建spark实例
    val sparkConf = new SparkConf().setAppName("QueueStream")
    sparkConf.setMaster("local")
    // 创建sparkStreamingContext ,
    val ssc = new StreamingContext(sparkConf, Seconds(15))
    val rs = getData()

   val map = new util.HashMap[String,String]()
    while(rs.next()){
      val patientid = rs.getString("PATIENT_ID")
      val describe = rs.getString("EXAM_FINDING_DESC")
      map.put("patientid",patientid)
      map.put("describe" ,describe)
    }

    val iHBaseutils =  new IHBaseUtils();
    val hbaseRDD =  ssc.sparkContext.makeRDD(map,10)






    ssc.start()
    ssc.awaitTermination()

  }



}







我的思路是把读出来的数据放到队列流里,然后再剖开,用RDD来计算。。我现在有两个疑问:

如何让我的数据每15秒去sqlserver里拉一次数据

队列流该如何处理数据  

已有(7)人评论

跳转到指定楼层
remarkzhao 发表于 2017-10-31 16:57:35
回复

使用道具 举报

remarkzhao 发表于 2017-10-31 17:18:36
刚还看到一种方案:

MySQL不支持选择某段时间内update的数据吧。不过,你可以考虑在你的表中增加一个时间字段,然后自己实现一个InputDStream,根据batch time来把某个时间范围的数据拿出来。Spark SQL提供了JDBC的接口,可以利用Spark SQL来生成InputDStream.compute所需要的RDD。

作者:朱诗雄
链接:https://www.zhihu.com/question/34648671/answer/59612136
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
回复

使用道具 举报

desehawk 发表于 2017-10-31 20:16:58
下面仅供参考,有问题可以讨论:
首先楼主应该会读取sqlserver形成Dstreaming.然后将Dstreaming通过spark streaming提供的窗口函数将其窗口化,这时候程序会自动按照窗口设置的时间去读取。读取完毕之后,通过Dstreaming.foreachrdd去保存到hbase中。
对于窗口函数资料
Spark Streaming的窗口操作
http://www.aboutyun.com/forum.php?mod=viewthread&tid=16504


写到数据库中的资料
【求助】Spark如果在写入多个HBase表中
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12121

Spark中文手册4:Spark之基本概念(2)

http://www.aboutyun.com/forum.php?mod=viewthread&tid=11516



回复

使用道具 举报

remarkzhao 发表于 2017-11-1 14:41:55
desehawk 发表于 2017-10-31 20:16
下面仅供参考,有问题可以讨论:
首先楼主应该会读取sqlserver形成Dstreaming.然后将Dstreaming通过spark  ...

hi,这是我昨晚尝试着去改的。。

object SparkStreamingDemo {

  def main(args: Array[String]) {

    //创建spark实例
    val sparkConf = new SparkConf().setAppName("QueueStream")
    sparkConf.setMaster("local")
    // 创建sparkStreamingContext ,
    val ssc = new StreamingContext(sparkConf, Seconds(15))
    var rddQueue = new Queue[RDD[String]]()
    val inputStream = ssc.queueStream(rddQueue)
    val iHBaseUtils = new IHBaseUtils()


   inputStream.foreachRDD(rdd =>{
      def func(records: Seq[String]): Unit ={
           while(!records.equals(null)){

                 records.flatMap(_.split(":")).(x_.1)
             val describe = records.splitAt(1)


             iHBaseUtils.put("ExamResult",patientId,"JieGuo","describe",describe)


           }
      }
     val repartitionedRDD = rdd.repartition(3)
     repartitionedRDD.foreachPartition(func)
   })







    ssc.start()
    val seq = conn()

    for (i <- 1 to 3) {

      rddQueue.synchronized {
        rddQueue += ssc.sparkContext.makeRDD(seq,10)
      }
      Thread.sleep(3000)
    }



    ssc.awaitTermination()

  }

  def conn(): Seq[String] = {

    val conn_str = "jdbc:sqlserver://192.168.1.21;username=sa;password=yishidb;database=CDRDB16;useUnicode=true&characterEncoding=UTF-8"
    //classOf[com.mysql.jdbc.Driver]
    //  Class.forName("com.mysql.jdbc.Driver").newInstance();
    val conn = DriverManager.getConnection(conn_str)

    try {
      // Configure to be Read Only
      val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

      // Execute Query,查询检查表 ,有patientId,describe属性。
      val rs = statement.executeQuery("select PATIENT_ID,EXAM_FINDING_DESC from DC_EXAM_RESULT")
      // Iterate Over ResultSet
      var seq = Seq("")

      while(rs.next()){
        val patientId = rs.getString("PATIENT_ID")
        val describe = rs.getString("EXAM_FINDING_DESC")
        seq = seq :+ patientId + ":"+ describe

      }

      return seq;
    }
    finally {
      conn.close
    }
  }


我现在有3个疑问:
1. def conn中我把resultset里的两个字段用冒号拼在了一起放在一个SEQ里进行返回,那么在sparkstreaming里我该如何进行分割  取出,这点我卡了很久没写出来。。。
2.     for (i <- 1 to 3) {
      rddQueue.synchronized {
        rddQueue += ssc.sparkContext.makeRDD(seq,10)
      }
      Thread.sleep(3000)
    }
这段代码我到现在都没看到到底是什么意思?是同步?还是线程安全,每句我都有疑问。
3.Seq的数据在inputstream割出来之后插入hbase方法具体改如何写,现在一片茫然啊。。下面附上我的IHBaseUtils类

public class IHBaseUtils implements Serializable,HBaseUtils {

    public static Admin admin = null;
    public static Connection conn = null;
    public static String zkNodes = "master,slave01,slave02";
    public IHBaseUtils() {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", zkNodes);
        try {
            conn = ConnectionFactory.createConnection(conf);
            admin = conn.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void createHTable(String tableName, String col, Integer versions) throws Exception {

        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
            System.out.println(tableName + "表已存在,先删除……");
            System.out.println("开始建立新表……");
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col.getBytes());
            hColumnDescriptor.setMaxVersions(versions);
            tableDesc.addFamily(hColumnDescriptor);
            admin.createTable(tableDesc);
            System.out.println(tableName + "新表已建立");
        } else {
            System.out.println(tableName + "表不存在,开始建表…………");
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col.getBytes());
            hColumnDescriptor.setMaxVersions(versions);
            tableDesc.addFamily(hColumnDescriptor);
            admin.createTable(tableDesc);
            System.out.println(tableName + "表已建立");
        }

    }


    public void put(String tableName, String row, String columnFamily,
                    String column, String data) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put p1 = new Put(Bytes.toBytes(row));
        p1.addColumn(columnFamily.getBytes(), column.getBytes(), data.getBytes());
        table.put(p1);
        System.out.println("put'" + row + "'," + columnFamily + ":" + column
                + "','" + data + "'");
    }


    public List get(String tableName, String row) throws IOException {

        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(row.getBytes());
        get.setMaxVersions();
        Result result = table.get(get);
        List<KeyValue> list = result.list();

           List<ExamResult> examResults = new ArrayList<>();
           for(final KeyValue kv: list) {
               ExamResult examResult = new ExamResult();
               examResult.setDescribe(Bytes.toString(kv.getValue()));
               examResult.setPatientId(Bytes.toString(kv.getRow()));
               examResults.add(examResult);
           }

               return examResults;
               // if (raw.length == 1) {
               // for(final KeyValue kv: list){
               // examResult = new ExamResult();
               //    examResult.setPatientId(kv.getRow().toString().getBytes());
               // examResult.setDescribe(Bytes.toString(kv.getValue()));
               //     doc.setTitle(new String(doc.getTitle().getBytes("gbk"),"UTF-8"));
               //  examResult.setPatientId(Bytes.toString(kv.getRow()));
               //     System.out.println("PatienId is " + examResult.getPatientId() + "Decribe is " + examResult.getDescribe());
           }


回复

使用道具 举报

zstu 发表于 2017-11-13 14:09:17
你这个不是增量的读取数据库的数据吧,每次都是全量的读取然后插入到hbase中。
回复

使用道具 举报

remarkzhao 发表于 2017-12-4 16:39:31
zstu 发表于 2017-11-13 14:09
你这个不是增量的读取数据库的数据吧,每次都是全量的读取然后插入到hbase中。

好像是。我迷茫了。
回复

使用道具 举报

zstu 发表于 2017-12-8 10:23:35
这个增量读取sqlserver好像不行,把数据发送到kafka中消费,然后存hbase
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条