本帖最后由 feilong 于 2018-12-28 08:18 编辑
问题导读
1.什么是Parquet?
2.Parquet底层原理是什么?
3.如何定义使用谓词类?
上一篇:
Spark 高级分析:第十章第2节用ADAM CLI摄取基因组学数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26522
在上一节中,我们了解了如何操作潜在的大量排序数据,而不用担心底层存储的细节或执行的并行性。然而,值得注意的是,ADAM项目使用了Parquet文件格式,它提供了我们在这里介绍的一些相当大的性能优势。
Parquet是一种开源文件格式规范和一组读/写器实现,我们建议这些实现用于分析查询(一次写入,多次读取)中使用的数据。它主要基于Google’s Dremel system中使用的底层数据存储格式,并且具有与Avro、Thrift和Protocol Buffers兼容的数据模型。特别地,它支持大多数常见的数据库类型(例如,int、double、string等),以及数组和记录,包括嵌套类型。重要的是,它是一个列文件格式,这意味着来自许多记录的特定列的值被连续存储在磁盘上,这种物理数据布局允许更高效的数据编码/压缩,并且通过最小化必须被读取/反序列化的数据量而显著减少查询时间。为每一列指定不同的编码/压缩方案的Parquet支持;并且每个列支持游程长度编码、字典编码和增量编码。
另一个有助于提高性能的Parquet的特点是“谓词下推”。在上面的CFTR查询中,Spark在决定是否通过谓词之前,必须反序列化/实现每个单例AlignmentRecord。这导致大量浪费I/O和CPU时间。Parquet读取器实现允许我们提供一个谓词类,该类在实现完整记录之前,只反序列化做出决策所需的列。
例如,要使用谓词下推实现我们的CFTR查询,我们必须首先定义一个合适的谓词类,用于测试AlignmentRecord是否在目标轨迹中:
[mw_shl_code=scala,true]import org.bdgenomics.adam.predicates.ColumnReaderInput._
import org.bdgenomics.adam.predicates.ADAMPredicate
import org.bdgenomics.adam.predicates.RecordCondition
import org.bdgenomics.adam.predicates.FieldCondition
class CftrLocusPredicate extends ADAMPredicate[AlignmentRecord] {
override val recordCondition = RecordCondition[AlignmentRecord](
FieldCondition(
"contig.contigName", (x: String) => x == "chr7"),
FieldCondition(
"start", (x: Long) => x <= 117149189),
FieldCondition(
"end", (x: Long) => x >= 117149189))
}[/mw_shl_code] 请注意,对于谓词工作,Parquet读取必须实例化类本身。这意味着必须将此代码编译到jar中,并通过将其添加到Spark类路径。在完成之后,谓词可以使用如下:
[mw_shl_code=scala,true]val cftr_reads = sc.adamLoad[AlignmentRecord, CftrLocusPredicate](
"/user/ds/genomics/reads/HG00103",
Some(classOf[CftrLocusPredicate])).collect()[/mw_shl_code]
|