第3节 hudi hive 数据同步,实现湖仓一体 cdh6.3.2存在版本兼容问题,spark可以查询HoodieParquetRealtimeInputFormat格式表,hive查询报错

  • Post author:
  • Post category:其他


对接hive

(1)将Hudi目录编译好的hudi-hadoop-mr-bundle-0.9.0.jar,复制到hive的lib下让hive支持hudi,需要重启hiveserver2服务,

或者不加入也可,在执行hive sql 时add jar也可

先复制分发jar包到hiveserver2节点

[xxx@xxx target]# rsync -rvl hudi-hadoop-mr-bundle-0.9.0.jar xxx@xxx:/data/software/

如下图已经放置在hive下

编写测试数据

形成member.log日志文件

import com.alibaba.fastjson.JSONObject;

import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.util.Properties;
import java.util.Random;

public class TestJson {
//    public static void main(String[] args) throws FileNotFoundException {
//        PrintStream mytxt=new PrintStream("member.log");
//        PrintStream out=System.out;
//        System.setOut(mytxt);
//        Random random = new Random();
//        for (int i = 0; i < 1000000; i++) {
//            JSONObject model = new JSONObject();
//            model.put("uid", i);
//            model.put("fullname", "王" + i);
//            model.put("ad_id", random.nextInt(9));
//            model.put("iconurl", "-");
//            model.put("dt", "20200918");
//            model.put("dn", "WebA");
//            model.put("uuid",i);
//            System.out.println(model.toJSONString());
//        }
//        System.setOut(out);
//    }
}

将上面构造的member.log放在hdfs的 /tmp/ods/member.log

pom.xml文件,为了将一些依赖引入打到包中

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dxt</groupId>
    <artifactId>sparkDataFrame</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-client</artifactId>
            <scope>provided</scope>
            <version>0.9.0</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark-bundle_2.11</artifactId>
            <scope>provided</scope>
            <version>0.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-hadoop-mr-bundle</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>jackson-databind</groupId>
                    <artifactId>com.fasterxml.jackson.core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-annotations</artifactId>
                </exclusion>
            </exclusions>
            <version>0.9.0</version>
            <scope>provided</scope>
        </dependency>

        <!--        &lt;!&ndash; Spark的依赖引入 &ndash;&gt;-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <scope>provided</scope>
            <version>2.4.4</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.scala-lang</groupId>-->
<!--            <artifactId>scala-library</artifactId>-->
<!--            <scope>provided</scope>-->
<!--            <version>2.11.12</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <scope>provided</scope>
            <version>2.4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <scope>provided</scope>
            <version>2.4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.11</artifactId>
            <scope>provided</scope>
            <version>2.4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <scope>provided</scope>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.spark-project.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1.spark2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.66</version>
        </dependency>
    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

开发spark代码,操作读写数据,数据写出后在hive中自动创建表,可以查询到数据

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions.TABLE_NAME
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object TableOperator {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test_operator").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
  }

  def insertData(sparkSession: SparkSession) = {
    import org.apache.spark.sql.functions._
    val commitTime = System.currentTimeMillis().toString //生成提交时间
    val resultDF = sparkSession.read.json("/tmp/ods/member.log")
      .withColumn("ts", lit(commitTime)) //添加ts时间戳
      .withColumn("hudipartition", concat_ws("/", col("dt"), col("dn"))) //添加分区 两个字段组合分区
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    resultDF.write.format("hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid") //设置主键
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") //数据更新时间戳的
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "hudipartition") //hudi分区列
      .option("hoodie.table.name", "member") //hudi表名 "hoodie.table.name"
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://cdh01:10000") //hiveserver2地址
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default") //设置hudi与hive同步的数据库
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member") //设置hudi与hive同步的表名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt,dn") //hive表同步的分区列
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") //设置数据集注册并同步到hive
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") //设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option("hoodie.insert.shuffle.parallelism", "12")
      .option("hoodie.upsert.shuffle.parallelism", "12")
      .mode(SaveMode.Append)
      .save("/tmp/hudi/hivetest")
  }

}


发生错误1

Exception in thread “main” java.lang.ClassNotFoundException: org.apache.hive.jdbc.HiveDriver

spark 代码打jar包时把驱动要打进去


发生错误2

Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hudi/DataSourceWriteOptions$

将自己编译的hudi-spark-bundle_2.11-0.9.0.jar 包放在要提交的 –jars 参数后面,不要用pom.xml从maven中引入的 hudi-spark-bundle_2.11-0.9.0.jar

编译的时候可以用,但saprk-submit的时候用自己编译的,因为自己编译的里面修改过源代码
<dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-client</artifactId>
    <scope>provided</scope>
    <version>0.9.0</version>
    <type>pom</type>
</dependency>


发现错误3

Exception in thread “main” java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.DataSourceUtils$.PARTITIONING_COLUMNS_KEY()Ljava/lang/String;

这个就是spark 2.4.4 和cdh spark 2.4.0 中的一部分区别,在编译Hudi 0.9的时候在源码里面修改了,所以再cdh中使用时要用自己编译的,而不是从maven引用已经编译好的jar包

