批量进行One-hot-encoder且进行特征字段拼接,并完成模型训练demo

  • Post author:
  • Post category:其他



import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder}
import org.apache.spark.ml.feature.VectorAssembler
import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoostClassificationModel}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.PipelineModel



val data = (spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("/Affairs.csv"))

data.createOrReplaceTempView("res1")

val affairs = "case when affairs>0 then 1 else 0 end as affairs,"
val df = (spark.sql("select " + affairs +
  "gender,age,yearsmarried,children,religiousness,education,occupation,rating" +
  " from res1 "))
  
  
val categoricals = df.dtypes.filter(_._2 == "StringType") map (_._1)

val indexers = categoricals.map(
  c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx")
)

val encoders = categoricals.map(
  c => new OneHotEncoder().setInputCol(s"${c}_idx").setOutputCol(s"${c}_enc").setDropLast(false)
)
  
  
val colArray_enc = categoricals.map(x => x + "_enc")
val colArray_numeric = df.dtypes.filter(_._2 != "StringType") map (_._1)

val final_colArray = (colArray_numeric ++ colArray_enc).filter(!_.contains("affairs"))

val vectorAssembler = new VectorAssembler().setInputCols(final_colArray).setOutputCol("features")

/*
val pipeline = new Pipeline().setStages(indexers ++ encoders ++ Array(vectorAssembler))
pipeline.fit(df).transform(df)
*/



///
// Create an XGBoost Classifier 
val xgb = new XGBoostEstimator(Map("num_class" -> 2, "num_rounds" -> 5, "objective" -> "binary:logistic", "booster" -> "gbtree")).setLabelCol("affairs").setFeaturesCol("features")


// XGBoost paramater grid
val xgbParamGrid = (new ParamGridBuilder()
   .addGrid(xgb.round, Array(10))
   .addGrid(xgb.maxDepth, Array(10,20))
   .addGrid(xgb.minChildWeight, Array(0.1))
   .addGrid(xgb.gamma, Array(0.1))
   .addGrid(xgb.subSample, Array(0.8))
   .addGrid(xgb.colSampleByTree, Array(0.90))
   .addGrid(xgb.alpha, Array(0.0))
   .addGrid(xgb.lambda, Array(0.6))
   .addGrid(xgb.scalePosWeight, Array(0.1))
   .addGrid(xgb.eta, Array(0.4))
   .addGrid(xgb.boosterType, Array("gbtree"))
   .addGrid(xgb.objective, Array("binary:logistic")) 
   .build())

// Create the XGBoost pipeline
val pipeline = new Pipeline().setStages(indexers ++ encoders ++ Array(vectorAssembler, xgb))


// Setup the binary classifier evaluator
val evaluator = (new BinaryClassificationEvaluator()
   .setLabelCol("affairs")
   .setRawPredictionCol("prediction")
   .setMetricName("areaUnderROC"))


// Create the Cross Validation pipeline, using XGBoost as the estimator, the
// Binary Classification evaluator, and xgbParamGrid for hyperparameters
val cv = (new CrossValidator()
   .setEstimator(pipeline)
   .setEvaluator(evaluator)
   .setEstimatorParamMaps(xgbParamGrid)
   .setNumFolds(3)
   .setSeed(0))


 // Create the model by fitting the training data
val xgbModel = cv.fit(df)

 // Test the data by scoring the model
val results = xgbModel.transform(df)



// Print out a copy of the parameters used by XGBoost, attention pipeline
(xgbModel.bestModel.asInstanceOf[PipelineModel]
  .stages(5).asInstanceOf[XGBoostClassificationModel]
  .extractParamMap().toSeq.foreach(println))

results.select("affairs","prediction").show

println("---Confusion Matrix------")
results.stat.crosstab("affairs","prediction").show()


// What was the overall accuracy of the model, using AUC
val auc = evaluator.evaluate(results)
println("----AUC--------")
println("auc="+auc)


转载于:https://my.oschina.net/kyo4321/blog/2050706