本帖最后由 feilong 于 2018-12-21 08:09 编辑
问题导读
1.什么是ADAM?
2.如何使用ADAM?
3.如何分析结果?
上一篇:
Spark 高级分析:第十章第1节 解耦存储与建模
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26490
BDG的基因组学工具的核心集叫做ADAM。从一组映射读取开始,这个核心包括可以执行标记复制、基本质量分数重新校准、索引重新对齐和变体调用等任务的工具。ADAM还包含一个命令行接口,它封装内核以便于使用。与HPC相反,这些命令行工具知道Hadoop和HDFS,并且它们中的许多可以在一个集群上自动并行化,而不必手动拆分文件或调度作业。
我们将开始建立ADAM,就像README告诉我们的:
[mw_shl_code=bash,true]git clone https://github.com/bigdatagenomics/adam.git
cd adam
export "MAVEN_OPTS=-Xmx512m -XX:MaxPermSize=128m"
mvn clean package -DskipTests[/mw_shl_code]
ADAM附带了一个提交脚本,该脚本有助于与Spark的spark-submit脚本对接;使用它的最简单方式可能是将其重命名:
[mw_shl_code=bash,true]export $ADAM_HOME=path/to/adam
alias adam-submit="$ADAM_HOME/bin/adam-submit"[/mw_shl_code]
正如在README中所指出的,可以通过$JAVA_OPTS设置附加的JVM选项,或者检查appassembler文档以获取更多信息。此时,应该能够从命令行运行ADAM并获得使用消息:
我们将首先获取包含一些映射NGS读取的.bam文件,将它们转换为对应的BDG格式(本例中为AlignedRecord),并将它们保存到HDFS。首先,我们得到一个合适的.bam文件,并把它放在HDFS:
[mw_shl_code=bash,true]# Note: this file is 16 GB
curl -O ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/data\
/HG00103/alignment/HG00103.mapped.ILLUMINA.bwa.GBR\
.low_coverage.20120522.bam
# or using Aspera instead (which is *much* faster)
ascp -i path/to/asperaweb_id_dsa.openssh -QTr -l 10G \
anonftp@ftp.ncbi.nlm.nih.gov:/1000genomes/ftp/data/HG00103\
/alignment/HG00103.mapped.ILLUMINA.bwa.GBR\
.low_coverage.20120522.bam .
hadoop fs -put HG00103.mapped.ILLUMINA.bwa.GBR\
.low_coverage.20120522.bam /user/ds/genomics[/mw_shl_code]
然后,我们可以使用ADAM转换命令将.bam文件转换为Parquet格式(如下所述)。这将在集群和本地模式下工作:
[mw_shl_code=bash,true]adam-submit \
transform \
/user/ds/genomics/HG00103.mapped.ILLUMINA.bwa.GBR\
.low_coverage.20120522.bam \
/user/ds/genomics/reads/HG00103[/mw_shl_code]
这将使控制台有大量输出,包括跟踪作业进度的URL。让我们看看我们已经产生了什么:
[mw_shl_code=bash,true]$ hadoop fs -du -h /user/ds/genomics/reads/HG00103
0 /user/ds/genomics/reads/HG00103/_SUCCESS
516.9 K /user/ds/genomics/reads/HG00103/_metadata
101.8 M /user/ds/genomics/reads/HG00103/part-r-00000.gz.parquet
101.7 M /user/ds/genomics/reads/HG00103/part-r-00001.gz.parquet
[...]
104.9 M /user/ds/genomics/reads/HG00103/part-r-00126.gz.parquet
12.3 M /user/ds/genomics/reads/HG00103/part-r-00127.gz.parquet[/mw_shl_code]
所得到的数据集是/user/ds/.mics/reads/HG00103/目录中所有文件的串联,其中每个part-*.parquet文件是Spark任务之一的输出。还会注意到,由于柱状存储,数据比..bam文件(下面是gzip)的压缩效率更高:
[mw_shl_code=bash,true]$ hadoop fs -du -h "/user/ds/genomics/HG00103.*.bam"
15.9 G /user/ds/genomics/HG00103. [...] .bam
$ hadoop fs -du -h -s /user/ds/genomics/reads/HG00103
12.6 G /user/ds/genomics/reads/HG00103[/mw_shl_code]
让我们看看在交互会话中这些对象中的一个是什么样子。首先,我们使用ADAM辅助脚本启动Spark shell。它采用与默认Spark脚本相同的参数/选项,但是加载所有必需的JAR,等等。如下所示的例子,运行在Spark on YARN上:
[mw_shl_code=scala,true]export SPARK_HOME=/path/to/spark
$ADAM_HOME/bin/adam-shell
...[/mw_shl_code]
注意,在处理YARN时,交互式SparkShell需要yarn-client客户端模式,因此driver在本地执行。可能还需要适当地设置HADOOP_CONF_DIR或YARN_CONF_DIR。现在我们将对齐的读取数据加载为RDD[AlignmentRecord]:
[mw_shl_code=scala,true]import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.AlignmentRecord
val readsRDD: RDD[AlignmentRecord] = sc.adamLoad(
"/user/ds/genomics/reads/HG00103")
readsRDD.first()[/mw_shl_code]
它打印了大量的日志输出(Spark和Parquet love to log)以及结果本身:
[mw_shl_code=scala,true]res0: org.bdgenomics.formats.avro.AlignmentRecord = {"contig": {"contigName": "X", "contigLength":[/mw_shl_code]
你可能会得到一个不同的读取,因为数据在集群上的分区可能不同,所以无法保证哪个读取会首先返回。
现在我们可以交互式地询问关于数据集的问题,同时在后台集群中执行计算本身。这个数据集有多少读数?
[mw_shl_code=scala,true]readsRDD.count()
...
14/09/11 18:26:05 INFO SparkContext: Starting job: count [...]
...
res16: Long = 160397565[/mw_shl_code]
这个数据集中的读数来自人类所有染色体吗?
[mw_shl_code=scala,true]val uniq_chr = (readsRDD
.map(_.contig.contigName.toString)
.distinct()
.collect())
uniq_chr.sorted.foreach(println)
...
1
10
11
12
[...]
GL000249.1
MT
NC_007605
X
Y
hs37d5
是的。让我们更仔细地分析这个定义:
val uniq_chr = (readsRDD //
.map(_.contig.contigName.toString) //
.distinct() //
.collect()) //[/mw_shl_code]
假设我们正在使用下一代测序来筛查囊性纤维化的携带者,我们的基因型呼叫者给你一些看起来像过早停止密码子的东西,但它既不存在于HGMD,也不存在于Sick.CFTR数据库中。我们想回到原始测序数据,看看潜在的有害基因型调用是否是假阳性。为此,我们需要手动分析映射到该变异位点的所有读数,例如,117149189的7号染色体(参见???):
[mw_shl_code=scala,true]val cftr_reads = (readsRDD
.filter(_.contig.contigName.toString == "7")
.filter(_.start <= 117149189)
.filter(_.end > 117149189)
.collect())
cftr_reads.length // cftr_reads is a local Array[AlignmentRecord]
...
res2: Int = 9[/mw_shl_code]
现在可以手动检查这九个读数,或者通过定制的对准器处理它们,例如,并检查所报告的病原体变体是否为假阳性。对读者的练习:7号染色体的平均覆盖率是多少?(对于在给定位置可靠地进行基因型鉴定来说,这绝对太低了。)
比方说,我们正在运行一个临床实验室,进行这种载体筛查,为临床医生服务。使用Hadoop对原始数据进行归档,可以确保数据保持相对温暖(与磁带归档相比)。除了具有用于实际执行数据处理的可靠系统之外,对于质量控制(QC)或对于需要手动干预的情况(如上文),可以容易地访问所有过去的数据。除了快速获取全部数据外,中心性也使得进行大规模的分析研究变得容易,如群体遗传学、大规模QC分析等。
|
|