Spark之计算IRR(内部收益率)
本帖最后由 hanyunsong 于 2020-8-13 20:25 编辑问题导读:
1. IRR内部收益率的概念是什么?
2. Spark之计算IRR需要做哪些数据准备?
3. Spark与Pandas效率对比如何?
概念
IRR又叫内部收益率,通俗解释就是内部收益率越高,就说明投入的成本相对较少,这时候可以获得的收益也就相对更多些。另外我们也可以理解为要决定一个项目接受与否,但又不适应现实情况的一个贴现率。这两种理解都算是通俗的解释
IRR 内部收益率在Excel和Pandas中有对应的函数,因需要计算的数据量太大所以给出spark解决方案,本文只做实现记录和部分概念讲解。
IRR代码
在网上IRR 内部收益率有对应的Java和Oracle sql实现,Java代码如下。原文链接
public static double irr(double[] values, double guess) {
int maxIterationCount = 20;
double absoluteAccuracy = 1.0E-007D;
double x0 = guess;
int i = 0;
while (i < maxIterationCount) {
double fValue = 0.0D;
double fDerivative = 0.0D;
for (int k = 0; k < values.length; k++) {
fValue += values / Math.pow(1.0D + x0, k);
fDerivative += -k * values / Math.pow(1.0D + x0, k + 1);
}
double x1 = x0 - fValue / fDerivative;
if (Math.abs(x1 - x0) <= absoluteAccuracy) {
return x1;
}
x0 = x1;
i++;
}
return (0.0D / 0.0D);
}调用函数只需要传递所有数据和一个guess,这个guess从代码中看出应该是一个迭代参数,函数调用如下
double ret = irr(income,0.1d)* 12 * 100 ;12和100看自己需要是否乘上去。
将网上的Java代码翻译为Scala代码如下
import scala.math._
def irr(values:String):(String,Double) = {
val list = values.split(",")
val id = list.head
val fees_flatten = list.tail.sortWith(_.toDouble < _.toDouble)
val maxIterationCount = 20;
var x0:Double = 0.00001;
var fValue:Double = 0.0;
val fDerivative:Double = 0.0;
val absoluteAccuracy:Double = 1.0E-007D;
var i = 0;
while (i < maxIterationCount) {
var fValue:Double = 0.0;
var fDerivative:Double = 0.0;
var k = 0;
while ( k< fees_flatten.length) {
var v = fees_flatten(k).toDouble
fValue = fValue + (v / pow(1.0 + x0, k));
fDerivative = fDerivative + (-k * (fees_flatten(k).toDouble) / pow(1.0 + x0, k + 1));
k = k + 1;
}
var x1 : Double = x0 - fValue / fDerivative;
if (abs(x1 - x0) <= absoluteAccuracy) {
return (id,x1);
}
x0 = x1;
i = i + 1;
}
val res : Double = 0.0;
return (id,res);
}值得一提的是方法的参数被我替换为了String类型。因为集合数据再RDD中是以Compactbuffer的形式存在的,在类型上是Iterable,传递到Scala函数中不容易操作。我以Iterable的形式传递进去后尝试了遍历取值等操作都报错了,所以干脆转化为String自己切分。
数据准备
接下来需要准备IRR计算所需要的数据。这个数据由两部分组成
1- 所有本金的值取负数
2-每期的应付金额
因为只是单纯为了加速计算IRR而不是后面经常用,所以我直接将Hive数据在HDFS上让Spark去读取了,我们只需要将这两部分数据导入到一个有两列的表中即可。table_name (id bigint, fee Double 或者decimal)
Spark代码
spark-shell
import scala.math._
def irr(values:String):(String,Double) = {
val list = values.split(",")
val id = list.head
val fees_flatten = list.tail.sortWith(_.toDouble < _.toDouble)
val maxIterationCount = 20;
var x0:Double = 0.00001;
var fValue:Double = 0.0;
val fDerivative:Double = 0.0;
val absoluteAccuracy:Double = 1.0E-007D;
var i = 0;
while (i < maxIterationCount) {
var fValue:Double = 0.0;
var fDerivative:Double = 0.0;
var k = 0;
while ( k< fees_flatten.length) {
var v = fees_flatten(k).toDouble
fValue = fValue + (v / pow(1.0 + x0, k));
fDerivative = fDerivative + (-k * (fees_flatten(k).toDouble) / pow(1.0 + x0, k + 1));
k = k + 1;
}
var x1 : Double = x0 - fValue / fDerivative;
if (abs(x1 - x0) <= absoluteAccuracy) {
return (id,x1);
}
x0 = x1;
i = i + 1;
}
val res : Double = 0.0;
return (id,res);
}
val lines = sc.textFile("/xx/xx/data.csv")
val odd_tup = lines.map(x => (x.split(",")(0),x.split(",")(1))).filter(x => x._1.toInt % 2 == 0).map(x => (x._1.toInt,x._2.toDouble))
val even_tup =lines.map(x => (x.split(",")(0),x.split(",")(1))).filter(x => x._1.toInt % 2 == 1).map(x => (x._1.toInt,x._2.toDouble))
val merged = odd_tup.cogroup(even_tup)
merged.filter(x => x._2._1.mkString(",").length > 0).map(x => (x._1,x._2._1)).map{ case (k,v) => s"""$k,${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1.toInt,x._2.toDouble )).toDF().insertInto(table_name,true)
merged.filter(x => x._2._2.mkString(",").length > 0).map(x => (x._1,x._2._2)).map{ case (k,v) => s"""$k,${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1.toInt,x._2.toDouble)).toDF().insertInto(table_name,false)/xx/xx/data.csv是我在HDFS上的数据存储路径,先在spark中构建好函数,然后用算子整理好数据后传递到自定义函数中得到结果存储到Hive中。此处得到的结果是没有乘12 * 100的,如果需要乘12 * 100则merged部分如下
merged.map(x => (x._1,x._2._1)).map{ case (k,v) => s"""$k,${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1,x._2 * 100 * 12)).toDF().insertInto(result_table_name)得到的结果挑几笔跟Excel对比下以免出错。
++20200812++++++++++++++++++start
直接读取hive数据
加一段,使用shell脚本执行,直接读取hive数据,计算结果写入hive。这样执行起来更加方便。
$ cat sparkShell_command.sh
#!/usr/bin/sh
input_table=$1
result_table=${input_table}_result
cat << EOF
import scala.math._
def irr(values:String):(String,Double) = {
val list = values.split(",")
val id = list.head
val fees_flatten = list.tail.sortWith(_.toDouble < _.toDouble)
val maxIterationCount = 20;
var x0:Double = 0.00001;
var fValue:Double = 0.0;
val fDerivative:Double = 0.0;
val absoluteAccuracy:Double = 1.0E-007D;
var i = 0;
while (i < maxIterationCount) {
var fValue:Double = 0.0;
var fDerivative:Double = 0.0;
var k = 0;
while ( k< fees_flatten.length) {
var v = fees_flatten(k).toDouble
fValue = fValue + (v / pow(1.0 + x0, k));
fDerivative = fDerivative + (-k * (fees_flatten(k).toDouble) / pow(1.0 + x0, k + 1));
k = k + 1;
}
var x1 : Double = x0 - fValue / fDerivative;
if (abs(x1 - x0) <= absoluteAccuracy) {
return (id,x1);
}
x0 = x1;
i = i + 1;
}
val res : Double = 0.0;
return (id,res);
}
sqlContext.sql("create table if not exists ${result_table}(id bigint , irr double)")
val odd = sqlContext.sql("select * from ${input_table}").rdd.map(x => (x(0).toString,x(1).toString)).filter(x => x._1.toInt % 2 == 0).map(x => (x._1.toInt,x._2.toDouble))
val even = sqlContext.sql("select * from ${input_table}").rdd.map(x => (x(0).toString,x(1).toString)).filter(x => x._1.toInt % 2 == 1).map(x => (x._1.toInt,x._2.toDouble))
val merged = odd.cogroup(even)
merged.filter(x => x._2._2.mkString(",").length > 0).map(x => (x._1,x._2._2)).map{ case (k,v) => s"""\$k,\${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1.toInt,x._2.toDouble)).toDF().insertInto("${result_table}",false)
merged.filter(x => x._2._1.mkString(",").length > 0).map(x => (x._1,x._2._1)).map{ case (k,v) => s"""\$k,\${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1.toInt,x._2.toDouble)).toDF().insertInto("${result_table}",false)
EOFcat EOF中间是后来改的直接读取hive的代码。脚本调用方式如下
bash sparkShell_command.shtable_name | spark-shell脚本的输入参数是数据表,输出的表是 数据表_result。需要注意,数据表要有两列数据(id bigint, fee double),在fee中需要包含总数-amount。
其实可以看出来,实际上还是在spark-shell中执行的。
++20200812++++++++++++++++++ end
与Pandas效率对比
以上代码,在公司数据量的基础上(不方便透露)同事写的Python脚本处理数据用2.5h,我预估了一下Python代码即便优化后执行也要接近三十分钟,而spark处理只用十秒钟。
spark代码后续用到再优化吧,以上只做记录。
文章来源:https://blog.csdn.net/weixin_39445556/article/details/107929751
页:
[1]