序列化Writable接口

  • Post author:
  • Post category:其他

序列化Writable接口

基本的序列化类型往往不能满足所有需求,比如在Hadoop框架内部传递一个自定义bean对象,那么对该对象就要实现Writable序列化接口。

image-20210701161400651

编写套路

  1. 必须实现Writable接口
  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public CustomBean() {
	super();
}
  1. 重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
	....
}
  1. 重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
	....
}
  1. 方便展示结果数据,需要重写bean对象的toString()方法,可以自定义分隔符

注意事项

  1. 反序列化的字段顺序和序列化字段的顺序必须完全一致
  2. 如果自定义Bean对象需要放在Mapper输出KV中的K,则该对象还需实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序!

Writable接口案例

需求:

统计每台智能音箱设备内容播放时长

原始日志格式:

001 001577c3 kar_890809 120.196.100.99 1116 954 200
日志id 设备id appkey(合作硬件厂商) 网络ip 自有内容时长(秒) 第三方内容时长(秒) 网络状态码

数据地址

链接:https://pan.baidu.com/s/170U3UIdfYAUdN8aFuGEAXg
提取码:h0ih

输出结果:

001577c3 11160 9540 20700
设备id 自有内容时长(秒) 第三方内容时长(秒) 总时长

思路:

  • 准备Bean: SpeakBean

  • map:LongWritable Text, Text(设备id), SpeakBean

  • map逻辑:对每个value进行解析成SpeakBean,输出

  • reduce: 输入key为设备id(text类型),输入 value为相同设备id的SpeakBean的集合, 输出key设备id,输出为SpeakBean

代码

SpeakBean

package com.my.mr.speak;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//这个类型是map输出kv中value的类型,需要实现writable序列化接口
public class SpeakBean implements Writable {

    //定义属性
    private Long selfDuration;//自有内容时长
    private Long thirdPartDuration;//第三方内容时长
    private String deviceId;//设备id
    private Long sumDuration;//总时长

    //准备一个空参构造

    public SpeakBean() {
    }


    //序列化方法:就是把内容输出到网络或者文本中
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(selfDuration);
        out.writeLong(thirdPartDuration);
        out.writeUTF(deviceId);
        out.writeLong(sumDuration);
    }

    //有参构造

    public SpeakBean(Long selfDuration, Long thirdPartDuration, String deviceId) {
        this.selfDuration = selfDuration;
        this.thirdPartDuration = thirdPartDuration;
        this.deviceId = deviceId;
        this.sumDuration = this.selfDuration + this.thirdPartDuration;
    }

    //反序列化方法
    @Override
    public void readFields(DataInput in) throws IOException {
        this.selfDuration = in.readLong();//自由时长
        this.thirdPartDuration = in.readLong();//第三方时长
        this.deviceId = in.readUTF();//设备id
        this.sumDuration = in.readLong();//总时长
    }


    public Long getSelfDuration() {
        return selfDuration;
    }

    public void setSelfDuration(Long selfDuration) {
        this.selfDuration = selfDuration;
    }

    public Long getThirdPartDuration() {
        return thirdPartDuration;
    }

    public void setThirdPartDuration(Long thirdPartDuration) {
        this.thirdPartDuration = thirdPartDuration;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public Long getSumDuration() {
        return sumDuration;
    }

    public void setSumDuration(Long sumDuration) {
        this.sumDuration = sumDuration;
    }


    //为了方便观察数据,重写toString()方法

    @Override
    public String toString() {
        return
                selfDuration +
                        "\t" + thirdPartDuration +
                        "\t" + deviceId + "\t" + sumDuration;
    }
}

SpeakMapper

package com.my.mr.speak;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//四个参数:分为两对kv
//第一对kv:map输入参数的kv类型;k-->一行文本偏移量,v-->一行文本内容
//第二对kv:map输出参数kv类型;k-->map输出的key类型,v:map输出的value类型
public class SpeakMapper extends Mapper<LongWritable, Text, Text, SpeakBean> {
    /*
    1 转换接收到的text数据为String
    2 按照制表符进行切分;得到自有内容时长,第三方内容时长,设备id,封装为SpeakBean
    3 直接输出:k-->设备id,value-->speakbean
     */
    Text device_id = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        1 转换接收到的text数据为String
        final String line = value.toString();
//        2 按照制表符进行切分;得到自有内容时长,第三方内容时长,设备id,封装为SpeakBean
        final String[] fields = line.split("\t");
        //自有内容时长
        String selfDuration = fields[fields.length - 3];
        //第三方内容时长
        String thirdPartDuration = fields[fields.length - 2];
        //设备id
        String deviceId = fields[1];
        final SpeakBean bean = new SpeakBean(Long.parseLong(selfDuration), Long.parseLong(thirdPartDuration), deviceId);
//        3 直接输出:k-->设备id,value-->speakbean
        device_id.set(deviceId);
        context.write(device_id, bean);
    }
}

SpeakReducer

package com.my.mr.speak;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SpeakReducer extends Reducer<Text, SpeakBean, Text, SpeakBean> {
    @Override
    protected void reduce(Text key, Iterable<SpeakBean> values, Context context) throws IOException, InterruptedException {
        //定义时长累加的初始值
        Long self_duration = 0L;
        Long third_part_duration = 0L;

        //reduce方法的key:map输出的某一个key
        //reduce方法的value:map输出的kv对中相同key的value组成的一个集合
        //reduce 逻辑:遍历迭代器累加时长即可
        for (SpeakBean bean : values) {
            final Long selfDuration = bean.getSelfDuration();
            final Long thirdPartDuration = bean.getThirdPartDuration();
            self_duration += selfDuration;
            third_part_duration += thirdPartDuration;
        }
        //输出,封装成一个bean对象输出
        final SpeakBean bean = new SpeakBean(self_duration, third_part_duration, key.toString());
        context.write(key, bean);
    }
}

SpeakDriver

package com.my.mr.speak;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class SpeakDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        final Configuration conf = new Configuration();        final Job job = Job.getInstance(conf, "speakDriver");        //设置jar包本地路径        job.setJarByClass(SpeakDriver.class);        //使用的mapper和reducer        job.setMapperClass(SpeakMapper.class);        job.setReducerClass(SpeakReducer.class);        //map的输出kv类型        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(SpeakBean.class);        //设置reduce输出        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(SpeakBean.class);        //读取的数据路径        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        //提交任务        final boolean flag = job.waitForCompletion(true);        System.exit(flag ? 0 : 1);    }}

运行

设置数据输入和输出路径

image-20210701171144441

结果显示

image-20210701171341466

总结

使用场景:基本的序列化类型往往不能满足所有需求

bean关键点:定义Bean,实现序列化接口,若作为key需要排序,则还需实现Comparable接口

mapreduce编写:明确需求,分别确定map和reduce的输入及输出的类型,以及处理逻辑。


版权声明:本文为StromCruise原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。