分享

Spark之计算IRR(内部收益率)

本帖最后由 hanyunsong 于 2020-8-13 20:25 编辑

问题导读:

1. IRR内部收益率的概念是什么?
2. Spark之计算IRR需要做哪些数据准备?
3. Spark与Pandas效率对比如何?




概念

IRR又叫内部收益率,通俗解释就是内部收益率越高,就说明投入的成本相对较少,这时候可以获得的收益也就相对更多些。另外我们也可以理解为要决定一个项目接受与否,但又不适应现实情况的一个贴现率。这两种理解都算是通俗的解释

IRR 内部收益率在Excel和Pandas中有对应的函数,因需要计算的数据量太大所以给出spark解决方案,本文只做实现记录和部分概念讲解。

IRR代码

在网上IRR 内部收益率有对应的Java和Oracle sql实现,Java代码如下。原文链接
[mw_shl_code=java,true]public static double irr(double[] values, double guess) {
        int maxIterationCount = 20;
        double absoluteAccuracy = 1.0E-007D;

        double x0 = guess;

        int i = 0;
        while (i < maxIterationCount) {
            double fValue = 0.0D;
            double fDerivative = 0.0D;
            for (int k = 0; k < values.length; k++) {
                fValue += values[k] / Math.pow(1.0D + x0, k);
                fDerivative += -k * values[k] / Math.pow(1.0D + x0, k + 1);
            }
            double x1 = x0 - fValue / fDerivative;
            if (Math.abs(x1 - x0) <= absoluteAccuracy) {
                return x1;
            }
            x0 = x1;
            i++;
        }
        return (0.0D / 0.0D);
    }[/mw_shl_code]调用函数只需要传递所有数据和一个guess,这个guess从代码中看出应该是一个迭代参数,函数调用如下
[mw_shl_code=java,true]double ret = irr(income,0.1d)* 12 * 100 ;[/mw_shl_code]12和100看自己需要是否乘上去。

将网上的Java代码翻译为Scala代码如下
[mw_shl_code=java,true]import scala.math._
def irr(values:String):(String,Double) = {
    val list = values.split(",")
    val id = list.head
    val fees_flatten = list.tail.sortWith(_.toDouble < _.toDouble)
    val maxIterationCount = 20;
    var x0:Double = 0.00001;
    var fValue:Double = 0.0;
    val fDerivative:Double = 0.0;
    val absoluteAccuracy:Double = 1.0E-007D;
    var i = 0;
    while (i < maxIterationCount) {
        var fValue:Double = 0.0;
        var fDerivative:Double = 0.0;
        var k = 0;
        while ( k  < fees_flatten.length) {
            var v = fees_flatten(k).toDouble
            fValue = fValue + (v / pow(1.0 + x0, k));
            fDerivative = fDerivative + (-k * (fees_flatten(k).toDouble) / pow(1.0 + x0, k + 1));
            k = k + 1;
        }
        var x1 : Double = x0 - fValue / fDerivative;
        if (abs(x1 - x0) <= absoluteAccuracy) {
            return (id,x1);
        }
        x0 = x1;
        i = i + 1;
    }
    val res : Double = 0.0;
    return (id,res);
}[/mw_shl_code]值得一提的是方法的参数被我替换为了String类型。因为集合数据再RDD中是以Compactbuffer的形式存在的,在类型上是Iterable,传递到Scala函数中不容易操作。我以Iterable的形式传递进去后尝试了遍历取值等操作都报错了,所以干脆转化为String自己切分。

数据准备

接下来需要准备IRR计算所需要的数据。这个数据由两部分组成

1- 所有本金的值取负数

2-每期的应付金额

因为只是单纯为了加速计算IRR而不是后面经常用,所以我直接将Hive数据在HDFS上让Spark去读取了,我们只需要将这两部分数据导入到一个有两列的表中即可。table_name (id bigint, fee Double 或者decimal)

Spark代码
[mw_shl_code=java,true]spark-shell

import scala.math._
def irr(values:String):(String,Double) = {
    val list = values.split(",")
    val id = list.head
    val fees_flatten = list.tail.sortWith(_.toDouble < _.toDouble)
    val maxIterationCount = 20;
    var x0:Double = 0.00001;
    var fValue:Double = 0.0;
    val fDerivative:Double = 0.0;
    val absoluteAccuracy:Double = 1.0E-007D;
    var i = 0;
    while (i < maxIterationCount) {
        var fValue:Double = 0.0;
        var fDerivative:Double = 0.0;
        var k = 0;
        while ( k  < fees_flatten.length) {
            var v = fees_flatten(k).toDouble
            fValue = fValue + (v / pow(1.0 + x0, k));
            fDerivative = fDerivative + (-k * (fees_flatten(k).toDouble) / pow(1.0 + x0, k + 1));
            k = k + 1;
        }
        var x1 : Double = x0 - fValue / fDerivative;
        if (abs(x1 - x0) <= absoluteAccuracy) {
            return (id,x1);
        }
        x0 = x1;
        i = i + 1;
    }
    val res : Double = 0.0;
    return (id,res);
}

