这里用的是mongo-spark-connector_2.11 的2.0.0版本和spark的spark-core_2.11的2.0.2版本
[mw_shl_code=xml,true] <dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>[/mw_shl_code]
示例代码
[mw_shl_code=scala,true] import com.mongodb.spark._
import org.apache.spark.{SparkConf, SparkContext}
import org.bson._
val conf = new SparkConf()
.setMaster("local")
.setAppName("Mingdao-Score")
//同时还支持mongo驱动的readPreference配置, 可以只从secondary读取数据
.set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName")
.set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName")
val sc = new SparkContext(conf)
// 创建rdd
val originRDD = MongoSpark.load(sc)
// 构造查询
val dateQuery = new BsonDocument()
.append("$gte", new BsonDateTime(start.getTime))
.append("$lt", new BsonDateTime(end.getTime))
val matchQuery = new Document("$match", BsonDocument.parse("{\"type\":\"1\"}"))
// 构造Projection
val projection1 = new BsonDocument("$project", BsonDocument.parse("{\"userid\":\"$userid\",\"message\":\"$message\"}")
val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1))
//比如计算用户的消息字符数
val rdd1 = aggregatedRDD.keyBy(x=>{
Map(
"userid" -> x.get("userid")
)
})
val rdd2 = rdd1.groupByKey.map(t=>{
(t._1, t._2.map(x => {
x.getString("message").length
}).sum)
})
rdd2.collect().foreach(x=>{
println(x)
})
//保持统计结果至MongoDB outputurl 所指定的数据库
MongoSpark.save(rdd2)[/mw_shl_code]