本地模式和集群模式运行spark程序

  • Post author:
  • Post category:其他




一、本地模式运行spark程序

spark版本如下:

在这里插入图片描述

1、

centos7安装idea

,激活码到链接要求的网址左上角idea教程那边,按提示才能领取,不能直接领取。

2、

创建maven工程

,并修改文件类型。

在这里插入图片描述


maven中创建第一个Scala

,主要是下载插件的方式。

必须要先添加scala sdk

在这里插入图片描述

在这里插入图片描述

如果没有要先去下载,有就直接选择

在这里插入图片描述


配置好spark对应版本的Scala

注意:pom.xml里的配置文件需要和你idea导入的scala sdk的版本相对于,并且hadoop和spark的版本也要对应,spark启动的Scala版本与idea导入的scala版本的相对应,比如spark启动为2.12.x,idea导入的也应该是2.12的版本。

在这里插入图片描述

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sparkWord</groupId>
    <artifactId>sparkWordSpace</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.3.2</spark.version>
    </properties>
    <dependencies>
        <!--Scala-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!--Spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--Hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>

</project>

WordCount.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf对象,设置appName和Master地址
    val sparkconf = new SparkConf().setAppName("WordCount").setMaster("local[2]").set("spark.testing.memory","2147480000")
    //2.创建SparkContext对象,它是所有任务计算的源头,
    // 它会创建DAGScheduler和TaskScheduler
    val sparkContext = new SparkContext(sparkconf)
    //3.读取数据文件,RDD可以简单的理解为是一个集合
    // 集合中存放的元素是String类型
    val data : RDD[String] = sparkContext.textFile("/root/words.txt")
    //4.切分每一行,获取所有的单词
    val words :RDD[String] = data.flatMap(_.split(" "))
    //5.每个单词记为1,转换为(单词,1)
    val wordAndOne : RDD[(String,Int)] = words.map(x=>(x,1))
    //6.相同单词汇总,前一个下划线表示累加数据,后一个下划线表示新数据
    val result : RDD[(String,Int)] = wordAndOne.reduceByKey(_+_)
    //7.收集打印结果数据
    val finalResult :Array[(String,Int)] = result.collect()
    println(finalResult.toBuffer)
    //8.关闭sparkContext对象
    sparkContext.stop()
  }
}

在这里插入图片描述



二、集群模式运行spark程序

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sparkWord</groupId>
    <artifactId>sparkWordSpace</artifactId>
    <version>1.0-SNAPSHOT</version>

     <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
          <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
                <configuration>
                  <args>
             <arg>-dependencyfile</arg>
             <arg>${project.build.directory}/.scala_dependencies</arg>
                  </args>
                </configuration>
              </execution>
            </executions>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>shade</goal>
                </goals>
                <configuration>
                  <filters>
                    <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                        <exclude>META-INF/*.SF</exclude>
                        <exclude>META-INF/*.DSA</exclude>
                        <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                    </filter>
                  </filters>
                  <transformers>
                    <transformer implementation=
     "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass></mainClass>
                    </transformer>
                  </transformers>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>



     <!--设置依赖版本号-->
       <properties>
         <scala.version>2.11.8</scala.version>
         <hadoop.version>2.7.4</hadoop.version>
         <spark.version>2.3.2</spark.version>
       </properties>
       <dependencies>
         <!--Scala-->
         <dependency>
           <groupId>org.scala-lang</groupId>
           <artifactId>scala-library</artifactId>
           <version>${scala.version}</version>
         </dependency>
         <!--Spark-->
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.11</artifactId>
           <version>${spark.version}</version>
         </dependency>
         <!--Hadoop-->
         <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-client</artifactId>
           <version>${hadoop.version}</version>
         </dependency>
       </dependencies>
</project>

WordCount_Online.scala

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
  //编写单词计数程序,打成Jar包,提交到集群中运行
  object WordCount_Online {
  def main(args: Array[String]): Unit = {
      //1.创建SparkConf对象,设置appName
      val sparkconf = new SparkConf().setAppName("WordCount_Online")
      //2.创建SparkContext对象,它是所有任务计算的源头
      //它会创建DAGScheduler和TaskScheduler
      val sparkContext = new SparkContext(sparkconf)
      //3.读取数据文件,RDD可以简单的理解为是一个集合,存放的元素是String类型
      val data : RDD[String] = sparkContext.textFile(args(0))
       //4.切分每一行,获取所有的单词
      val words :RDD[String] = data.flatMap(_.split(" "))
      //5.每个单词记为1,转换为(单词,1)
      val wordAndOne :RDD[(String, Int)] = words.map(x =>(x,1))
      //6.相同单词汇总,前一个下划线表示累加数据,后一个下划线表示新数据
      val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
      //7.把结果数据保存到HDFS上
      result.saveAsTextFile(args(1))
      //8.关闭sparkContext对象
      sparkContext.stop()
       }
   }

通过使用Maven Projects,双击package选修,打成jar包。

在这里插入图片描述

在这里插入图片描述

该项目下会生成target文件夹

在这里插入图片描述

启动zookeeper、hadoop、spark集群运行

最后运行生成的sparkWordSpace-1.0-SNAPSHOT.jar包

bin/spark-submit --master spark://hadoop01:7077 \
--class WordCount_Online \
--executor-memory 1g \
--total-executor-cores 1 \
/root/sparkWordCountSpace/target/sparkWordSpace-1.0-SNAPSHOT.jar \
/spark/test/words.txt \
/spark/test/out

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述



版权声明:本文为weixin_44692890原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。