spark

  • Post author:
  • Post category:其他




spark架构体系

master负责管理,worker负责管理当前节,点启动Master,然后内部会启动一个定时器,定期检测超时的worker,移除超时的Worker 启动Worker,跟Master建立网络链接,将自己的信息(id,内存,cou等信息)注册给Master,Master接受到消息后会将Worker的信息保存起来,保存到内存,也可以持久化到磁盘,然后向Worker发送注册成功的消息Worker接受到了Master发送注册成功的消息,然后启动一个定时器,定期向Master’发送心跳(发送心跳的目的是为了报活),SparkSubmit(Dirver)这个客户端向Master申请资源,Master会看那些Worker符合条件(因为Worker向Master注册的所以会保存Worker的信息),Master会把符合条件的worker筛选出来(在Master内部完成),接下来就会和对应的Worker进行RPC通信Worker就会启动一个新的进程叫Executor,这个Executor以后就会运行真正的计算任务,Executer启动之后会向SparkSubmit反向注册(他是怎么知道Dirver在哪的嫩因为SparkSubmit(Dirver)跟Master通信在和Worker通信在和Executor通信就传过来了),这时候SparkSubmit(Dirver)要是做大量的准备工作task在这个时候就生成了就可以通过网络传给Excutor(也就是计算逻辑在SparkSubmit(Dirver)生成在传给Executor)
在这里插入图片描述



Spark中的重要角色

Master :是一个Java进程,接收Worker的注册信息和心跳、移除异常超时的Worker、接收客户端提交的任务、负责资源调度、命令Worker启动Executor。

Worker :是一个Java进程,负责管理当前节点的资源管理,向Master注册并定期发送心跳,负责启动Executor、并监控Executor的状态。

SparkSubmit :是一个Java进程,负责向Master提交任务。

Driver :是很多类的统称,可以认为SparkContext就是Driver,client模式Driver运行在SparkSubmit进程中,cluster模式单独运行在一个进程中,负责将用户编写的代码转成Tasks,然后调度到Executor中执行,并监控Task的状态和执行进度。

Executor :是一个Java进程,负责执行Driver端生成的Task,将Task放入线程中运行。

在这里插入图片描述



启动spark Application

./start-all.sh

/opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-shell --master spark://linux01:7077 --executor-memory 1g --total-executor-cores 3
sc     叫  Spark Context (spark上下文信息)只有他才可以生成RDD
sc.textFile("")    是Spark Context上的一个方法用来指定以后从哪里读数据(好比是创建最原始的大集合)
 val lines = sc.textFile("hdfs://linux01:8020/4.txt")  比如说从hdfs上读就这么写



(神奇的大集合)RDD简介

RDD并不是一个真正装数据的集合,而是装的描述信息,以后真正干活的是task(task是一个类的实例,黎明装的是属性和方法,new这个类就可以new出来就可以读数据运算数据)

RDD一般是值弹性的分布式数据集,(本质是一个泛型的数据对象,可以理解为数据容器,本身是一个符合类型的数据结构)RDD中并不装真正要计算的数据,而装的是描述信息,描述以后从哪里读取数据,调用了用什么方法,传入了什么函数,以及依赖关系等。

 val lines = sc.textFile("hdfs://linux01:8020/4.txt")  指定一个文件的路径      
lines.partitions.length                         看有几个分区
val words = lines.flatMap(_.split(" "))         这就是任务主体
words.collect                                    提交任务   



使用spark编写wordcount(就是从hdfs上读再写到hdfs上)

/opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-submit --master spark://linux01:7077 --class cn._51doit.demo01.Word.Count /data/spark-1.0-SNAPSHOT.jar hdfs://linux01:8020/4.txt hdfs://linux01:8020/out
这里是放再linux上运行的jar包的命令,有几个路径要改spark的路径,文件的jar路径,文件再hdfs上的路径,文件存到hdfs上的路径
package cn._51doit.demo01.Word

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Counts {
  def main(args: Array[String]): Unit = {
    //创建SparkContext,只有使用SparkContext才可以向集群申请资源,才可以创建RDD
    val conuts = new SparkConf().setAppName("Conuts")
    val sc = new SparkContext(conuts)

    //第一步创建RDD:指定[以后]从HDFS中读取数据创建RDD
    //读取数据
    val lines: RDD[String] = sc.textFile(args(0))

    //对数据进行亚平
    val words = lines.flatMap(_.split(" "))

    //将单词和1组合
    val value = words.map((_, 1))

    //聚合(优点:先在分区内局部聚合,在全局聚合)
    val value1 = value.reduceByKey(_ + _)

    //排序
    val value2 = value1.sortBy(_._2, false)

    //将计算好的结果保存到HDFS中
    val unit = value2.saveAsTextFile(args(1))

    //释放资源
    sc.stop()
  }
}



使用Java编写worldcount

package cn._51doit.day01;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;


public class JavaWordCount {
    public static void main(String[] args) {
        //创建SparkContext,只有使用SparkContext才可以向集群申请资源才可以创建RDD
        SparkConf cont = new SparkConf().setAppName("JavaWordCount");
        //使用JavaSparkContext是对SparkContext的包装
        JavaSparkContext sc = new JavaSparkContext(cont);

        //使用JavaSparkContext就可以创建JavaRDD
        //JavaRDD包装了RDD
        JavaRDD<String> lines = sc.textFile(args[0]);

        //切分亚平
        JavaRDD<String> work = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split("")).iterator();
            }
        });

        //将一和单词组合
        JavaPairRDD<String, Integer> wordAdOne = work.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return Tuple2.apply(s, 1);
            }
        });

        //聚合
        JavaPairRDD<String, Integer> reduced = wordAdOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //交换顺序
        JavaPairRDD<Integer, String> reducedd = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.swap();
            }
        });

        //排序
        JavaPairRDD<Integer, String> s = reducedd.sortByKey(false);

        //再交换回来
        JavaPairRDD<String, Integer> n = s.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                return integerStringTuple2.swap();
            }
        });

        //保存到HDFS上
        n.saveAsTextFile(args[1]);
        //释放资源
        sc.stop();
  }     
    }



用lambda表达式写

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;

public class JavaWordCount2 {
    public static void main(String[] args) {
        //创建SparkContext只有使用SparkContext才可以向集群申请资源才可以创建RDD
        SparkConf conf = new SparkConf().setAppName("JavaWordCount2");
        //使用JavaSparkContext是对SparkContext的包装
        JavaSparkContext jsc = new JavaSparkContext(conf);

        //创建RDD
        JavaRDD<String> lines = jsc.textFile(args[0]);

        //亚平切分
        JavaRDD<String> words = lines.flatMap(i -> Arrays.stream(i.split(" ")).iterator());

        //和1拼接
        JavaPairRDD<String, Integer> wordAdOne = words.mapToPair(i -> new Tuple2<>(i, 1));

        //聚合
        JavaPairRDD<String, Integer> reduced = wordAdOne.reduceByKey((a, b) -> a + b);

        //交换k,v 因为不能根据v排序所以好把v变成k
        JavaPairRDD<Integer, String> reduced2 = reduced.mapToPair(i -> i.swap());

        //排序
        JavaPairRDD<Integer, String> sor = reduced2.sortByKey(false);

        //再把k,v的位置交换回来
        JavaPairRDD<String, Integer> resout = sor.mapToPair(i -> i.swap());

        //上传到hdfs上
        resout.saveAsTextFile(args[1]);

        //关闭资源
        jsc.stop();
    }
}



在本地上跑

 System.setProperty("HADOOP_USER_NAME","root")
    //创建SparkContext,只有使用SparkContext才可以向集群申请资源,才可以创建RDD
    val conuts = new SparkConf().setAppName("Conuts").setMaster("local[*]")
    val sc = new SparkContext(conuts)

    //第一步创建RDD:指定[以后]从HDFS中读取数据创建RDD
    //读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    跟上面对比的话就是多了一个setMaster("local[*]")local[*]就是本地的意思
    如果放在hdfs上跑就是加一个伪装System.setProperty("HADOOP_USER_NAME","root")
    固定格式伪装成root权限



版权声明:本文为m0_52106226原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。