sunshine_junge 发表于 2014-12-30 15:08:22

Mahout协同过滤框架Taste的源码分析(2)

本帖最后由 sunshine_junge 于 2014-12-30 15:16 编辑


问题导读:

1.如何使用MapReduce计算物品相似度?
2.如何使用MapReduce进行矩阵乘法?
3.如何使用Taste完成推荐?



static/image/hrline/4.gif


推荐过程

主要分成了如下几步来完成推荐
1. 输入数据预处理
2. 获取评分矩阵
3. 计算物品相似度
4. 矩阵乘法
5. 数据过滤
6. 计算推荐
测试数据

user&item12345
133320
244400
355503
444414


基于物品的推荐public final class RecommenderJob extends AbstractJob {
@Override
    public int run(String[] args) throws Exception {
      // 1. 加载参数
      //
      // input(path)
      // 包含用户偏好信息的数据文件目录, 其中文件格式为 userID, itemID[, preferencevalue]
      //
      // output(path)
      // 推荐结果的输出目录
      //
      // similarityClassname (classname)
      // 计算相似度的实现类
      //
      // usersFile (path)
      // 只计算指定用户的推荐结果
      //
      // itemsFile (path)
      // 只在指定的物品中给出推荐结果
      //
      // filterFile (path)
      // 从推荐结果中为指定的用户过滤指定的物品
      //
      // numRecommendations (integer)
      // 给每个用户推荐物品的数量
      //
      // booleanData (boolean)
      // 是否只考虑用户是否评价过物品而不考虑分值
      //
      // maxPrefsPerUser (integer)
      // 用户的最大偏好数量, 默认为10
      //
      // maxSimilaritiesPerItem (integer)
      // 每个物品最多与多少个物品计算相似度, 默认为100
      //
      // minPrefsPerUser (integer)
      // 用户的最小偏好数量, 默认为1
      //
      // maxPrefsPerUserInItemSimilarity (integer)
      // 在每个Item在计算相似度阶段对User的最大采样个数, 默认为1000
      //
      // threshold (double)
      // 计算物品相似度时的门槛

      // 2. 获取评分矩阵
      // PreparePreferenceMatrixJob
      //
      // 3. 计算物品相似度
      // RowSimilarityJob
      //
      // 4. 为矩阵乘法做准备
      // prePartialMultiply1, prePartialMultiply2, partialMultiply
      //
      // 5. 如果filterFile存在则为指定用户过滤指定物品
      // Job itemFiltering
      //
      // 6. 计算推荐
      // Job aggregateAndRecommend
    }
}


评分矩阵索引转换// Job itemIDIndex
//
// 物品ID转换成索引的MapReduce任务
// 将 userID, itemID, pref格式的数据转换为 index, itemID
//
// inputPath      用户偏好信息数据
// outputPath   tempDir/preparePreferenceMatrix/itemIDIndex
// inputFormat    TextInputFormat
// mapper         ItemIDIndexMapper
// reducer      ItemIDIndexReducer
// outputFormat   SequenceFileOutputFormat


示例数据# 输入
1,1,3
1,2,3
1,3,3
1,4,2
2,1,4
2,2,4
2,3,4
3,1,5
3,2,5
3,3,5
3,5,3
4,1,4
4,2,4
4,3,4
4,4,1
4,5,4

# 输出
1   1
2   2
3   3
4   4
5   5


mapper实现
// 解析输出文件, 假定其为long型, 转换为int型
public final class ItemIDIndexMapper extends Mapper<LongWritable, Text, VarIntWritable, VarLongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
      long itemID = Long.parseLong(tokens);
      int index = TasteHadoopUtils.idToIndex(itemID);
      context.write(new VarIntWritable(index), new VarLongWritable(itemID));
    }
}



reducer实现
// 将索引与itemID的对应关系写入到输出中
public final class ItemIDIndexReducer extends Reducer<VarIntWritable, VarLongWritable, VarIntWritable, VarLongWritable> {
    @Override
    protected void reduce(VarIntWritable index,
            Iterable<VarLongWritable> possibleItemIDs, Context context)
            throws IOException, InterruptedException {
      long minimumItemID = Long.MAX_VALUE;
      for (VarLongWritable varLongWritable : possibleItemIDs) {
            long itemID = varLongWritable.get();
            if (itemID < minimumItemID) {
                minimumItemID = itemID;
            }
      }
      if (minimumItemID != Long.MAX_VALUE) {
            context.write(index, new VarLongWritable(minimumItemID));
      }
    }
}


