spark架构体系
master负责管理,worker负责管理当前节,点启动Master,然后内部会启动一个定时器,定期检测超时的worker,移除超时的Worker 启动Worker,跟Master建立网络链接,将自己的信息(id,内存,cou等信息)注册给Master,Master接受到消息后会将Worker的信息保存起来,保存到内存,也可以持久化到磁盘,然后向Worker发送注册成功的消息Worker接受到了Master发送注册成功的消息,然后启动一个定时器,定期向Master’发送心跳(发送心跳的目的是为了报活),SparkSubmit(Dirver)这个客户端向Master申请资源,Master会看那些Worker符合条件(因为Worker向Master注册的所以会保存Worker的信息),Master会把符合条件的worker筛选出来(在Master内部完成),接下来就会和对应的Worker进行RPC通信Worker就会启动一个新的进程叫Executor,这个Executor以后就会运行真正的计算任务,Executer启动之后会向SparkSubmit反向注册(他是怎么知道Dirver在哪的嫩因为SparkSubmit(Dirver)跟Master通信在和Worker通信在和Executor通信就传过来了),这时候SparkSubmit(Dirver)要是做大量的准备工作task在这个时候就生成了就可以通过网络传给Excutor(也就是计算逻辑在SparkSubmit(Dirver)生成在传给Executor)
Spark中的重要角色
Master :是一个Java进程,接收Worker的注册信息和心跳、移除异常超时的Worker、接收客户端提交的任务、负责资源调度、命令Worker启动Executor。
Worker :是一个Java进程,负责管理当前节点的资源管理,向Master注册并定期发送心跳,负责启动Executor、并监控Executor的状态。
SparkSubmit :是一个Java进程,负责向Master提交任务。
Driver :是很多类的统称,可以认为SparkContext就是Driver,client模式Driver运行在SparkSubmit进程中,cluster模式单独运行在一个进程中,负责将用户编写的代码转成Tasks,然后调度到Executor中执行,并监控Task的状态和执行进度。
Executor :是一个Java进程,负责执行Driver端生成的Task,将Task放入线程中运行。
启动spark Application
./start-all.sh
/opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-shell --master spark://linux01:7077 --executor-memory 1g --total-executor-cores 3
sc 叫 Spark Context (spark上下文信息)只有他才可以生成RDD
sc.textFile("") 是Spark Context上的一个方法用来指定以后从哪里读数据(好比是创建最原始的大集合)
val lines = sc.textFile("hdfs://linux01:8020/4.txt") 比如说从hdfs上读就这么写
(神奇的大集合)RDD简介
RDD并不是一个真正装数据的集合,而是装的描述信息,以后真正干活的是task(task是一个类的实例,黎明装的是属性和方法,new这个类就可以new出来就可以读数据运算数据)
RDD一般是值弹性的分布式数据集,(本质是一个泛型的数据对象,可以理解为数据容器,本身是一个符合类型的数据结构)RDD中并不装真正要计算的数据,而装的是描述信息,描述以后从哪里读取数据,调用了用什么方法,传入了什么函数,以及依赖关系等。
val lines = sc.textFile("hdfs://linux01:8020/4.txt") 指定一个文件的路径
lines.partitions.length 看有几个分区
val words = lines.flatMap(_.split(" ")) 这就是任务主体
words.collect 提交任务
使用spark编写wordcount(就是从hdfs上读再写到hdfs上)
/opt/apps/spark-3.0.1-bin-hadoop3.2/bin/spark-submit --master spark://linux01:7077 --class cn._51doit.demo01.Word.Count /data/spark-1.0-SNAPSHOT.jar hdfs://linux01:8020/4.txt hdfs://linux01:8020/out
这里是放再linux上运行的jar包的命令,有几个路径要改spark的路径,文件的jar路径,文件再hdfs上的路径,文件存到hdfs上的路径
package cn._51doit.demo01.Word
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Counts {
def main(args: Array[String]): Unit = {
//创建SparkContext,只有使用SparkContext才可以向集群申请资源,才可以创建RDD
val conuts = new SparkConf().setAppName("Conuts")
val sc = new SparkContext(conuts)
//第一步创建RDD:指定[以后]从HDFS中读取数据创建RDD
//读取数据
val lines: RDD[String] = sc.textFile(args(0))
//对数据进行亚平
val words = lines.flatMap(_.split(" "))
//将单词和1组合
val value = words.map((_, 1))
//聚合(优点:先在分区内局部聚合,在全局聚合)
val value1 = value.reduceByKey(_ + _)
//排序
val value2 = value1.sortBy(_._2, false)
//将计算好的结果保存到HDFS中
val unit = value2.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}
使用Java编写worldcount
package cn._51doit.day01;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class JavaWordCount {
public static void main(String[] args) {
//创建SparkContext,只有使用SparkContext才可以向集群申请资源才可以创建RDD
SparkConf cont = new SparkConf().setAppName("JavaWordCount");
//使用JavaSparkContext是对SparkContext的包装
JavaSparkContext sc = new JavaSparkContext(cont);
//使用JavaSparkContext就可以创建JavaRDD
//JavaRDD包装了RDD
JavaRDD<String> lines = sc.textFile(args[0]);
//切分亚平
JavaRDD<String> work = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.stream(s.split("")).iterator();
}
});
//将一和单词组合
JavaPairRDD<String, Integer> wordAdOne = work.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return Tuple2.apply(s, 1);
}
});
//聚合
JavaPairRDD<String, Integer> reduced = wordAdOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//交换顺序
JavaPairRDD<Integer, String> reducedd = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.swap();
}
});
//排序
JavaPairRDD<Integer, String> s = reducedd.sortByKey(false);
//再交换回来
JavaPairRDD<String, Integer> n = s.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2.swap();
}
});
//保存到HDFS上
n.saveAsTextFile(args[1]);
//释放资源
sc.stop();
}
}
用lambda表达式写
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class JavaWordCount2 {
public static void main(String[] args) {
//创建SparkContext只有使用SparkContext才可以向集群申请资源才可以创建RDD
SparkConf conf = new SparkConf().setAppName("JavaWordCount2");
//使用JavaSparkContext是对SparkContext的包装
JavaSparkContext jsc = new JavaSparkContext(conf);
//创建RDD
JavaRDD<String> lines = jsc.textFile(args[0]);
//亚平切分
JavaRDD<String> words = lines.flatMap(i -> Arrays.stream(i.split(" ")).iterator());
//和1拼接
JavaPairRDD<String, Integer> wordAdOne = words.mapToPair(i -> new Tuple2<>(i, 1));
//聚合
JavaPairRDD<String, Integer> reduced = wordAdOne.reduceByKey((a, b) -> a + b);
//交换k,v 因为不能根据v排序所以好把v变成k
JavaPairRDD<Integer, String> reduced2 = reduced.mapToPair(i -> i.swap());
//排序
JavaPairRDD<Integer, String> sor = reduced2.sortByKey(false);
//再把k,v的位置交换回来
JavaPairRDD<String, Integer> resout = sor.mapToPair(i -> i.swap());
//上传到hdfs上
resout.saveAsTextFile(args[1]);
//关闭资源
jsc.stop();
}
}
在本地上跑
System.setProperty("HADOOP_USER_NAME","root")
//创建SparkContext,只有使用SparkContext才可以向集群申请资源,才可以创建RDD
val conuts = new SparkConf().setAppName("Conuts").setMaster("local[*]")
val sc = new SparkContext(conuts)
//第一步创建RDD:指定[以后]从HDFS中读取数据创建RDD
//读取数据
val lines: RDD[String] = sc.textFile(args(0))
跟上面对比的话就是多了一个setMaster("local[*]")local[*]就是本地的意思
如果放在hdfs上跑就是加一个伪装System.setProperty("HADOOP_USER_NAME","root")
固定格式伪装成root权限