一、本地模式运行spark程序
spark版本如下:
1、
centos7安装idea
,激活码到链接要求的网址左上角idea教程那边,按提示才能领取,不能直接领取。
2、
创建maven工程
,并修改文件类型。
maven中创建第一个Scala
,主要是下载插件的方式。
必须要先添加scala sdk
如果没有要先去下载,有就直接选择
注意:pom.xml里的配置文件需要和你idea导入的scala sdk的版本相对于,并且hadoop和spark的版本也要对应,spark启动的Scala版本与idea导入的scala版本的相对应,比如spark启动为2.12.x,idea导入的也应该是2.12的版本。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sparkWord</groupId>
<artifactId>sparkWordSpace</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.4</hadoop.version>
<spark.version>2.3.2</spark.version>
</properties>
<dependencies>
<!--Scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>
WordCount.scala
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkConf对象,设置appName和Master地址
val sparkconf = new SparkConf().setAppName("WordCount").setMaster("local[2]").set("spark.testing.memory","2147480000")
//2.创建SparkContext对象,它是所有任务计算的源头,
// 它会创建DAGScheduler和TaskScheduler
val sparkContext = new SparkContext(sparkconf)
//3.读取数据文件,RDD可以简单的理解为是一个集合
// 集合中存放的元素是String类型
val data : RDD[String] = sparkContext.textFile("/root/words.txt")
//4.切分每一行,获取所有的单词
val words :RDD[String] = data.flatMap(_.split(" "))
//5.每个单词记为1,转换为(单词,1)
val wordAndOne : RDD[(String,Int)] = words.map(x=>(x,1))
//6.相同单词汇总,前一个下划线表示累加数据,后一个下划线表示新数据
val result : RDD[(String,Int)] = wordAndOne.reduceByKey(_+_)
//7.收集打印结果数据
val finalResult :Array[(String,Int)] = result.collect()
println(finalResult.toBuffer)
//8.关闭sparkContext对象
sparkContext.stop()
}
}
二、集群模式运行spark程序
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sparkWord</groupId>
<artifactId>sparkWordSpace</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation=
"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!--设置依赖版本号-->
<properties>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.4</hadoop.version>
<spark.version>2.3.2</spark.version>
</properties>
<dependencies>
<!--Scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>
WordCount_Online.scala
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//编写单词计数程序,打成Jar包,提交到集群中运行
object WordCount_Online {
def main(args: Array[String]): Unit = {
//1.创建SparkConf对象,设置appName
val sparkconf = new SparkConf().setAppName("WordCount_Online")
//2.创建SparkContext对象,它是所有任务计算的源头
//它会创建DAGScheduler和TaskScheduler
val sparkContext = new SparkContext(sparkconf)
//3.读取数据文件,RDD可以简单的理解为是一个集合,存放的元素是String类型
val data : RDD[String] = sparkContext.textFile(args(0))
//4.切分每一行,获取所有的单词
val words :RDD[String] = data.flatMap(_.split(" "))
//5.每个单词记为1,转换为(单词,1)
val wordAndOne :RDD[(String, Int)] = words.map(x =>(x,1))
//6.相同单词汇总,前一个下划线表示累加数据,后一个下划线表示新数据
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//7.把结果数据保存到HDFS上
result.saveAsTextFile(args(1))
//8.关闭sparkContext对象
sparkContext.stop()
}
}
通过使用Maven Projects,双击package选修,打成jar包。
该项目下会生成target文件夹
启动zookeeper、hadoop、spark集群运行
最后运行生成的sparkWordSpace-1.0-SNAPSHOT.jar包
bin/spark-submit --master spark://hadoop01:7077 \
--class WordCount_Online \
--executor-memory 1g \
--total-executor-cores 1 \
/root/sparkWordCountSpace/target/sparkWordSpace-1.0-SNAPSHOT.jar \
/spark/test/words.txt \
/spark/test/out