Java_ODPS-D2-离线数仓-4-自定义函数UDTF,一进多出,处理复杂事件json串
原始数据和结果都自行想象吧 或者 翻翻笔记p89 p92
FuntionStudio
-
新建一个项目gmall_udtf,运行环境选udfjava
-
新建一个FlatEventUDTF
-
pom.xml中加入fastjson依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28.odps</version>
</dependency>
- 编辑udtf,导包
package com.alibaba.dataworks.udtf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
// TODO define input and output types, e.g. "string,string->string,bigint".
@Resolve({"string->bigint,string,string"})
public class FlatEventUDTF extends UDTF {
@Override
public void setup(ExecutionContext ctx) throws UDFException {
}
@Override
public void process(Object[] args) throws UDFException {
// TODO
String event = (String) args[0]; //传进来唯一的et字串
JSONArray jsonArray= JSON.parseArray(event); //把字串json格式化
// 循环遍历出每一块,再把每一块中的三个字段解析出来
for(int i=0;i<jsonArray.size();i++ ){
JSONObject jsonObject = jsonArray.getJSONObject(i);
String ett = jsonObject.getString("ett");
String eventName = jsonObject.getString("en");
String eventJson = jsonObject.getString("kv");
// 返回的数据 注意输出字段类型
forward(Long.parseLong(ett),eventName,eventJson);
}
}
@Override
public void close() throws UDFException {
}
}
DataStudio
- 提交到DataStudio后,测试一下
--测试FlatEventUDTF自定义函数 一进多出
SELECT
FLATEVENTUDTF(GET_JSON_OBJECT(log_string,'$.et')) as (event_time,event_name,event_json) --$.et 就是事件
FROM ods_base_log
WHERE ds='00000000';
-
UDTF在sql中的实际应用
列转行
LATERAL VIEW FLATEVENTUDTF(GET_JSON_OBJECT(log_string,’$.et’)) event_view AS event_time,event_name,event_json
--手动将ods层数据导入到dwd层
INSERT OVERWRITE TABLE dwd_start_log PARTITION(ds,hh,mm)
SELECT GET_JSON_OBJECT(log_string,'$.cm.mid') mid
,GET_JSON_OBJECT(log_string,'$.cm.uid') user_id
,GET_JSON_OBJECT(log_string,'$.cm.vc' ) version_code
,GET_JSON_OBJECT(log_string,'$.cm.vn') version_name
,GET_JSON_OBJECT(log_string,'$.cm.l') lang
,GET_JSON_OBJECT(log_string,'$.cm.sr') source
,GET_JSON_OBJECT(log_string,'$.cm.os') os
,GET_JSON_OBJECT(log_string,'$.cm.ar') area
,GET_JSON_OBJECT(log_string,'$.cm.md') model
,GET_JSON_OBJECT(log_string,'$.cm.ba') brand
,GET_JSON_OBJECT(log_string,'$.cm.sv') sdk_version
,GET_JSON_OBJECT(log_string,'$.cm.hw') height_width
,GET_JSON_OBJECT(log_string,'$.cm.g') email
,GET_JSON_OBJECT(log_string,'$.cm.hw') sv
,GET_JSON_OBJECT(log_string,'$.cm.ln') ln
,GET_JSON_OBJECT(log_string,'$.cm.la') la
,GET_JSON_OBJECT(event_view.event_json,'$.entry') entry
,GET_JSON_OBJECT(event_view.event_json,'$.loading_time') loading_time
,GET_JSON_OBJECT(event_view.event_json,'$.action') action
,GET_JSON_OBJECT(event_view.event_json,'$.open_ad_type') open_ad_type
,GET_JSON_OBJECT(event_view.event_json,'$.detail') detail
,event_view.event_time
,ds
,hh
,mm
FROM ods_base_log
LATERAL VIEW FLATEVENTUDTF(GET_JSON_OBJECT(log_string,'$.et')) event_view AS event_time,event_name,event_json
WHERE ds = '00000000'
AND event_view.event_name = 'start'
;
--查看导入结果
SELECT * from dwd_start_log WHERE ds='20200308' LIMIT 3;
-- mid user_id version_code version_name lang source os area model brand sdk_version email height_width network lng lat entry open_ad_type action loading_time detail event_time ds hh mm
-- 0 0 9 1.0.7 es V 8.1.1 MX HTC-0 HTC V2.5.9 640*960 QED8OB5D@gmail.com 640*960 -112.5 29.2 3 13 1 2 1583531265269 00000000 01 15
版权声明:本文为weixin_44345917原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。