分享

java版本如何将spark的处理结果存入mysql?

Hentai 发表于 2017-1-19 16:11:01 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 10 22063
求大神指教一下

已有(11)人评论

跳转到指定楼层
easthome001 发表于 2017-1-19 16:46:04
首先结果肯定能得到的,然后在里面调用数据库驱动,使用相关api直接插入即可。
也就是把mysql相关api引入,然后直接调用插入
回复

使用道具 举报

Hentai 发表于 2017-1-19 16:47:13
easthome001 发表于 2017-1-19 16:46
首先结果肯定能得到的,然后在里面调用数据库驱动,使用相关api直接插入即可。
也就是把mysql相关api引入 ...

不是特别懂  你能写个伪代码么
回复

使用道具 举报

easthome001 发表于 2017-1-19 16:47:21
如下面,唯一的不同就是结果是spark的,这里结果是学生信息而已
package impl;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import db.DBConnection;

import entity.Student;

public class StudentImpl {
       
        public String  joinString(String[] slikes){
                String joinslikes="";
                for(String temp:slikes){
                        joinslikes+=temp+"-";
                }
                System.out.println(joinslikes.substring(0, joinslikes.length()-1));
                return joinslikes.substring(0, joinslikes.length()-1);
        }
       
        public void saveStudent(Student student) throws Exception {
                // 连接数据库,完成数据的录入操作
                //System.out.println("---saveStudent---");
                DBConnection db=new DBConnection();
                String sql="insert into student (sname,slikes,saddress) values (?,?,?)";
                try {
                        PreparedStatement preStmt=db.getConn().prepareStatement(sql);
                        preStmt.setString(1, student.getSname());
                        preStmt.setString(2, student.getSlikes());
                        preStmt.setString(3, student.getSaddress());
                       
                        preStmt.executeUpdate();
                       
                } catch (SQLException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
        }
       
}


回复

使用道具 举报

easthome001 发表于 2017-1-19 16:49:57
这是使用Scala的,嵌入了Java的mysql的驱动。Java代码更好修改了。原理都是一样的

[mw_shl_code=scala,true]import java.sql.{PreparedStatement, Connection, DriverManager}  
import java.util.concurrent.atomic.AtomicInteger  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming._  
import org.apache.spark.streaming.StreamingContext._  
//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?  
object SparkStreamingForPartition {  
  def main(args: Array[String]) {  
    val conf = new SparkConf().setAppName("NetCatWordCount")  
    conf.setMaster("local[3]")  
    val ssc = new StreamingContext(conf, Seconds(5))   
    val dstream = ssc.socketTextStream("hadoopMaster", 9999).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)  
    dstream.foreachRDD(rdd => {  
      //embedded function  
      def func(records: Iterator[(String,Int)]) {  
//Connect the mysql
        var conn: Connection = null  
        var stmt: PreparedStatement = null  
        try {  
          val url = "jdbc:mysql://hadoopMaster:3306/streaming";  
          val user = "root";  
          val password = "hadoop"  
          conn = DriverManager.getConnection(url, user, password)  
          records.foreach(word => {  
            val sql = "insert into wordcounts values (?,?)";  
            stmt = conn.prepareStatement(sql);  
            stmt.setString(1, word._1)  
            stmt.setInt(2, word._2)  
            stmt.executeUpdate();  
          })  
        } catch {  
          case e: Exception => e.printStackTrace()  
        } finally {  
          if (stmt != null) {  
            stmt.close()  
          }  
          if (conn != null) {  
            conn.close()  
          }  
        }  
      }   
      val repartitionedRDD = rdd.repartition(3)  
      repartitionedRDD.foreachPartition(func)  
    })  
    ssc.start()  
    ssc.awaitTermination()  
  }  
}  [/mw_shl_code]


回复

使用道具 举报

Hentai 发表于 2017-1-19 16:50:20
本帖最后由 Hentai 于 2017-1-19 16:52 编辑
easthome001 发表于 2017-1-19 16:47
如下面,唯一的不同就是结果是spark的,这里结果是学生信息而已
package impl;

可能你理解错我的意思了,我给你看看我的代码吧
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
                        /**
                         *
                         */
                        private static final long serialVersionUID = -6731516149554006263L;

                        @Override
                        public Tuple2<String, Integer> call(String s) {
                                return new Tuple2<String, Integer>(s, 1);
                        }
                }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
                        
                        private static final long serialVersionUID = 1429329519996995217L;

                        @Override
                        public Integer call(Integer i1, Integer i2) {
                                return i1 + i2;
                        }
                }, new Duration(5000), new Duration(5000)); 我想问的是 如何把这个计算结果通过spark插入数据库

回复

使用道具 举报

easthome001 发表于 2017-1-19 17:03:32
Hentai 发表于 2017-1-19 16:50
可能你理解错我的意思了,我给你看看我的代码吧
JavaPairDStream wordCounts = words.mapToPair(new Pai ...

楼主可能搞错了。spark只有spark相关api,对于mysql他没有这方面的api的。spark是用来处理大数据的。如同hadoop只有hadoop的api,hbase只有hbase的api。hadoop不能插入hbase,只能引入hbase的api才可以。
回复

使用道具 举报

Hentai 发表于 2017-1-19 17:11:16
easthome001 发表于 2017-1-19 17:03
楼主可能搞错了。spark只有spark相关api,对于mysql他没有这方面的api的。spark是用来处理大数据的。如同 ...

嗯  你看看我这段代码是不是应该这样写
wordCounts.foreachRDD(new  VoidFunction<JavaPairRDD<String, Integer>>() {

                       
                        private static final long serialVersionUID = -7774591138569233818L;

                        @Override
                        public void call(JavaPairRDD<String, Integer> arg0) throws Exception {
                               
                                String sql = "insert into table values(?,?)";
                                stmt = conn.prepareStatement(sql);  
                    stmt.setString(1, arg0._1)  ;
                    stmt.setInt(2, arg0._2)  ;
                    stmt.executeUpdate();  
                        }
                });

回复

使用道具 举报

easthome001 发表于 2017-1-19 17:14:14
Hentai 发表于 2017-1-19 17:11
嗯  你看看我这段代码是不是应该这样写
wordCounts.foreachRDD(new  VoidFunction() {

理解的很正确,剩下的楼主可调试下
回复

使用道具 举报

Hentai 发表于 2017-1-19 17:18:20
easthome001 发表于 2017-1-19 17:14
理解的很正确,剩下的楼主可调试下

我刚刚发现一个问题 就是JavaPairRDD<String, Integer> arg0 这个对象里面如何取对应的值,因为这个对象不像tuple2 有_1 _2这两个字段

点评

这是泛型  发表于 2017-1-21 12:07
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条