Spark相关个人笔记

  • Post author:
  • Post category:其他





Scala



基础

  1. 变量声明:
val str="abc"
val str:String = "abc"
  1. 列表:
val list: List[Any] = List("abc",1,true)
  1. 数据类型转换顺序:

    byte–>short–>int–>long–>float–>double

    char–>int
  2. 条件语句:
val x = if(i>0) 100 else 0
  1. 块表达式:
val result={
  val a=1
  val b=2
  a+b
}
  1. for循环:
for(i<-1 to 5) println(i) //1,2,3,4,5
for(i<-1 until 5) println(i) //1,2,3,4
//嵌套for
for(i<-1 to 5;j<-1 to 4) println(j)
while循环:
while(i<5){
  println(i)
  i=i+1
}
  1. 方法
def method(a:Int,b:Int) : Int={
  return a+b  //return可省略
}
def method(a:Int,b:Int) : Unit={
  println(100)
}

//传入多个类型相同参数
def method(a:String,b:Int*)={
  println(b)
  var res=0
  for(i<-b) res=res+i
  a+res
}
method("abc",1,2,3) //array[1,2,3] 结果:abc6
method("abc",arr:_*) //传入的本身就是数组
  1. 函数
(a:Int,b:Int) => a+b
val f1=(a:Int,b:Int) => {a+b}
f1(1,2)

*函数可以作为参数传入方法

方法–>函数:

def m2(x:Int,y:Int) = x+y //定义方法2
def m1(f:(Int,Int)=>Int):Int={
  f(2,6)
} //定义方法1
val res=m1(m2) //传入的参数为方法m2,此时m2自动变为函数

val f=m2 _ //手动转换



数组

val arr=Array(1,2,3)
val arr=new Array[Int](3) //定长数组
val arr = new ArrayBuffer[Int]() //变长数组,需要import scala.collection.mutable.ArrayBuffer

数组基本操作:

遍历数组:for(i<-arr)
添加元素:arr+=1
删除值元素:arr-=1 //一次删除一个,从前向后
合并数组:arr1++=arr2
插入元素:arr.insert(0,1,2)  //第一个参数为index
删除位置元素:arr.remove(1,2) //第二个参数为删除的元素个数
常用方法:sum, max, min, sorted, sorted.reverse



列表


(默认使用不可变)

val nums: List[Int] = List(1,2,3,4)
头部添加:val nums1=nums.+:(1)
尾部添加:val nums2=nums:+1
合并:val nums3 = nums1++:nums2



Map

类似字典、哈希表

val mp = Map(
  "key1" -> "value1",
  "key2" -> "value2"
}
或者
val mp = Map(
  ("key1" , "value1"),
  ("key2" , "value2")
}

mp.keys.foreach{i=> ...}  //迭代
for((k,v)<-mp) //简易迭代



元组

val t=(1,"abc",2.1) //数据类型可以不同

取数:t._1
迭代:t.productIterator.foreach{i=> ...}
转换为字符串:t.toString()
元组最大长度22



集合

val set = Set(1,2,3)

增加元素:val set1 = set+1
删除元素:val set2 = set-1
连接:set1++set2

常用方法:set.head(头元素),set.tail(除头元素),set.isEmpty,set.max,min,set.&或intersect(set1) (交集)




class User{
  private var age=10
  def count(){
    age+=1
  }
}
new User()

//单例对象
object Person{
  private var name="abc"
  def showInfo():Unit={
    println(name)
  }
}

//方法调用
Person.showInfo()

*当类和单例对象名字相同时,称为伴生,可以互相访问私有成员




Spark RDD



创建

  1. 从对象集合创建:
//通过列表创建
val rdd=sc.parallelize(List(1,2,3))  
sc.makeRDD(List(1,2,3))  
  1. 从文件创建(按行拆分)
val rdd=sc.textFile("/home/word.txt")

*查看rdd:rdd.collect



转化算子

map:接受函数作为参数,应用于rdd每个元素,返回每个元素的值

val rdd2=rdd1.map(x=>x+1)
val rdd2=rdd1.map(_+1)

filter:对rdd每个元素进行过滤

val rdd2=rdd1.filter(>3)

flatMap:同map,返回的元素可变

val rdd2=rdd1.flatMap(_.split(" "))

reduceByKey:元组形式RDD,把相同的key通过传入的函数进行合并成一个值