用户向量转换
// Job toUserVectors ~
//
// 获取每个用户对物品偏好的向量, 即由原始数据获取 userID, <itemID, pref> 格式的向量
//
// inputPath      用户偏好信息数据
// outputPath   tempDir/preparePreferenceMatrix/userVectors
// inputFormat    TextInputFormat
// mapper         ToItemPrefsMapper
// reducer      ToUserVectorsReducer
// outputFormat   SequenceFileOutputFormat


示例数据
# 输入同上
# 输出
1 : (1,3.0) (2,3.0) (3,3.0) (4,2.0)
2 : (1,4.0) (2,4.0) (3,4.0)
3 : (1,5.0) (2,5.0) (3,5.0) (5,3.0)
4 : (1,4.0) (2,4.0) (3,4.0) (4,1.0) (5,4.0)


mapper实现
public abstract class ToEntityPrefsMapper extends
      Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
      String[] tokens = DELIMITER.split(value.toString());
      long userID = Long.parseLong(tokens);
      long itemID = Long.parseLong(tokens);
      // 是否需要进行转置
      if (itemKey ^ transpose) {
            long temp = userID;
            userID = itemID;
            itemID = temp;
      }
      // 是否不考虑评分值
      if (booleanData) {
            context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
      } else {
            float prefValue = tokens.length > 2 ? Float.parseFloat(tokens) + ratingShift : 1.0f;
            context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
      }
    }
}



reducer实现
public final class ToUserVectorsReducer
      extends
      Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

    @Override
    protected void reduce(VarLongWritable userID,
            Iterable<VarLongWritable> itemPrefs, Context context)
            throws IOException, InterruptedException {
      // 将相同用户对物品的偏好信息存储到向量
      Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
      for (VarLongWritable itemPref : itemPrefs) {
            int index = TasteHadoopUtils.idToIndex(itemPref.get());
            float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref)
                  .getPrefValue() : 1.0f;
            userVector.set(index, value);
      }

      // 只有当物品数量达到最小值时才以userID为key写入到输出
      if (userVector.getNumNondefaultElements() >= minPreferences) {
            VectorWritable vw = new VectorWritable(userVector);
            vw.setWritesLaxPrecision(true);
            // 统计用户数量
            context.getCounter(Counters.USERS).increment(1);
            context.write(userID, vw);
      }
    }



统计用户数量
// 将用户数量写入到临时目录 tempDir/preparePreferenceMatrix/numUsers.bin
public static final String NUM_USERS = "numUsers.bin";
int numberOfUsers = (int) toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
HadoopUtil.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());



物品向量
// Job toUserVectors
//
// 获取每个物品与其关联用户的向量, 由userID, <itemID, pref> 转换为 itemID, <userID, pref>
//
// inputPath      tempDir/preparePreferenceMatrix/userVectors
// outputPath   tempDir/preparePreferenceMatrix/ratingMatrix
// inputFormat    TextInputFormat
// mapper         ToItemVectorsMapper
// reducer      ToItemVectorsReducer
// outputFormat   SequenceFileOutputFormat


示例数据
# 输入即上一个任务的输出

# 输出
1 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
2 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
3 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
4 : (1,2.0) (4,1.0)
5 : (3,3.0) (4,4.0)


计算相似度

向量赋权
// Job normsAndTranspose
//
// 使用指定的相似度算法对每行计算权重,
// 即由 itemID, <userID, pref> 计算得到 userID, <itemID, pref>
//
// inputPath      tempDir/preparePreferenceMatrix/ratingMatrix
// outputPath   tempDir/weights
// inputFormat    SequenceFileInputFormat
// mapper         VectorNormMapper
// reducer      MergeVectorsReducer
// outputFormat   SequenceFileOutputFormat


示例数据
# 输入 itemID, <userID, pref>
1 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
2 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
3 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
4 : (1,2.0) (4,1.0)
5 : (3,3.0) (4,4.0)

# 输出 userID, <itemID, pref>
1 : (1,3.0) (2,3.0) (3,3.0) (4,2.0)
2 : (1,4.0) (2,4.0) (3,4.0)
3 : (1,5.0) (2,5.0) (3,5.0) (5,3.0)
4 : (1,4.0) (2,4.0) (3,4.0) (4,1.0) (5,4.0)

# norms.bin
(1,66.0) (2,66.0) (3,66.0) (4,5.0) (5,25.0)


