本帖最后由 ayou 于 2015-5-6 08:37 编辑
输入:文档ID+“\t”+文档内容
ID1 Hello world
ID2 Hello spark
输出:关键词+"\t"+文档ID
Hello ID1 ID2
world ID1
spark ID2
代码如下:
请根据自己机器情况修改路径
- package Spark1
-
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.SparkContext._
- import scala.collection.mutable
-
- /**
- *
- * Created by youxingzhi on 15-5-3.
- */
- object InvertedIndex {
- def main (args: Array[String]) {
- val conf = new SparkConf().setAppName("InvertedIndex").setMaster("spark://192.168.1.170:7077")
- val spark = new SparkContext(conf)
- spark.addJar("/home/youxingzhi/IdeaProjects/WordCount/out/artifacts/Spark1_jar/Spark1.jar")
- //textFile可以通过设置第二个参数来指定slice个数(slice与Hadoop里的split/block概念对应,一个task处理一个slice)。Spark默认将Hadoop上一个block对应为一个slice,但可以调大slice的个数,但不能比block的个数小,这就需要知道HDFS上一个文件的block数目,可以通过50070的dfs的jsp来查看。
- val words = spark.textFile("hdfs://master:8020/InvertedIndex",1).map(file=>file.split("\t")).
- map(item =>{
- (item(0),item(1))
- }).flatMap(file => {
- var map = mutable.Map[String,String]()
- val words = file._2.split(" ").iterator
- val doc = file._1
- while(words.hasNext){
- map+=(words.next() -> doc)
- }
- map
- })
-
- //save to file
- words.reduceByKey(_+" "+_).map(x=>{
- x._1+"\t"+x._2
- }).saveAsTextFile("hdfs://master:8020/test3")
- }
- }
复制代码
|