Spark for python developers ---Spark与数据的机器学习
本帖最后由 Oner 于 2016-6-3 08:37 编辑问题导读:
1. Spark MLlib 在应用架构中的位置是怎样的?
2. Spark MLlib 算法可分为哪几类?
3.如何进行数据预处理?
4. 如何运行聚类算法?
5. 如何评估模型和结果?
6.如何设置作流程?
static/image/hrline/4.gif
机器学习可以从数据中得到有用的见解. 目标是纵观SparkMLlib,采用合适的算法从数据集中生成见解。对于 Twitter的数据集,采用非监督集群算法来区分与Apache Spark相关的tweets .初始输入是混合在一起的tweets。 首先提取相关特性, 然后在数据集中使用机器学习算法 ,最后评估结果和性能. 本章重点如下:
•了解 SparkMLlib 模块及其算法,还有典型的机器学习流程 .
• 预处理 所采集的Twitter 数据集提取相关特性, 应用非监督集群算法识别ApacheSpark- 相关的tweets.然后, 评估得到的模型和结果.
• 描述Spark 机器学习的流水线.
Spark MLlib 在应用架构中的位置
先看一下数据学习在数据密集型应用架构中的位置,集中关注分析层,准确一点说是机器学习。这是批处理和流处理数据学习的基础,它们只是推测的规则不同。
下图指出了重点, 分析层处理的探索式数据分析工具 SparkSQL和Pandas外 还有机器学习模块.
Spark MLlib 算法分类
SparkMLlib 是一个更新很快的模块,新的算法不断地引入到 Spark中.
下图提供了 SparkMLlib 算法的高层通览,并根据传统机器学习技术的体系或数据的连续性进行了分组:
http://img.blog.csdn.net/20160511170736431
根据数据的类型,将 SparkMLlib 算法分成两栏,性质无序或者数量连续的 . 我们将数据区分为无序的性质数据和数量连续的数据。一个性质数据的例子:在给定大气压,温度,云的类型和呈现,天气是晴朗,干燥,多雨,或者阴天时 时,预测她是否穿着男性上装,这是离散的值。 另一方面,给定了位置,住房面积,房间数,想预测房屋的价格 ,房地产可以通过线性回归预测。 本例中,讨论数量连续的数据。
水平分组反映了所使用的机器学习算法的类型。监督和非监督机器学习取决于训练的数据是否标注。非监督学习的挑战是对非标注数据使用学习算法。目标是发现输入中隐含的结构。 而监督式学习的数据是标注过的。 重点是对连续数据的回归预测以及离散数据的分类.
机器学习的一个重要门类是推荐系统,主要是利用了协同过滤技术。 Amazon和 Netflix有自己非常强大的 推荐系统. 随机梯度下降是适合于Spark分布计算的机器学习优化技术之一。对于处理大量的文本,Spark提供了特性提取和转换的重要的库例如:TF-IDF ,Word2Vec,standardscaler,和normalizer.
监督和非监督式学习
深入研究一下SparkMLlib 提供的传统的机器学习算法 .监督和非监督机器学习取决于训练的数据是否标注.区别无序和连续取决于数据的离散或连续.
下图解释了 SparkMLlib 监督和非监督时算法及预处理技术 :
http://img.blog.csdn.net/20160511171213088
下面是Spakr MLlib中的监督和非监督算法以及与预处理技术:
• Clustering 聚类:一个非监督式机器学习技术,数据是没有标注的,目的是数据中提取结构:
∞ K-Means:从 K个不同聚类中进行数据分片
∞ GaussianMixture: 基于组件的最大化后验概率聚类
∞ Power Iteration Clustering(PIC): 根据图顶点的两两边的相似程度聚类
∞ LatentDirichletAllocation(LDA):用于将文本文档聚类成话题
∞ StreamingK-Means:使用窗口函数将进入的数据流动态的进行K-Means聚类
• DimensionalityReduction:目的在减少特性的数量.基本上,用于削减数据噪音并关注关键特性:
∞ SingularValueDecomposition(SVD):将矩阵中的数据分割成简单的有意义的小片,把初始矩阵生成3个矩阵.
∞ PrincipalComponentAnalysis(PCA):以子空间的低维数据逼近高维数据集.
• RegressionandClassification:当分类器将结果分成类时,回归使用标注过的训练数据来预测输出结果。分类的变量是离散无序的,回归的变量是连续而有序的 :
∞ LinearRegressionModels(linearregression,logisticregression, andsupportvectormachines): 线性回归算法可以表达为凸优化问题,目的是基于有权重变量的向量使目标函数最小化.目标函数通过函数的正则部分控制函数的复杂性,通过函数的损失部分控制函数的误差.
∞ NaiveBayes: 基于给定标签的条件概率来预测,基本假设是变量的相互独立性。
∞ DecisionTrees:它执行了特性空间的递归二元划分,为了最好的划分分割需要将树状节点的信息增益最大化。
∞ Ensemblesoftrees(RandomForestsandGradient-BoostedTrees): 树集成算法结合了多种决策树模型来构建一个高性能的模型,它们对分类和回归是非常直观和成功的。
• IsotonicRegression保序回归:最小化所给数据和可观测响应的均方根误差.
其他学习算法
SparkMLlib还提供了很多其它的算法,广泛地说有三种其它的机器学习算法:
推荐系统,优化算法,和特征提取.
下面当前是MLlib 中的其它算法:
• Collaborativefiltering:是推荐系统的基础,创建用户-物品关联矩阵,目标是填充差异。基于其它用户与物品的关联评分,为没有评分的目标用户推荐物品。在分布式计算中, ALS(AlternatingLeastSquare)是最成功的算法之一:
∞ AlternatingLeastSquares交替最小二乘法:矩阵分解技术采用了隐性反馈,时间效应和置信水平,把大规模的用户物品矩阵分解成低维的用户物品因子,通过替换因子最小化二次损失函数。
• Featureextractionandtransformation:这些是大规模文档处理的基础,包括如下技术:
∞ TermFrequency:搜素引擎使用SearchenginesuseTF-IDF 对语料库中的文档进行等级评分。机器学习中用它来判断一个词在文档或语料库中的重要性.词频统计所确定的权重取决于它在语料库中出现的频率。 词频本身可能存在错误导向比如过分强调了类似 the,of,orand等这样无意义的信息词汇.逆文档频率提供了特异性或者信息量的度量,词汇在语料库中所有文档是否常见。
∞ Word2Vec:包括了两个模型 Skip-Gram 和Continuous BagofWord.Skip-Gram预测了一个给定词汇的邻近词,基于词汇的滑动窗口; ContinuousBagof Words 预测了所给定邻近词汇的当前词是什么.
∞ StandardScaler:作为预处理的一部分,数据集经常通过平均去除和方差缩放来标准化.我们在训练数据上计算平均和标准偏差,并应用同样的变形来测试数据.
∞ Normalizer:在缩放样本时需要规范化. 对于点积或核心方法这样的二次型非常有用。
∞ Featureselection: 通过选择模型中最相关的特征来降低向量空间的维度。
∞ Chi-SquareSelector:一个统计方法来测量两个事件的独立性。
• Optimization:这些 SparkMLlib 优化算法聚焦在各种梯度下降的技术。Spark 在分布式集群上提供了非常有效的梯度下降的实现,通过本地极小的迭代完成梯度的快速下降。迭代所有的数据变量是计算密集型的:
∞ StochasticGradientDescent:最小化目标函数即 微分函数的和.随机梯度下降仅使用了一个训练数据的抽样来更新一个特殊迭代的参数,用来解决大规模稀疏机器学习问题例如文本分类.
• Limited-memoryBFGS(L-BFGS):文如其名,L-BFGS使用了有限内存,适合于SparkMLlib 实现的分布式优化算法。
Spark MLlib data types
MLlib 支持4种数据类型:localvector,labeledpoint,localmatrix, anddistributedmatrix. SparkMLlib算法广泛地使用了这些数据类型:
• Localvector:位于单机,可以是紧密或稀疏的:
∞ Densevector 是传统的 doubles 数组.例如.
∞ Sparsevector使用整数和double值.稀疏向量应该是 (4, ,),表明了向量的维数.
这是 PySpark中一个使用本地向量的例子:
importnumpyasnp
importscipy.sparseassps
frompyspark.mllib.linalgimportVectors
#NumPyarrayfordensevector.
dvect1=np.array()
#Pythonlistfordensevector.
dvect2=
#SparseVectorcreation
svect1=Vectors.sparse(4,,)
#Sparsevectorusingasingle-columnSciPycsc_matrix
svect2=sps.csc_matrix((np.array(),np.array()),shape=(4,1))
• Labeledpoint.一个标注点是监督式学习中的一个有标签的紧密或稀疏向量.在二元标签中,0.0 代表负值,1.0代表正值.
这有一个PySpark中标注点的例子:
frompyspark.mllib.linalgimportSparseVector
frompyspark.mllib.regressionimportLabeledPoint
#Labeled point with a positive label and a dense featurevector.
lp_pos=LabeledPoint(1.0,)
#Labeledpointwithanegativelabelandasparsefeaturevector.
lp_neg=LabeledPoint(0.0,SparseVector(4,,))
• LocalMatrix:本地矩阵位于单机上,拥有整型索引和double的值.
这是一个 PySpark中本地矩阵的例子:
frompyspark.mllib.linalgimportMatrix,Matrices
#Densematrix((1.0,2.0,3.0),(4.0,5.0,6.0))
dMatrix=Matrices.dense(2,3,)
#Sparsematrix((9.0,0.0),(0.0,8.0),(0.0,6.0))
sMatrix=Matrices.sparse(3,2,,,)
• DistributedMatrix:充分利用 RDD的成熟性,分布式矩阵可以在集群中共享 .有4种分布式矩阵类型:RowMatrix,IndexedRowMatrix, CoordinateMatrix,和BlockMatrix:
∞ RowMatrix:使用多个向量的一个RDD,创建无意义索引的行分布式矩阵叫做RowMatrix.
∞ IndexedRowMatrix:行索引是有意义的. 首先使用IndexedRow 类创建一个 带索引行的RDDFirst, 再创建一个 IndexedRowMatrix.
∞ CoordinateMatrix:对于表达巨大而稀疏的矩阵非常有用。CoordinateMatrix 从MatrixEntry的RDD创建,用类型元组 (long,long,or float)来表示。
∞ BlockMatrix:从 子矩阵块的RDDs创建, 子矩阵块形如 ((blockRowIndex,blockColIndex), sub-matrix).
机器学习的工作流和数据流
除了算法,机器学习还需要处理过程,我们将讨论监督和非监督学习的典型流程和数据流.
监督式学习工作流程
在监督式学习中,输入的训练数据集是标注过的.一个重要的话数据实践是分割输入来训练和测试, 以及严重相应的模式.完成监督式学习有6个步骤:
• Collectthedata:这个步骤依赖于前面的章节,保证数据正确的容量和颗粒度,使机器学习算法能够提供可靠的答案.
• Preprocessthedata:通过抽样检查数据质量,添补遗漏的数据值,缩放和规范化数据。同时,定义特征提取处理。典型地,在大文本数据集中,分词,移除停词,词干提取 和TF-IDF. 在监督式学习中,分别将数据放入训练和测试集。我门也实现了抽样的各种策略, 为交叉检验分割数据集。
• Readythedata:准备格式化的数据和算法所需的数据类型。在SparkMLlib中,包括 local vector,dense或 sparsevectors,labeledpoints,localmatrix,distributed matrixwithrowmatrix,indexedrowmatrix,coordinatematrix,和 blockmatrix.
• Model:针对问题使用算法以及获得最适合算法的评估结果,可能有多种算法适合同一问题; 它们的性能存储在评估步骤中以便选择性能最好的一个。我门可以实现一个综合方案或者模型组合来得到最好的结果。
• Optimize:为了一些算法的参数优化,需要运行网格搜索。这些9参数取决于训练,测试和产品调优的阶段。
• Evaluate:最终给模型打分,并综合准确率,性能,可靠性,伸缩性 选择最好的一个模型。 用性能最好的模型来测试数据来探明模型预测的准确性。一旦对调优模型满意,就可以到生产环境处理真正的数据了.
监督式机器学习的工作流程和数据流如下图所示:
非监督式学习工作流程
与监督式学习相对,非监督式学习的初始数据使没有标注的,这是真实生活的情形。通过聚类或降维算法提取数据中的结构, 在非监督式学习中,不用分割数据到训练和测试中,因为数据没有标注,我们不能做任何预测。训练数据的6个步骤与监督式学习中的那些步骤相似。一旦模型训练过了,将评估结果和调优模型,然后发布到生产环境。
非监督式学习是监督式学习的初始步骤。 也即是说,数据降维先于进入学习阶段。
非监督式机器学习的工作流程和数据流表达如下:
Twitter 数据集聚类
感受一下从Twitter提取到的数据,理解数据结构,然后运行 K-Means 聚类算法 .使用非监督式的处理和数据流程,步骤如下:
1. 组合所有的 tweet文件成一个 dataframe.
2. 解析 tweets, 移除停词,提取表情符号,提取URL,并最终规范化词 (如,转化为小写,移除标点符号和数字).
3.特征提取包括以下步骤:
∞ Tokenization:将tweet的文本解析成单个的单词或tokens
∞ TF-IDF:应用 TF-IDF算法从文本分词中创建特征向量
∞ HashTF-IDF:应用哈希函数的TF-IDF
4.运行 K-Means聚类算法.
5.评估 K-Means聚类的结果:
∞ 界定 tweet 的成员关系 和聚类结果
∞ 通过多维缩放和PCA算法执行降维分析到两维
∞ 绘制聚类
6.流水线:
∞ 调优相关聚类K值数目
∞ 测量模型成本
∞ 选择优化的模型
在 Twitter数据集上应用Scikit-Learn
Python有自己的Scikit-Learn机器学习库,是最可靠直观和健壮的工具之一 。使用Pandas和Scikit-Learn运行预处理和非监督式学习。 在用SparkMLlib 完成聚类之前,使用Scikit-Learn来探索数据的抽样是非常有益的。这里混合了 7,540tweets, 它包含了与ApacheSpark,Python相关的tweets, 即将到来的总统选举: 希拉里克林顿 和 唐纳德 ,一些时尚相关的tweets ,LadyGaga和JustinBieber的音乐.在Twitter 数据集上使用 Scikit-Learn并运行K-Means 聚类算法。
先将样本数据加载到 一个 Pandasdataframe:
importpandasaspd
csv_in='C:\\Users\\Amit\\Documents\\IPythonNotebooks\\AN00_Data\\ unq_tweetstxt.csv'
twts_df01=pd.read_csv(csv_in,sep=';',encoding='utf-8')
In:
csv(csv_in,sep=';',encoding='utf-8')
In:
twts_df01.count()
Out:
Unnamed:0 7540
id7540
created_at7540
user_id 7540
user_name 7538
tweet_text7540
dtype:int64
#
#Introspectingthetweetstext
#
In:
twtstxt_ls01
Out:
['RT@deroach_Ismoke:IamNOTvotingfor#hilaryclintonhttp://t.co/jaZZpcHkkJ',
'RT@AnimalRightsJen:#HilaryClintonWhatdoBernieSandersand DonaldTrumpHaveinCommon?:Hehassofarbeenth...http://t.co/t2YRcGCh6…',
'IunderstandwhyBillwasoutbangingotherchicks....... I mean
lookatwhatheismarriedto.....\n@HilaryClinton',
'#HilaryClintonWhatdoBernieSandersandDonaldTrumpHavein Common?:Hehassofarbeenth...http://t.co/t2YRcGCh67#Tcot #UniteBlue']
先从Tweets 的文本中做一个特征提取,使用一个有10000特征和英文停词的TF-IDF 矢量器将数据集向量化:
In:
print("Extractingfeaturesfromthetrainingdatasetusingasparse vectorizer")
t0=time()
Extractingfeaturesfromthetrainingdatasetusingasparse
vectorizer
In:
vectorizer=TfidfVectorizer(max_df=0.5,max_features=10000,
min_df=2,stop_words='english',use_idf=True)
X=vectorizer.fit_transform(twtstxt_ls01)
#
#OutputoftheTFIDFFeaturevectorizer
#
print("donein%fs"%(time()-t0))
print("n_samples:%d,n_features:%d"%X.shape)
print()
donein5.232165s
n_samples:7540,n_features:6638
数据集被分成拥有6638特征的7540个抽样,形成稀疏矩阵给K-Means 聚类算法 ,初始选择7个聚类和最多100次迭代:
In:
km=KMeans(n_clusters=7,init='k-means++',max_iter=100,n_init=1,
verbose=1)
print("Clusteringsparsedatawith%s"%km)
t0=time()
km.fit(X)
print("donein%0.3fs"%(time()-t0))
ClusteringsparsedatawithKMeans(copy_x=True,init='k-means++',max_iter=100,n_clusters=7,n_init=1,
n_jobs=1,precompute_distances='auto',random_state=None,
tol=0.0001,verbose=1)
Initializationcomplete
Iteration 0,inertia13635.141
Iteration 1,inertia6943.485
Iteration 2,inertia6924.093
Iteration 3,inertia6915.004
Iteration 4,inertia6909.212
Iteration 5,inertia6903.848
Iteration 6,inertia6888.606
Iteration 7,inertia6863.226
Iteration 8,inertia6860.026
Iteration 9,inertia6859.338
Iteration10,inertia6859.213
Iteration11,inertia6859.102
Iteration12,inertia6859.080
Iteration13,inertia6859.060
Iteration14,inertia6859.047
Iteration15,inertia6859.039
Iteration16,inertia6859.032
Iteration17,inertia6859.031
Iteration18,inertia6859.029
Convergedatiteration18
donein1.701s
在18次迭代后 K-Means聚类算法收敛,根据相应的关键词看一下7个聚类的结果 .Clusters0和6 是关于音乐和时尚的 JustinBieber和LadyGaga 相关的tweets.
Clusters1和5 是与美国总统大选 DonaldTrump和 HilaryClinton相关的tweets.Clusters2和3是我们感兴趣的ApacheSpark和Python.Cluster4包含了Thailand相关的tweets:
#
#Introspecttoptermspercluster
#
In:
print("Toptermspercluster:")
order_centroids=km.cluster_centers_.argsort()[:,::-1]
terms=vectorizer.get_feature_names()
foriinrange(7):
print("Cluster%d:"%i,end='')
forindinorder_centroids:
print('%s'%terms,end='')
print()
Toptermspercluster:
Cluster0:justinbieberlovemeanrtfollowthankhi https
whatdoyoumeanvideowannahearwhatdoyoumeanviralrorykramerhappylol makingpersondreamjustin
Cluster1:donaldtrumphilaryclintonrthttpstrump2016
realdonaldtrumptrumpgopampjustinbieberpresidentclintonemails oy8ltkstzetcotlikeberniesandershilarypeopleemail
Cluster2:bigdataapachesparkhadoopanalyticsrtsparktraining chennaiibmdatascienceapacheprocessingclouderamapreducedatasap httpsvoratransformingdevelopment
Cluster3:apachesparkpythonhttpsrtsparkdataampdatabricksusing newlearnhadoopibmbigapachecontinuumiobluemixlearningjoinopen
Cluster4:ernestsganttsimbata3jdhm2015elsahel12phuketdailynews dreamintentionsbeyhiveinfrancealmtorta18civipartnership9_a_6
25whu72ep0k7erhvu7wnfdmxxxcm3hosxuh2fxnt5o5rmb0xhpjnbgkqn0dj ovap57ujdhdtzsz3lb6xsunnysai12345sdcvulih6g
Cluster5:trumpdonalddonaldtrumpstarbuckstrumpquote
trumpforpresidentoy8ltkstzehttpszfns7pxysxsillygoystump trump2016newsjeremycoffeecorbynok7vc8aetzrttonight
Cluster6:ladygagagagaladyrthttpslovefollowhorrorcdstory ahshotelamericanjapanhotelhumantraffickingmusicfashiondiet queenahs
我们将通过画图来可视化结果。由于我们有6638个特征的7540个抽样,很难多维可视化,所以通过MDS算法来降维描绘 :
importmatplotlib.pyplotasplt
importmatplotlibasmpl
fromsklearn.manifoldimportMDS
MDS()
#
#BringdowntheMDStotwodimensions(components)aswewillplot theclusters
#
mds=MDS(n_components=2,dissimilarity="precomputed",random_state=1)
pos=mds.fit_transform(dist) #shape(n_components,n_samples)
xs,ys=pos[:,0],pos[:,1]
In:
#
#Setupcolorsperclustersusingadict
#
cluster_colors={0:'#1b9e77',1:'#d95f02',2: '#7570b3',3:'#e7298a',4:'#66a61e',5:'#9990b3',6:'#e8888a'}
#
#setupclusternamesusingadict
#
cluster_names={0:'Music,Pop',
1:'USAPolitics,Election',
2:'BigData,Spark',
3:'Spark,Python',
4:'Thailand',
5:'USAPolitics,Election',
6:'Music,Pop'}
In:
#
#ipythonmagictoshowthematplotlibplotsinline
#
%matplotlibinline
#
#CreatedataframewhichincludesMDSresults,clusternumbersand
tweettextstobedisplayed
#
df=pd.DataFrame(dict(x=xs,y=ys,label=clusters,txt=twtstxt_ls02_
utf8))
ix_start=2000
ix_stop =2050
df01=df
print(df01[['label','txt']])
print(len(df01))
print()
#Groupbycluster
groups=df.groupby('label')
groups01=df01.groupby('label')
#Setuptheplot
fig,ax=plt.subplots(figsize=(17,10))
ax.margins(0.05)
#
#Buildtheplotobject
#
forname,groupingroups01:
ax.plot(group.x,group.y,marker='o',linestyle='',ms=12,
label=cluster_names,color=cluster_colors,
mec='none')
ax.set_aspect('auto')
ax.tick_params(\
axis='x', #settingsforx-axis
which='both', #
bottom='off', #
top='off',#
labelbottom='off')
ax.tick_params(\
axis='y', #settingsfory-axis
which='both', #
left='off', #
top='off',#
labelleft='off')
ax.legend(numpoints=1)#
#
#Addlabelinx,ypositionwithtweettext
#
foriinrange(ix_start,ix_stop):
ax.text(df01.ix['x'],df01.ix['y'],df01.ix['txt'],
size=10)
plt.show()#Displaytheplot
label text
2000 2 b'RT@BigDataTechCon:'
2001 3 b"@4Quant'spresentat"
2002 2 b'CassandraSummit201'
这是Cluster2的图, 由蓝点表示BigData和 Spark; Cluster3,由红点表示Spark 和Python,以及相关的tweets 内容抽样:
利用 Scikit-Learn 的处理结果,已经探索到数据的一些好的见解,现在关注在Twitter数据集上执行Spark MLlib。
预处理数据集
为了准备在数据集上运行聚类算法,现在聚焦特征提取和工程化。我们实例化SparkContext,读取 Twitter数据集到一个Sparkdataframe.然后,对tweet文本数据连续分词,应用哈希词频算法到tokens, 并最终应用InverseDocumentFrequency 算法,重新缩放数据 。代码如下:
In:
#
#ReadcsvinaPandaDF
#
#
importpandasaspd
csv_in='/home/an/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark/data/unq_tweetstxt.csv'
pddf_in=pd.read_csv(csv_in,index_col=None,header=0,sep=';',encoding='utf-8')
In:
sqlContext=SQLContext(sc)
In:
#
#ConvertaPandaDFtoaSparkDF
#
spdf_02=sqlContext.createDataFrame(pddf_in[['id','user_id','user_name','tweet_text']])
In:
spdf_02.show()
In:
spdf_02.take(3)
Out:
http://t.co/VpD7FoqMr0'),
Row(id=638830426727911424,user_id=3276255125,user_name=u'True
Equality',tweet_text=u'ernestsgantt:BeyHiveInFrance:
PhuketDailyNews:dreamintentions:elsahel12:simbata3:JDHM2015:almtorta18:CiviPa\u2026http://t.co/VpD7FoqMr0'),
Row(id=638830425402556417,user_id=3276255125,user_name=u'True
Equality',tweet_text=u'ernestsgantt:BeyHiveInFrance:9_A_6:
ernestsgantt:elsahel12:simbata3:JDHM2015:almtorta18:
CiviPartnership:dr\u2026http://t.co/EMDOn8chPK')]
In:
frompyspark.ml.featureimportHashingTF,IDF,Tokenizer
In:
#
#Tokenizethetweet_text
#
tokenizer=Tokenizer(inputCol="tweet_text",outputCol="tokens")
tokensData=tokenizer.transform(spdf_02)
In:
tokensData.take(1)
Out:
http://t.co/VpD7FoqMr0',
tokens=[u'ernestsgantt:',u'beyhiveinfrance:',u'9_a_6:',
u'dreamintentions:',u'elsahel12:',u'simbata3:',u'jdhm2015:',u'almtorta18:',u'dreamintentions:\u2026',u'http://t.co/vpd7foqmr0'])]
In:
#
#ApplyHashingTFtothetokens
#
hashingTF=HashingTF(inputCol="tokens", outputCol="rawFeatures",numFeatures=2000)
featuresData=hashingTF.transform(tokensData)
In:
featuresData.take(1)
Out:
http://t.co/VpD7FoqMr0',
tokens=[u'ernestsgantt:',u'beyhiveinfrance:',u'9_a_6:',
u'dreamintentions:',u'elsahel12:',u'simbata3:',u'jdhm2015:',u'almtorta18:',u'dreamintentions:\u2026',u'http://t.co/vpd7foqmr0'],
rawFeatures=SparseVector(2000,{74:1.0,97:1.0,100:1.0,160:1.0,185:1.0,742:1.0,856:1.0,991:1.0,1383:1.0,1620:1.0}))]
In:
#
#ApplyIDFtotherawfeaturesandrescalethedata
#
idf=IDF(inputCol="rawFeatures",outputCol="features")
idfModel=idf.fit(featuresData)
rescaledData=idfModel.transform(featuresData)
forfeaturesinrescaledData.select("features").take(3):
print(features)
In:
rescaledData.take(2)
Out:
http://t.co/VpD7FoqMr0',
tokens=[u'ernestsgantt:',u'beyhiveinfrance:',u'9_a_6:',
u'dreamintentions:',u'elsahel12:',u'simbata3:',u'jdhm2015:',u'almtorta18:',u'dreamintentions:\u2026',u'http://t.co/vpd7foqmr0'],
rawFeatures=SparseVector(2000,{74:1.0,97:1.0,100:1.0,160:1.0,185:1.0,742:1.0,856:1.0,991:1.0,1383:1.0,1620:1.0}),
features=SparseVector(2000,{74:2.6762,97:1.8625,100:2.6384,160:2.9985,185:2.7481,742:5.5269,856:4.1406,991:2.9518,1383:4.694,1620:3.073})),
Row(id=638830426727911424,user_id=3276255125,user_name=u'True
Equality',tweet_text=u'ernestsgantt:BeyHiveInFrance:
PhuketDailyNews:dreamintentions:elsahel12:simbata3:
JDHM2015:almtorta18:CiviPa\u2026http://t.co/VpD7FoqMr0',
tokens=,
rawFeatures=SparseVector(2000,{74:1.0,97:1.0,100:1.0,160:1.0,185:1.0,460:1.0,987:1.0,991:1.0,1383:1.0,1620:1.0}),
features=SparseVector(2000,{74:2.6762,97:1.8625,100:2.6384,160:2.9985,185:2.7481,460:6.4432,987:2.9959,991:2.9518,1383:4.694,1620:3.073}))]
In:
rs_pddf=rescaledData.toPandas()
In:
rs_pddf.count()
Out:
id7540
user_id 7540
user_name 7540
tweet_text7540
tokens7540
rawFeatures 7540
features 7540
dtype:int64
In:
feat_lst=rs_pddf.features.tolist()
In:
feat_lst[:2]
Out:
[SparseVector(2000,{74:2.6762,97:1.8625,100:2.6384, 160:2.9985,185:2.7481,742:5.5269,856:4.1406,991:2.9518,1383: 4.694,1620:
3.073}),
SparseVector(2000,{74:2.6762,97:1.8625,100:2.6384, 160:2.9985,185:2.7481,460:6.4432,987:2.9959,991:2.9518,1383: 4.694,1620:3.073})]
运行聚类算法
在Twitter数据集上运行K-Means算法, 作为非标签的tweets,希望看到ApacheSparktweets形成一个聚类。 遵从以前的步骤, 特征的 TF-IDF 稀疏向量转化为一个RDD将被输入到 SparkMLlib程序。初始化K-Means模型为 5聚类,10次迭代:
In:
frompyspark.mllib.clusteringimportKMeans,KMeansModel
fromnumpyimportarray
frommathimportsqrt
In:
#Loadandparsethedata
in_Data=sc.parallelize(feat_lst)
In:
in_Data.take(3)
Out:
[SparseVector(2000,{74:2.6762,97:1.8625,100:2.6384, 160:2.9985,185:2.7481,742:5.5269,856:4.1406,991:2.9518,1383: 4.694,1620:3.073}),
SparseVector(2000,{74:2.6762,97:1.8625,100:2.6384, 160:2.9985,185:2.7481,460:6.4432,987:2.9959,991:2.9518,1383: 4.694,1620:3.073}),
SparseVector(2000,{20:4.3534,74:2.6762,97:1.8625,100:5.2768,185:2.7481,856:4.1406,991:2.9518,1039:3.073,1620:3.073,1864:4.6377})]
In:
in_Data.count()
Out:
7540
In:
#Buildthemodel(clusterthedata)
clusters=KMeans.train(in_Data,5,maxIterations=10,
runs=10,initializationMode="random")
In:
#EvaluateclusteringbycomputingWithinSetSumofSquaredErrors
deferror(point):
center=clusters.centers
returnsqrt(sum())
WSSSE=in_Data.map(lambdapoint:error(point)).reduce(lambdax,y:x +y)
print("WithinSetSumofSquaredError="+str(WSSSE))
评估模型和结果
聚类算法调优的一个方式是改变聚类的个数并验证输出.检查这些聚类,感受一下目前的聚类结果:
In:
cluster_membership=in_Data.map(lambdax:clusters.predict(x))
In:
cluster_idx=cluster_membership.zipWithIndex()
In:
type(cluster_idx)
Out:
pyspark.rdd.PipelinedRDD
In:
cluster_idx.take(20)
Out:
[(3, 0),
(3, 1),
(3, 2),
(3, 3),
(3, 4),
(3, 5),
(1, 6),
(3, 7),
(3, 8),
(3, 9),
(3, 10),
(3, 11),
(3, 12),
(3, 13),
(3, 14),
(1, 15),
(3, 16),
(3, 17),
(1, 18),
(1, 19)]
In:
cluster_df=cluster_idx.toDF()
In:
pddf_with_cluster=pd.concat(,axis=1)
In:
pddf_with_cluster._1.unique()
Out:
array()
In:
pddf_with_cluster==0].head(10)
Out:
Unnamed:0 idcreated_atuser_id user_name tweet_text_1 _2
6227 3 642418116819988480FriSep1119:23:09+00002015
49693598 AjinkyaKale RT@bigdata:DistributedMatrixComputations
i... 0 6227
6257 45642391207205859328FriSep1117:36:13+00002015
937467860 AngelaBassa I'mreading""DistributedMatrix
Comput... 0 6257
6297 119 642348577147064320FriSep1114:46:49+0000
2015 18318677 BenLorica DistributedMatrixComputationsin@
ApacheSpar... 0 6297
In:
pddf_with_cluster==1].head(10)
Out:
Unnamed:0 idcreated_atuser_id user_name tweet_text_1
_2
6 6 638830419090079746TueSep0121:46:55+00002015
2241040634MassimoCarrisi Python:Python:Removing\xa0from
string?-I...16
1517638830380578045953TueSep0121:46:46+00002015
57699376 RafaelMonnerat RT@ramalhoorg:Noitedeautógrafosdo
Fluent... 115
1841638830280988426250TueSep0121:46:22+00002015
951081582 JackBaldwin RT@cloudaus:Weare3/4full!2-day@
swcarpen... 1 18
1942638830276626399232TueSep0121:46:21+00002015
6525302 MasayoshiNakamura PynamoDB#AWS#DynamoDB#Python
http://...1 19
2043638830213288235008TueSep0121:46:06+00002015
3153874869BaltimorePython Flexx:PythonUItookitbasedonweb
technolog... 1 20
2144638830117645516800TueSep0121:45:43+00002015
48474625 RadioFreeDenali Hmm,emerge--depcleanwantstoremove
somethi...1 21
2246638829977014636544TueSep0121:45:10+00002015
154915461 LucianoRamalho NoitedeautógrafosdoFluentPythonno
Garoa...122
2347638829882928070656TueSep0121:44:47+00002015
917320920 bsbafflesbrains @DanSWrightHarperchannelingMonty
Python."... 1 23
2448638829868679954432TueSep0121:44:44+00002015
134280898 LannickTechnology RT@SergeyKalnish:Iam#hiring:
SeniorBacke...1 24
2549638829707484508161TueSep0121:44:05+00002015
2839203454JoshuaJones RT@LindseyPelas:SurvivingMontyPython
inFl... 1 25
In:
pddf_with_cluster==2].head(10)
Out:
Unnamed:0 idcreated_atuser_id user_name tweet_text_1
_2
7280 688 639056941592014848WedSep0212:47:02+00002015
2735137484Chris Atruegayiconwhenwill@ladygaga@Madonna@...
2 7280
In:
pddf_with_cluster==3].head(10)
Out:
Unnamed:0 idcreated_atuser_id user_name tweet_text_1
_2
0 0 638830426971181057TueSep0121:46:57+00002015
3276255125TrueEqualityernestsgantt:BeyHiveInFrance:9_A_6:
dreamint... 3 0
1 1 638830426727911424TueSep0121:46:57+00002015
3276255125TrueEqualityernestsgantt:BeyHiveInFrance:
PhuketDailyNews...3 1
2 2 638830425402556417TueSep0121:46:56+00002015
3276255125TrueEqualityernestsgantt:BeyHiveInFrance:9_A_6:
ernestsg... 3 2
3 3 638830424563716097TueSep0121:46:56+00002015
3276255125TrueEqualityernestsgantt:BeyHiveInFrance:
PhuketDailyNews...3 3
4 4 638830422256816132TueSep0121:46:56+00002015
3276255125TrueEqualityernestsgantt:elsahel12:9_A_6:
dreamintention... 3 4
5 5 638830420159655936TueSep0121:46:55+00002015
3276255125TrueEqualityernestsgantt:BeyHiveInFrance:
PhuketDailyNews...3 5
7 7 638830418330980352TueSep0121:46:55+00002015
3276255125TrueEqualityernestsgantt:elsahel12:9_A_6:
dreamintention... 3 7
8 8 638830397648822272TueSep0121:46:50+00002015
3276255125TrueEqualityernestsgantt:BeyHiveInFrance:
PhuketDailyNews...3 8
9 9 638830395375529984TueSep0121:46:49+00002015
3276255125TrueEqualityernestsgantt:elsahel12:9_A_6:
dreamintention... 3 9
1010638830392389177344TueSep0121:46:49+00002015
3276255125TrueEqualityernestsgantt:BeyHiveInFrance:
PhuketDailyNews...3 10
In:
pddf_with_cluster==4].head(10)
Out:
Unnamed:0 idcreated_atuser_id user_name tweet_text_1
_2
1361 882 642648214454317056SatSep1210:37:28+00002015
27415756 RaymondEnisuoh LAChosenForUS2024OlympicBid-
LA2016See...4 1361
1363 885 642647848744583168SatSep1210:36:01+00002015
27415756 RaymondEnisuoh PrisonSee:https://t.co/x3EKAExeFi………
……... 41363
5412 11640480770369286144SunSep0611:04:49+00002015
3242403023DonaldTrump2016 "igiboooy!@Starbuckshttps://t.
co/97wdL... 4 5412
5428 27640477140660518912SunSep0610:50:24+00002015
3242403023DonaldTrump2016 " @Starbuckshttps://t.co/
wsEYFIefk7"-D...4 5428
5455 61640469542272110592SunSep0610:20:12+00002015
3242403023DonaldTrump2016 "starbucks@StarbucksMamPlaza
https://t.co... 4 5455
5456 62640469541370372096SunSep0610:20:12+00002015
3242403023DonaldTrump2016 "Aaahhhthepumpkinspicelatteis
back,fall...4 5456
5457 63640469539524898817SunSep0610:20:12+00002015
3242403023DonaldTrump2016 "RTkayyleighferry:Ohmygoddd
HarryPotter...45457
5458 64640469537176031232SunSep0610:20:11+00002015
3242403023DonaldTrump2016 "Starbuckshttps://t.co/3xYYXlwNkf
"-Donald... 4 5458
我们以部分样本tweet映射成5个聚类.Cluster0 关于 Spark.Cluster1 关于 Python。Cluster2 关于 LadyGaga。Cluster3 关于Thailand’sPhuket 新闻。 Cluster4关于 DonaldTrump.
构建机器学习流水线
我们希望当优化最佳参数来获得最好执行模型时,能够组合特征提取,准备活动,训练,测试,和预测活动。
在 SparkML 中实现了强大的机器学习流水线,以5行代码准确地捕获了下面的tweet:
SparkML流水线是从 Python’sScikit-Learn中得到了灵感,创建了简洁数据连续转换的声明式语句可以快速地发布可调的模型。
页:
[1]