waterdrop大数据同步数据配置以及mongodb到导入数据

  • Post author:
  • Post category:其他


1、waterdrop由mongodb同步到clickhouse集群配置

spark {


spark.app.name = “Waterdrop0923”

spark.executor.instances = 40

spark.executor.cores = 2

spark.executor.memory = “3g”

}

input{


mongodb {


readconfig.uri=“mongodb://root:root@ip:port/ONLINE?authSource=admin” #MongoDB链接地址

readconfig.database=“ONLINE” #等同于关系型数据databases

readconfig.collection=“tablename” #等同于关系型数据table

readconfig.password=“root” #MongoDB数据库密码(我知道上面已经配置了,但是在这里不配置的话,会报错,这是我们走过的坑,重点记一下)

# readconfig.spark.mongodb.input.partitioner = “MongoShardedPartitioner” #用于对数据进行分区的分区程序的类名(其实我也不是很明白)

readconfig.spark.mongodb.input.partitioner = “MongoShardedPartitioner”

#分片键 readconfig.spark.mongodb.input.partitionerOptions.shardkey = “TRADESN”

#spark.mongodb.input.partitioner:用于对数据进行分区的分区程序的类名

#默认使用:MongoDefaultPartitioner,其他值有:

#MongoSamplePartitioner:使用集合的平均文档大小和随机抽样来确定集合的合适分区。

#MongoShardedPartitioner:根据数据块对集合进行分区。需要对config数据库的读访问权限。

#MongoSplitVectorPartitioner:使用splitVector独立命令或主数据库上的命令来确定数据库的分区。需要特权才能运行splitVector命令

#MongoPaginateByCountPartitioner:创建特定数量的分区。需要查询每个分区。

#MongoPaginateBySizePartitioner:根据数据大小创建分区。需要查询每个分区。

result_table_name = “tasks” #读取表的别名

#num_partitions=500

}

}

filter{


sql{


#SparkSQL

sql=“select tradesn,trandate from tasks where trandate=’”${trandate}”’ ”

#num_partitions=500

}

#分区

repartition{


num_partitions=500

}

#类型转换

convert{


source_field = “orderamount”

new_type = “double”

}

convert{


source_field = “payee_amount”

new_type = “double”

}

convert{


source_field = “accountamount”

new_type = “double”

}

convert{


source_field = “channelamount”

new_type = “double”

}

convert{


source_field = “refundamount”

new_type = “double”

}

convert{


source_field = “cashamount”

new_type = “double”

}

convert{


source_field = “uncashamount”

new_type = “double”

}

convert{


source_field = “creditamount”

new_type = “double”

}

convert{


source_field = “src_fee”

new_type = “double”

}

convert{


source_field = “payee_fee”

new_type = “double”

}

convert{


source_field = “payer_fee”

new_type = “double”

}

convert{


source_field = “payer_recharge_fee”

new_type = “double”

}

convert{


source_field = “payer_oncededuct_amount”

new_type = “double”

}

convert{


source_field = “settlement_amount”

new_type = “double”

}

convert{


source_field = “calcfeeamount”

new_type = “double”

}

}

output {


clickhouse {


clickhouse.socket_timeout=50000

host = “IP:8123,ip:8123”

database = “ysdw”

table = “ods”

fields = [“tradesn”,“trandate”]

username = “default”

password = “clickhouse”

}

}

2.oracle到clickhouse集群

spark {


spark.app.name = “orcleToHive”

spark.executor.instances = 2

spark.executor.cores = 1

spark.executor.memory = “1g”

}

input {


jdbc {


driver = “oracle.jdbc.driver.OracleDriver”

url = “jdbc:oracle:thin:@IP:1521:DC”

#这里没有database,所以写表的时候可以带上库名称(对于那种一台服务器多个库的)

table = “seq”

user = “123”

password = “123”

result_table_name = “seq_temp”

}

}

filter {


Sql {


table_name = “seq”

# 查询数据

sql = “select sn,accountid,org_no from seq a where a.sn=‘20130130000000393006’”

}

repartition{


num_partitions=500

}

convert{


source_field = “preamount”

new_type = “double”

}

convert{


source_field = “amount”

new_type = “double”

}

convert{


source_field = “cash_amount”

new_type = “double”

}

convert{


source_field = “uncash_amount”

new_type = “double”

}

convert{


source_field = “credit_amount”

new_type = “double”

}

convert{


source_field = “createtime”

new_type = “string”

}

}

output {


clickhouse {


clickhouse.socket_timeout=50000

host = “ip:8123,ip:8123”

database = “dw”

table = “seq”

fields = [“sn”,“accountid”,“undotype”,“zy”,“note”,“custtype”,“org_no”]

username = “default”

password = “clickhouse”

}

}

3、hive到clickhouse集群

spark {


spark.sql.catalogImplementation = “hive”

spark.app.name = “Waterdrop”

spark.executor.instances = 2

spark.executor.cores = 1

spark.executor.memory = “1g”

}

input {


hive {


pre_sql = “select cast(last_up_time as long),sort_id,trandate from ods_trade.paybill where trandate=‘20200923’ ”

result_table_name = “mcc_cd”

}

}

mongodb导出数据到另外一个库的集合

mongoexport -h IP:50000 -u root -p root -d YSDW -c table1 –authenticationDatabase=admin -q “{AC_DT:’$dateq’}” |mongoimport -h IP:50000 -u root -p root -d YSDM -c table2 –authenticationDatabase=admin –numInsertionWorkers 1 –writeConcern=’{w:1}’ ;



版权声明:本文为yilushunfengli原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。