mapper实现public class RowSimilarityJob extends AbstractJob {
    public static class VectorNormMapper extends
            Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
      @Override
      protected void map(IntWritable row, VectorWritable vectorWritable,
                Context ctx) throws IOException, InterruptedException {
            // 输入文件为itemID--Vector<userID, pref>格式的item向量
            // 使用指定的相似度算法进行处理
            Vector rowVector = similarity.normalize(vectorWritable.get());

            int numNonZeroEntries = 0;
            double maxValue = Double.MIN_VALUE;


            // 遍历对某个物品存在非零偏好信息的所有用户
            Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
            while (nonZeroElements.hasNext()) {
                Vector.Element element = nonZeroElements.next();
                RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);

                // 将 itemID - <userID, pref> 变换为 userID - <itemID, pref>
                partialColumnVector.setQuick(row.get(), element.get());
                ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));

                // 记录用户数量和最高分值
                numNonZeroEntries++;
                if (maxValue < element.get()) {
                  maxValue = element.get();
                }
            }

            // 如果需要过滤最小分值则记录对当前物品评分的用户总数及最大分值
            if (threshold != NO_THRESHOLD) {
                nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
                maxValues.setQuick(row.get(), maxValue);
            }
            // 记录每个物品的norm值, 不同的相似度算法有不同的实现
            // 欧几里得距离计算的是平方和
            norms.setQuick(row.get(), similarity.norm(rowVector));

            ctx.getCounter(Counters.ROWS).increment(1);
      }
      @Override
      protected void cleanup(Context ctx) throws IOException,
                InterruptedException {
            super.cleanup(ctx);
            // 向reducer传递特殊key值, 以便于特殊处理
            ctx.write(new IntWritable(NORM_VECTOR_MARKER), new VectorWritable(norms));
            ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new VectorWritable(nonZeroEntries));
            ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new VectorWritable(maxValues));
      }
    }
}

reducer实现public class RowSimilarityJob extends AbstractJob {
    public static class MergeVectorsReducer extends
            Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
      @Override
      protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
                throws IOException, InterruptedException {

            // 按照物品进行合并
            Vector partialVector = Vectors.merge(partialVectors);

            if (row.get() == NORM_VECTOR_MARKER) {
                // 将每个物品的规范分值写入到 tempDir/norms.bin
                Vectors.write(partialVector, normsPath, ctx.getConfiguration());
            } else if (row.get() == MAXVALUE_VECTOR_MARKER) {
                // 将最大分值写入到tempDir/maxValues.bin
                Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
            } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
                // 将非零偏好的用户数量写入到 tempDir/numNonZeroEntries.bin
                Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
            } else {
                // 其它的写入到默认的输出目录 tempDir/weights
                ctx.write(row, new VectorWritable(partialVector));
            }
      }
    }
}

相似度计算// 计算物品两两间相似度矩阵
// 由 userID, <itemID, pref> 计算获得 itemA, <itemO, similarity>
//
// inputPath      tempDir/weights
// outputPath   tempDir/pairwiseSimilarity
// inputFormat    SequenceFileInputFormat
// mapper         CooccurrencesMapper
// reducer      SimilarityReducer
// outputFormat   SequenceFileOutputFormat

示例数据# 输入
1 : (1,3.0) (2,3.0) (3,3.0) (4,2.0)
2 : (1,4.0) (2,4.0) (3,4.0)
3 : (1,5.0) (2,5.0) (3,5.0) (5,3.0)
4 : (1,4.0) (2,4.0) (3,4.0) (4,1.0) (5,4.0)

# 输出
1 : (2,1.0) (3,1.0) (4,0.122828568570857) (5,0.1566130288262323)
2 : (3,1.0) (4,0.122828568570857) (5,0.1566130288262323)
3 : (4,0.122828568570857) (5,0.1566130288262323)
4 : (5,0.1757340838011157)


