Spark RDD算子(三)distinct、union、intersection、subtract、cartesian
distinct
- distinct用于去重,生成的RDD可能有重复的元素,使用distinct方法可以去除重复的元素,不过此方法涉及混洗,操作开销很大
Scala版本
package nj.zb.sparkstu
import org.apache.spark.{SparkConf, SparkContext}
object DistinctScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
val sc = new SparkContext(conf)
val distinctRdd = sc.makeRDD(List("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd"))
val a=distinctRdd.distinct()
a.collect.foreach(println)
}
}
结果展示:
Java版本
package nj.zb.sparkstu;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class DistinctJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> strings = Arrays.asList("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd");
//distinct
JavaRDD<String> strRdd = sc.parallelize(strings);
JavaRDD<String> distinctRdd = strRdd.distinct();
List<String> collect = distinctRdd.collect();
for (String str :
collect) {
System.out.println(str);
}
}
}
结果展示:
union
- RDD1.union(RDD2)用于两个RDD进行合并
Scala版本
package nj.zb.sparkstu
import org.apache.spark.{SparkConf, SparkContext}
object DistinctScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
val sc = new SparkContext(conf)
val distinctRdd = sc.makeRDD(List("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd"))
val unionRdd=sc.makeRDD(List("aa","bb","cc"))
val b=unionRdd.union(distinctRdd)
b.collect.foreach(println)
}
}
结果展示:
Java版本
package nj.zb.sparkstu;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class DistinctJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> strings = Arrays.asList("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd");
JavaRDD<String> strRdd = sc.parallelize(strings);
//union
List<String> strings1 = new ArrayList<String>();
strings1.add("aa");
strings1.add("bb");
strings1.add("cc");
JavaRDD<String> strRdd2 = sc.parallelize(strings1);
JavaRDD<String> unionRdd = strRdd.union(strRdd2);
List<String> collect = unionRdd.collect();
for (String str :
collect) {
System.out.println(str);
}
}
}
结果展示:
intersection
- RDD1.intersection(RDD2)返回两个RDD的交集,并且去重
- intersection需要混洗数据,比较浪费性能
Scala版本
package nj.zb.sparkstu
import org.apache.spark.{SparkConf, SparkContext}
object DistinctScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
val sc = new SparkContext(conf)
val distinctRdd = sc.makeRDD(List("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd"))
val unionRdd=sc.makeRDD(List("aa","bb","cc"))
val c=unionRdd.intersection(distinctRdd)
c.collect.foreach(println)
}
}
结果展示:
Java版本
package nj.zb.sparkstu;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class DistinctJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> strings = Arrays.asList("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd");
JavaRDD<String> strRdd = sc.parallelize(strings);
List<String> strings1 = new ArrayList<String>();
strings1.add("aa");
strings1.add("bb");
strings1.add("cc");
JavaRDD<String> strRdd2 = sc.parallelize(strings1);
JavaRDD<String> intersectionRdd = strRdd.intersection(strRdd2);
List<String> collect = intersectionRdd.collect();
for (String str :
collect) {
System.out.println(str);
}
}
}
结果展示:
subtract
- RDD1.subtract(RDD2)返回在RDD1中出现,但是不在RDD2中出现的元素,不去重
Scala版本
package nj.zb.sparkstu
import org.apache.spark.{SparkConf, SparkContext}
object DistinctScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
val sc = new SparkContext(conf)
val distinctRdd = sc.makeRDD(List("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd"))
val unionRdd=sc.makeRDD(List("aa","bb","cc"))
val d=distinctRdd.subtract(unionRdd)
d.collect.foreach(println)
}
}
结果展示:
Java版本
package nj.zb.sparkstu;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class DistinctJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> strings = Arrays.asList("aa", "bb", "aa", "bb", "cc", "cc", "dd", "dd");
JavaRDD<String> strRdd = sc.parallelize(strings);
List<String> strings1 = new ArrayList<String>();
strings1.add("aa");
strings1.add("bb");
strings1.add("cc");
JavaRDD<String> strRdd2 = sc.parallelize(strings1);
JavaRDD<String> subtractRdd = strRdd.subtract(strRdd2);
List<String> collect = subtractRdd.collect();
for (String str :
collect) {
System.out.println(str);
}
}
}
结果展示:
cartesian
- RDD1.cartesian(RDD2)返回RDD1和RDD2的笛卡尔积,这个开销超级大,慎用
Scala版本
package nj.zb.sparkstu
import org.apache.spark.{SparkConf, SparkContext}
object DistinctScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("distinctscala")
val sc = new SparkContext(conf)
val rdd1=sc.makeRDD(List("1","2","3"))
val rdd2=sc.makeRDD(List("a","b","c"))
val e=rdd1.cartesian(rdd2)
e.collect.foreach(println)
}
}
结果展示:
Java版本
package nj.zb.sparkstu;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class DistinctJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("distinctJava");
JavaSparkContext sc = new JavaSparkContext(conf);
//cartesian
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("1", "2", "3"));
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("a", "b", "c"));
JavaPairRDD<String, String> cartesianRdd = rdd1.cartesian(rdd2);
List<Tuple2<String, String>> collect = cartesianRdd.collect();
for (Tuple2 tp2 :
collect) {
System.out.println(tp2);
}
}
}
结果展示:
版权声明:本文为dsjia2970727原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。