spark 机器学习流水线 pineline所组合的转换器的入参出参问题

  • Post author:
  • Post category:其他




背景

机器学习中我们几乎肯定会使用pineline来组合转换器和预估器的,然后在使用pineline来进行fit操作进行机器学习,那么问题就来了。我们pineline组合的这些转化器,也就是比如StringIndexer,OheEncoder, VectorAssemble的时候出入参是怎么样的呢?StringIndexer参数的输出肯定是要作为OheEncoder的输入吗?同理OheEncoder的输出肯定要是VectorAssemble的输入吗? 本文就来回答这个问题,顺带着学习下如何保存和恢复训练好的pineline模型



技术验证

我们这里使用spark 快速大数据分析中的大部分代码作为例子,只是之前他是运行在了spark3.0上。我这里的代码是运行在了spark2.3版本上,先来看一下完整的代码:

from pyspark import SparkConf
from pyspark.sql import SparkSession
import traceback
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.regression import IsotonicRegression

appname = "test"  # 任务名称
master = "local"  # 单机模式设置
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
# spark_driver_host = "10.0.0.248"

try:
    # conf = SparkConf().setAppName(appname).setMaster(master).set("spark.driver.host", spark_driver_host) # 集群
    conf = SparkConf().setAppName(appname).setMaster(master)  # 本地
    spark = SparkSession.builder.config(conf=conf).getOrCreate()

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.functions import udf
    from pyspark.sql.types import *

    spark = SparkSession.builder.appName('basemodel').master('local').getOrCreate()

    from pyspark.sql.functions import avg, lit

    datapath = '''D://spark/spark/spark-2.3.0-bin-hadoop2.7/data/mllib/''';
    filePath = datapath + "databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet"
    airbnbDF = spark.read.parquet(filePath)
    (trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

    from pyspark.ml.feature import OneHotEncoder, StringIndexer, OneHotEncoderEstimator

    print(trainDF.dtypes)
    categoricalCols = [field for (field, dataType) in trainDF.dtypes
                       if dataType == "string"]
    indexOutputCols = [x + "Index" for x in categoricalCols]
    oheOutputCols = [x + "OHE" for x in categoricalCols]
    # ['host_is_superhostIndex', 'cancellation_policyIndex', 'instant_bookableIndex', 'neighbourhood_cleansedIndex',
    #  'property_typeIndex', 'room_typeIndex', 'bed_typeIndex']
    host_is_superhost_stringIndexer = StringIndexer(inputCol='host_is_superhost',
                                                    outputCol='host_is_superhostIndex',
                                                    handleInvalid="skip")

    cancellation_policy_stringIndexer = StringIndexer(inputCol='cancellation_policy',
                                                      outputCol='cancellation_policyIndex',
                                                      handleInvalid="skip")

    instant_bookable_stringIndexer = StringIndexer(inputCol='instant_bookable',
                                                   outputCol='instant_bookableIndex',
                                                   handleInvalid="skip")

    neighbourhood_cleansed_stringIndexer = StringIndexer(inputCol='neighbourhood_cleansed',
                                                         outputCol='neighbourhood_cleansedIndex',
                                                         handleInvalid="skip")

    property_type_stringIndexer = StringIndexer(inputCol='property_type',
                                                outputCol='property_typeIndex',
                                                handleInvalid="skip")

    room_type_stringIndexer = StringIndexer(inputCol='room_type',
                                            outputCol='room_typeIndex',
                                            handleInvalid="skip")

    bed_type_stringIndexer = StringIndexer(inputCol='bed_type',
                                           outputCol='bed_typeIndex',
                                           handleInvalid="skip")

    oheEncoder = OneHotEncoderEstimator(inputCols=indexOutputCols,
                                        outputCols=oheOutputCols)

    from pyspark.ml.feature import VectorAssembler

    numericCols = [field for (field, dataType) in trainDF.dtypes
                   if ((dataType == "double") & (field != "price"))]
    assemblerInputs = oheOutputCols + numericCols
    vecAssembler = VectorAssembler(inputCols=assemblerInputs,
                                   outputCol="features")

    from pyspark.ml.regression import LinearRegression

    lr = LinearRegression(labelCol="price", featuresCol="features")

    from pyspark.ml import Pipeline

    stages = [host_is_superhost_stringIndexer, cancellation_policy_stringIndexer, instant_bookable_stringIndexer,
              neighbourhood_cleansed_stringIndexer,
              property_type_stringIndexer, room_type_stringIndexer,
              bed_type_stringIndexer, oheEncoder, vecAssembler, lr]
    pipeline = Pipeline(stages=stages)

    pipelineModel = pipeline.fit(trainDF)
    predDF = pipelineModel.transform(testDF)
    predDF.select("features", "price", "prediction").show(5)

    from pyspark.ml.evaluation import RegressionEvaluator

    regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")

    rmse = regressionEvaluator.evaluate(predDF)
    print(f"RMSE is {rmse}")

    r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
    print(f"R2 is {r2}")

    datapath = '''D://spark/spark/spark-2.3.0-bin-hadoop2.7/data/mllib/''';
    pipelinePath = datapath + "databricks-datasets/sf-airbnb/lr-pipeline-model"
    pipelineModel.write().overwrite().save(pipelinePath)

    spark.stop()
    print('计算成功!')
except:
    traceback.print_exc()  # 返回出错信息
    print('连接出错!')

这里和我们讨论的最相关的代码片段是:

    stages = [host_is_superhost_stringIndexer, cancellation_policy_stringIndexer, instant_bookable_stringIndexer,
              neighbourhood_cleansed_stringIndexer,
              property_type_stringIndexer, room_type_stringIndexer,
              bed_type_stringIndexer, oheEncoder, vecAssembler, lr]



结论

pineline流水线组合了上面的这些StringIndexer,OheEncoder和VecAssembler,其中值得注意的是OheEncoder的输入是由多个StringIndexer的输出来组成的,并且OheEncoder的输出加上了numericCols列才组成了VecAssembler的输入列。

所以这里我们可以知道流水线中的这些转化器的输入输出并不一定要一环扣一环,也就是说中途的过程中一环的输出可以组合上其他的列或者表达式,然后再作为另一个转化器的输入列.



关于机器学习模型的保存和加载

我们先看下如何加载保存好的机器学习模型

from pyspark import SparkConf
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
import traceback
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.regression import IsotonicRegression


appname = "test"  # 任务名称
master = "local"  # 单机模式设置
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
# spark_driver_host = "10.0.0.248"

try:
    # conf = SparkConf().setAppName(appname).setMaster(master).set("spark.driver.host", spark_driver_host) # 集群
    conf = SparkConf().setAppName(appname).setMaster(master)  # 本地
    spark = SparkSession.builder.config(conf=conf).getOrCreate()

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.functions import udf
    from pyspark.sql.types import *

    spark = SparkSession.builder.appName('basemodel').master('local').getOrCreate()

    from pyspark.sql.functions import avg, lit
    datapath = '''D://spark/spark/spark-2.3.0-bin-hadoop2.7/data/mllib/''';
    filePath = datapath + "databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet"
    airbnbDF = spark.read.parquet(filePath)
    (trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

    pipelinePath = datapath + "databricks-datasets/sf-airbnb/lr-pipeline-model"
    from pyspark.ml import PipelineModel

    savedPipelineModel = PipelineModel.load(pipelinePath)

    predDF = savedPipelineModel.transform(testDF)
    predDF.select("features", "price", "prediction").show(5)
    predDF.show(5)
    coefficient = savedPipelineModel.stages[-1].coefficients[0]
    intercept = savedPipelineModel.stages[-1].intercept
    print(coefficient)
    print(intercept)
    regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")

    rmse = regressionEvaluator.evaluate(predDF)
    print(f"RMSE is {rmse}")

    r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
    print(f"R2 is {r2}")
    spark.stop()
    print('计算成功!')
except:
    traceback.print_exc()  # 返回出错信息
    print('连接出错!')


从这段代码中,

savedPipelineModel = PipelineModel.load(pipelinePath)

我们可以知道在加载机器学习模型时是需要指定Model的,比如是这里的PineLineModel,或者LinearRegressionModel等,为了保持模型加载的一致性,我们保持的时候最后也是使用PineLineMode去保持模型,这样我们就可以使用统一的加载代码加载模型文件了



版权声明:本文为lixia0417mul2原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。