regan 发表于 2016-9-7 21:00:16

如何在Spark sql上扩展自己的插件?spark开源项目spark-avro

本帖最后由 regan 于 2016-9-17 12:15 编辑

      spark-avro是一个开源的建立在SparkSQL之上的免费开源插件。github地址:https://github.com/databricks/spark-avro.git。
这里正好关注了一下,分析了一下spark-avro的源代码并在此基础上进行二次开发。
      首先Spark是基于Scala语言实现的大数据计算引擎,Scala提供的简易型编程模型——模式匹配、隐式转换等功能,为Spark的实现提供了良好的基础。跟读Spark源码可以看到有大量的Case语句及implicits语句。这是这些实用的语言特性,为程序的编写及实现提供了最大的灵活性。
      spark-avro是基于SparkSQL开发的一个用于读取和保存avro格式文件的插件。提供的avro方法用于装载avro压缩格式的文件并转换成DataFrame,avro方法也可以保存DataFrame为avro格式的文件。
      在包对象中有如下的定义:      

package object avro {/**
   * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
   * the DataFileWriter
   */
implicit class AvroDataFrameWriter(writer: DataFrameWriter) {
    def avro(str:String):Unit = writer.format("com.databricks.spark.avro").save(str)
}

/**
   * Adds a method, `avro`, to DataFrameReader that allows you to read avro files using
   * the DataFileReade
   */
implicit class AvroDataFrameReader(reader: DataFrameReader) {
    def avro(str:String):DataFrame = reader.format("com.databricks.spark.avro").load(str)
}
}
       在avro包对象中,定义了两个隐式类,分别是AvroDataFrameWriter主构造器参数为DataFrameWriter参数、AvroDataFrameReader主构造器参数为DataFrameReader。两个隐式类中都定义了一个avro方法,该方法接受一个路径参数。第一个avro用于向指定路径写avro文件,第二个avro方法读取指定路径下的avro文件。       下面来看一下测试程序:

test("test save and load") {// Test if load works as expected
TestUtils.withTempDir { tempDir =>
    val df = spark.read.avro(episodesFile)
    assert(df.count == 8)


    val tempSaveDir = s"$tempDir/save/"

    df.write.avro(tempSaveDir)
    val newDf = spark.read.avro(tempSaveDir)
    assert(newDf.count == 8)
}
}

      该程序依赖spark2.0版本,程序中的spark类型是SparkSession,在spark上调用read,read是一个DataFrameReader对象,要知道DataFrameReader对象上原本是没有avro方法的,但是把spark-avro插件加入工程后,为啥就有了avro方法?前面你已经知道,在spark-avro插件的包对象avro中定义了两个隐式类,其中一个隐式类接受DataFrameReader参数。当调用DataFrameReader上的avro方法时,Scala会发现原DataFrameReader上是没有avro方法的,但是在失败之前Scala还会尝试通过隐式转换功能能不能找到适当的方法,即一个定义为implicit的类或方法。这里Scala发现AvroDataFrameReader是有avro方法的,因此不会报错,会直接调用AvroDataFrameReader隐式类的avro方法。DataFrameWriter也是一样,DataFrameReader上是没有avro方法的,但是隐式类AvroDataFrameWriter这个隐式类中是有定义avro方法的,所以调用df.write.avro方法不会报错,而是调用插件中的隐式对象AvroDataFrameWriter上的avro方法。
      从这里已经可以看出,Scala中的隐式转化的强大之处了吧。你可以通过提供一个隐式类或隐式函数,动态扩展某个对象上的行为,而你不必关心这会导致出错,因为Scala会为你完成通过隐式转换查找到对应的类或方法。要使用implicit定义隐式类或隐式方法。使用方法是*:将要扩展行为的类包装在隐式类的构造函数中或者隐式方法的参数中。
至此,你已经可以通过DataFrameReader或DataFrameWriter调用avro方法了,前提是在你的工程中加入spark-avro插件。但是故事到此仅仅是一个开始,好戏还在后头。下面来分析一下,DataFrameReader如何通过avro方法读取avro文件的。
spark-avro通过avro方法读取文件:
学习开源软件最有效的方法就是跟踪源代码了,哈哈。i'am a happy runner! just do it! go!调用DataFrameReader的avro方法对应源代码有如下的调用:

def avro(str:String):DataFrame = reader.format("com.databricks.spark.avro").load(str)
      从上面可以看出,其实avro方法做了一件事,那就是在原DataFrameReader基础上调用format方法,设置avro所在的包路径,该设置会在更底层通过反射得到需要扩展的类的信息。接着将str参数传入DataFrameReader原生的load方法,跟进去:

def load(path: String): DataFrame = {option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
}
def load(paths: String*): DataFrame = {
sparkSession.baseRelationToDataFrame(
    DataSource.apply(
      sparkSession,
      paths = paths,
      userSpecifiedSchema = userSpecifiedSchema,
      className = source,
      options = extraOptions.toMap).resolveRelation())
}
         第二个load方法中,sparkSession上的baseRelationToDataFrame方法接收一个BaseRelation类型的参数,该参数由DataSource的apply方法提供
sparkSession的baseRelationToDataFrame方法如下:
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {Dataset.ofRows(self, LogicalRelation(baseRelation))
}
ofRows方法接收一个LogicalRelation逻辑关系对象,该对象由DataSource中的resolveRelation方法产生。ofRows方法源代码如下所以:

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset(sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema))
}
ofRows方法中将执行逻辑计划。执行流程如下。
上面这些部分共同参与到SQL的执行过程中,步骤如下:
1)SQL语句经过SqlParser解析成UnresolvedLogicalPlan;
2)使用analyzer结合数据字典(catalog)进行绑定,生成ResolvedLogicalPlan;
3)使用optimizer对ResolvedLogicalPlan进行优化,生成OptimizedLogicalPlan;
4)使用SparkPlan将LogicalPlan转换成PhysicalPlan;
5)使用prepareForExecution将PhysicalPlan转换成可执行物理计划;
6)使用execute()执行可执行物理计划,生成SchemaRDD。7)最后将生成并返回DataFrame对象向

有兴趣的可以一步一的跟踪一下源代码


lingyufeng 发表于 2016-9-9 09:13:32

谢谢分享
页: [1]
查看完整版本: 如何在Spark sql上扩展自己的插件?spark开源项目spark-avro