val rdd2=rdd1.reduceByKey((x,y)=>x+y)

groupByKey:元组形式RDD,把相同的key通过传入的函数进行合并成一起

val rdd2=rdd1.groupByKey()

union:合并两个RDD

val rdd3=rdd1.union(rdd2)

sortBy:按照某个规则排序,第一个参数是按某个属性,第二个参数是升降序

val rdd2=rdd1.sortBy(x=>x._2,false) //降序

sortByKey:元组形式RDD按照key排序,默认升序

val rdd2 = rdd1.sortByKey(False)

join:元组形式RDD按照key连接,左外连接leftOuterJoin()左边rdd必定存在,连接的数据为(key, (v, Some(w))),未连接的数据为(key, (v, None)),右外连接同,全外连接fullOuterJoin()rdd1和rdd2记录全部存在,全部为Some类型

val rdd3 = rdd1.join(rdd2) //rdd1和rdd2顺序改变有区别

intersection:交集

val rdd3 = rdd1.intersection(rdd2)

distinct:去重

val rdd2 = rdd1.distinct

cogroup:元组形式RDD,根据key进行并集,(key,(iterable1,iterable2))

val rdd3 = rdd1.cogroup(rdd2)

*

所有转化算子并非立即运算,直到遇到行动算子才会执行



行动算子

reduce:通过传入的函数对rdd聚合

rdd1.reduce(_+_) //使用例子

count:统计rdd元素量

countByKey:元组形式RDD,统计相同key的数量,返回map

collect:返回元素

first:返回第一个元素

take:返回前n个元素数组

foreach:给每个元素执行传入的函数



RDD持久化

cache或persist



检查点

sc.setCheckpointDir("...")
rdd.checkpoint()

*

检查点之前也建议先持久化



import项

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
val conf = new SparkConf()
val sc = new SparkContext(conf)



Spark结构化数据(DataFrame)

SparkSession:SparkContext的封装,允许用户调用DataFrame和Dataset

//创建sparksession
val spark = SparkSession.builder().appName("...").master("...").getOrCreate()

DataFrame是元素类型为row的Dataset

*查看dataframe数据:d1.show(n)



Spark SQL 内部数据



RDD转DataFrame

  1. 元组法
val rdd2: RDD[(String,Int)] = rdd1.map(line => {
  val arr = line.split(",")
  (arr(0), arr(1).toInt)
})

import spark.implicits._
val df = rdd.toDF("name","age")
  1. 样例法
val rdd2: RDD[User] = rdd1.map(line => {
  val arr = line.split(",")
  User(arr(0), arr(1).toInt)
})
import spark.implicits._
val df = rdd.toDF()

//位于伴生对象外建立样例
case class User(name: String, age: Int) 
  1. 函数法
val rdd2: RDD[Row] = rdd1.map(line => {
val arr = line.split(",")
  Row(arr(0), arr(1).toInt)
})
val schema = StructType{
  List(
    StructField("name", DataTypes.StringType),
    StructField("age", DataTypes.IntegerType)
  )
}
val df = spark.createDataFrame(rdd2,schema)



Dataset转DataFrame(同RDD)

val d1 = spark.read.textFile("...") //首先读取Dataset
val d2 = d1.map(line=>{
  val fields = line.split(",")
  val row1 = fields(0).toInt
  val row2 = fields(1)
  ....
  Person(row1,row2,...)
})

case class Person(row1:Int, row2:String, ... )
import spark.implicits._
val pdf = d2.toDF()



通过执行SQL语句生成DataFrame

pdf.createTempView("v_person") //临时视图
val res = spark.sql("select * from v_person") //使用sql语句



Spark SQL 外部文件



加载外部文件(通用)

val df = spark.read.load("...")

//保存
df.write.save("...")



读取parquet文件

val DF = spark.read.parquet("...")

//schema合并操作(如需要)
val mergedDF = spark.read.option("mergeSchema","true").parquet("...")



读取json

//json内容:{"name":"zhangsan","age":32}
//         {"name":"lisi","age":20}
//         ......
val DF = spark.read.json("...")



写入Hive表

//开启hive
val spark = SparkSession.builder()
                .appName("Spark Hive Demo")
                .enableHiveSupport()
                .getOrCreate()
                
//使用sparksql创建hive表
spark.sql("CREATE TABLE IF NOT EXISTS students (name STRING, age INT)"+"ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'")

