本帖最后由 feilong 于 2018-11-30 09:22 编辑
问题导读
1.什么是多元正态分布?
2.将多元正态分布拟合到我们的数据中需要做什么?
3.如何进行数据试验?
关注最新经典文章,欢迎关注公众号
Spark 高级分析:第九章第6,7节 权重因子的确定和数据抽样
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26386
多元正态分布可以通过考虑因素之间的相关信息来帮助。多元正态分布中的每个样本都是一个向量。给定除了一个以外的所有维度的值,该维度上的值分布是正态的。但是,在它们的联合分布中,变量不是独立的。
多变量法线是参数化的平均沿每个维度和一个矩阵,描述每对维度之间的协方差。当协方差矩阵是对角线时,多变量法线会独立地减少到沿每个维度的采样,但是在非对角线中放置非零值有助于捕获变量之间的关系。
风险价值文献通常描述一个步骤,其中因子权重被转换(去相关),以便进行抽样。Apache Commons Math MultivariateNormalDistribution在我们的封装下为这一步做好了准备。
为了将多元正态分布拟合到我们的数据中,首先我们需要找到它的样本均值和协方差:
[mw_shl_code=scala,true]import org.apache.commons.math3.stat.correlation.Covariance
val factorCov = new Covariance(factorMat).getCovarianceMatrix().
getData()
val factorMeans = factorsReturns.
map(factor => factor.sum / factor.size).toArray[/mw_shl_code]
然后,我们可以简单地创建一个参数化的分布:
[mw_shl_code=scala,true]import org.apache.commons.math3.distribution.MultivariateNormalDistribution
val factorsDist = new MultivariateNormalDistribution(factorMeans,factorCov)[/mw_shl_code]
从中取样一组市场条件:
[mw_shl_code=scala,true]factorsDist.sample()
res1: Array[Double] = Array(2.6166887901169384, 2.596221643793665, 1.4224088720128492, 55.00874247284987)
factorsDist.sample()
res2: Array[Double] = Array(-8.622095499198096, -2.5552498805628256, 2.3006882454319686, -75.4850042214693)[/mw_shl_code]
有了每个票据的模型和取样因子返回的程序,我们现在有了运行实际试验所需的部件。因为运行试验是非常计算密集的,我们最终将转向Spark,帮助我们并行化它们。在每次试验中,我们想取样一组风险因素,用它们来预测每种票据的回报,并把所有的回报相加以求出完整的试验损失。为了实现一个代表性的分布,我们希望运行成千上万或数百万的这些试验。
我们在如何并行化仿真方面有一些选择。我们可以沿着试验、票据或两者并行化。为了沿着这两个并行化,我们将创建一个RDD的工具,一个RDD的试验参数,然后使用笛卡尔变换来生成所有的对的RDD。这是最一般的方法,但也有一些缺点。首先,它需要明确地创建一个RDD的试验参数,我们可以避免使用随机种子的一些技巧。第二,它需要洗牌操作。 [mw_shl_code=scala,true]val randomSeed = 1496
val instrumentsRdd = ...
def trialLossesForInstrument(seed: Long, instrument: Array[Double])
: Array[(Int, Double)] = {
...
}
instrumentsRdd.flatMap(trialLossesForInstrument(randomSeed, _)).
reduceByKey(_ + _)[/mw_shl_code] 通过这种方法,数据被划分在instruments RDD,每个票据通过flatmap变换计算和产生基于试验的损失。在所有任务中使用相同的随机种子意味着我们将产生相同的试验序列。reduceByKey将所有与同一试验相对应的损失合计。这种方法的一个缺点是,它仍然需要洗牌O(|instruments|* |trials|) 数据。
我们针对几千个票据数据的模型数据足够小,以适合每个执行器的内存,并且一些背包计算显示,即使对于大约一百万个票据和数百个因素,这种情况可能仍然存在。100万个工具乘以500个因子乘以存储每个因子权重的双倍所需的8个字节,大约等于4GB,小到足以适合大多数现代集群机上的每个执行器。这意味着一个好的选择是在广播变量中分配票据数据。每个执行器具有票据数据的完整副本的优点是,每次试验的总损失可以在一台机器上计算。不需要聚合。
随着分区试验方法(我们将使用),我们开始与RDD的种子。我们希望在每个分区中有一个不同的种子,以便每个分区产生不同的试验。 [mw_shl_code=scala,true]val parallelism = 1000
val baseSeed = 1496
val seeds = (baseSeed until baseSeed + parallelism)
val seedRdd = sc.parallelize(seeds, parallelism)[/mw_shl_code] 随机数生成是一个耗时和CPU密集的过程。虽然我们这里没有使用这个技巧,但是预先生成一组随机数并在多个作业中使用它通常是有用的。同样的随机数不应该用在单个作业中,因为这将违反蒙特卡罗假设的随机值是独立分布的。如果我们要走这条路线,我们将用文本文件替换并行化,并加载一个随机数SRDD。
对于每个种子,我们要生成一组试验参数,并观察这些参数对所有票据的影响。让我们从头开始编写一个函数,该函数计算单次试验下单个票据的返回。我们简单地应用我们先前为该票据训练的线性模型。回归参数的票据阵列的长度比试验阵列的长度大1,因为票据阵列的第一个元素包含截取项。 [mw_shl_code=scala,true]def instrumentTrialReturn(instrument: Array[Double],
trial: Array[Double]): Double = {
var instrumentTrialReturn = instrument(0)
var i = 0
while (i < trial.length) {//我们在这里使用while循环,而不是使用更有效的Scala构造,因为这是一个性能关键区域。
instrumentTrialReturn += trial(i) * instrument(i+1)
i += 1
}
instrumentTrialReturn
}[/mw_shl_code] 然后,为了计算一次试验的全部回报,我们简单地总结了所有票据的回报: [mw_shl_code=scala,true]def trialReturn(trial: Array[Double],
instruments: Seq[Array[Double]]): Double = {
var totalReturn = 0.0
for (instrument <- instruments) {
totalReturn += instrumentTrialReturn(instrument, trial)
}
totalReturn
}[/mw_shl_code] 最后,我们需要在每一个任务中产生一系列的试验。因为选择随机数是整个过程的一个重要部分,所以使用一个强随机数生成器是很重要的,它需要很长的时间来重复。公文学数学包括一个有利于这一点的梅森扭曲器实现。我们使用它来从多元正态分布采样如上所述。请注意,我们正在应用之前在生成的因子返回上定义的featurize方法,以便将它们转换为模型中使用的特征表示。 [mw_shl_code=scala,true]def trialReturns(seed: Long, numTrials: Int,
factorCovariances: Array[Array[Double]]): Seq[Double] = {
val multivariateNormal = new MultivariateNormalDistribution(
val trialReturns = new Array[Double](numTrials)
val trialFactorReturns = multivariateNormal.sample()
trialReturns(i) = trialReturn(trialFeatures, instruments)
trialReturns[/mw_shl_code] 随着我们的脚手架完成,我们可以使用它来计算RDD,其中每个元素是来自单个试验的总回报。因为票据数据(包括每个票据的每个因子特征的权重)是大的,所以我们使用广播变量。这确保它只需要每个执行器反序列化一次。 [mw_shl_code=scala,true]val bFactorWeights = sc.broadcast(factorWeights)
trialReturns(_, numTrials / parallelism,
bFactorWeights.value, factorMeans, factorCov))[/mw_shl_code] 如果你还记得,我们一直在混淆所有这些数据的全部原因是为了计算VaR。试验现在形成了投资组合收益的经验分布。为了计算5%的VaR,我们需要找到一种回报,这样我们期望在5%的时间里做的比它差,而在5%的时间里做的比它好。根据我们的经验分布,这非常简单,只要发现5%的试验不如95%的试验好于5%的值。我们可以通过采取有序的行动来把最坏的5%的试验交给司机。我们的VAR是这个子集中最好的试验的返回。 [mw_shl_code=scala,true]math.max(numTrials / 20, 1))
val varFivePercent = topLosses.last
varFivePercent: Double = -1751.6806481676153[/mw_shl_code]
|