val lines = sc.textFile("/xx/xx/data.csv")
val odd_tup = lines.map(x => (x.split(",")(0),x.split(",")(1))).filter(x => x._1.toInt % 2 == 0).map(x => (x._1.toInt,x._2.toDouble))
val even_tup =  lines.map(x => (x.split(",")(0),x.split(",")(1))).filter(x => x._1.toInt % 2 == 1).map(x => (x._1.toInt,x._2.toDouble))
val merged = odd_tup.cogroup(even_tup)
merged.filter(x => x._2._1.mkString(",").length > 0).map(x => (x._1,x._2._1)).map{ case (k,v) => s"""$k,${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1.toInt,x._2.toDouble )).toDF().insertInto(table_name,true)
merged.filter(x => x._2._2.mkString(",").length > 0).map(x => (x._1,x._2._2)).map{ case (k,v) => s"""$k,${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1.toInt,x._2.toDouble)).toDF().insertInto(table_name,false)[/mw_shl_code]/xx/xx/data.csv是我在HDFS上的数据存储路径,先在spark中构建好函数,然后用算子整理好数据后传递到自定义函数中得到结果存储到Hive中。此处得到的结果是没有乘12 * 100的,如果需要乘12 * 100则merged部分如下
[mw_shl_code=java,true]merged.map(x => (x._1,x._2._1)).map{ case (k,v) => s"""$k,${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1,x._2 * 100 * 12)).toDF().insertInto(result_table_name)[/mw_shl_code]得到的结果挑几笔跟Excel对比下以免出错。

++20200812++++++++++++++++++start

直接读取hive数据

加一段,使用shell脚本执行,直接读取hive数据,计算结果写入hive。这样执行起来更加方便。
[mw_shl_code=java,true][azkaban@exec101 di_lwx]$ cat sparkShell_command.sh

#!/usr/bin/sh

input_table=$1
result_table=${input_table}_result

cat << EOF
import scala.math._
def irr(values:String):(String,Double) = {
    val list = values.split(",")
    val id = list.head
    val fees_flatten = list.tail.sortWith(_.toDouble < _.toDouble)
    val maxIterationCount = 20;
    var x0:Double = 0.00001;
    var fValue:Double = 0.0;
    val fDerivative:Double = 0.0;
    val absoluteAccuracy:Double = 1.0E-007D;
    var i = 0;
    while (i < maxIterationCount) {
        var fValue:Double = 0.0;
        var fDerivative:Double = 0.0;
        var k = 0;
        while ( k  < fees_flatten.length) {
            var v = fees_flatten(k).toDouble
            fValue = fValue + (v / pow(1.0 + x0, k));
            fDerivative = fDerivative + (-k * (fees_flatten(k).toDouble) / pow(1.0 + x0, k + 1));
            k = k + 1;
        }
        var x1 : Double = x0 - fValue / fDerivative;
        if (abs(x1 - x0) <= absoluteAccuracy) {
            return (id,x1);
        }
        x0 = x1;
        i = i + 1;
    }
    val res : Double = 0.0;
    return (id,res);
}
sqlContext.sql("create table if not exists ${result_table}(id bigint , irr double)")
val odd = sqlContext.sql("select * from ${input_table}").rdd.map(x => (x(0).toString,x(1).toString)).filter(x => x._1.toInt % 2 == 0).map(x => (x._1.toInt,x._2.toDouble))
val even = sqlContext.sql("select * from ${input_table}").rdd.map(x => (x(0).toString,x(1).toString)).filter(x => x._1.toInt % 2 == 1).map(x => (x._1.toInt,x._2.toDouble))
val merged = odd.cogroup(even)
merged.filter(x => x._2._2.mkString(",").length > 0).map(x => (x._1,x._2._2)).map{ case (k,v) => s"""\$k,\${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1.toInt,x._2.toDouble)).toDF().insertInto("${result_table}",false)
merged.filter(x => x._2._1.mkString(",").length > 0).map(x => (x._1,x._2._1)).map{ case (k,v) => s"""\$k,\${v.mkString(",")}"""}.map( x => irr(x)).map(x => (x._1.toInt,x._2.toDouble)).toDF().insertInto("${result_table}",false)

EOF[/mw_shl_code]cat EOF中间是后来改的直接读取hive的代码。脚本调用方式如下
[mw_shl_code=bash,true]bash sparkShell_command.sh  table_name | spark-shell[/mw_shl_code]脚本的输入参数是数据表,输出的表是 数据表_result。需要注意,数据表要有两列数据(id bigint, fee double),在fee中需要包含总数-amount。
其实可以看出来,实际上还是在spark-shell中执行的。

++20200812++++++++++++++++++ end

与Pandas效率对比

以上代码,在公司数据量的基础上(不方便透露)同事写的Python脚本处理数据用2.5h,我预估了一下Python代码即便优化后执行也要接近三十分钟,而spark处理只用十秒钟。

spark代码后续用到再优化吧,以上只做记录。


文章来源:https://blog.csdn.net/weixin_39445556/article/details/107929751


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条