//使用sparksql导入数据入hive表
spark.sql("LOAD DATA LOCAL INPATH '...'"+"INTO TABLE students")

//将DataFrame转hive表
val DF = spark.table("students")
DF.write.mode(SaveMode.Overwrite).saveAsTable("...")



加载JDBC(mysql)

val DF = spark.read.format("jdbc")
                .option("url","jdbc:mysql://.....")
                .option("driver","com.mysql.jdbc.Driver")
                .option("dbtable","select *from student")
                .option("user","root")
                .option("password","1234")
                .load()



Spark SQL 函数

(只列举一些常用函数)

lower, select, where, filter, count, countDistinct, distinct, avg, max, min

groupBy:按类别分组,返回一个组关联dataset,通常可以和count或聚合函数一起使用,返回dataframe

agg:聚合函数,参数是新列生成方式

withColumn:添加列

自定义函数

val func = (:)=>{}
spark.udf.register("function_name",func)

开窗函数row_number()

row_number() over (partition by column_name order by column_name desc) new_column_name



SQL 相关(补)

  1. COALESCE(expression1,expression2,…)

    返回参数中第一个不是空值的expression
  2. TRIM(string)

    删除字符串头尾空格



import 项

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{...,udf}
val spark = SparkSession.builder().appName("...").master("...").getOrCreate()
import spark.implicits._ //数据转DataFrame格式需要
val df = spark.sql(...) //从hive表中直接导入DataFrame



Kafka



基本概念

数据单元:消息

独立的服务器节点:Broker

消息的类别:主题,通常分成多个分区,消息均匀分配(类似队列),允许消息超出一台服务器

生产者:前端产生的页面、服务器日志

消费者:hadoop集群、实时监控程序、数仓

zookeeper集群:管理协调broker

*

kafka的broker是生产者和消费者的中间件,生产者push,消费者pull(需要偏移值offset),消息通常保留新信息,当某个broker宕机时,其他broker会接替,生产者和消费者重新连接新broker



副本

领导者副本:只有一个,生产者和消费者都向领导者发请求

跟随者副本:复制领导者副本,保持一致(向领导者pull)

领导者崩溃,跟随者接替

领导者副本维护同步副本列表:ISR(在zookeeper中),如果跟随者没有跟上,会被移出ISR

消息写入时需要ISR所有副本同步完毕,才算写完,消费者才能读取数据

对于生产者来说,可以通过调节acks参数,如为1时领导者写入成功,生产者就认为写入成功



消费者组

同一消费组内不允许多个消费者消费同一分区,不同消费者组可以同时消费同一分区

在组内为队列模式,组间是广播模式



数据存储

每个分区一个文件夹,每个分区多个segment(包含多个消息,kafka数据存储最小单位)

segment分为索引文件和数据文件,索引文件指向数据文件消息的物理偏移地址(实际取内容用)

segment文件名字由20位数字组成,每个分区第一个文件为0,下一个文件为上一个文件最后一个消息的offset(定位消息用)




Spark Streaming



基本概念

接收实时输入数据流,以时间片为单位拆分成批数据(多个RDD,DStream),交给spark处理生成批结果(RDD序列)

输入DStream:输入数据流,会与一个Receiver对象关联,一个receiver占一个cpu内核

Executor进程:执行任务,所分配的cpu内核数必须大于receive



代码一般流程

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

//1. 创建StreamingContext
val conf = new SparkConf().setMaster("local[2]")
                .setAppName("...")
val ssc = new StreamingContext(conf, Seconds(1))   //批处理间隔1秒

//2. 创建DStream
val lines = ssc.socketTextStream("localhost", 9999)   //lines每条记录都是一行文本

//3. 算子操作(同RDD)
val words = lines.flatMap(_.split(" "))

//4. 启动
ssc.start()   //开始计算
ssc.awaitTermination()



数据源

  1. 文件
ssc.fileStream[keyclass, valueclass, formatclass]("...")
ssc.textFileStream("...")
  1. socket

    同上节
  2. RDD队列
val rddQueue = new mutable.Queue[RDD[int]]()
rddQueue += ssc.sparkContext.makeRDD(1 to 10)
rddQueue += ssc.sparkContext.makeRDD(20 to 30)
val inputDStream = ssc.queueStream(rddQueue)
  1. kafka等高级数据源

    在maven引入spark streaming依赖库、kafka第三方依赖库
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])



