confusions: RDD[(Double, BinaryConfusionMatrix)]) = { // Create a bin for each distinct score value, count weighted positives and // negatives within each bin, and then sort by score values in descending order. //按照预测概率值分组聚合 //(score, BinaryLabelCounter) 每个预测值(降序排列),都统计其正负样本数,是根据label计算的 val counts = scoreLabelsWeight.combineByKey( createCombiner = (labelAndWeight: (Double, Double)) => //统计正样本数, 负样本数 newBinaryLabelCounter(0.0, 0.0) += (labelAndWeight._1, labelAndWeight._2), mergeValue = (c: BinaryLabelCounter, labelAndWeight: (Double, Double)) => c += (labelAndWeight._1, labelAndWeight._2), mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2 ).sortByKey(ascending = false) //保证了分区内和分区间都有序
// binnedCounts的数量跟numBins有关 val binnedCounts = // ==0不分箱 if (numBins == 0) { // 如果numBins==0也就是不分箱,则binnedCounts就是所有不同的预测值,这也就是为什么我们不分箱时,产生的结果非常大的原因。强烈建议分箱! counts } else {//分箱 val countsSize = counts.count()//预测值去重后的个数 // Group the iterator into chunks of about countsSize / numBins points, // so that the resulting number of bins is about numBins //每个箱子的大小 val grouping = countsSize / numBins if (grouping < 2) { // numBins was more than half of the size; no real point in down-sampling to bins logInfo(s"Curve is too small ($countsSize) for $numBins bins to be useful") counts } else { //注意到mapPartitions的调用,这里是每个分区都这么分箱,这也解释了,为什么不对参数RDD重分区时,结果集可能会超出分箱参数的原因 counts.mapPartitions { iter => if (iter.hasNext) { var score = Double.NaN var agg = newBinaryLabelCounter() var cnt = 0L iter.flatMap { pair => score = pair._1 agg += pair._2 cnt += 1 if (cnt == grouping) { /** 箱子最后一个预测值是这个箱子的最小的预测值,超过这个预测值的都被累计了 ret的第一列时该分区内的阈值,agg是对应的混淆矩阵; */ val ret = (score, agg) //清空计数器,返回混淆矩阵 agg = newBinaryLabelCounter() cnt = 0 Some(ret) } elseNone } ++ { if (cnt > 0) { Iterator.single((score, agg)) } elseIterator.empty } } elseIterator.empty } } } //计算每个分区的统计值 val agg = binnedCounts.values.mapPartitions { iter => val agg = newBinaryLabelCounter() //这里的实现其实就是正负样本累加 iter.foreach(agg += _) Iterator(agg) }.collect() //把每个分区的计数收集到一起 //计算分区间累加的统计值 //scanLeft产生了一个数组,聚合所有分区的计数,长度是分区数+1 val partitionwiseCumulativeCounts = agg.scanLeft(newBinaryLabelCounter())((agg, c) => agg.clone() += c) val totalCount = partitionwiseCumulativeCounts.last logInfo(s"Total counts: $totalCount") //part内累积:每个score先整体累加前一个part,在累加part内其他score的 val cumulativeCounts = binnedCounts.mapPartitionsWithIndex( (index: Int, iter: Iterator[(Double, BinaryLabelCounter)]) => { //先累加上一个分区的统计值, 再逐个累加本分区内所有箱子的统计值 val cumCount = partitionwiseCumulativeCounts(index) iter.map { case (score, c) => cumCount += c (score, cumCount.clone()) } }, preservesPartitioning = true) cumulativeCounts.persist() //计算混淆矩阵, 有了混淆矩阵,那些指标根据我们传的公式就可以计算指标了 val confusions = cumulativeCounts.map { case (score, cumCount) => (score, BinaryConfusionMatrixImpl(cumCount, totalCount).asInstanceOf[BinaryConfusionMatrix]) } (cumulativeCounts, confusions) }