mapper实现public class RowSimilarityJob extends AbstractJob {
    public static class CooccurrencesMapper extends
            Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
      @Override
      protected void map(IntWritable column, VectorWritable occurrenceVector,
                Context ctx) throws IOException, InterruptedException {
            // 获取某个用户关注的所有物品, 按照索引排序
            Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
            Arrays.sort(occurrences, BY_INDEX);

            int cooccurrences = 0;
            int prunedCooccurrences = 0;
            // 计算物品两两间由相似度实现类的 aggregate 方法求得的值
            // 按照 itemA - <itemO, aggregate> 格式写入到输出
            // 使用测试数据将得到
            // user1:
            //   1 : <2,9.0> <3,9.0> <4,6.0>
            //   2 : <1,9.0> <3,9.0> <4,6.0>
            //   3 : <1,9.0> <2,9.0> <4,6.0>
            //   4 : <1,6.0> <2,6.0> <3,6.0>
            // user2:
            //   1 : <2,16.0> <3,16.0>
            //   2 : <1,16.0> <3,16.0>
            //   3 : <1,16.0> <2,16.0>
            for (int n = 0; n < occurrences.length; n++) {
                Vector.Element occurrenceA = occurrences;
                Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
                for (int m = n; m < occurrences.length; m++) {
                  Vector.Element occurrenceB = occurrences;
                  if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
                        dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
                        cooccurrences++;
                  } else {
                        prunedCooccurrences++;
                  }
                }
                ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
            }
            ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
            ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
      }
    }
}

reducer实现public class RowSimilarityJob extends AbstractJob {
    public static class SimilarityReducer extends
            Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
      @Override
      protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
                throws IOException, InterruptedException {
            // 上一步mapper过程得到 itemA - <itemO, aggregate> 格式的向量
            // 累加相同物品由不同用户计算出的分值
            // 1 : <2,(9+16+25+16)> <3,(9+16+25+16)> <4,(6+4)> <5,(15+16)>
            // 2 : <1,(9+16+25+16)> <3,(9+16+25+16)> <4,(6+4)> <5,(15+16)>
            Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
            Vector dots = partialDotsIterator.next().get();
            while (partialDotsIterator.hasNext()) {
                Vector toAdd = partialDotsIterator.next().get();
                Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
                while (nonZeroElements.hasNext()) {
                  Vector.Element nonZeroElement = nonZeroElements.next();
                  dots.setQuick(nonZeroElement.index(),
                            dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
                }
            }

            // 创建一个相同大小的向量存储两两物品间的相似度
            Vector similarities = dots.like();

            // norms 由上一步的MapReduce任务计算得出
            // 这里直接从HDFS存储路径上加载文件构建成向量
            // (1,66.0) (2,66.0) (3,66.0) (4,5.0) (5,25.0)
            double normA = norms.getQuick(row.get());

            // 使用相似度实现类的similarity方法计算两个物品的相似度
            // 参数为:由相似度实现类的 aggregate 方法得到的值, 两个物品的norm值
            // 欧几里得距离的实现为
            //
            // double euclideanDistance = Math.sqrt(normA - 2 * dots + normB);
            // return 1.0 / (1.0 + euclideanDistance);
            //
            // 1与2的相似度为 1.0 / = 1.0
            // 1与4的相似度为 1.0 / = 0.122828568570857
            Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
            while (dotsWith.hasNext()) {
                Vector.Element b = dotsWith.next();
                double similarityValue = similarity.similarity(b.get(), normA,
                        norms.getQuick(b.index()), numberOfColumns);
                if (similarityValue >= treshold) {
                  similarities.set(b.index(), similarityValue);
                }
            }
            if (excludeSelfSimilarity) {
                similarities.setQuick(row.get(), 0);
            }
            ctx.write(row, new VectorWritable(similarities));
      }
    }
}
矩阵变换// Job asMatrix

// 矩阵变换, 为每个物品查找TopN相似的其它物品
// 最后仍然是 itemA, <itemO, similarity> 格式的向量
//
// inputPath      tempDir/pairwiseSimilarity
// outputPath   tempDir/similarityMatrix
// inputFormat    SequenceFileInputFormat
// mapper         UnsymmetrifyMapper
// reducer      MergeToTopKSimilaritiesReducer
// outputFormat   SequenceFileOutputFormat
示例数据// Job asMatrix

// 矩阵变换, 为每个物品查找TopN相似的其它物品
// 最后仍然是 itemA, <itemO, similarity> 格式的向量
//
// inputPath      tempDir/pairwiseSimilarity
// outputPath   tempDir/similarityMatrix
// inputFormat    SequenceFileInputFormat
// mapper         UnsymmetrifyMapper
// reducer      MergeToTopKSimilaritiesReducer
// outputFormat   SequenceFileOutputFormat

