版本说明:
Scala版本:2.12.10
Hive版本:2.3.3
Hadoop
版本:2.7.7
大数据开发工作中最近有个需求,需要编写Scala代码实现,具体需求描述如下:
编写代码通过jdbc的方式读取mysql数据库源表信息,并自动实现hive建表,并且自动刷新元数据。
分析:就是通过传参的方式,让代码自动读取参数,并且自动在hive上进行建表,且不用手动刷新元数据
代码实现步骤如下:
第一步:
编写jdbc工具类连接数据库,并且读取数据库表字段数据
具体实现代码如下,返回一个set嵌套集合
import java.sql.{Array => _, _}
object JDBCUtools {
private def toConnection(url: String, user: String, password: String, driver: String) = {
Class.forName(driver)
val connection: Connection = DriverManager.getConnection(url, user, password)
connection
}
private def closeConnection(connection: Connection): Unit = {
if (connection != null) {
connection.close()
}
}
def executeQuery(url: String, user: String, password: String, driver: String, sql: String, database: String*): Set[Map[String, Object]] = {
var conn: Connection = null
var stat: Statement = null
var resultSet: ResultSet = null
var result = Set[Map[String, Object]]()
try {
conn = toConnection(url, user, password, driver)
stat = conn.createStatement()
if (database != null && database.nonEmpty) {
var useSql = "use "
useSql += database(0)
stat.executeUpdate(useSql)
}
resultSet = stat.executeQuery(sql)
val rsm: ResultSetMetaData = resultSet.getMetaData
val colNum: Int = rsm.getColumnCount
while (resultSet.next()) {
var map = Map[String, Object]()
for (i <- 0 until colNum) {
map += (rsm.getColumnName(i + 1).toUpperCase() -> resultSet.getObject(i + 1))
}
result += map
}
} catch {
case e: Exception =>
throw new Exception(e);
} finally {
if (resultSet != null) {
resultSet.close()
}
if (stat != null) {
stat.close()
}
if (conn != null) {
closeConnection(conn)
}
}
result
}
}
测试数据如下:
第二步:
编写获取mysql目标表的元数据信息Scala方法,Scala方法和查询mysql元数据的sql代码如下:
/**
* MySQL格式化查询
*
* @param url MySQL Url
* @param user 用户名
* @param password 密码
* @param driver 驱动
* @param table 表名
* @return MySQL格式化查询结果集合
*/
def mysqlSetInit(url: String, user: String, password: String, driver: String, table: String): Set[Map[String, Object]] = {
val sql =
s"""
SELECT
|TABLE_SCHEMA as SCHEMA_NAME,
|TABLE_NAME,
|COLUMN_NAME,
|COLUMN_COMMENT
|FROM information_schema.`COLUMNS`
|where table_name='$table'
|""".stripMargin
val columnInfo: Set[Map[String, Object]] = JDBCUtools.executeQuery(
url,
user,
password,
driver,
sql
)
columnInfo
}
第三步:
编写通过jdbc连接hive并执行hive建表的Scala方法,具体代码如下:
/**
*
* @param url MySQL Url
* @param user 用户名
* @param password 密码
* @param driver 驱动
* @param sql ddl语句
*/
/**
* ddl
*
* @param sql sql ddl语句
*/
def executeDdlSql(url: String, user: String, password: String, driver: String, sql: String): Unit = {
var conn: Connection = null
try {
conn = getConnection(url, user, password, driver)
val pst = conn.prepareStatement(sql)
pst.execute()
} catch {
case e: Exception =>
logger.error(s"executeDdlSql error:$e")
} finally {
closeConnection(conn)
}
}
第四步:
编写遍历set集合进行映射到hive库的Scala方法,并调用执行ddl语句的方法进行建表
/**
* 读取mysql源表数据并进行映射到hive库建表
*
* @param tableComment 表注释
* @param url hive url
* @param user hive user
* @param password hive password
* @param columnSet 数据源表信息集合
* @param updateFrequency 更新频率(y/m/d/h)
* @param updateType 增量/全量(a/i)
* @param partitioned 分区和注释中间空格,多个分区,以逗号分割 格式:"分区名 注释" 例: "dt 时间分区","add_type 同步类型"
*/
def createTableToHive(tableComment: String, url: String, user: String, password: String, columnSet: Set[Map[String, Object]],
updateFrequency: String, updateType: String, partitioned: String*): Unit = {
var tableName = ""
var db = ""
val columnsInfo = new StringBuilder
var isPartitioned = ""
if (partitioned != null && partitioned.nonEmpty) {
val par = new StringBuilder(" partitioned by (")
partitioned.foreach(x => {
if (x != null && x.nonEmpty && x.trim.indexOf(" ") != -1) {
par.append(x.substring(0, x.indexOf(" ")) + " STRING COMMENT" + x.substring(x.indexOf(" "), x.length) + ",")
}
})
if (par.lastIndexOf(",") != -1) {
isPartitioned = par.replace(par.lastIndexOf(","), par.length, ")\n").toString
}
}
for (i <- columnSet) {
//循环遍历输出每个元素
columnsInfo.append("\t" + i.apply("COLUMN_NAME") + " STRING COMMENT '" + i.apply("COLUMN_COMMENT").toString + "',\n")
if ("".equals(tableName)) {
tableName = i("TABLE_NAME").toString
}
if ("".equals(db)) {
db = i("SCHEMA_NAME").toString
}
}
val tn = s"ods_$db" + s"_$tableName" + s"_$updateFrequency" + s"$updateType"
val st = s"create external table if not exists ods." + tn + " (\n" + columnsInfo + "\tetl_time STRING COMMENT '数据同步时间'\n) COMMENT '" + tableComment + "'\n" + isPartitioned + " STORED AS TEXTFILE\n LOCATION 'hdfs://chint-bigdata-cluster/user/hive/warehouse/ods/" + db + "/" + tn + "'"
JDBCUtools.executeDdlSql(url,
user,
password,
"org.apache.hive.jdbc.HiveDriver", st)
}
第五步:
编写hive刷新元数据的Scala方法,我这里是直接用的INVALIDATE METADATA命令进行废弃元数据,具体代码实现如下
/**
* ddl
*
* @param sql sql 刷新元数据等执行更新语句
*/
def refreshMetadata(url: String, user: String, password: String, driver: String, sql: String): Unit = {
var conn: Connection = null
try {
conn = getConnection(url, user, password, driver)
val pst = conn.prepareStatement(sql)
pst.executeUpdate()
} catch {
case e: Exception =>
logger.error(e)
} finally {
closeConnection(conn)
}
}
最终即可实现mysql到hive自动化建表,效果图如下:
上面已经将建表语句打印出来了,并且已经在hive上创建好了,此时我们直接去hue平台上查看这张表有没有建成功。
结果如下:
第六步:
创建成功!!!