spark (RDD 的持久化、排序、二次排序)

  • Post author:
  • Post category:其他




RDD 的持久化

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * RDD 的持久化
 */

public class Persion_9 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Persist_9")
                .setMaster("local");

        JavaSparkContext sc =  new JavaSparkContext(conf);

        // cache()或者persist()的使用,是有规则的
        // 必须在transformation或者textFile等创建了一个RDD之后,直接连续调用cache()或persist()才可以
        // 如果你先创建一个RDD,然后单独另起一行执行cache()或persist()方法,是没有用的
        // 而且,会报错,大量的文件会丢失
        JavaRDD<String> lines = sc.textFile("D:\\eclipse\\wc\\scalaworid\\spark.txt")
                .cache();


        long beginTime = System.currentTimeMillis();
        long count = lines.count();
        System.out.println(count);
        long endTime = System.currentTimeMillis();
        System.out.println("消耗的时间" + (endTime - beginTime));


        beginTime = System.currentTimeMillis();
        count = lines.count();
        System.out.println(count);
        endTime = System.currentTimeMillis();
        System.out.println("消耗的时间" + (endTime - beginTime));
    }
}

在这里插入图片描述


1、对文本文件内的每个单词都统计出其出现的次数。

2、按照每个单词出现次数的数量,降序排序。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.sources.In;
import scala.Tuple2;

import java.util.Arrays;

/**
 * 1、对文本文件内的每个单词都统计出其出现的次数。
 * 2、按照每个单词出现次数的数量,降序排序.
 */
public class SortWorldCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("SortWorldCount")
                .setMaster("local");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> stringJavaRDD = sc.textFile("D:\\eclipse\\wc\\scalaworid\\spark.txt");

        JavaRDD<String> objectJavaRDD = stringJavaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });
        JavaPairRDD<String, Integer> worldAndOne = objectJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = worldAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        //到这里为止就得到了每个单词出现的次数,但是我们的新需求是,按照每个单词出现的次数降序排列
        //wAndC DRR内的元素是什么?应该是这样的格式:(hello,2)(word,3)
        //我们需要将RDD中转换为(3,word)的格式,才能根据单词出现的次数进行排序
        // 进行key-value的反转

        JavaPairRDD<Integer, String> pairRDD = stringIntegerJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2) throws Exception {
                return new Tuple2<Integer, String>(tuple2._2,tuple2._1);
            }
        });

        //按照key排序
        JavaPairRDD<Integer, String> sortByKey = pairRDD.sortByKey(false);
        sortByKey.foreach(new VoidFunction<Tuple2<Integer, String>>() {
            @Override
            public void call(Tuple2<Integer, String> tuple2) throws Exception {
                System.out.println(tuple2._1 + " : "+ tuple2._2);
            }
        });
    }
}

在这里插入图片描述


1、按照文件中的第一列排序。

2、如果第一列相同,则按照第二列排序。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Int;
import scala.Tuple2;

/**
 * 二次排序
 * 1.实现自定义的key,要实现Ordered接口和Serializable接口,在key自己对多个列的排序算法
 * 2.将包含文本的RDD,映射称key为自定义key,value为文本的JavaPairRDD
 * 3.使用sortedByKey算子,按照自定义的key进行排列
 * 4.再次映射,剔除自定义的key,只保留value元文本行
 *
 */
public class SecondarySort {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local")
                .setAppName("SecondarySort_12");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile("D:\\eclipse\\wc\\input\\sort.txt");

        /**
         * 1 5
         * 2 4
         * 3 6
         * 1 3
         * 2 1
         */
        JavaPairRDD<SecondarySortKey, String> pair = lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() {
            @Override
            public Tuple2<SecondarySortKey, String> call(String s) throws Exception {
                String[] split = s.split(" ");
                SecondarySortKey key = new SecondarySortKey(Integer.parseInt(split[0]), Integer.parseInt(split[1]));
                //(1,5:1 5)
                //(2,4:2 4)
                //(3,6:3 6)
                //(1,3:1 3)
                //(2,1:2 1)
                return new Tuple2<SecondarySortKey, String>(key, s);
            }
        });


        //(1,3:1 3)
        //(1,5:1 5)
        //(2,1:2 1)
        //(2,4:2 4)
        //(3,6:3 6)
        JavaPairRDD<SecondarySortKey, String> sorted = pair.sortByKey();

        final JavaRDD<String> sorteded = sorted.map(new Function<Tuple2<SecondarySortKey, String>, String>() {
            @Override
            public String call(Tuple2<SecondarySortKey, String> tuple2) throws Exception {
                return tuple2._2;
            }
        });

        sorteded.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
        sc.close();
    }
}


SecondarySortKey类

import scala.math.Ordered;

import java.io.Serializable;

public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {

    //首先在自定义的key里面,定义需要进行排序的列
    private int first;
    private int second;

    // 为要进行排序的多个列提供getter和setter方法,以及hashcode和equals和构造方法方法
    public SecondarySortKey(int first, int second) {
        this.first = first;
        this.second = second;
    }

    public int getFirst() {
        return first;
    }

    public int getSecond() {
        return second;
    }

    public void setFirst(int first) {
        this.first = first;
    }

    public void setSecond(int second) {
        this.second = second;
    }


    @Override
    public int compare(SecondarySortKey that) {

        if(this.getFirst() - that.getFirst() !=0){
            return this.getFirst() - that.getFirst();
        }else
            return this.getSecond() - that.getSecond();
    }

    //小于
    @Override
    public boolean $less(SecondarySortKey that) {
        if(this.getFirst() < that.getFirst()){
            return true;
        }else if (this.getFirst() == that.getFirst() && that.getSecond() < that.getSecond()){
            return true;
        }
        return false;
    }

    //大于
    @Override
    public boolean $greater(SecondarySortKey that) {
        if(this.getFirst() > that.getFirst()){
            return true;
        }else if (this.getFirst() == that.getFirst() && this.getSecond() > that.getSecond()){
            return true;
        }
        return false;
    }

    //小于等于
    @Override
    public boolean $less$eq(SecondarySortKey that) {
        if(this.getFirst() < that.getFirst()){
            return true;
        }else if (this.getFirst() == that.getFirst() && that.getSecond() == that.getSecond()){
            return true;
        }
        return false;
    }

    //大于等于
    @Override
    public boolean $greater$eq(SecondarySortKey that) {
        if(this.getFirst() > (that.getFirst())){
            return true;
        }else if (this.first == that.getFirst() && this.second == this.getSecond()){
                return true;
        }
        return false;
    }

    @Override
    public int compareTo(SecondarySortKey that) {
       if (this.getFirst() - that.getFirst() !=0){
           return this.getFirst() - that.getFirst();
       }else
           return this.getSecond() - that.getSecond();
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SecondarySortKey that = (SecondarySortKey) o;

        if (first != that.first) return false;
        return second == that.second;
    }

    @Override
    public int hashCode() {
        int result = first;
        result = 31 * result + second;
        return result;
    }
}

在这里插入图片描述



版权声明:本文为TylerPY原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。