mapper实现public class RowSimilarityJob extends AbstractJob {
    public static class UnsymmetrifyMapper extends
            Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
      @Override
      protected void map(IntWritable row,
                VectorWritable similaritiesWritable, Context ctx)
                throws IOException, InterruptedException {
            // 这里的输入格式为 itemA, <itemO, similarity>
            Vector similarities = similaritiesWritable.get();

            // 存储转置后的向量
            Vector transposedPartial = similarities.like();

            // 根据参数设置存储最多 maxSimilaritiesPerRow 个与当前物品相似的物品
            TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);

            Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
            while (nonZeroElements.hasNext()) {
                Vector.Element nonZeroElement = nonZeroElements.next();
                topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));

                // 转置向量里按照 <itemA, similarity> 格式存储
                // 再以 itemO, <itemA, similarity> 格式写入到输出
                transposedPartial.setQuick(row.get(), nonZeroElement.get());
                ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
                transposedPartial.setQuick(row.get(), 0.0);
            }

            // 将与当前物品TopN相似的物品以 itemA, <itemO, similarity> 格式写入到输出
            Vector topKSimilarities = similarities.like();
            for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
                topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
            }
            ctx.write(row, new VectorWritable(topKSimilarities));
      }
    }
}

reducer实现public class RowSimilarityJob extends AbstractJob {
    public static class MergeToTopKSimilaritiesReducer extends
            Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
      @Override
      protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
                throws IOException, InterruptedException {
            // 将mapper过程中得到的两种向量 itemO, <itemA, similarity> 与 itemA, <itemO, similarity> 按相同物品进行合并
            Vector allSimilarities = Vectors.merge(partials);
            // 再次求TopN
            Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
            // 最后输出 itemA, <itemO, similarity> 格式的向量
            ctx.write(row, new VectorWritable(topKSimilarities));
      }
    }
}

矩阵乘法准备

准备过程1// Job prePartialMultiply1

// 为每个物品添加对自身的相似度, 并将向量转换为 VectorOrPrefWritable 类型
// 最后得到 itemA, <itemO, similarity>, 即每个物品与其它物品的相似度
//
// inputPath      tempDir/similarityMatrix
// outputPath   tempDir/prePartialMultiply1
// inputFormat    SequenceFileInputFormat
// mapper         SimilarityMatrixRowWrapperMapper
// reducer      Reducer
// outputFormat   SequenceFileOutputFormat
示例数据# 输入
1 : (2,1.0) (3,1.0) (4,0.122828568570857) (5,0.1566130288262323)
2 : (1,1.0) (3,1.0) (4,0.122828568570857) (5,0.1566130288262323)
3 : (1,1.0) (2,1.0) (4,0.122828568570857) (5,0.1566130288262323)
4 : (1,0.122828568570857) (2,0.122828568570857) (3,0.122828568570857) (5,0.1757340838011157)
5 : (1,0.1566130288262323) (2,0.1566130288262323) (3,0.1566130288262323) (4,0.1757340838011157)

# 输出
1 : (1,NaN) (2,1.0) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
2 : (1,1.0) (2,NaN) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
3 : (1,1.0) (2,1.0) (3,NaN) (4,0.12282856553792953) (5,0.15661302208900452)
4 : (1,0.12282856553792953) (2,0.12282856553792953) (3,0.12282856553792953) (4,NaN) (5,0.17573408782482147)
5 : (1,0.15661302208900452) (2,0.15661302208900452) (3,0.15661302208900452) (4,0.17573408782482147) (5,NaN)

mapper实现public final class SimilarityMatrixRowWrapperMapper
      extends
      Mapper<IntWritable, VectorWritable, VarIntWritable, VectorOrPrefWritable> {
    @Override
    protected void map(IntWritable key, VectorWritable value, Context context)
            throws IOException, InterruptedException {
      // 这里的输入格式为 itemA, <itemO, similarity>
      Vector similarityMatrixRow = value.get();

      // 将当前物品与自己的相似度设置 Double.NaN
      similarityMatrixRow.set(key.get(), Double.NaN);

      // 将向量转换为 VectorOrPrefWritable 类型
      context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));
    }
}

reducer实现// org.apache.hadoop.mapreduce.Reducer
// 使用hadoop默认的reducer实现, 即仅仅合并相同的key
// 但在mapper过程中的输入即已是合并完成的向量
// 所以这里的reducer过程并无实际意义

