desehawk 发表于 2017-2-10 16:18:36

Spark sql操作postgresql数据库【附源码】



问题导读


1.本文如何实现读取Postgresql某一张表的数据为DataFrame?
2.如何写入Postgresql某张表中?
3.如何提交程序?


static/image/hrline/4.gif

源码git地址
完整项目源码github

spark sql支持多个数据源,比如,sparkSQL可以源自多个数据源:jsonFile、parquetFile、hive等,下面是postgresql 。

概述:Spark postgresql jdbc 数据库连接和写入操作源码解读,详细记录了SparkSQL对数据库的操作,通过Java程序,在本地开发和运行。整体为,Spark建立数据库连接,读取数据,将DataFrame数据写入另一个数据库表中。附带完整项目源码(完整项目源码github)。1.首先在postgreSQL中创建一张测试表,并插入数据。1.1. 在postgreSQL中的postgres用户下,创建 products
CREATE TABLE products (
    product_no integer,
    name text,
    price numeric
);


1.2. 在 products 插入数据


INSERT INTO products (product_no, name, price) VALUES
    (1, 'Cheese', 9.99),
    (2, 'Bread', 1.99),
    (3, 'Milk', 2.99);
查看数据库写入结果。





2.编写SPARK程序。2.1.读取Postgresql某一张表的数据为DataFrame
SparkPostgresqlJdbc.java

Properties connectionProperties = new Properties();


//增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
connectionProperties.put("user","postgres");
connectionProperties.put("password","123456");
connectionProperties.put("driver","org.postgresql.Driver");

//SparkJdbc读取Postgresql的products表内容
Dataset<Row> jdbcDF = spark.read()
      .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");

//显示jdbcDF数据内容
jdbcDF.show();

2.2.写入Postgresql某张表中


//将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
jdbcDF.write().mode("append")
      .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

3.运行程序,并查看结果

3.1.直接在intellij IDEA(社区版)中运行。

a.在运行按钮的“Edit Configeration”中的VM option中添加“-Dspark.master=local”




3.2.在终端(Terminal)中运行。
/opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \
--class "SparkPostgresqlJdbc" \
--master local \
--driver-class-path /home/xiaolei/.m2/repository/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212.jar \
target/SparkPostgresqlJdbc-1.0-SNAPSHOT.jar


其中 --driver-class-path 指定下载的postgresql JDBC数据
库驱动路径,命令执行要在项目的根目录中(/home/xiaolei/Data/GS/Spark/SparkPostgresqlJdbc)。查看Spark写入数据库中的数据

4.以下为项目中主要源码

4.1.项目配置源码pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wangxiaolei</groupId>
    <artifactId>SparkPostgresqlJdbc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
      <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
      </dependency>
      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
      </dependency>
      <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.4.1212</version>
      </dependency>
    </dependencies>
</project>


4.2.java源码

SparkPostgresqlJdbc.java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

/**
* MIT.
* Author: wangxiaolei(王小雷).
* Date:17-2-9.
* Project:SparkPostgresqlJdbc.
*/
public class SparkPostgresqlJdbc {
    public static void main (String[] args) {

      SparkSession spark = SparkSession
                .builder()
                .appName("SparkPostgresqlJdbc")
                .config("spark.some.config.option","some-value")
                .getOrCreate();
    //启动runSparkPostgresqlJdbc程序
      runSparkPostgresqlJdbc(spark);

      spark.stop();

    }

    private static void runSparkPostgresqlJdbc(SparkSession spark){
      //new一个属性
      System.out.println("确保数据库已经开启,并创建了products表和插入了数据");
      Properties connectionProperties = new Properties();


      //增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
      System.out.println("增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)");
      connectionProperties.put("user","postgres");
      connectionProperties.put("password","123456");
      connectionProperties.put("driver","org.postgresql.Driver");



      //SparkJdbc读取Postgresql的products表内容
      System.out.println("SparkJdbc读取Postgresql的products表内容");
      Dataset<Row> jdbcDF = spark.read()
                .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");
      //显示jdbcDF数据内容
      jdbcDF.show();



      //将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是"error"模式。
      jdbcDF.write().mode("append")
                .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

    }
}


来自:csdn
作者:王小雷-多面手


页: [1]
查看完整版本: Spark sql操作postgresql数据库【附源码】