本帖最后由 feilong 于 2018-6-1 09:15 编辑
问题导读
1.什么是词干?如何进行词形还原?如何用代码实现?
2.如何利用Spark MLib计算TF-IDF?
3.计算TF-IDF有哪些步骤和注意点?
上一篇:Spark 高级分析:第六章第2,3节 解析和准备数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=24565&extra=
词形还原
使用纯文本,下一步就是把它变成一个词语集。这一步需要关心几个点。首先,像这样的普通词汇占据了空间,但最好不要给模型提供有用的信息。过滤掉一个停止词列表既可以节省空间,又可以提高保真度。第二,具有相同含义的词语通常会有略微不同的形式。例如,猴子和猴子不应该被分开。国有化和国有化也没有。将这些不同的屈折形式合并成单一的术语称为词干或引理。词干是指以启发式为基础的技术,用于在单词的结尾截断字符,而lemmatization则是指更有原则的方法。例如,前者可能截断吸引到dr,而后者可能更正确地输出draw。Stanford Core NLP项目提供了一个优秀的lemmatizer,它具有一个Scala可以利用的Java API。下面的代码片段获取了纯文本文档的RDD,并将它和过滤出的stop单词进行了过滤。
[mw_shl_code=scala,true]mport edu.stanford.nlp.pipeline._
import edu.stanford.nlp.ling.CoreAnnotations._
def plainTextToLemmas(text: String, stopWords: Set[String])
: Seq[String] = {
val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")
val pipeline = new StanfordCoreNLP(props)
val doc = new Annotation(text)
pipeline.annotate(doc)
val lemmas = new ArrayBuffer[String]()
val sentences = doc.get(classOf[SentencesAnnotation])
for (sentence <- sentences;
token <- sentence.get(classOf[TokensAnnotation])) {
val lemma = token.get(classOf[LemmaAnnotation])
if (lemma.length > 2 && !stopWords.contains(lemma)
&& isOnlyLetters(lemma)) {
lemmas += lemma.toLowerCase
}
}
lemmas
}
val lemmatized = plainText.map(plainTextToLemmas(_, stopWords))[/mw_shl_code]
计算TF-IDF
在这一点上,lemmatized指的是一系列词语的RDD,每个术语对应于一个文档。下一步是计算每个文档中每个术语的频率,以及整个语料库中的每个词语。下面的代码为每个文档构建了一个词语映射表:
[mw_shl_code=scala,true]import scala.collection.mutable.HashMap
val docTermFreqs = lemmatized.map(terms => {
val termFreqs = terms.foldLeft(new HashMap[String, Int]()) {
(map, term) => {
map += term -> (map.getOrElse(term, 0) + 1)
map
}
}
termFreqs
})[/mw_shl_code]
由此产生的RDD将至少在此之后使用两次:计算逆文档率并计算最终的term文档矩阵。所以在内存中缓存是一个好办法:
[mw_shl_code=scala,true]docTermFreqs.cache()[/mw_shl_code]
对于计算文档频率(对于每个词语来说,它在整个语料库中出现的文档数)的计算方法是值得考虑的。第一个使用聚合操作来在每个分区上构建一个词语的本地图,然后将所有这些映射合并到驱动程序中。聚合接受两个函数:将记录合并到每个分区结果对象的函数,以及将两个结果对象合并在一起的函数。在这种情况下,每个记录都是一个文档中频率的一幅图,结果对象是一组文档中的频率图。当聚集的记录和结果对象具有相同的时间(例如,在一个和)中,reduce是有用的,但是当类型不同时,就像他们在这里做的那样,聚合是一个更强大的替代方法。
[mw_shl_code=scala,true]val zero = new HashMap[String, Int]()
def merge(dfs: HashMap[String, Int], tfs: HashMap[String, Int])
: HashMap[String, Int] = {
tfs.keySet.foreach { term =>
dfs += term -> (dfs.getOrElse(term, 0) + 1)
}
dfs
}
def comb(dfs1: HashMap[String, Int], dfs2: HashMap[String, Int])
: HashMap[String, Int] = {
for ((term, count) <- dfs2) {
dfs1 += term -> (dfs1.getOrElse(term, 0) + count)
}
dfs1
}
docTermFreqs.aggregate(zero)(merge, comb)[/mw_shl_code]
在整个语料库上运行它:
[mw_shl_code=scala,true]java.lang.OutOfMemoryError: Java heap space[/mw_shl_code]
这到底是怎么回事?看来,所有文件的全部项无法都装入内存,而且使驱动程序不堪重负。那有多少项呢?
[mw_shl_code=scala,true]docTermFreqs.flatMap(_.keySet).distinct().count()
...
res0: Long = 9014592[/mw_shl_code]
其中许多词语都是垃圾,或者只出现在语料库中。过滤不那么频繁的词语可以提高性能和消除噪音。一个合理的选择是,除了最常用的N个单词之外,其余的单词都不存在,其中N是成千上万个单词中的一个。以下代码以分布式方式计算文档频率。这类似于用来展示一个简单的MapReduce程序的经典单词count job。一个具有词语的键值对和数字1是在文档中每一个单独出现一个词语的情况下发出的,并且每个词语的数据集都有一个还原键和这些数字。
[mw_shl_code=scala,true]val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).
reduceByKey(_ + _)[/mw_shl_code]
上面的top操作返回给驱动程序最高值的N个记录。自定义排序用于允许它在term-count对上进行操作。
val numTerms = 50000
val ordering = Ordering.by[(String, Int), Int](_._2)
val topDocFreqs = docFreqs.top(numTerms)(ordering)
使用文档频率,可以计算逆文档频率。计算这些在驱动程序上而不是在执行器中每一个词被引用节省了一些冗余浮点数:
[mw_shl_code=scala,true]val idfs = docFreqs.map{
case (term, count) => (term, math.log(numDocs.toDouble / count))
}.toMap[/mw_shl_code]
词频和逆文档频率构成计算TF-IDF向量所需的数字。然而,还有一个最后的问题:数据目前驻留在由字符串构成的映射中,但是将这些数据输入到MLlib中需要将它们转换成由整数键入的矢量。要从前者生成后者,为每个术语分配一个惟一的ID:
[mw_shl_code=scala,true]val termIds = idfs.keys.zipWithIndex.toMap[/mw_shl_code]
因为ID映射是相当大的,我们将在几个不同的地方使用它。
[mw_shl_code=scala,true]val bTermIds = sc.broadcast(termIds).value[/mw_shl_code]
最后,我们通过为每个文档创建一个tf - idf加权向量来将其连接在一起。请注意,我们使用稀疏向量,因为每个文档只包含全部词集的一小部分。MLlib的稀疏向量可以通过给出一个大小和一个索引值对来构造。
[mw_shl_code=scala,true]import org.apache.spark.mllib.linalg.Vectors
val vecs = docTermFreqs.map(termFreqs => {
val docTotalTerms = termFreqs.values().sum
val termScores = termFreqs.filter {
case (term, freq) => bTermIds.containsKey(term)
}.map{
case (term, freq) => (bTermIds(term),
bIdfs(term) * termFreqs(term) / docTotalTerms)
}.toSeq
Vectors.sparse(bTermIds.size, termScores)
})[/mw_shl_code]
|
|