feilong 发表于 2017-12-8 10:28:54

Spark 高级分析:第三章第4,5节

本帖最后由 feilong 于 2017-12-8 10:31 编辑

问题导读

1.如何构建一个模型?
2.怎样进行抽样检测?
3.广播变量是什么?如何使用?


http://www.aboutyun.com/static/image/hrline/4.gif


上一篇:Spark 高级分析:第三章第3节
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23439&extra=


4.构建一个模型对Spark MLlib的ALS的使用来说虽然数据集的格式几乎是正确的,但它需要两个额外的转换。首先,应该应用别名数据集将所有艺术家ID转换为规范ID,如果存在的话。其次,数据应该被转换为评级对象,这是用户-产品-价值数据的抽象。除了名称,Rating适用于隐式数据。还要注意的是,MLlib所指的“产品”会贯穿其API,这个例子也是如此,但在这里“产品”是指艺术家。底层模型并不完全是推荐产品,也不是为了向人们推荐产品。import org.apache.spark.mllib.recommendation._
val bArtistAlias = sc.broadcast(artistAlias)
val trainData = rawUserArtistData.map { line =>
val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
val finalArtistID =
bArtistAlias.value.getOrElse(artistID, artistID)//如果有艺名则取艺名,否则取原名
Rating(userID, finalArtistID, count)
}.cache()
artistalias先前创建的映射可以直接引用在RDD的map()函数,即使它是在driver上的本地Map。这会生效,因为它将自动复制每个任务。然而,它并不小,在内存中消耗大约15兆字节,并且以序列化的形式消耗至少数兆字节。由于许多任务在一个JVM中执行,所以发送和存储如此多的数据副本是浪费的。
作为替代,为artistAlias创建一个广播的变量bartistalias。这使得Spark在每个集群中的每个executor中只发送一个副本。当有数千个任务,并且在每个executor上并行执行多个任务时,这可以节省重要的网络流量和内存。
Broadcast Variables 广播变量当Spark运行一个stage时,它会创建一个代表将要在那个stage运行任务所需信息的二进制对象,称之为闭包,它会被执行。闭包包含了所以的在driver引用的数据结构。Spark将其发送给集群每个executor。
广播变量适用于当许多任务访问同一个不变的数据结构。它们扩展了对任务闭包的正常处理:
[*]在每个executor缓存数据为原生java对象,所以他们不需要为每个任务进行反序列化
[*]跨多个作业和阶段缓存数据

例如,假设一个依赖于一个大字典的自然语言处理应用程序。广播字典只允许将它传输给每个executor一次:val dict = ...
val bDict = sc.broadcast(dict)
...
def query(path: String) = {
sc.textFile(path).map(l => score(l, bDict.value))
...
}
cache()函数将Spark的计算结果RDD临时存储在集群内存,这有助于ALS的迭代,ALS会访问10次以上数据。如果没有缓存,则每次访问数据时都要从原始数据重复计算生成RDD。Spark UI存储页面会显示有多少RDD被缓存了,使用了多少内存,如图3-2所示。这个示例消耗了大概900M集群内存。

图3-2 Spark UI 存储页,显示缓存的RDD内存使用
最后,便可以创建这样一个模型:val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
这是一个MatrixFactorizationModel模型。取决于集群,操作可能需要几分钟或更多时间。与一些机器学习模型相比,它的最终形式可能只包含几个参数或系数,这种模型是巨大的。它包含模型中每个用户和产品的10个值的特征向量,这种情况下有超过170万个。该模型包含了这些大的用户特征和产品特征矩阵作为它们自己的RDD。
若要得到更多特征向量,可以试试以下代码。注意特征向量是一个10个数组成的数组,数组不会自动以可读形式打印。要想该向量可读可以使用msString()函数,一种Scala中常用的方法,将集合元素连接到分隔字符串中。model.userFeatures.mapValues(_.mkString(", ")).first
...
(4293,-0.3233030601963864, 0.31964527593541325,
0.49025505511361034, 0.09000932568001832, 0.4429537767744912,
0.4186675713407441, 0.8026858843673894, -0.4841300444834003,
-0.12485901532338621, 0.19795451025931002)
trainimplicit()的其他参数是超参数,其值可以影响模型推荐的质量。这些将在后面解释。更重要的第一个问题是,模型有什么好处吗?它能产生好的推荐吗?
5.抽样检查推荐
我们应该首先看看艺术家推荐是否有任何直观的意义,通过检查一个用户,他或她的播放记录,以及对该用户的推荐。以用户2093760为例。提取该用户已听过的艺术家的id并打印其名字。这意味着为该用户搜索艺术家id的输入,然后通过这些id筛选艺术家的集合,以便顺序收集和打印这些名字:val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).
filter { case Array(user,_,_) => user.toInt == 2093760 }
val existingProducts =
rawArtistsForUser.map { case Array(_,artist,_) => artist.toInt }.
collect().toSet
artistByID.filter { case (id, name) =>
existingProducts.contains(id)
}.values.collect().foreach(println)
...
David Gray
Blackalicious
Jurassic 5
The Saw Doctors
Xzibit这些艺术家看起来像是主流流行音乐和嘻哈音乐的混合体。侏罗纪5的粉丝吗?记住,这是2005。如果你想知道,Saw Doctors是爱尔兰流行的爱尔兰摇滚乐队。
可以给该用户推荐5个类似的艺术家:val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)
...
Rating(2093760,1300642,0.02833118412903932)
Rating(2093760,2814,0.027832682960168387)
Rating(2093760,1037970,0.02726611004625264)
Rating(2093760,1001819,0.02716011293509426)
Rating(2093760,4605,0.027118271894797333)包含Rating对象的的结果由用户id,艺术家id,和一个数值组成。尽管是一个评级的字段,它并不是一个估计的评级。对于这种类型的ALS算法,它是一个不透明的值,通常在0到1之间,更高的值意味着更好的建议。它不是一个概率,但可以看作是一个0 / 1值的估计值,该值指示用户是否将分别与艺术家交互。
在提取了推荐的艺术家id之后,可以查询艺术家的名字val recommendedProductIDs = recommendations.map(_.product).toSet
artistByID.filter { case (id, name) =>
recommendedProductIDs.contains(id)
}.values.collect().foreach(println)
...Green DayLinkin ParkMetallicaMy Chemical RomanceSystem of a Down
其结果是混合了流行朋克和金属。乍一看,这并不是一套很好的推荐。虽然这些都是很受欢迎的艺术家,但他们并没有表现出个性化的倾听习惯。

页: [1]
查看完整版本: Spark 高级分析:第三章第4,5节