在spark应用提交到集群时,如果我们需要加载本地(驱动节点)的配置文件时,如:
...
spark-submit \
-class ...
-jars ...
-jar ...
file:///yourLocalFilePath/conf.properties
在解析该配置文件时,通常我们将其处理为HashMap形式,通过key-value方式实现便捷的配置参数读取。具体实现方式如下:
package com.gendlee1991.utils
import java.io.{FileInputStream, FileNotFoundException, InputStreamReader}
import java.util.Properties
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path}
import scala.collection.mutable.HashMap
class BigDataConf(propertiesPath: String) extends Serializable
{
val log = LogFactory.getLog(classOf[BigDataConf])
private val settings = new HashMap[String, String]()
load(propertiesPath)
def load(propertiesPath: String)
{
loadProPertiesFile(propertiesPath)
}
def loadProPertiesFile(propertiesPath: String)
{
var in: FSDataInputStream = null
var inr: InputStreamReader = null
try
{
val conf = new Configuration()
val path = new Path(propertiesPath)
val fs = path.getFileSystem(conf)
in = fs.open(path)
val prop = new Properties()
inr = new InputStreamReader(in,"utf-8")
prop.load(inr)
val keys = prop.propertyNames()
while (keys.hasMoreElements)
{
val key = keys.nextElement().toString
settings += ((key, prop.getProperty(key).trim))
}
}
catch
{
case e: FileNotFoundException =>
{
println("*" * 40)
println(e.getMessage)
log.error(e.getMessage,e)
println("*" * 40)
}
}
finally
{
if (inr != null)
{
inr.close()
}
if (in != null)
{
in.close()
}
}
println("*" * 40)
settings.foreach(println)
println("*" * 40)
}
def loadLocal(propertiesPath: String)
{
var in: FileInputStream = null
try
{
in = new FileInputStream(propertiesPath)
val prop = new Properties()
prop.load(in)
val keys = prop.propertyNames()
while (keys.hasMoreElements)
{
val key = keys.nextElement().toString
settings += ((key, prop.getProperty(key)))
}
}
catch
{
case e: FileNotFoundException =>
{
println("*" * 40)
println(e.getMessage)
log.error(e.getMessage,e)
println("*" * 40)
}
}
finally
{
if (in != null)
{
in.close()
}
}
println("*" * 40)
settings.foreach(println)
println("*" * 40)
}
def get(key: String): String =
{
settings.getOrElse(key, throw new NoSuchElementException(key))
}
def get(key: String, defaultValue: String): String =
{
settings.getOrElse(key, defaultValue)
}
/** Get all parameters as a list of pairs */
def getAll: Map[String, String] =
{
settings.toMap
}
/** Get a parameter as an Option */
def getOption(key: String): Option[String] =
{
settings.get(key)
}
/** Get a parameter as an integer, falling back to a default if not set */
def getInt(key: String, defaultValue: Int): Int =
{
getOption(key).map(_.toInt).getOrElse(defaultValue)
}
/** Get a parameter as a long, falling back to a default if not set */
def getLong(key: String, defaultValue: Long): Long =
{
getOption(key).map(_.toLong).getOrElse(defaultValue)
}
/** Get a parameter as a double, falling back to a default if not set */
def getDouble(key: String, defaultValue: Double): Double =
{
getOption(key).map(_.toDouble).getOrElse(defaultValue)
}
}
使用方式:
1、通过提交脚本传入本地文件后,主函数参数传入,假设为第一个参数,则:
def main(args: Array[String]): Unit = {
...
val bigdataConf = new BigDataConf(args(0))
val para1_value = bigdataConf.getString("para1_key")
...
...
}
参考文献:
http://github.com/thisbigdata/spark-bonus
版权声明:本文为gendlee1991原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。