测试提交脚本

spark-submit 
--class  com.dxt.hudi.TableOperator  
--driver-memory 4G  
--executor-memory 4G 
--jars hudi-spark-bundle_2.11-0.9.0.jar  
sparkDataFrame-1.0-SNAPSHOT.jar  

spark-submit执行成功 在hive 中出现

hive default库下多出两个表

member_ro

member_rt

show create table member_rt

CREATE EXTERNAL TABLE `member_rt`(	
  `_hoodie_commit_time` string, 	
  `_hoodie_commit_seqno` string, 	
  `_hoodie_record_key` string, 	
  `_hoodie_partition_path` string, 	
  `_hoodie_file_name` string, 	
  `ad_id` bigint, 	
  `fullname` string, 	
  `iconurl` string, 	
  `uid` bigint, 	
  `uuid` bigint, 	
  `ts` string, 	
  `hudipartition` string)	
PARTITIONED BY ( 	
  `dt` string, 	
  `dn` string)	
ROW FORMAT SERDE 	
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 	
WITH SERDEPROPERTIES ( 	
  'hoodie.query.as.ro.table'='false', 	
  'path'='/tmp/hudi/hivetest') 	
STORED AS INPUTFORMAT 	
  'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 	
OUTPUTFORMAT 	
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'	
LOCATION	
  'hdfs://nameservice1/tmp/hudi/hivetest'	
TBLPROPERTIES (	
  'last_commit_time_sync'='20210918173310', 	
  'spark.sql.sources.provider'='hudi', 	
  'spark.sql.sources.schema.numPartCols'='2', 	
  'spark.sql.sources.schema.numParts'='1', 	
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"ad_id","type":"long","nullable":true,"metadata":{}},{"name":"fullname","type":"string","nullable":true,"metadata":{}},{"name":"iconurl","type":"string","nullable":true,"metadata":{}},{"name":"uid","type":"long","nullable":true,"metadata":{}},{"name":"uuid","type":"long","nullable":true,"metadata":{}},{"name":"ts","type":"string","nullable":false,"metadata":{}},{"name":"hudipartition","type":"string","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}},{"name":"dn","type":"string","nullable":true,"metadata":{}}]}', 	
  'spark.sql.sources.schema.partCol.0'='dt', 	
  'spark.sql.sources.schema.partCol.1'='dn', 	
  'transient_lastDdlTime'='1631957618')	
CREATE EXTERNAL TABLE `member_ro`(	
  `_hoodie_commit_time` string, 	
  `_hoodie_commit_seqno` string, 	
  `_hoodie_record_key` string, 	
  `_hoodie_partition_path` string, 	
  `_hoodie_file_name` string, 	
  `ad_id` bigint, 	
  `fullname` string, 	
  `iconurl` string, 	
  `uid` bigint, 	
  `uuid` bigint, 	
  `ts` string, 	
  `hudipartition` string)	
PARTITIONED BY ( 	
  `dt` string, 	
  `dn` string)	
ROW FORMAT SERDE 	
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 	
WITH SERDEPROPERTIES ( 	
  'hoodie.query.as.ro.table'='true', 	
  'path'='/tmp/hudi/hivetest') 	
STORED AS INPUTFORMAT 	
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 	
OUTPUTFORMAT 	
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'	
LOCATION	
  'hdfs://nameservice1/tmp/hudi/hivetest'	
TBLPROPERTIES (	
  'last_commit_time_sync'='20210918173310', 	
  'spark.sql.sources.provider'='hudi', 	
  'spark.sql.sources.schema.numPartCols'='2', 	
  'spark.sql.sources.schema.numParts'='1', 	
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"ad_id","type":"long","nullable":true,"metadata":{}},{"name":"fullname","type":"string","nullable":true,"metadata":{}},{"name":"iconurl","type":"string","nullable":true,"metadata":{}},{"name":"uid","type":"long","nullable":true,"metadata":{}},{"name":"uuid","type":"long","nullable":true,"metadata":{}},{"name":"ts","type":"string","nullable":false,"metadata":{}},{"name":"hudipartition","type":"string","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}},{"name":"dn","type":"string","nullable":true,"metadata":{}}]}', 	
  'spark.sql.sources.schema.partCol.0'='dt', 	
  'spark.sql.sources.schema.partCol.1'='dn', 	
  'transient_lastDdlTime'='1631957617')	

出现如下两种格式的表,当前使用cdh6.3.2下的hive查询引擎可以查询HoodieParquetInputFormat的表,但是无法查询HoodieParquetRealtimeInputFormat 格式的表,推测依旧是版本兼容造成,但是可以使用spark读取 HoodieParquetRealtimeInputFormat 格式的hive 表

HoodieParquetRealtimeInputFormat

HoodieParquetInputFormat

下一节查看问题所在

(merge on read 表就会出现rt与ro  copy on write 表只会出现一个表名且不带rt/ro)



版权声明:本文为dengxt原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。