Spark是一个极为优秀的大数据框架,在大数据批处理上基本无人能敌,流处理上也有一席之地,机器学习则是当前正火热AI人工智能的驱动引擎,在大数据场景下如何发挥AI技术成为优秀的大数据挖掘工程师必备技能。本文结合机器学习思想与Spark框架代码结构来实现分布式机器学习过程,希望与大家一起学习进步~
目录
本文采用的组件版本为:
Ubuntu 19.10、Jdk 1.8.0_241、Scala 2.11.12、Hadoop 3.2.1、Spark 2.4.5
,老规矩先开启一系列Hadoop、Spark服务与Spark-shell窗口:
集成方法是一种学习算法,它创建由一组其他基础模型组成的模型。spark.mllib支持两种主要的集成算法:GradientBoostedTrees和RandomForest。两者都使用决策树作为其基础模型。
1.
随机森林概念
随机森林是决策树的集合。随机森林是用于分类和回归的最成功的机器学习模型之一。他们结合了许多决策树,以减少过度拟合的风险。像决策树一样,随机森林处理分类特征,扩展到多类分类设置,不需要特征缩放,并且能够捕获非线性和特征交互。
spark.mllib支持使用连续和分类功能对二进制和多类分类以及进行回归的随机森林。spark.mllib使用现有的决策树实现来实现随机森林。请参阅决策树指南以获取有关树的更多信息。
通过讨论各种参数,我们包括一些使用随机森林的准则。由于决策树指南中介绍了一些决策树参数,因此我们将其省略。
2
随机森林参数
我们提到的前两个参数是最重要的,对其进行调整通常可以提高性能:
-
numTrees:
森林中的树木数量。
增加树的数量将减少预测的方差,从而提高模型的测试时间准确性。训练时间在树数量上大致呈线性增加。
-
maxDepth:
森林中每棵树的最大深度。
深度的增加使模型更具表现力和功能。但是,深树需要更长的训练时间,也更容易过度拟合。通常,使用随机森林比使用单个决策树训练更深的树是可以接受的。与随机森林相比,一棵树更可能过度拟合(由于对森林中的多棵树进行平均而减少了方差)。
接下来的两个参数通常不需要调整。但是,可以对其进行调整以加快培训速度。
-
subsamplingRate:
此参数指定用于训练森林中每棵树的数据集的大小,作为原始数据集大小的一部分。
建议使用默认值(1.0),但降低此比例可以加快训练速度。
-
featureSubsetStrategy:
用作在每个树节点处分割的候选要素的数量。
该数字指定为特征总数的分数或函数。
减少此数字将加快训练速度,但是如果太低,有时会影响性能。
3.
随机森林实例
下面的示例演示如何加载LIBSVM数据文件,将其解析为LabeledPoint的RDD,然后使用随机森林进行分类。计算测试误差以测量算法准确性。
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
// 加载和解析数据文件
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
//将数据分为训练集和测试集(保留30%进行测试)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// 训练一个随机森林模型
// 空的categoricalFeaturesInfo表示所有要素都是连续的。
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // 在实际中使用最多
val featureSubsetStrategy = "auto" // 让模型自行选择
val impurity = "gini"
val maxDepth = 4
val maxBins = 32
val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// 在测试集上评估模型并计算误差
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification forest model:\n ${model.toDebugString}")
// 保存和加载模型
model.save(sc, "target/tmp/myRandomForestClassificationModel")
val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")
回归实例限于篇幅就不做全部展示,需要注意与分类不同的是评估准则:
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
4.
随机森林源码
在利用随机森林进行预测时,调用的predict方法扩展自TreeEnsembleModel,它是树结构组合模型的表示,其核心代码如下所示:
//不同的策略采用不同的预测方法
def predict(features: Vector): Double = {
(algo, combiningStrategy) match {
case (Regression, Sum) =>
predictBySumming(features)
case (Regression, Average) =>
predictBySumming(features) / sumWeights
case (Classification, Sum) => // binary classification
val prediction = predictBySumming(features)
// TODO: predicted labels are +1 or -1 for GBT. Need a better way to store this info.
if (prediction > 0.0) 1.0 else 0.0
case (Classification, Vote) =>
predictByVoting(features)
case _ =>
throw new IllegalArgumentException()
}
}
private def predictBySumming(features: Vector): Double = {
val treePredictions = trees.map(_.predict(features))
//两个向量的内集
blas.ddot(numTrees, treePredictions, 1, treeWeights, 1)
}
//通过投票选举
private def predictByVoting(features: Vector): Double = {
val votes = mutable.Map.empty[Int, Double]
trees.view.zip(treeWeights).foreach { case (tree, weight) =>
val prediction = tree.predict(features).toInt
votes(prediction) = votes.getOrElse(prediction, 0.0) + weight
}
votes.maxBy(_._2)._1
}
5.
梯度提升树概念
梯度提升树(GBT)是决策树的集合。GBT迭代地训练决策树,以最小化损失函数。像决策树一样,GBT处理分类特征,扩展到多类分类设置,不需要特征缩放,并且能够捕获非线性和特征相互作用。
spark.mllib使用连续和分类功能支持GBT用于二进制分类和回归。spark.mllib使用现有的决策树实现来实现GBT。请参阅决策树指南以获取有关树的更多信息。
注意:GBT尚不支持多类分类。对于多类问题,请使用决策树或随机森林。
下表列出了spark.mllib中GBT当前支持的损失。请注意,每种损失都适用于分类或回归之一,但不能同时适用于两者。
表示法:N =实例数。yi =实例i的标签。xi =实例i的特征。F(xi)=模型的预测标签,例如i。
6.
GBT参数
通过讨论各种参数,我们包括一些使用GBT的准则。由于决策树指南中介绍了一些决策树参数,因此我们将其省略。
-
loss:有关损失及其对任务的适用性(分类与回归)的信息,请参见上面的部分。根据数据集的不同,不同的损失可能会产生明显不同的结果。
-
numIterations:设置集合中树木的数量。每次迭代都会生成一棵树。增加此数字可使模型更具表现力,从而提高训练数据的准确性。但是,如果测试时间精度太大,则可能会降低测试时间精度。
-
learningRate:不需要调整此参数。如果算法行为似乎不稳定,则减小该值可以提高稳定性。
-
算法:使用树[Strategy]参数设置算法或任务(分类与回归)。
7.
GBT实例
下面的示例演示如何加载LIBSVM数据文件,将其解析为LabeledPoint的RDD,然后使用带有日志丢失的梯度增强树进行分类。计算测试误差以测量算法准确性。
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// 训练一个GBT模型
// 默认使用对数损失函数
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 3 // 在实际中使用更多的迭代次数
boostingStrategy.treeStrategy.numClasses = 2
boostingStrategy.treeStrategy.maxDepth = 5
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification GBT model:\n ${model.toDebugString}")
model.save(sc, "target/tmp/myGradientBoostingClassificationModel")
val sameModel = GradientBoostedTreesModel.load(sc, "target/tmp/myGradientBoostingClassificationModel")
同理,回归实例限于篇幅就不做全部展示,需要注意与分类不同的是评估准则更变为MSE:
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
8.
GBT源码分析
利用梯度提升树进行预测时,调用的predict方法扩展自TreeEnsembleModel,它是树结构组合模型的表示,其核心代码如下所示:
//不同的策略采用不同的预测方法
def predict(features: Vector): Double = {
(algo, combiningStrategy) match {
case (Regression, Sum) =>
predictBySumming(features)
case (Regression, Average) =>
predictBySumming(features) / sumWeights
//用于梯度提升树,转换为1 或者 0
case (Classification, Sum) => // binary classification
val prediction = predictBySumming(features)
// TODO: predicted labels are +1 or -1 for GBT. Need a better way to store this info.
if (prediction > 0.0) 1.0 else 0.0
case (Classification, Vote) =>
predictByVoting(features)
case _ =>
throw new IllegalArgumentException()
}
}
private def predictBySumming(features: Vector): Double = {
val treePredictions = trees.map(_.predict(features))
//两个向量的内集
blas.ddot(numTrees, treePredictions, 1, treeWeights, 1)
}
Spark集成树的内容至此结束,有关Spark的基础文章可参考前文:
参考链接:
http://spark.apache.org/docs/latest/mllib-ensembles.html
https://github.com/endymecy/spark-ml-source-analysis
历史推荐
数据分析与挖掘
数据结构与算法
机器学习与大数据组件
欢迎关注,感谢“在看”,随缘稀罕~