大数据——Spark RDD算子(三)distinct、union、intersection、subtract、cartesian

  • Post author:
  • Post category:其他




distinct

  • distinct用于去重,生成的RDD可能有重复的元素,使用distinct方法可以去除重复的元素,不过此方法涉及混洗,操作开销很大



Scala版本

package nj.zb.sparkstu

import org.apache.spark.{SparkConf, SparkContext}

object DistinctScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
    val sc = new SparkContext(conf)
    val distinctRdd = sc.makeRDD(List("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd"))
    val a=distinctRdd.distinct()
    a.collect.foreach(println)
  }
}

结果展示:

在这里插入图片描述



Java版本

package nj.zb.sparkstu;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class DistinctJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> strings = Arrays.asList("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd");

        //distinct
        JavaRDD<String> strRdd = sc.parallelize(strings);
        JavaRDD<String> distinctRdd = strRdd.distinct();
        List<String> collect = distinctRdd.collect();
        for (String str :
                collect) {
            System.out.println(str);
        }
        }
}

结果展示:
在这里插入图片描述



union

  • RDD1.union(RDD2)用于两个RDD进行合并



Scala版本

package nj.zb.sparkstu

import org.apache.spark.{SparkConf, SparkContext}

object DistinctScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
    val sc = new SparkContext(conf)
    val distinctRdd = sc.makeRDD(List("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd"))
    val unionRdd=sc.makeRDD(List("aa","bb","cc"))
    val b=unionRdd.union(distinctRdd)
    b.collect.foreach(println)
  }
}

结果展示:
在这里插入图片描述



Java版本

package nj.zb.sparkstu;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class DistinctJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> strings = Arrays.asList("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd");
        JavaRDD<String> strRdd = sc.parallelize(strings);
        //union
            List<String> strings1 = new ArrayList<String>();
            strings1.add("aa");
            strings1.add("bb");
            strings1.add("cc");
            JavaRDD<String> strRdd2 = sc.parallelize(strings1);
            JavaRDD<String> unionRdd = strRdd.union(strRdd2);
        List<String> collect = unionRdd.collect();
        for (String str :
                collect) {
            System.out.println(str);
        }
        }
}

结果展示:
在这里插入图片描述



intersection

  • RDD1.intersection(RDD2)返回两个RDD的交集,并且去重
  • intersection需要混洗数据,比较浪费性能



Scala版本

package nj.zb.sparkstu

import org.apache.spark.{SparkConf, SparkContext}

object DistinctScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
    val sc = new SparkContext(conf)
    val distinctRdd = sc.makeRDD(List("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd"))
    val unionRdd=sc.makeRDD(List("aa","bb","cc"))
    val c=unionRdd.intersection(distinctRdd)
    c.collect.foreach(println)
  }
}

结果展示:
在这里插入图片描述



Java版本

package nj.zb.sparkstu;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class DistinctJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> strings = Arrays.asList("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd");
        JavaRDD<String> strRdd = sc.parallelize(strings);
        List<String> strings1 = new ArrayList<String>();
            strings1.add("aa");
            strings1.add("bb");
            strings1.add("cc");
            JavaRDD<String> strRdd2 = sc.parallelize(strings1);
            JavaRDD<String> intersectionRdd = strRdd.intersection(strRdd2);
            List<String> collect = intersectionRdd.collect();
        for (String str :
                collect) {
            System.out.println(str);
        }
        }
}

结果展示:
在这里插入图片描述



subtract

  • RDD1.subtract(RDD2)返回在RDD1中出现,但是不在RDD2中出现的元素,不去重



Scala版本

package nj.zb.sparkstu

import org.apache.spark.{SparkConf, SparkContext}

object DistinctScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
    val sc = new SparkContext(conf)
    val distinctRdd = sc.makeRDD(List("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd"))
    val unionRdd=sc.makeRDD(List("aa","bb","cc"))
    val d=distinctRdd.subtract(unionRdd)
    d.collect.foreach(println)
  }
}

结果展示:

在这里插入图片描述



Java版本

package nj.zb.sparkstu;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class DistinctJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> strings = Arrays.asList("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd");
        JavaRDD<String> strRdd = sc.parallelize(strings);
        List<String> strings1 = new ArrayList<String>();
            strings1.add("aa");
            strings1.add("bb");
            strings1.add("cc");
            JavaRDD<String> strRdd2 = sc.parallelize(strings1);
            JavaRDD<String> subtractRdd = strRdd.subtract(strRdd2);
        List<String> collect = subtractRdd.collect();
        for (String str :
                collect) {
            System.out.println(str);
        }
        }
}

结果展示:
在这里插入图片描述



cartesian

  • RDD1.cartesian(RDD2)返回RDD1和RDD2的笛卡尔积,这个开销超级大,慎用



Scala版本

package nj.zb.sparkstu

import org.apache.spark.{SparkConf, SparkContext}

object DistinctScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
    val sc = new SparkContext(conf)
    val rdd1=sc.makeRDD(List("1","2","3"))
    val rdd2=sc.makeRDD(List("a","b","c"))
    val e=rdd1.cartesian(rdd2)
    e.collect.foreach(println)
  }
}

结果展示:

在这里插入图片描述



Java版本

package nj.zb.sparkstu;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class DistinctJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //cartesian
            JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("1", "2", "3"));
            JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("a", "b", "c"));
            JavaPairRDD<String, String> cartesianRdd = rdd1.cartesian(rdd2);
            List<Tuple2<String, String>> collect = cartesianRdd.collect();
        for (Tuple2 tp2 :
                collect) {
            System.out.println(tp2);
        }
        }
    }

结果展示:

在这里插入图片描述



版权声明:本文为dsjia2970727原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。