准备过程2// Job prePartialMultiply2
//
// 把用户偏好信息变换为以物品为key的向量
// 即由 userID, <itemID, pref> 变换为 itemID, <userID, pref>
// 最后得到每个用户对同一个物品的偏好
//
// inputPath      tempDir/preparePreferenceMatrix/userVectors
// outputPath   tempDir/prePartialMultiply2
// inputFormat    SequenceFileInputFormat
// mapper         UserVectorSplitterMapper
// reducer      Reducer
// outputFormat   SequenceFileOutputFormat
示例数据# 输入
1 : (1,3.0) (2,3.0) (3,3.0) (4,2.0)
2 : (1,4.0) (2,4.0) (3,4.0)
3 : (1,5.0) (2,5.0) (3,5.0) (5,3.0)
4 : (1,4.0) (2,4.0) (3,4.0) (4,1.0) (5,4.0)

# 输出
1 : (1,3.0)
1 : (2,4.0)
1 : (3,5.0)
1 : (4,4.0)
2 : (1,3.0)
2 : (2,4.0)
2 : (3,5.0)
2 : (4,4.0)
3 : (1,3.0)
3 : (2,4.0)
3 : (3,5.0)
3 : (4,4.0)
4 : (1,2.0)
4 : (4,1.0)
5 : (3,3.0)
5 : (4,4.0)
mapper实现public final class UserVectorSplitterMapper
      extends
      Mapper<VarLongWritable, VectorWritable, VarIntWritable, VectorOrPrefWritable> {
    @Override
    protected void map(VarLongWritable key, VectorWritable value,
            Context context) throws IOException, InterruptedException {
      long userID = key.get();

      // 是否过滤用户
      if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
            return;
      }
      Vector userVector = maybePruneUserVector(value.get());

      // 获取每个用户对物品的偏好信息
      // 将 userID, <itemID, pref> 变换为 itemID, <userID, pref>
      Iterator<Vector.Element> it = userVector.iterateNonZero();
      VarIntWritable itemIndexWritable = new VarIntWritable();
      VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
      while (it.hasNext()) {
            Vector.Element e = it.next();
            itemIndexWritable.set(e.index());
            vectorOrPref.set(userID, (float) e.get());
            context.write(itemIndexWritable, vectorOrPref);
      }
    }
}
reducer实现// org.apache.hadoop.mapreduce.Reducer
// 使用hadoop默认的reducer实现, 即仅仅合并相同的key
数据拼接// Job partialMultiply

// 根据用户偏好以及物品相似度获得
// itemA, <<itemO, similarity>, userIDs, prefValues> 格式的输出

// inputPath      tempDir/prePartialMultiply1, tempDir/prePartialMultiply2
// outputPath   tempDir/partialMultiply
// inputFormat    SequenceFileInputFormat
// mapper         Mapper
// reducer      ToVectorAndPrefReducer
// outputFormat   SequenceFileOutputFormat</span>
示例数据<span style="font-weight: normal;"># 输入
# prePartialMultiply1
1 : (1,NaN) (2,1.0) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
2 : (1,1.0) (2,NaN) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
3 : (1,1.0) (2,1.0) (3,NaN) (4,0.12282856553792953) (5,0.15661302208900452)
4 : (1,0.12282856553792953) (2,0.12282856553792953) (3,0.12282856553792953) (4,NaN) (5,0.17573408782482147)
5 : (1,0.15661302208900452) (2,0.15661302208900452) (3,0.15661302208900452) (4,0.17573408782482147) (5,NaN)

# prePartialMultiply2
1 : (1,3.0)
1 : (2,4.0)
1 : (3,5.0)
1 : (4,4.0)
2 : (1,3.0)
2 : (2,4.0)
2 : (3,5.0)
2 : (4,4.0)
3 : (1,3.0)
3 : (2,4.0)
3 : (3,5.0)
3 : (4,4.0)
4 : (1,2.0)
4 : (4,1.0)
5 : (3,3.0)
5 : (4,4.0)

# 输出
1 :
<1,3.0> <2,4.0> <4,4.0> <3,5.0>
(1,NaN) (2,1.0) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
2 :
<1,3.0> <2,4.0> <4,4.0> <3,5.0>
(1,1.0) (2,NaN) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
3 :
<1,3.0> <2,4.0> <4,4.0> <3,5.0>
(1,1.0) (2,1.0) (3,NaN) (4,0.12282856553792953) (5,0.15661302208900452)
4 :
<1,2.0> <4,1.0>
(1,0.12282856553792953) (2,0.12282856553792953) (3,0.12282856553792953) (4,NaN) (5,0.17573408782482147)
5 :
<3,3.0> <4,4.0>
(1,0.15661302208900452) (2,0.15661302208900452) (3,0.15661302208900452) (4,0.17573408782482147) (5,NaN)

