问题导读
1.本示例中将使用那些细胞数据?如何获取这些数据?
2.要将示例中的原始数据生成什么样的训练集?如何操作?
3.如何提取特征?
上一篇:
Spark 高级分析:第十章第3节 Parquet格式与列式存储
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26548
在本例中,我们将使用公共可用的序列特征数据构建用于转录因子绑定的简单模型。转录因子(Transcription factor,TF)是与基因组中的特定位点结合并帮助控制不同基因表达的蛋白质。因此,它们在确定特定细胞的表型方面至关重要,并且涉及许多生理和疾病过程。ChIP-seq是基于NGS的分析,它允许在特定细胞/组织类型中特定TF的结合位点的全基因组表征。然而,除了ChIP-seq的成本和技术困难之外,还需要对每个组织/TF对进行单独的实验。相比之下,DNase-seq是一种发现全基因组开放染色质区域的方法,并且每种组织类型只需要进行一次。代替通过针对每个组织/TF组合执行测序实验来分析TF结合位点,我们希望仅假设DNase-seq数据的可用性来预测新组织类型中的TF结合位点。
特别地,我们将使用DNase-seq数据以及已知的序列基序数据(来自HT-SELEX)和来自公开可用的ENCODE数据集的其他数据来预测CTCF转录因子的结合位点。我们已经选择了6种不同的细胞类型,有可用的DNase-seq和ChIP-seq数据。训练示例将是脱氧核糖核酸酶超敏反应(HS)峰值,并且标签将从ChIP-seq数据中导出。
我们将使用以下细胞系的数据:
GM12888
常见淋巴母细胞系的研究
K562
女性慢性粒细胞白血病
BJ
皮肤成纤维细胞
HEK293
胚胎肾
H54
胶质母细胞瘤
HepG2
肝细胞癌
首先以narrow Peak格式下载每个细胞系的DNASE数据:
[mw_shl_code=bash,true]hadoop fs -mkdir /user/ds/genomics/dnase
curl -s -L <...DNase URL...> \ #
| gunzip \
| hadoop fs -put - /user/ds/genomics/dnase/sample.DNase.narrowPeak
[...][/mw_shl_code]
接下来是CTCF转录因子的ChIP-seq数据,也是narrow Peak格式,以及GENCODE数据,GTF格式:
[mw_shl_code=bash,true]hadoop fs -mkdir /user/ds/genomics/chip-seq
curl -s -L <...ChIP-seq URL...> \ #
| gunzip \
| hadoop fs -put - /user/ds/genomics/chip-seq/samp.CTCF.narrowPeak
[...][/mw_shl_code]
请注意我们如何用GunZip解压缩数据流,以便在HDFS中存储数据流。现在我们下载一些额外的数据集,从中我们将得到预测的特征。
[mw_shl_code=bash,true]curl -s -L -O \
"http://hgdownload.cse.ucsc.edu/goldenPath/hg19/bigZips/hg19.2bit"[/mw_shl_code]
最后,保存数据以固定的摆动格式提供,这很难作为可分割文件读取。不可能预测一个特定任务为了获得关于邻接坐标的元数据而必须读取的文件的返回距离。因此,我们将wigFIX数据转换成HDF格式的BED格式。
[mw_shl_code=bash,true]hadoop fs -mkdir /user/ds/genomics/phylop
for i in $(seq 1 22); do
curl -s -L <...phyloP.chr$i URL...> \ #
| gunzip \
| adam-submit wigfix2bed \
| hadoop fs -put - "/user/ds/genomics/phylop/chr$i.phyloP.bed"
done
[...][/mw_shl_code]
最后,我们在Spark shell中执行PhyloP数据从text..bed格式到Parquet的一次性转换:
[mw_shl_code=bash,true](sc
.adamBEDFeatureLoad("/user/ds/genomics/phylop_text")
.adamSave("/user/ds/genomics/phylop"))[/mw_shl_code]
从所有这些原始数据中,我们要生成一个具有如下模式的训练集:
1.脱氧核糖核酸酶HS峰ID
2.染色体
3.起点
4.终点
5.最高TF基模PWM分数
6.平均PHOLOP保存分数
7.最大PHOLOP保存评分
8.最小PHOLOP保存评分
9.最接近转录起始位点(TSS)的距离
10.TF恒等式(在这种情况下总是“CTCF”)
11.细胞系
12.TF绑定状态(布尔值;目标变量)
现在我们生成可以用来创建RDD[LabeleDePoT ]的数据集。我们需要为多个细胞系生成数据,因此我们将为每个细胞系定义RDD,并在最后:
[mw_shl_code=scala,true]val cellLines = Vector(
"GM12878", "K562", "BJ", "HEK293", "H54", "HepG2")
val dataByCellLine = cellLines.map(cellLine => {
})[/mw_shl_code]
在开始之前,我们加载一些数据,这些数据将在整个计算过程中使用,包括保存、转录起始位点、人类基因组参考序列以及源自HT-SELEX的CTCF PWM(引用这个)。
[mw_shl_code=scala,true]// Load the human genome reference sequence
val bHg19Data = sc.broadcast(
new TwoBitFile(
new LocalFileByteAccess(
new File("/user/ds/genomics/hg19.2bit"))))
val phylopRDD = (sc.adamLoad[Feature, Nothing]("/user/ds/genomics/phylop")
// clean up a few irregularities in the phylop data
.filter(f => f.getStart <= f.getEnd))
val tssRDD = (sc.adamGTFFeatureLoad(
"/user/ds/genomics/gencode.v18.annotation.gtf")
.filter(_.getFeatureType == "transcript")
.map(f => (f.getContig.getContigName, f.getStart)))
val bTssData = sc.broadcast(tssRDD
// group by contig name
.groupBy(_._1)
// create Vector of TSS sites for each chromosome
.map(p => (p._1, p._2.map(_._2.toLong).toVector))
// collect into local in-memory structure for broadcasting
.collect().toMap)
// CTCF PWM from http://dx.doi.org/10.1016/j.cell.2012.12.009
val bPwmData = sc.broadcast(Vector(
Map('A'->0.4553,'C'->0.0459,'G'->0.1455,'T'->0.3533),
Map('A'->0.1737,'C'->0.0248,'G'->0.7592,'T'->0.0423),
Map('A'->0.0001,'C'->0.9407,'G'->0.0001,'T'->0.0591),
Map('A'->0.0051,'C'->0.0001,'G'->0.9879,'T'->0.0069),
Map('A'->0.0624,'C'->0.9322,'G'->0.0009,'T'->0.0046),
Map('A'->0.0046,'C'->0.9952,'G'->0.0001,'T'->0.0001),
Map('A'->0.5075,'C'->0.4533,'G'->0.0181,'T'->0.0211),
Map('A'->0.0079,'C'->0.6407,'G'->0.0001,'T'->0.3513),
Map('A'->0.0001,'C'->0.9995,'G'->0.0002,'T'->0.0001),
Map('A'->0.0027,'C'->0.0035,'G'->0.0017,'T'->0.9921),
Map('A'->0.7635,'C'->0.0210,'G'->0.1175,'T'->0.0980),
Map('A'->0.0074,'C'->0.1314,'G'->0.7990,'T'->0.0622),
Map('A'->0.0138,'C'->0.3879,'G'->0.0001,'T'->0.5981),
Map('A'->0.0003,'C'->0.0001,'G'->0.9853,'T'->0.0142),
Map('A'->0.0399,'C'->0.0113,'G'->0.7312,'T'->0.2177),
Map('A'->0.1520,'C'->0.2820,'G'->0.0082,'T'->0.5578),
Map('A'->0.3644,'C'->0.3105,'G'->0.2125,'T'->0.1127)))[/mw_shl_code]
现在,我们定义了一些用于特征生成的实用函数,包括标记、PWM评分和TSS距离
[mw_shl_code=scala,true]// fn for finding closest transcription start site
// naive...make this better
def distanceToClosest(loci: Vector[Long], query: Long): Long = {
loci.map(x => abs(x - query)).min
}
// compute a motif score based on the TF PWM
def scorePWM(ref: String): Double = {
val score1 = ref.sliding(bPwmData.value.length).map(s => {
s.zipWithIndex.map(p => bPwmData.value(p._2)(p._1)).product
}).max
val rc = SequenceUtils.reverseComplement(ref)
val score2 = rc.sliding(bPwmData.value.length).map(s => {
s.zipWithIndex.map(p => bPwmData.value(p._2)(p._1)).product
}).max
max(score1, score2)
}
// functions for labeling the DNase peaks as binding sites or not;
// compute overlaps between an interval and a set of intervals
// naive impl - this only works because we know the ChIP-seq peaks
// are non-overlapping (how do we verify this? exercise for the
// reader)
def isOverlapping(i1: (Long, Long), i2: (Long, Long)) =
(i1._2 > i2._1) && (i1._1 < i2._2)
def isOverlappingLoci(loci: Vector[(Long, Long)],
testInterval: (Long, Long)): Boolean = {
@tailrec
def search(m: Int, M: Int): Boolean = {
val mid = m + (M - m) / 2
if (M <= m) {
false
} else if (isOverlapping(loci(mid), testInterval)) {
true
} else if (testInterval._2 <= loci(mid)._1) {
search(m, mid)
} else {
search(mid + 1, M)
}
}
search(0, loci.length)
}[/mw_shl_code]
最后,将身体的“循环”用于计算每个细胞系上的数据。请注意我们如何读取ChIP-seq和DNase数据的文本表示,因为数据集并不太大,因此会损害性能。
首先,我们将DNASE和ChIP-seq数据加载为RDDS。
[mw_shl_code=scala,true]val dnaseRDD = sc.adamNarrowPeakFeatureLoad(
s"/user/ds/genomics/dnase/$cellLine.DNase.narrowPeak")
val chipseqRDD = sc.adamNarrowPeakFeatureLoad(
s"/user/ds/genomics/chip-seq/$cellLine.ChIP-seq.CTCF.narrowPeak")[/mw_shl_code]
然后,我们将在DNase特性上生成目标标签的函数定义为“绑定”或“不绑定”。该函数需要同时访问所有ChIP-seq峰值,因此我们将原始ChIP-seq数据处理为内存中的数据结构,并将其广播到所有节点,作为广播变量bBindingData:
[mw_shl_code=scala,true]val bBindingData = sc.broadcast(
chipseq
// group peaks by chromosome
.groupBy(_.getContig.getContigName.toString) //
// for each chr, for each ChIP-seq peak, extract start/end
.map(p => (p._1, p._2.map(f =>
(f.getStart: Long, f.getEnd: Long)))) //
// for each chr, sort the peaks (non-overlapping)
.map(p => (p._1, p._2.toVector.sortBy(x => x._1))) //
// collect them back into a local in-memory data structure for
// broadcasting
.collect().toMap)[/mw_shl_code]
这个操作为我们提供了一个Map,其中键是染色体名称,值是按位置排序的非重叠(开始,结束)对的向量。现在我们定义实际的标记函数:
[mw_shl_code=scala,true]def generateLabel(f: Feature) = {
val contig = f.getContig.getContigName
if (!bBindingData.value.contains(contig)) {
false
} else {
val testInterval = (f.getStart: Long, f.getEnd: Long)
isOverlappingLoci(bBindingData.value(contig), testInterval)
}
}[/mw_shl_code]
为了计算守恒特征(使用phyloP数据),我们必须将DNase峰与phyloP数据连接起来。
因为我们加入了间隔,我们将使用亚当中的广播区域连接实现,它收集连接的一侧(在这种情况下,较小的DNASE数据),计算非重叠区域,然后通过广播所收集的数据来实现复制的连接。
[mw_shl_code=scala,true]val dnaseWithPhylopRDD = (
BroadcastRegionJoin.partitionAndJoin(sc, dnaseRDD, phylopRDD)
// group the conservation values by DNase peak
.groupBy(x => x._1.getFeatureId)
// compute conservation stats on each peak
.map(x => {
val y = x._2.toSeq
val peak = y(0)._1
val values = y.map(_._2.getValue)
// compute phylop features
val avg = values.reduce(_ + _) / values.length
val m = values.max
val M = values.min
(peak.getFeatureId, peak, avg, m, M)
}))[/mw_shl_code]
现在我们计算每个DNase峰上的最后一组特征,包括目标变量。
[mw_shl_code=scala,true]// generate the final set of tuples
dnaseWithPhylopRDD.map(tup => {
val peak = tup._2
val featureId = peak.getFeatureId
val contig = peak.getContigName.getContigName
val start = peak.getStart
val end = peak.getEnd
val score = scorePWM(
bHg19Data.value.extract(ReferenceRegion(peak)))
val avg = tup._3
val m = tup._4
val M = tup._5
val closest_tss = min(
distanceToClosest(bTssData.value(contig), peak.getStart),
distanceToClosest(bTssData.value(contig), peak.getEnd))
val tf = "CTCF"
val line = cellLine
val bound = generateLabel(peak)
(featureId, contig, start, end, score, avg, m, M, closest_tss,
tf, line, bound)
})[/mw_shl_code]
这个最后的RDD在环路上的每个通过单元线上计算。最后,我们将来自每个细胞系的每个RDD合并,并将这些数据缓存在内存中,以准备从其中去除训练模型。
[mw_shl_code=scala,true]val preTrainingData = dataByCellLine.reduce(_ ++ _)
preTrainingData.cache()
preTrainingData.count() // 801263
preTrainingData.filter(_._12 == true).count() // 220285[/mw_shl_code]
此时,preTrainingData中的数据可以被标准化并转换为RDD[LabeledPoint]以训练分类器,如XXXXX章所述。请注意,应该执行交叉验证,其中在每个折叠中,保存来自其中一个单元格行的数据。
|
|