操作

  1. 无状态操作

    只计算当前时间批次:map, flatMap, filter, union, reduce, countByValue, join, reduceByKey, cogroup等
  2. 状态操作

    当前时间批次和历史时间批次累加计算:updateStateByKey
  3. 窗口操作

    在滑动窗口内进行操作:window(length, interval) 取滑动窗口所覆盖的DStream

    countByWindow(length, interval) 计算窗口内元素数量

    reduceByWindow(func, length, interval)
  4. 输出操作

    将DStream输出到外部

    print(), saveAsTextFiles, foreachRDD(func)



持久化和检查点

  1. 持久化

    同RDD
  2. 检查点:

    1)元数据检查点:定义流计算的信息,包括配置信息、DStream操作、未完成批次

    2)数据检查点:使用updateStateByKey等状态转换函数,必须提供检查点,定期将RDD存入可靠系统;driver从故障中恢复,也必须提供检查点

    *

    检查点默认时间间隔是批处理时间间隔倍数(5-10倍),至少10s

    dstream.checkpoint(interval)
ssc.checkpoint("...") //创建检查点

//从故障中恢复时,重新创建StreamContext
def func():StreamingContext={
  ......
  ssc.checkpoint("...")
  ssc
}
val context = StreamingContext.getOrCreate("...",func())



linux指令



文件属性

d:目录

-:文件

l:链接文件

rwx:可读可写可执行

第一组rwx:属主权限,文件所有者

第二组rwx:属组权限,文件所有者同组用户

第三组rwx:其他用户权限

例:drwxr-xr–



文件管理

ls:列出文件 -a全部包括隐藏文件,-d全部目录,-l详细属性

cd:切换目录 /根目录,./当前目录,…/相对路径,…上一个目录,~家目录

pwd:当前路径 [-P]确实路径

mkdir:创建目录 -p递归创建,-m配置权限(711:rwx–x–x)

rmdir:删除空目录 -p递归删除

cp:复制文件 -i询问覆盖,-p同时复制属性,-d复制连结,-r递归复制,-a同-pdr

rm:删除文件(谨慎) -i询问删除,-r递归删除,-f强制删除

mv:移动或更名文件 -i询问移动,-u更新,-f强制移动直接覆盖

man:help

ln:硬链接 -s软链接

cat:显示内容 -b显示行号不包括空行,-n显示行号包括空行

nl:显示内容与行号

more:翻页显示 space翻一页,enter滚动一行,/查找,q退出,b往回翻

rz:放入新文件

sz:提取文件

scp:与服务器互传文件(可用rz和sz)

*hadoop fs:hadoop文件管理命令



shell脚本相关



基本命令

vim filename:进入vim一般模式

一般中i:输入模式

esc:返回一般模式

一般中冒号:底线模式

底线中w:保存

底线中q:退出



编写内容

#!/bin/bash或#!/bin/sh开头

  1. test命令

    用途:检查语句成立,通常可以与if语句一起使用

    数值比较:等于-eq,不等于-ne,大于-gt,小于-lt

    字符串比较:等于=,不等于!=,长度为零-z,长度不为零-n

    文件检查:存在-e,存在且内容不为空-s,存在且为目录-d

    echo命令:输出字符串

    read:输入
  2. 条件语句
if condition
then
  command
else
  command
fi
#elseif语句相同写法


如果else中没有command就不写else

  1. 循环语句
#for循环
for ... in ... ... ...
do
  command
done

#while循环
while condition
do
  command
done
  1. case语句
case ... in ... ... ...
...)
  command
  ...
  ;;
...)
  command
  ...
  ;;
esac
  1. 函数
func(){
  command
  return ...
}



Git指令

  1. 本地指令(工作区到本地仓库)

    git init:初始化仓库

    git clone:拷贝仓库

    git add:添加文件至暂存区

    git status:查看仓库状态

    git commit:提交

    git reset:回退

    git rm:删除暂存区和工作区中的文件

    git mv:移动工作区的文件

    git log:查看历史提交
  2. 远程操作(远程仓库)

    git remote:操作

    git fetch:获取代码库

    git pull:下载远程代码

    git push:上传代码

常用git流程:

流程
(转自菜鸟教程)

*

一般提交流程:git add …—> git commit -m “…”—> git push

一般下载流程:git fetch

  1. 分支

    git branch:列出分支 -a列出本地和远程分支

    git merge:合并分支



版权声明:本文为weixin_44372699原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。