mapper实现// org.apache.hadoop.mapreduce.Mapper
// 使用hadoop默认的mapper实现, 即仅仅读取输入

reducer实现public final class ToVectorAndPrefReducer
      extends
      Reducer<VarIntWritable, VectorOrPrefWritable, VarIntWritable, VectorAndPrefsWritable> {
    @Override
    protected void reduce(VarIntWritable key, Iterable<VectorOrPrefWritable> values, Context context)
            throws IOException, InterruptedException {

      List<Long> userIDs = Lists.newArrayList();
      List<Float> prefValues = Lists.newArrayList();
      Vector similarityMatrixColumn = null;

      // 从输入中分别读取所有用户对当前物品的打分以及所有其它物品与当前物品的相似度
      for (VectorOrPrefWritable value : values) {
            if (value.getVector() == null) {
                userIDs.add(value.getUserID());
                prefValues.add(value.getValue());
            } else {
                if (similarityMatrixColumn != null) {
                  throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
                }
                similarityMatrixColumn = value.getVector();
            }
      }

      if (similarityMatrixColumn == null) {
            return;
      }
      // 以 itemA, <<itemO, similarity>, userIDs, prefValues> 格式写入到输出
      VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
      context.write(key, vectorAndPrefs);
    }
}<b></span></b>

进行推荐物品过滤
// 如果启动RecommenderJob时设置了 filterFile 则执行此任务
// 最后将得到一个黑名单, 用于在推荐过程进行过滤
//
// inputPath      存储被过滤物品的数据目录
// outputPath   tempDir/explicitFilterPath
// inputFormat    TextInputFormat
// mapper         ItemFilterMapper
// reducer      ItemFilterAsVectorAndPrefsReducer
// outputFormat   SequenceFileOutputFormat

mapper实现public class ItemFilterMapper extends
      Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {
    @Override
    protected void map(LongWritable key, Text line, Context ctx)
            throws IOException, InterruptedException {

      // 仅仅读取了要为哪些用户过滤哪些物品
      // 因为做矩阵乘法准备的过程中生成的数据都以物品为key
      // 因此这里的输出为 itemID, userID
      String[] tokens = SEPARATOR.split(line.toString());
      long userID = Long.parseLong(tokens);
      long itemID = Long.parseLong(tokens);
      ctx.write(new VarLongWritable(itemID), new VarLongWritable(userID));
    }
}

reducer实现public class ItemFilterAsVectorAndPrefsReducer extends
      Reducer<VarLongWritable, VarLongWritable, VarIntWritable, VectorAndPrefsWritable> {
    @Override
    protected void reduce(VarLongWritable itemID,
            Iterable<VarLongWritable> values, Context ctx) throws IOException,
            InterruptedException {

      // 索引转换
      int itemIDIndex = TasteHadoopUtils.idToIndex(itemID.get());

      // 将被过滤物品的相似度设置为Double.NaN, 以便于识别
      Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
      vector.set(itemIDIndex, Double.NaN);

      List<Long> userIDs = Lists.newArrayList();
      List<Float> prefValues = Lists.newArrayList();

      // 将用户对被过滤物品的喜欢统一设置为1.0
      for (VarLongWritable userID : values) {
            userIDs.add(userID.get());
            prefValues.add(1.0f);
      }

      ctx.write(new VarIntWritable(itemIDIndex), new VectorAndPrefsWritable(vector, userIDs, prefValues));
    }
}

统计和推荐// Job aggregateAndRecommend
//
// 根据用户对每个物品的偏好信息进行计算
// 最后返回为每个用户给出TopN个推荐
//
// inputPath      tempDir/partialMultiply, tempDir/explicitFilterPath
// outputPath   推荐结果的输出目录
// inputFormat    SequenceFileInputFormat
// mapper         PartialMultiplyMapper
// reducer      AggregateAndRecommendReducer
// outputFormat   TextOutputFormat

示例数据# 输入
1 :
<1,3.0> <2,4.0> <4,4.0> <3,5.0>
(1,NaN) (2,1.0) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
2 :
<1,3.0> <2,4.0> <4,4.0> <3,5.0>
(1,1.0) (2,NaN) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
3 :
<1,3.0> <2,4.0> <4,4.0> <3,5.0>
(1,1.0) (2,1.0) (3,NaN) (4,0.12282856553792953) (5,0.15661302208900452)
4 :
<1,2.0> <4,1.0>
(1,0.12282856553792953) (2,0.12282856553792953) (3,0.12282856553792953) (4,NaN) (5,0.17573408782482147)
5 :
<3,3.0> <4,4.0>
(1,0.15661302208900452) (2,0.15661302208900452) (3,0.15661302208900452) (4,0.17573408782482147) (5,NaN)

