0103 spark应用提交时加载本地配置文件及解析方式

  • Post author:
  • Post category:其他


在spark应用提交到集群时,如果我们需要加载本地(驱动节点)的配置文件时,如:

...
spark-submit \
 -class ...
 -jars ...
 -jar ...
 file:///yourLocalFilePath/conf.properties

在解析该配置文件时,通常我们将其处理为HashMap形式,通过key-value方式实现便捷的配置参数读取。具体实现方式如下:

package com.gendlee1991.utils


import java.io.{FileInputStream, FileNotFoundException, InputStreamReader}
import java.util.Properties

import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path}

import scala.collection.mutable.HashMap

class BigDataConf(propertiesPath: String) extends Serializable
{

    val log = LogFactory.getLog(classOf[BigDataConf])

    private val settings = new HashMap[String, String]()
    load(propertiesPath)

    def load(propertiesPath: String)
    {
        loadProPertiesFile(propertiesPath)
    }

    def loadProPertiesFile(propertiesPath: String)
    {
        var in: FSDataInputStream = null
        var inr: InputStreamReader = null
        try
        {
            val conf = new Configuration()
            val path = new Path(propertiesPath)
            val fs = path.getFileSystem(conf)

            in = fs.open(path)
            val prop = new Properties()

            inr = new InputStreamReader(in,"utf-8")
            prop.load(inr)

            val keys = prop.propertyNames()

            while (keys.hasMoreElements)
            {
                val key = keys.nextElement().toString
                settings += ((key, prop.getProperty(key).trim))
            }
        }
        catch
            {
                case e: FileNotFoundException =>
                {
                    println("*" * 40)
                    println(e.getMessage)
                    log.error(e.getMessage,e)
                    println("*" * 40)
                }
            }
        finally
        {
            if (inr != null)
            {
                inr.close()
            }
            if (in != null)
            {
                in.close()
            }
        }

         println("*" * 40)
        settings.foreach(println)
        println("*" * 40)
    }

    def loadLocal(propertiesPath: String)
    {
        var in: FileInputStream = null

        try
        {
            in = new FileInputStream(propertiesPath)
            val prop = new Properties()
            prop.load(in)
            val keys = prop.propertyNames()

            while (keys.hasMoreElements)
            {
                val key = keys.nextElement().toString
                settings += ((key, prop.getProperty(key)))
            }
        }
        catch
            {
                case e: FileNotFoundException =>
                {
                    println("*" * 40)
                    println(e.getMessage)
                    log.error(e.getMessage,e)
                    println("*" * 40)
                }
            }
        finally
        {
            if (in != null)
            {
                in.close()
            }
        }
        println("*" * 40)
        settings.foreach(println)
        println("*" * 40)
    }

    def get(key: String): String =
    {
        settings.getOrElse(key, throw new NoSuchElementException(key))
    }

    def get(key: String, defaultValue: String): String =
    {
        settings.getOrElse(key, defaultValue)
    }

    /** Get all parameters as a list of pairs */
    def getAll: Map[String, String] =
    {
        settings.toMap
    }

    /** Get a parameter as an Option */
    def getOption(key: String): Option[String] =
    {
        settings.get(key)
    }

    /** Get a parameter as an integer, falling back to a default if not set */
    def getInt(key: String, defaultValue: Int): Int =
    {
        getOption(key).map(_.toInt).getOrElse(defaultValue)
    }

    /** Get a parameter as a long, falling back to a default if not set */
    def getLong(key: String, defaultValue: Long): Long =
    {
        getOption(key).map(_.toLong).getOrElse(defaultValue)
    }

    /** Get a parameter as a double, falling back to a default if not set */
    def getDouble(key: String, defaultValue: Double): Double =
    {
        getOption(key).map(_.toDouble).getOrElse(defaultValue)
    }
}

使用方式:

1、通过提交脚本传入本地文件后,主函数参数传入,假设为第一个参数,则:

def main(args: Array[String]): Unit = {
...
val bigdataConf = new BigDataConf(args(0))

val para1_value = bigdataConf.getString("para1_key")

... 
...
}

参考文献:

http://github.com/thisbigdata/spark-bonus



版权声明:本文为gendlee1991原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。