分享

Spark 高级分析:第十一章第1节 PySpark概述

本帖最后由 feilong 于 2019-1-18 07:55 编辑

问题导读

1.
什么是PySpark
2.
PySpark经常用来做什么
3.
PySpark内部原理是什么



Spark 高级分析:第十章第5节 示例:从1000个基因组项目中查询基因型
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26608


第十一章用PySpark和Thunder分析神经影像学数据

成像设备和自动化的进步导致了关于大脑功能的大量数据的产生。虽然过去的实验可能只从大脑中的少数电极或少量的脑切片的静态图像产生时间序列数据,但是当今的技术可以在生物体活动时从大区域的大量神经元中采样大脑活动。的确,奥巴马政府已经批准了“大脑”计划,该计划有崇高的技术发展目标,例如,能够同时记录小鼠大脑中每个神经元在长期内的电活动。尽管测量技术的突破是必须的,但产生的数据量将为生物学创造全新的范例。

在本章中,我们将介绍用于通过Python与Spark交互的PySpark API,以及在PySpark之上开发的Thunder项目,它一般用于处理大量的时间序列数据,特别是神经成像数据。PySpark是用于探索性大数据分析的特别灵活的工具,因为它很好地集成了PyData生态系统的其他部分,包括用于可视化的matplotlib,甚至还集成了用于“可执行文档”的IPython Notebook(Jupyter)。

我们将利用这些工具来理解斑马鱼大脑的一些结构。使用Thunder,我们将对大脑的不同区域(代表神经元组)进行聚类,以发现斑马鱼随时间的行为活动模式。

Python是许多数据科学家最喜欢的工具,因为它的高级语法和广泛的包库。Spark生态系统已经认识到Python在数据分析环境中的重要性,并开始投资于使用Spark的Python API,尽管Python与JVM集成存在历史困难。

用于科学计算和数据科学的Python

Python已经成为科学计算和数据科学的热门工具。它现在正被用于许多传统上使用MATLAB、R或Mathematica的应用程序。原因包括:一种易于使用和学习的高级语言。一个广泛的类库系统,从小众数字计算到网络抓取实用程序,再到数据可视化工具。轻松地与C/C++代码接口,允许访问高性能的库,包括BLAS/LAPACK/ALTLAS1。

需要特别记住的一些库包括:.numpy/scipy/matplotlib:这些库概括了典型的MATLAB功能,包括快速数组操作、科学函数和广泛使用的受MATLAB启发的绘图库. pandas:这个库提供了与R的data.frame类似的功能,且具有更高的性能,用于引导.scikit-learn/statsmodels:这些库提供了机器学习算法(例如,分类/回归、聚类、矩阵分解)和统计模型.nltk:一个自然语言处理库的高质量实现。还有许多其他的;请参阅这里的大列表:https://github.com/vinta/awesome-python

启动PySpark:
[mw_shl_code=bash,true]export IPYTHON=1 # PySpark can use the IPython shell
pyspark --master ... --num-executors ...[/mw_shl_code]
可以使用spark-submit提交Python脚本,它将检测脚本上的.py扩展。PySpark通过设置环境变量IPYTHON=1支持使用IPython shell,这是我们普遍推荐的。当Python shell启动时,它创建一个Python SparkContext对象,通过该对象我们与集群交互。一旦SparkContext可用,PySpark API就非常类似于Scala API。
[mw_shl_code=python,true]raw_data = sc.textFile('path/to/csv/data') # RDD[string]
# filter, split on comma, parse floats to get a RDD[list[float]]
data = (raw_data
.filter(lambda x: x.startswith("#"))
.map(lambda x: map(float, x.split(','))))
data.take(5)[/mw_shl_code]
就像在Scala API中一样,我们加载文本文件,过滤以#开头的行,并将CSV数据解析为浮点值列表。例如,传递给筛选器和映射的Python函数非常灵活。它们必须接受Python对象并返回Python对象(在过滤器的情况下,返回值被解释为布尔值)。唯一的限制是必须使用cloudpickle(其中包括匿名lambda函数)对python函数对象进行序列化,并且必须在python执行器进程的PYTHONPATH 上提供闭包中引用的任何必要模块。为了确保引用模块的可用性,必须安装这些模块集群范围和在python执行器进程的PYTHONPATH上可用,或者相应的模块zip/egg文件必须由spark显式分布,然后spark将它们添加到PYTHONPATH 中。后一个功能可以通过调用sc.addpyfile()来实现。

PySparkRDD只是Python对象的RDD:与Python列表一样,它们可以存储具有混合类型的对象(因为在下面,所有对象都是PyObject的实例)。

PySpark API在某种程度上可能落后于Scala API,因此在某些情况下,Scala中的特性变得更快。然而,除了核心API之外,已经存在MLlib的Python API,例如,在Thunder中使用的Python API。

图片1.png
了解一下PySpark是如何实现的,以便简化调试,以及了解可能的性能陷阱,这是很有用的。

当PySpark的Python解释器启动时,它还启动一个JVM,通过套接字与该JVM进行通信。PySpark使用Py4J项目来处理此通信。JVM充当实际的Spark驱动程序,并加载一个JavaSparkContext,该JavaSparkContext通过集群与Spark执行器通信。将Python API调用SparkContext,然后转换为Java API调用JavaSparkContext。例如,PySpark的sc.textFile()的实现将向JavaSparkContext的.textFile方法发送一个调用,JavaSparkContext最终与Spark执行器JVM通信以从HDFS加载文本数据。

集群上的Spark执行器为每个核心启动一个Python解释器,当需要执行用户代码时,它们通过管道与Python解释器进行通信。本地PySpark客户端中的PythonRDD对应于本地JVM中的PythonRDD对象。与RDD相关联的数据实际上存储在Spark JVM中作为Java对象。例如,在Python解释器中运行sc.textFile()将调用JavaSparkContexts textFile方法,该方法将数据加载为集群中的Java字符串对象。类似地,使用newAPIHadoopFile 加载一个Parquet/AVRO文件将将对象加载为Java Avro对象。

当对Python RDD进行API调用时,将使用任何相关代码(例如,Python lambda函数)序列化并分发给执行器。然后将数据从Java对象转换为Python兼容的表示(例如,pickle对象),并通过管道流到执行器相关的Python解释器。在解释器中执行任何必要的Python处理,所得到的数据作为RDD(默认情况下作为pickle对象)存储在JVM中。

Python对可执行代码序列化的内置支持不如Scala强大。因此,PySpark的作者不得不使用由现在已经停用的PiCloud构建的名为“cloudpickle”的自定义模块。

IPython Notebook是一个用于探索性分析和用作计算“实验室笔记本”的极好环境。它允许用户集成文本、图像、可执行代码(在Python和现在其他语言中),还支持托管平台以及其他特性。虽然IPython Notebook对Spark支持得很好,但是要正确配置它需要一些小心,因为PySpark必须以特定的方式初始化。有关详细信息,请参阅此博客文章:http://blog.cloudera.com/blog/2014/08/how-to-use-ipythonnotebook-with-apache-spark/



最新经典文章,欢迎关注公众号



已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条