# 输出
1   
2   
3   
mapper实现public final class PartialMultiplyMapper extends
      Mapper<VarIntWritable, VectorAndPrefsWritable, VarLongWritable, PrefAndSimilarityColumnWritable> {

    @Override
    protected void map(VarIntWritable key,
            VectorAndPrefsWritable vectorAndPrefsWritable, Context context)
            throws IOException, InterruptedException {

      // 从输入中分别读取当前物品的相似度向量, 即 itemA, <itemO, similarity>
      // 以及所有用户对当前物品的偏好信息, 即 <userID, pref> , 这里将其分别存储在两个列表中
      Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
      List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
      List<Float> prefValues = vectorAndPrefsWritable.getValues();

      VarLongWritable userIDWritable = new VarLongWritable();
      PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();

      // 遍历用户, 输出 userID, <pref, <itemO, similarity>>
      // 比如
      // 5 : <3,3.0> <4,4.0>
      // (1,0.15661302208900452)
      // (2,0.15661302208900452)
      // (3,0.15661302208900452)
      // (4,0.17573408782482147)
      // (5,NaN)
      // 变换为
      // 3 : <3.0,
      // (1,0.15661302208900452)
      // (2,0.15661302208900452)
      // (3,0.15661302208900452)
      // (4,0.17573408782482147)>
      // 4 : <4.0,
      // (1,0.15661302208900452)
      // (2,0.15661302208900452)
      // (3,0.15661302208900452)
      // (4,0.17573408782482147)>
      for (int i = 0; i < userIDs.size(); i++) {
            long userID = userIDs.get(i);
            float prefValue = prefValues.get(i);
            if (!Float.isNaN(prefValue)) {
                prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
                userIDWritable.set(userID);
                context.write(userIDWritable, prefAndSimilarityColumn);
            }
      }
    }
}
reducer实现
public final class AggregateAndRecommendReducer
      extends
      Reducer<VarLongWritable, PrefAndSimilarityColumnWritable, VarLongWritable, RecommendedItemsWritable> {

    @Override
    protected void reduce(VarLongWritable userID, Iterable<PrefAndSimilarityColumnWritable> values, Context context)
            throws IOException, InterruptedException {
      // 针对任务运行参数分别处理
      if (booleanData) {
            reduceBooleanData(userID, values, context);
      } else {
            reduceNonBooleanData(userID, values, context);
      }
    }
    private void reduceNonBooleanData(VarLongWritable userID,
            Iterable<PrefAndSimilarityColumnWritable> values, Context context)
            throws IOException, InterruptedException {

      Vector numerators = null;

      Vector denominators = null;

      Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);

      // 遍历用户有过偏好信息的所有物品
      // 2 <4.0, (2,1.0) (3,1.0)
      //   (4,0.12282856553792953) (5,0.15661302208900452)>
      // 2 <4.0, (1,1.0) (3,1.0)
      //   (4,0.12282856553792953) (5,0.15661302208900452)>
      // 2 <4.0, (1,1.0) (2,1.0)
      //   (4,0.12282856553792953) (5,0.15661302208900452)>

      for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
            Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
            float prefValue = prefAndSimilarityColumn.getPrefValue();

            // 为每一个需要给出预测分值的物品记录相似物品的打分次数
            Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();
            while (usedItemsIterator.hasNext()) {
                int itemIDIndex = usedItemsIterator.next().index();
                numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
            }
            // 计算加权值所需的分子
            // 构成由 similarity * pref 组成的新向量
            // <4, 0.12282856553792953 * 4.0 +
            //   0.12282856553792953 * 4.0 +
            //   0.12282856553792953 * 4.0>
            // <5, 0.15661302208900452 * 4.0 +
            //   0.15661302208900452 * 4.0 +
            //   0.15661302208900452 * 4.0>
            numerators = numerators == null ?
                prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)
                : numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn






引用:http://matrix-lisp.github.io/blog/2013/12/26/mahout-taste-source-2/


songyl525 发表于 2014-12-30 15:23:11

写的很详细,我没有细看,思路就是这样的
页: [1]
查看完整版本: Mahout协同过滤框架Taste的源码分析(2)