分享

java版本 spark streaming 存mysql的问题

Hentai 发表于 2017-2-6 11:10:08 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 15834
我想把统计结果出入数据库 可是无法存入到mysql里面 已经不诶困扰了很久了 ,求大神讲解一下


wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>,Time>() {

                       
                        private static final long serialVersionUID = 1807802659186487251L;

                        @Override
                        public void call(JavaPairRDD<String, Integer> arg0, Time time) throws Exception {
                                // TODO 自动生成的方法存根
                                arg0.foreach(new VoidFunction<Tuple2<String,Integer>>() {

                                       
                                        private static final long serialVersionUID = -4954196778252003366L;

                                        @Override
                                        public void call(Tuple2<String, Integer> tuple2) throws Exception {
                                                // TODO 自动生成的方法存根
                                                MysqlUtil.insert(tuple2._1, tuple2._2);
                                        }
                                });
                        }
                });

已有(8)人评论

跳转到指定楼层
sstutu 发表于 2017-2-6 13:34:48
插入挺简单的,只要你会通过Java插入mysql。
spark streaming 也是调用jdbc,然后通过调用mysql 的sql语句直接插入即可  
//插入mysql数据库
   end.foreachRDD(wd => wd.foreachPartition(
      data => {
        val conn = ConnectPool.getConn("root", "1714004716", "hh15", "dg")
//        val conn = ConnectPool.getConn("root", "1714004716", "h15", "dg")
        //插入数据
//        conn.prepareStatement("insert into t_word2(word,num) values('tom',23)").executeUpdate()
        try {
          for (row <- data) {
            println("input data is " + row._1 + "  " + row._2)
            val sql = "insert into t_word2(word,num) values(" + "'" + row._1 + "'," + row._2 + ")"
            conn.prepareStatement(sql).executeUpdate()
          }
        }finally {
          conn.close()
        }

回复

使用道具 举报

Hentai 发表于 2017-2-6 14:01:44
sstutu 发表于 2017-2-6 13:34
插入挺简单的,只要你会通过Java插入mysql。
spark streaming 也是调用jdbc,然后通过调用mysql 的sql语句 ...

你这个好像是用scala写的吧?我想看看java是怎么的
回复

使用道具 举报

sstutu 发表于 2017-2-6 14:06:26
Hentai 发表于 2017-2-6 14:01
你这个好像是用scala写的吧?我想看看java是怎么的


原理都是一样的,翻译成Java即可
Java版
[mw_shl_code=java,true]package com.sectong;
import java.io.Serializable;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
public class Main implements Serializable {
/**
*
*/
private static final long serialVersionUID = -8513279306224995844L;
private static final String MYSQL_USERNAME = "demo";
private static final String MYSQL_PWD = "demo";
private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://192.168.1.91:3306/demo";
private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkSaveToDb").setMaster("local"));
private static final SQLContext sqlContext = new SQLContext(sc);
public static void main(String[] args) {
// Sample data-frame loaded from a JSON file
DataFrame usersDf = sqlContext.read().json("users.json");
// Save data-frame to MySQL (or any other JDBC supported databases)
Properties connectionProperties = new Properties();
connectionProperties.put("user", MYSQL_USERNAME);
connectionProperties.put("password", MYSQL_PWD);
// write dataframe to jdbc mysql
usersDf.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "users", connectionProperties);
}
}[/mw_shl_code]
上面只是例子,当然都需要修改
回复

使用道具 举报

sstutu 发表于 2017-2-6 14:08:44
回复

使用道具 举报

Hentai 发表于 2017-2-6 14:24:59
sstutu 发表于 2017-2-6 14:08
相关源码
https://github.com/jiekechoo/spark-jdbc-apps

版主 你可能误会我的意思了,我是想知道spark streaming的java版本如何用foreachRDD把数据插入mysql的,不是想问java如何插入数据库
回复

使用道具 举报

starrycheng 发表于 2017-2-6 14:38:36
Hentai 发表于 2017-2-6 14:24
版主 你可能误会我的意思了,我是想知道spark streaming的java版本如何用foreachRDD把数据插入mysql的, ...

[mw_shl_code=java,true]import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object dataToMySQL {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("use the foreachRDD write data to mysql").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(10))

    val streamData = ssc.socketTextStream("master",9999)
    val wordCount = streamData.map(line =>(line.split(",")(0),1)).reduceByKeyAndWindow(_+_,Seconds(60))
    val hottestWord = wordCount.transform(itemRDD => {
      val top3 = itemRDD.map(pair => (pair._2, pair._1))
        .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
      ssc.sparkContext.makeRDD(top3)
    })


    hottestWord.foreachRDD( rdd =>{
      rdd.foreachPartition(partitionOfRecords =>{
        val connect = scalaConnectPool.getConnection
        connect.setAutoCommit(false)
        val stmt = connect.createStatement()
        partitionOfRecords.foreach(record =>{
          stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')")
        })
        stmt.executeBatch()
        connect.commit()
      }
      )
    }
    )



    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
[/mw_shl_code]
回复

使用道具 举报

starrycheng 发表于 2017-2-6 15:08:20
这个是Java版的
[mw_shl_code=java,true]package com.dt.spark.streaming

import com.dt.spark.common.ConnectPool
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
* 以网站热词排名为例,将处理结果写到MySQL中
* Created by dinglq on 2016/5/3.
*/
object WriteDataToMySQL {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WriteDataToMySQL")
    val ssc = new StreamingContext(conf,Seconds(5))
    // 假设socket输入的数据格式为:searchKeyword,time
    val ItemsStream = ssc.socketTextStream("spark-master",9999)
    // 将输入数据变成(searchKeyword,1)
    var ItemPairs = ItemsStream.map(line =>(line.split(",")(0),1))

     val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,Seconds(60),Seconds(10))
    //ssc.checkpoint("/user/checkpoints/")
    // val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,(v1:Int,v2:Int)=> v1-v2,Seconds(60),Seconds(10))
    /**
     * 接下来需要对热词的频率进行排序,而DStream没有提供sort的方法。那么我们可以实现transform函数,用RDD的sortByKey实现
     */
    val hottestWord = ItemCount.transform(itemRDD => {
      val top3 = itemRDD.map(pair => (pair._2, pair._1))
        .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
      ssc.sparkContext.makeRDD(top3)
    })

    hottestWord.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords =>{
        val conn = ConnectPool.getConnection
        conn.setAutoCommit(false);  //设为手动提交
        val  stmt = conn.createStatement();

        partitionOfRecords.foreach( record => {

          stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')");
        })

        stmt.executeBatch();
        conn.commit();  //提交事务

      })
    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}[/mw_shl_code]

更多可参考

通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中

回复

使用道具 举报

Hentai 发表于 2017-2-6 15:09:18
starrycheng 发表于 2017-2-6 15:08
这个是Java版的
[mw_shl_code=java,true]package com.dt.spark.streaming

不对吧      这是scala版本的
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条