三个开发入门类
SparkConf
SparkConf
是Spark
的配置类,Spark
中的每一个组件都是直接或者间接地使用着它所存储的属性。
SparkContext
SparkContext
是通往Spark
集群的唯一入口,可以用来在Spark
集群中创建RDDs
、累加器变量和广播变量等。SparkContext
也是整个Spark
应用程序(Application
)中至关重要的一个对象,可以说是整个Application
运行调度的核心(不是指资源调度)。SparkContext
的核心作用是初始化Spark
应用程序运行所需要的核心组件,包括高层调度器(DAGScheduler
)、底层调度器(TaskScheduler
)和调度器的通信终端(SchedulerBackend
),同时还会负责Spark
程序向Master
注册程序等。- 备注:只可以有一个
SparkContext
实例运行在一个JVM
内存中,所以在创建新的SparkContext
实例前,必须调用stop
方法停止当前JVM
唯一运行的SparkContext
实例。
SparkSession
SparkSession
主要用在SparkSQL
中,当然也可以用在其他场合,他可以代替SparkContext
;SparkSession
实际上封装了SparkContext
,另外也封装了SparkConf
、sqlContext
- 备注:从这三个类所处位置我们可以发现,
SparkConf
和SparkContext
是位于org.apache.spark
包下,是一个通用的包;而SparkSession
位于org.apache.spark.sql
包下,是一个专用附加包。因此从这里我们可以看到,在使用SparkSQL
的时候尽量使用SparkSession
,如果发现有些API
不在SparkSession
中,也可以通过SparkSession
拿到SparkContext
和其他Context
等
spark-submit
作用
spark-submit
是在spark
安装目录中bin
目录下的一个 shell 脚本文件,用于在集群中启动应用程序(如*.jar、*.py 脚本等);对于spark
支持的集群模式,spark-submit
提交应用的时候有统一的接口,不用太多的设置。
参数
--master
- 该参数表示提交任务到哪里执行,常见的选项有:
local
:提交到本地服务器执行,并分配单个线程local[k]
:提交到本地服务器执行,并分配k
个线程local[*]
:提交到本地服务器执行,并分配本地core
个数个线程yarn
:提交到yarn
模式部署的集群中
--deploy-mode
spark on yarn
的两种启动方式,区别是spark
的driver
是在本地 (client
) 启动还是在yarn
的container
中启动,默认是client
- client:本地
driver
- cluster:远端
driver
- client:本地
--class
- 应用程序的主类,仅针对
java
或scala
应用
--name
- 指定应用程序的名称,在
yarn
调度系统下,只对cluster
模式生效
--jar
- 用逗号分隔的本地
jar
包,设置后,这些jar
将包含在driver
和executor
的 classpath 下。如果路径是个目录的话,--jars
的设置无法起作用,必须详细到abc.jar
。 - 备注:区别
spark-defaults.conf
配置文件中的spark.yarn.jars
--jars
:主要用于上传我们需要的第三方依赖spark.yarn.jars
:主要传入spark
环境相关的jar
包,例如spark.core
,spark.sql
--conf prop=value
- 指定
spark
配置属性的值,格式为PROP=VALUE
, 例如--conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"
--properties-file
- 指定需要额外加载的配置文件,用逗号分隔,如果不指定,默认为
conf/spark-defaults.conf
--driver-memory
和--driver-core
- 前者表示
driver
内存,默认1G
;后者表示driver
的核数,默认是1
。在yarn
或者standalone
下使用 - 建议:对于
driver memory
通常不用设置,若出现使用collect
算子将RDD
数据全部拉取到Driver
上处理,就必须确保该值足够大,否则OOM
内存溢出(如果设置了广播变量再设置大一点)
--num-executors
- 启动的
executor
数量,即该作业总共需要多少executor
进程执行,默认为2
。建议:每个作业运行一般设置5
,10
,20
个左右较合适。在yarn
下使用。
--executor-memory 和--executor-cores
executor-memory
:设置每个executor
进程的内存,num-executors * executor-memory
代表作业申请的总内存量(尽量不要超过最大总内存的1/3~1/2
)- 建议:设置 5G~10G 较合适
executor-cores
:每个executor
进程的CPU Core
数量,该参数决定每个executor
进程并行执行task
线程的能力,num-executors* executor-cores
代表作业申请总CPU core
数(不要超过总CPU Core
的 1/3~1/2 )- 建议:设置 2~4 个比较合适
--queue QUEUE_NAME
- 将任务提交给哪个
YARN
队列,默认为YARN
的默认队列
RDD
概念
RDD(Resilient Distributed Dataset)
叫做弹性分布式数据集,是Spark
中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合- 弹性之一:自动的进行内存和磁盘数据存储的切换;
- 弹性之二:基于 Lineage(血缘)的高效容错;
- 弹性之三:Task 如果失败会自动进行特定次数的重试;
- 弹性之四:Stage 如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
- 弹性之五:checkpoint 和 persist(持久化和检查点);
- 弹性之六:数据分片的高度弹性。
核心属性
- 分区列表:
RDD
数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性 - 分区计算函数:
Spark
在计算时,是使用分区函数对每一个分区进行计算 RDD
之间的依赖关系:RDD
是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD
建立依赖关系- 分区器(可选):当数据为
KV
类型数据时,可以通过设定分区器自定义数据的分区 - 首选位置(可选):计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
- Spark 运行时使用这 5 条信息来调度和执行用户数据的处理逻辑,其中每个属性的代码如下:
// RDD 中的依赖关系由一个 Seq 数据集来记录,使用 Seq 的原因是经常取第一个元素或者遍历。简记:获取每个 rdd 的依赖,即血缘
protected def getDependencies: Seq[Dependency[_]]
// 分区列表定义在一个数组中,这里使用 Array 的原因是随时使用下标来访问分区内容。计算分区信息,只会被调用一次,简记:获取分区信息
protected def getPartitions: Array[Partition]
// 接口定义,具体由子类实现,对输入的 RDD 分区进行计算,将某个分区的数据读成一个 Iterator。简记:获取分区信息之后对每个分区进行子计算
def compute(split: Partition, context: TaskContext): Iterator[T]
// 分区器,可选,子类可以重写以指定新的分区方式,Spark 支持 Hash 和 Range两种分区方式
@transient val partitioner: Option[Partitioner] = None
// 可选,每个 partition 数据驻留在集群中的位置,子类可以指定分区的位置,如 HadoopRDD 可以重写此方法,让分区尽可能与数据在相同的节点上
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
- 在
Spark RDD
中,locationPrefs
是指用于执行任务的节点的位置偏好设置。具体来说,它是一个Map
,其中键是RDD
的分区索引,值是一个序列,表示首选执行该分区任务的节点列表。这些位置偏好设置可以在Spark
调度任务时使用,以尽可能地在节点之间分配任务,以减少数据移动和网络延迟,并提高任务执行效率。例如,如果一个节点已经拥有某个分区的数据,那么最好将该分区的任务分配给该节点,以避免将数据移动到其他节点。按照“移动数据不如移动计算”的理念,Spark
在进行任务调度的时候,会尽可能地优先将计算任务分配到其所要处理的block
的存储位置
总结
RDD
可以看做是一系列的Partition
所组成的RDD
之间存在依赖关系- 算子是作用在
Partition
之上 - 分区器是作用在
kv
形式的RDD
上 Partition
提供是最佳计算位置,利于数据从本地化计算,移动计算不是移动数据
RDD
创建
-
在
Spark
中创建RDD
的方式分为四种:-
使用程序中的集合创建 RDD,主要用于进行测试
- 集合中的元素类型就是
RDD
的泛型类型。可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark
应用的流程。 - 从集合中创建
RDD
,Spark
主要提供了两中函数:parallelize
和makeRDD
。我们可以先看看这两个函数的声明:
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val rdd1 = sparkContext.parallelize( List(1,2,3,4) ) val rdd2 = sparkContext.makeRDD( List(1,2,3,4) ) rdd1.collect().foreach(println) rdd2.collect().foreach(println) sparkContext.stop()
- 从底层代码实现来讲,
makeRDD
方法就是parallelize
方法
def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) }
- 集合中的元素类型就是
-
从外部存储(文件)创建
RDD
- 由外部存储系统的数据集创建
RDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new SparkContext(sparkConf) val fileRDD: RDD[String] = sparkContext.textFile("input") fileRDD.collect().foreach(println) sparkContext.stop()
- 由外部存储系统的数据集创建
-
从其他
RDD
创建- 主要通过一个
RDD
运算完后,再产生新的RDD
- 主要通过一个
-
直接创建
RDD
- 使用
new
的方式直接构造RDD
,一般由Spark
框架自身使用
- 使用
-
RDD
并行度与分区
Spark任务中的相关的概念
- 什么是
job
Job
简单讲就是提交给spark
的任务,严格的说一个Action
算子就属于一个job
- 什么是
stage
Stage
是每一个job
处理过程要分为的几个阶段,stage
的分割线是看是否发生shuffle
,即有没有产生宽依赖,后面会详细的说明。
- 什么是
task
Task
是每一个job
处理过程要分为几次任务。Task
是任务运行的最小单位,一个stage
要处理的partition
对应一个task
,最终是要以task
为单位运行在Executor
中,task
是Executor
进程中的一个线程。
Spark
官网建议的Task
的设置原则是:设置Task
数目为num-executors * executor-cores
的 2~3 倍较为合适。- 一个
stage
的task
的数量是由谁来决定的?- 是由输入文件的切片个数来决定的。通过算子修改了某一个
rdd
的分区数量,task
数量也会同步修改。总结:一个分区对应一个task
- 是由输入文件的切片个数来决定的。通过算子修改了某一个
- 一个
job
任务的task
数量是由谁来决定的?- 一个
job
任务可以有一个或多个stage
,一个stage
又可以有一个或多个task
。所以一个job
的task
数量是 (stage
数量 *task
数量)的总和。
- 一个
- 每一个
stage
中的task
最大的并行度?- 并行度:是指指令并行执行的最大条数。在指令流水中,同时执行多条指令称为指令并行。
- 理论上:每一个
stage
下有多少的分区,就有多少的task
,task
的数量就是任务的最大的并行度。(一般情况下,一个task
运行的时候,使用一个cores
) - 实际上:最大的并行度,取决于任务运行时使用的
executor
拥有的core
的数量。
分区和并行度的关系
-
分区数:是一个相对静态的概念,这个值的初始大小由数据源的分布情况决定(如果是内存数据,分区数和设置的并行度一致),比如读取
hdfs
,此时有10
个block
块,那么你的分区数就是 10; -
并行度:是一个相对动态的概念,是根据当前计算引擎可用资源来动态决定的,它的值是小于等于分区数
-
总结:
- 分区数描述的是数据源,是个相对静态的概念
- 并行度描述的是计算引擎一次要同时处理的分区数,是根据资源情况临时决定的
- 并行度建议小于等于分区数
RDD
转换算子
-
transformation
操作会针对已有的RDD
创建一个新的RDD
。transformation
的特点就是lazy
特性。lazy
特性指的是,如果一个spark
应用中只定义了transformation
操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation
是不会触发spark
程序的执行的,它们只是记录了对RDD
所做的操作,但是不会自发的执行。只有当transformation
之后,接着执行了一个action
操作,那么所有的transformation
才会执行。Spark
通过这种lazy
特性,来进行底层的spark
应用执行的优化,避免产生过多中间结果。 -
RDD
根据数据处理方式的不同将算子整体上分为Value
类型、双Value
类型和Key-Value
类型
Value
类型
map
-
函数签名
def map[U: ClassTag](f: T => U): RDD[U]
-
函数说明
map
是对RDD
中的每个元素都执行一个指定函数来产生一个新的RDD
。任何原RDD
中的元素在新RDD
中都有且只有一个元素与之对应,将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
-
函数示例:
def main(args: Array[String]) { val conf = new SparkConf().setAppName("Map").setMaster("local") val sc =new SparkContext(conf) val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4)) val dataRDD1: RDD[Int] = dataRDD.map( num => {num * 2} ) val dataRDD2: RDD[String] = dataRDD1.map( num => { "" + num } ) }
mapPartitions
-
函数签名
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("mapPartition") val sc = new SparkContext(conf) val value: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9),1) val rest = value.mapPartitions{ iter => { var result = List[Int]() var even = 0 var odd = 0 while (iter.hasNext){ var value = iter.next() if(value % 2 == 0){ even += value // 2 +4 + 6 +8 = 20 // println(s"parID id ${TaskContext.get.partitionId()}, value is ${value}, even is ${even}") }else{ odd += value // 1+3+5+7+9 = 25 // println(s"parID id ${TaskContext.get.partitionId()}, value is ${value}, even is ${even}") } } result = result :+ even :+ odd // list(20,25) result.iterator } } rest.foreach(println)//20 25 }
-
函数说明
- 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
-
map
和mapPartitions
的区别Map
算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions
算子是以分区为单位进行批处理操作。
-
功能的角度
Map
算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions
算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
-
性能的角度
Map
算子因为类似于串行操作,所以性能比较低,而是mapPartitions
算子类似于批处理,所以性能较高。- 但是
mapPartitions
算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。
mapPartitionsWithIndex
-
函数签名
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
-
函数说明
- 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
-
函数示例:
def main(args: Array[String]) { val conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local") val sc =new SparkContext(conf) val rdd = sc.parallelize(1 to 8,3) rdd.mapPartitionsWithIndex{ //查看各个分区的数据的最好的 api (partid,iter)=>{ var part_map = scala.collection.mutable.Map[String,List[Int]]() var part_name = "part_" + partid part_map(part_name) = List[Int]() while(iter.hasNext){ part_map(part_name) :+= iter.next()//:+= 列表尾部追加元素 } part_map.iterator } }.collect }
flatMap
-
函数签名
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
-
函数说明
- 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
-
函数示例
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("flatMap") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List( List(1, 2), List(3, 4) )) val flatRDD = rdd.flatMap( list => { list//list是RDD中的一个元素,而list又是List类型,返回结果时会拆解为单个数字返回 } ) flatRDD.collect().foreach(println) sc.stop() }
glom
-
函数签名
def glom(): RDD[Array[T]]
-
函数说明
- 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("flatMap") val sc = new SparkContext(conf) val dataRDD = sc.makeRDD(List( 1,2,3,4),1) val glomRDD: RDD[Array[Int]] = dataRDD.glom() val mapRDD: RDD[Int] = glomRDD.map({ iter => iter.max }) println(mapRDD.sum()) }
groupBy
-
函数签名
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
-
函数说明
-
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
filter
-
函数签名
def filter(f: T => Boolean): RDD[T]
-
函数说明
- 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("mapPartition") val sc = new SparkContext(conf) val dataRDD = sc.makeRDD(List(1, 2, 3, 4), 1) dataRDD.filter(_ % 2 == 0).foreach(println) }
sample
-
函数签名
def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
-
函数说明
- 根据指定的规则从数据集中抽取数据
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("mapPartition") val sc = new SparkContext(conf) val dataRDD = sc.makeRDD(List( 1,2,3,4),1) // 抽取数据不放回(伯努利算法) // 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。 // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要 // 第一个参数:抽取的数据是否放回,false:不放回 // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取; 如果大于 1 或者小于零将会报错 // 第三个参数:随机数种子 val dataRDD1 = dataRDD.sample(false, 0.5) // 抽取数据放回(泊松算法) // 第一个参数:抽取的数据是否放回,true:放回;false:不放回 // 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数 // 第三个参数:随机数种子 val dataRDD2 = dataRDD.sample(true, 2) // 报错 }
distinct
-
函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T] def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
-
函数说明
- 将数据集中重复的数据去重
-
函数示例
def main(args: Array[String]): Unit = { //创建配置文件 val conf: SparkConf = new SparkConf().setAppName("distinct").setMaster("local[*]") //创建 SparkContext,该对象是提交的入口 val sc = new SparkContext(conf) //创建 RDD val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 3, 3), numSlices = 3) //用 mapPartitionsWithIndex 的目的是为了证明全局去重和 shuffle rdd.mapPartitionsWithIndex((index: Int, datas: Iterator[Int]) => { println(index + "分区:" + datas.mkString(",")) datas }).collect() println("***********************************") //去重 rdd.distinct().mapPartitionsWithIndex((index, datas) => { println(index + "分区:" + datas.mkString(",")) datas }).collect() //释放资源 sc.stop() }
coalesce
-
函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T]
-
函数说明
- 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("coalesce") val sc = new SparkContext(conf) val rdd = sc.parallelize(1 to 16,4) //当 suffle 的值为 false 时,不能增加分区数 val coalesceRDD = rdd.coalesce(3) println("重新分区后的分区个数:" + coalesceRDD.partitions.size) // shuffle=true,可以增加或者減少分区 val coalesceRDDOnShuffle = rdd.coalesce(7,true) println("重新分区后的分区个数:" + coalesceRDDOnShuffle.partitions.size) println("RDD 依赖关系:"+coalesceRDDOnShuffle.toDebugString) }
repartition
-
函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
-
函数说明
- 该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的RDD,还是将分区数少的 RDD 转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("coalesce") val sc = new SparkContext(conf) val rdd = sc.parallelize(1 to 16,4) // 减少分区 val repatitionRDD = rdd.repartition(3) println("重新分区后的分区个数:" + repatitionRDD.partitions.size) // 增加分区 val repatitionRDDAdd = rdd.repartition(7) println("重新分区后的分区个数:" + repatitionRDDAdd.partitions.size) }
repartition和coalesce区别
repartition
会产生shuffle
,是一个消耗比较昂贵的操作算子,Spark
出了一个优化版的repartition
叫做coalesce
(shuffle
=false
),它可以尽量避免数据迁移,但是你只能减少同等RDD
的partition
。- 1、N 小于 M(只能使用 shuffle)
- 一般情况下,
N
个分区有数据分布不均匀的状况,利用HashPartitioner
函数将数据重新分区为M
个,此时需要使用repartition
或者coalesce
(shuffle
=true
)。
- 一般情况下,
- 2、N 大于 M 且和 M 相差不多(不建议使用 shuffle)
- 假如
N
是1000
,M
是100
,那么就可以将N
个分区中的若干个分区合并成一个新的分区,最终合并为M
个分区,这时可以使用coalesce
(shuffle
=false
),不产生shuffle
,如果使用repartition
将会产生shuffle
- 假如
- 3、N 大于 M 且和 M 相差悬殊(建议使用 shuffle)
- 这时如果将
shuffle
设置为false
,父子RDD
是窄依赖关系,他们同处在一个stage
中,就可能造成spark
程序的并行度不够,从而影响性能。如果在M
为 1 的时候,为了使coalesce
之前的操作有更好的并行度,可以将shuffle
设置为true
。
- 这时如果将
sortBy
-
函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length):RDD[(K, V)] def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
-
函数说明
- 该操作用于排序数据。在排序之前,可以将数据通过
f
函数进行处理,之后按照f
函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD
的分区数与原``RDD的分区数一致。中间存在
shuffle` 的过程
- 该操作用于排序数据。在排序之前,可以将数据通过
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("sortByKey") val sc = new SparkContext(conf) val rdd: RDD[(Int, String)] = sc.makeRDD(List((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd"))) // 默认是升序 val resRDD1: RDD[(Int, String)] = rdd.sortByKey() resRDD1.collect().foreach(println) println("**************************************") // 降序 val resRDD2: RDD[(Int, String)] = rdd.sortByKey(ascending = false) resRDD2.collect().foreach(println) val products = sc.parallelize(List("屏保 20 10","支架 20 1000","酒精棉 5 2000","吸氧机 5000 1000")) val productData = products.map(x=>{ val splits = x.split(" ") val name = splits(0) val price = splits(1).toDouble val amount = splits(2).toInt (name,price,amount) }) productData.sortBy(_._2, false).collect().foreach(println) productData.sortBy(x=>(-x._2, -x._3)).collect().foreach(println) }
object CustomSortByKey { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("CustomSortByKey").setMaster("local") val sc = new SparkContext(conf) val stdList: List[(Student, Int)] = List( (new Student("zs", 18), 1), (new Student("li", 18), 1), (new Student("zs", 19), 1), (new Student("wangwu", 18), 1), (new Student("zs", 20), 1) ) //创建 val stdRDD: RDD[(Student, Int)] = sc.makeRDD(stdList) val resRDD: RDD[(Student, Int)] = stdRDD.sortByKey() //打印输出 resRDD.collect().foreach(println) } } class Student(var name: String, var age: Int) extends Ordered[Student] with Serializable { //自定义比较规则 override def compare(that: Student): Int = { //先按照名称升序排序,如果名称相同的话,在按照年龄降序排序 var res: Int = this.name.compareTo(that.name) if (res == 0) { res = that.age - this.age } res } override def toString: String = s"Student($name, $age)" }
object SortByDemo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("sortBy demo") val sc = new SparkContext(conf) val students = List(Student("zhangsan",15), Student("lisi",10), Student("wangwu",12), Student("wangwu",14), Student("zhuqi",18) ) sc.makeRDD(students).sortBy(_.age)(Ordering[Int].reverse, ClassTag.Int) // 防止泛型擦除 .collect() .foreach(println) } case class Student(var name: String, var age: Int)
双Value类型
intersection
-
函数签名
def intersection(other: RDD[T]): RDD[T]
-
函数说明
- 对源
RDD
和参数RDD
求交集后返回一个新的RDD
- 对源
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("intersection").setMaster("local") val sc = new SparkContext(conf) val dataRDD1 = sc.makeRDD(List(1,2,3,4)) val dataRDD2 = sc.makeRDD(List(3,4,5,6)) val dataRDD = dataRDD1.intersection(dataRDD2) }
union
-
函数签名
def union(other: RDD[T]): RDD[T]
-
函数说明
- 对源
RDD
和参数RDD
求并集后返回一个新的RDD
- 对源
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("union demo") val sc = new SparkContext(conf) val rdd1 = sc.makeRDD(List(1,2,3,4,5)) val rdd2 = sc.makeRDD(List(3,4,5,6,7)) // union 是不去重的,如果想去重,再使用 distinct 算子 rdd1.union(rdd2) .distinct() .collect() .foreach(println) val rdd3 = rdd1 ++ rdd2 // rdd1.++(rdd2) rdd3.distinct().collect().foreach(println) }
subtract
-
函数签名
def subtract(other: RDD[T]): RDD[T]
-
函数说明
- 以一个
RDD
元素为主,去除两个RDD
中重复元素,将其他元素保留下来。求差集
- 以一个
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("subtract demo") val sc = new SparkContext(conf) val rdd1 = sc.makeRDD(List(1,2,3,4,5)) val rdd2 = sc.makeRDD(List(3,4,5,6,7)) // 以调用者为主 rdd1.subtract(rdd2).collect().foreach(println) rdd2.subtract(rdd1).collect().foreach(println) }
cartesian
-
函数签名
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
-
函数说明
- 笛卡尔积操作,对输入 RDD 内的所有元素计算笛卡尔积
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("cartesian").setMaster("local") val sc = new SparkContext(conf) val x = sc.parallelize(List(1,2,3,4,5)) val y = sc.parallelize(List(6,7,8,9,10)) x.cartesian(y).collect }
zip
-
函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
-
函数说明
- 将两个
RDD
中的元素,以键值对的形式进行合并。其中,键值对中的Key
为第 1 个RDD
中的元素,Value
为第 2 个RDD
中的相同位置的元素。
- 将两个
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("zip").setMaster("local") val sc = new SparkContext(conf) val dataRDD1 = sc.makeRDD(List(1,2,3,4)) val dataRDD2 = sc.makeRDD(List(3,4,5,6)) val dataRDD = dataRDD1.zip(dataRDD2) }
zipPartitions
-
函数签名
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning:Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
-
函数说明
- 该算子将多个
RDD
按照partition
组合成为新的RDD
,该算子需要组合的RDD
具有相同的分区数,如果分区数不相同,则会报错。但对于每个分区内的元素数量没有要求(如果数据不对齐,那么由用户自定义解决对齐问题)。参数preservesPartitioning
表示是否保留父RDD
的partitioner
分区信息
- 该算子将多个
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("zipPartitions").setMaster("local") val sc = new SparkContext(conf) var rdd1 = sc.makeRDD(1 to 5,2) var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2) //rdd1 两个分区中元素分布: rdd1.mapPartitionsWithIndex{ (x,iter) => { var result = List[String]() while(iter.hasNext){ result ::= ("part_" + x + "|" + iter.next()) } result.iterator } }.collect.foreach(println) //rdd2 两个分区中元素分布 rdd2.mapPartitionsWithIndex{ (x,iter) => { var result = List[String]() while(iter.hasNext){ result ::= ("part_" + x + "|" + iter.next()) } result.iterator } }.collect.foreach(println) //rdd1 和 rdd2 做 zipPartition rdd1.zipPartitions(rdd2){ (rdd1Iter,rdd2Iter) => { var result = List[String]() while(rdd1Iter.hasNext && rdd2Iter.hasNext) { result::=(rdd1Iter.next() + "_" + rdd2Iter.next()) } result.iterator } }.collect.foreach(println) }
Key - Value
类型
mapValues&flatMapValues
-
函数签名
def mapValues[U](f: V => U): RDD[(K, U)]
-
函数说明
- 输入函数应用于
RDD
中KV
(Key
-Value
)类型元素中的Value
,原RDD
中的Key
保持不变,与新的Value
一起组成新的RDD
中的元素。
- 输入函数应用于
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("mapValues demo") val sc = new SparkContext(conf) sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4),("e",5),("f",6))) .mapValues(_ * 3) .collect() .foreach(println) sc.makeRDD(List(("a","hello java"),("b","hello scala"),("c","hello hadoop"))) .flatMapValues(_.split(" ")) .collect() .foreach(println) }
partitionBy
-
函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
-
函数说明
- 将数据按照指定
Partitioner
重新进行分区。Spark
默认的分区器是HashPartitioner
- 将数据按照指定
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("partitionBy demo") val sc = new SparkContext(conf) sc.makeRDD(List((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) .partitionBy(new HashPartitioner(3)) // .repartition(3) .mapPartitionsWithIndex{ (id,iter)=>{ println(id + "分区:" + iter.mkString(",")) iter } } .collect() }
reduceByKey
-
函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
-
函数说明
- 可以将数据按照相同的
Key
对Value
进行聚合
- 可以将数据按照相同的
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("reduceByKey").setMaster("local") val sc = new SparkContext(conf) val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3))) val dataRDD2 = dataRDD1.reduceByKey(_ + _) val dataRDD3 = dataRDD1.reduceByKey(_ + _, 2) }
groupByKey
-
函数签名
def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
-
函数说明
- 将数据源的数据根据
key
对value
进行分组
- 将数据源的数据根据
-
reduceByKey
和groupByKey
的区别- 从
shuffle
的角度:reduceByKey
和groupByKey
都存在shuffle
的操作,但是reduceByKey
可以在shuffle
前对分区内相同key
的数据进行预聚合(combine
)功能,这样会减少落盘的数据量,而groupByKey
只是进行分组,不存在数据量减少的问题,reduceByKey
性能比较高。 - 从功能的角度:
reduceByKey
其实包含分组和聚合的功能。GroupByKey
只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey
,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
- 从
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByKey demo") val sc = new SparkContext(conf) sc.makeRDD(List((1,"A"),(2,"B"),(3,"C"),(1,"D"),(2,"C")),2) .groupByKey() .collect() .foreach(println) sc.makeRDD(List((1,"A"),(2,"B"),(3,"C"),(1,"D"),(2,"C")),2) .groupBy(_._2) .collect() .foreach(println) }
aggregateByKey
-
函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
-
函数说明
- 将数据根据不同的规则进行分区内计算和分区间计算。
- 需要注意的是:
aggregateByKey
最终的返回数据结果应该和初始值的类型保持一致
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("aggregateByKey").setMaster("local") val sc = new SparkContext(conf) // 将数据根据不同的规则进行分区内计算和分区间计算 val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3))) // aggregateByKey 算子是函数柯里化,存在两个参数列表 // 1. 第一个参数列表中的参数表示初始值 // 2. 第二个参数列表中含有两个参数 // 2.1 第一个参数表示分区内的计算规则 // 2.2 第二个参数表示分区间的计算规则 dataRDD1.aggregateByKey(0)(_ + _, _ + _).collect().foreach(println) // 取出每个分区内相同 key 的最大值然后分区间相加 val rdd = sc.makeRDD(List( ("a",1), ("a",2), ("c",3), ("b",4), ("c",5), ("c",6)), 2) // 分区 0:("a",1),("a",2),("c",3) => (a,10),(c,10) // 分区 1:("b",4),("c",5),("c",6) => (b,10)(c,10) // 最终的结果:分区 0 的(a,10)不动,将分区 0 的(c,10)和分区 1 的(c,10)相加结果(c,20)放在分区 1中,分区 1 中的(b,10)也保持不动 rdd.aggregateByKey(10)( (x, y) => math.max(x,y), (x, y) => x + y ) .collect().foreach(println) }
foldByKey
-
函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
-
函数说明
- 当分区内计算规则和分区间计算规则相同时,
aggregateByKey
就可以简化为foldByKey
- 当分区内计算规则和分区间计算规则相同时,
combineByKey
-
函数签名
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)]
-
函数说明
- 允许对具有相同键的元素进行自定义的聚合操作。它接收三个函数参数:
- createCombiner:对于每个键的第一个值,创建一个累加器的初始值。
- mergeValue:对于每个键的后续值,将其合并到累加器中。
- mergeCombiners:在不同分区上的累加器之间进行合并
- 这个操作常用于需要对键值对进行聚合计算的场景,比如求和、求平均等。它的结果是一个新的RDD,其中每个键都对应一个聚合后的结果值。
- 允许对具有相同键的元素进行自定义的聚合操作。它接收三个函数参数:
-
函数示例
// 统计男性和女生的个数,并以(性别,(名字,名字....),个数)的形式输出 def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("combineByKey demo") val sc = new SparkContext(conf) val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy")) sc.parallelize(people) .combineByKey( //把不同 key 映射成另外的形式,这里决定了 V 和 C 的数据类型,一元匿名函数 (x: String) => (List(x), 1), //这里匿名函数输入参数就是上一步匿名函数的输出类型和输入类型组成的 (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //这一步主要是用来合并上一步的不同分区的输出,因是同类型合并,因此输出类型和第二步一样 (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2) ) .collect() .foreach(println) }
reduceByKey、aggregateByKey、foldByKey、combineByKey 的区别
reduceByKey
:相同key
的第一个数据不进行任何计算,分区内和分区间计算规则相同aggregateByKey
:相同key
的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同foldByKey
:相同key
的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同combineByKey
:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
sortByKey
-
函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
-
函数说明
- 在一个
(K,V)
的RDD
上调用,K
必须实现Ordered
接口(特质),返回一个按照key进行排序的
- 在一个
join
-
函数签名
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
-
函数说明
- 在类型为
(K,V)
和(K,W)
的RDD
上调用,返回一个相同key
对应的所有元素连接在一起的(K,(V,W))
的RDD
- 在类型为
-
函数详解
- 本质是对两个含有
KV
对元素的RDD
进行cogroup
算子协同划分,再通过flatMapValues
将合并的数据分散。在类型为(K, V)
和(K, W)
的RDD
上调用,返回一个相同key
对应的所有元素连接在一起的(K, (V,W))
的RDD
- 本质是对两个含有
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("join").setMaster("local") val sc = new SparkContext(conf) val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"))) val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) rdd.join(rdd1).collect().foreach(println) }
leftOuterJoin
-
函数签名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
-
函数说明
- 类似于
SQL
语句的左外连接
- 类似于
cogroup
-
函数签名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
-
函数说明
- 在类型为
(K,V)
和(K,W)
的RDD
上调用,返回一个(K,(Iterable<V>,Iterable<W>))
类型的RDD
- 在类型为
-
函数示例:
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("cogroup").setMaster("local") val sc = new SparkContext(conf) val dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("c",3))) val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3))) val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2) }
自定义 Transformation 算子
-
如果以上的这些算子依旧不能满足企业的业务需要,那么我们可以通过实现自定的
Transformation
算子实现。在自定义Transformation
算子之前,可以先观察系统内置的Transformation
算子是如何实现,通过模仿结构,接着只需要实现自定义的业务代码即可。object CustomTransformation { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Custom Transformation").setMaster("local") val sc = new SparkContext(conf) val inputData: RDD[Int] = sc.parallelize(Array(1,2,3,4,5)) val transformedData = new MyTransformationRDD(inputData) transformedData.collect().foreach(println) } } class MyTransformationRDD(prev: RDD[Int]) extends RDD[Int](prev) { override def compute(split: Partition, context: TaskContext): Iterator[Int] = { val inputIterator = firstParent[Int].iterator(split, context) inputIterator.map(_ * 2) } override protected def getPartitions: Array[Partition] = firstParent[Int].partitions }
RDD
行动算子
action
则主要是对RDD
进行最后的操作,或者成为触发操作,比如遍历、reduce
、保存到文件等,并可以返回结果给Driver
程序。action
操作执行,会触发一个spark job
的运行,从而触发这个action
之前所有的transformation
的执行。
reduce
-
函数签名
def reduce(f: (T, T) => T): T
-
函数说明
- 聚集
RDD
中的所有元素,先聚合分区内数据,再聚合分区间数据
- 聚集
-
函数示例:
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("reduce demo").setMaster("local") val sc = new SparkContext(conf) val reduce: Int = sc.makeRDD(Array(1, 2, 4, 5, 3)).reduce(_ + _) println(reduce) }
collect&collectAsMap
-
函数签名
def collect(): Array[T] def collectAsMap(): Map[K, V]
-
函数说明
- 将
RDD
多分区的元素转换为单机上的Scala
数组并返回,类似于toArray
功能
- 将
-
补充:
collectAsMap
与collect
类似,主要针对元素类型为key-value
对的RDD
,转换为Scala Map
并返回,保存元素的KV
结构,作用与collect
不同的是collectAsMap
函数不包含重复的key
,对于重复的key
,后面的元素覆盖前面的元素
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("collect demo") val sc = new SparkContext(conf) sc.makeRDD(List(1, 2, 3, 4),2) .map(_*2) .collect() .foreach(println) }
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("collectAsMap").setMaster("local") val sc = new SparkContext(conf) val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3)) val rdd = sc.parallelize(arr,2) rdd.collectAsMap().foreach(println) }
count
-
函数签名
def count(): Long
-
函数说明
- 返回
RDD
中元素的个数
- 返回
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("count demo") val sc = new SparkContext(conf) val rddCount: Long = sc.makeRDD(List(1, 2, 3, 4), 2) .count() println(rddCount) }
first
-
函数签名
def first(): T
-
函数说明
- 返回
RDD
中的第一个元素
- 返回
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("first demo").setMaster("local") val sc = new SparkContext(conf) val first: Int = sc.makeRDD(Array(1, 2, 4, 5, 3)).first() println(first) }
take
-
函数签名
def take(num: Int): Array[T]
-
函数说明
- 返回一个由
RDD
的前n
个元素组成的数组
- 返回一个由
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("take demo") val sc = new SparkContext(conf) sc.makeRDD(List(1, 3, 4, 5, 6),3) .take(4) .foreach(println) sc.makeRDD(List(1, 3, 4, 5, 6),3) .takeSample(false, 3) .foreach(println) }
takeOrdered
-
函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
-
函数说明
- 返回该
RDD
排序后的前n
个元素组成的数组
- 返回该
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("takeOrder demo") val sc = new SparkContext(conf) sc.parallelize(Array(1,3,2,7,5)) .takeOrdered(2) .foreach(println) }
aggregate
-
函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
-
函数说明
- 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
-
函数详解
- 允许用户对
RDD
使用两个不同的reduce
函数,第一个reduce
函数对各个分区内的数据聚集,每个分区得到一个结果;第二个reduce
函数对每个分区的结果进行聚集,最终得到一个总的结果。aggregate
相当于对RDD
内的元素数据归并聚集。如果aggregate
的分区内分区间的操作是一样的,那么就可以简化为fold
- 允许用户对
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("aggregate demo").setMaster("local") val sc = new SparkContext(conf) val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8) // 将该 RDD 所有元素相加得到结果,对于两个函数相同的情况下,aggregate 可以简化为 fold println(rdd.aggregate(0)(_ + _, _ + _)) println(rdd.fold(0)(_ + _)) }
fold
-
函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
-
函数说明
- 折叠操作,aggregate的简化版操作
countByKey&countByValue
-
函数签名
def countByKey(): Map[K, Long] def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope { map(value => (value, null)).countByKey() }
-
函数说明
countByKey
:统计每种key
的个数, 因此针对的是(key, value)
这种pair
对数据countByValue
:统计相同的value
的个数,原理就是把value
封装成(value,null)
然后再调用 countByKey
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("countByKey demo").setMaster("local") val sc = new SparkContext(conf) sc.makeRDD(List((0,"a"), (1, "a"), (1, "b"), (2, "c"), (2, "c"), (3, "d"))) .countByKey .foreach{ tuple => { println(s"key is ${tuple._1},and value is ${tuple._2}") } } }
save
-
函数签名
def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit def saveAsObjectFile(path: String): Unit
-
函数说明
- 将数据保存到不同格式的文件中,常见的有一下多种形式:
- **saveAsTextFile:**用于将
RDD
以文本文件的格式存储到指定的文件系统中。从源码中可以看到,saveAsTextFile
函数是依赖于saveAsHadoopFile
函数,由于saveAsHadoopFile
函数接受PairRDD
,所以在saveAsTextFile
函数中利用rddToPairRDDFunctions
函数转化为(NullWritable,Text)
类型的RDD
,然后通过saveAsHadoopFile
函数实现相应的写操作。 - saveAsObjectFile:
saveAsObjectFile
用于将RDD
中的元素序列化成对象,存储到文件中。从源码中可以看出,saveAsObjectFile
函数是依赖于saveAsSequenceFile
函数实现的,将RDD
转化为类型为<NullWritable, BytesWritable>
的PairRDD
,然后通过saveAsSequenceFile
函数实现。
- **saveAsTextFile:**用于将
- 将数据保存到不同格式的文件中,常见的有一下多种形式:
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("save demo").setMaster("local") val sc = new SparkContext(conf) // 企业中一般会使用此种方式 val path = this.getClass.getResource("/data/aaaa.txt").getPath // 一般测试中会使用此种方式 sc.textFile("src/main/resources/data/aaaaa.txt").repartition(2) // 上一步有几个分区,就会有几个数据文件。另外下面保存的路径是会生成文件夹 // 如果指定位置已经存在,则会报错 .saveAsTextFile("src/main/resources/data/aaaa2.txt") // 如果是一个目录,会读取目录下除“.”和“_”开头的所有文件 sc.textFile("src/main/resources/data/aaaa1.txt") .collect .foreach(println) // 以“.”和“_”开头文件会被认为不存在 // sc.textFile("src/main/resources/data/abcd1.txt/_acc.txt") // .collect() // .foreach(println) // 这里之所以能够访问到 hdfs,是因为本地配置了 HADOOP_HOME,如下: // println(System.getenv("HADOOP_HOME")) // 如果本地没有配置 HADOOP_HOME,可以将 core-site.xml 和hdfs-site.xml 放在 resources 目录下, // 但是一旦放在 resourcer 目录下,这个时候默认的路径又变成了 hdfs:///,本地的话需要加 file:/// // sc.textFile("src/main/resources/data/aaaa1.txt") sc.textFile("file:///" + path) // 即使是上面报错了,下面依旧可以在 hdfs 生成文件夹,只不过是空的,这点区别于本地 .saveAsTextFile("hdfs://hadoop004:8020/spark/whj2333") // 一般在生产环境中不建议直接连接 hdfs 集群,而是通过 args 获取指定传参,然后通过 spark-submit 将指定的位置传入 // sc.textFile(args(0)).saveAsTextFile(args(1)) sc.textFile("hdfs://hadoop004:8020/spark/whj1") .collect() .foreach(println) }
foreach&foreachPartition
-
函数签名
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } def foreachPartition(f: Iterator[T] => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) }
-
函数说明
foreach
是对RDD
中的每个元素执行无参数的f
函数,返回Unit
,而foreachPartiion
则是对每个分区
-
函数示例
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("foreach demo") val sc = new SparkContext(conf) val value = sc.makeRDD(List(1, 2, 3, 4),2) .map(_*2) value.collect().foreach(println) println("**********") value.foreach(println) }
-
函数补充
-
foreachPartition
算子可以提高模型效率,比如在使用foreach
时,将RDD
中所有数据写Mongo
中,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions
算子一次性处理一个partition
的数据,那么对于每个partition
,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
-
自定义 Action 算子
-
上面很多的 Action 算子,基本上已经足够日常的使用了,但是如果以上的这些算子依旧不能满足企业的业务需要,那么我们可以通过实现自定的 Action 算子实现。在自定义
Action
算子之前,可以先观察系统内置的Action
算子是如何实现,通过模仿结构,接着只需要实现自定义的业务代码即可。 -
通过阅读每一个
Action
算子的源码可以发现一个通用的方法:runJob
,而这个方法才是真正触发Spark
任务执行的方式,因此只要显示的调用该方法,就可以实现自定义Action
算子的实现。object CustomAction { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName) val sc = new SparkContext(conf) val rdd = sc.parallelize(Array(1,2,3,4,5,6,7),4) val res = sc.runJob(rdd, myAction _) res.foreach(println) } /*在 driver 端获取 executor 执行 task 返回的结果,比如 task 是个规则引擎,我想 知道每条规则命中了几条数据 */ def myAction(itr : Iterator[Int]) = { var count = 0 itr.foreach(each =>{ count += 1 }) (TaskContext.getPartitionId(),count) } }
Transformation 算子和 Action 算子的判别
- 在
Spark
中,可以通过以下几个方面来判断一个算子是Transformation
类型还是Action
类型:- 返回值类型:
Transformation
类型的算子返回一个新的RDD
,而Action
类型的算子返回一个非分布式的结果(通常是Scala
集合或基本数据类型)。 - 惰性执行:
Transformation
类型的算子通常是惰性执行的,即它们不会立即执行,而是在遇到Action
类型的算子时触发执行。这是因为Transformation
类型的算子只是描述了要对数据集进行的转换操作,并不会立即触发计算。 - 操作触发时机:
Transformation
类型的算子通常在数据集上定义了一系列的转换操作,而不会立即触发计算。只有当遇到Action
类型的算子时,Spark
才会根据依赖关系图(DAG
)执行之前定义的一系列Transformation
算子,并将结果返回给驱动程序。
- 返回值类型:
RDD
序列化
闭包检查
- 从计算的角度, 算子以外的代码都是在
Driver
端执行, 算子里面的代码都是在Executor
端执行。那么在scala
的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor
端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
序列化方法和属性
- 从计算的角度, 算子以外的代码都是在
Driver
端执行, 算子里面的代码都是在Executor
端执行
Kryo
序列化框架
- Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
RDD
中的 DAG 和 依赖关系
DAG
-
在
Spark
中,DAG
(Directed Acyclic Graph
,有向无环图)是执行计划的核心概念。DAG
代表了Spark
应用程序中的一系列操作步骤和它们之间的依赖关系。DAG
是一组顶点与边的组合,顶点代表RDD
,边代表对RDD
的一系列操作。DAG Sheduler
根据RDD
的不同transformation
操作,将 DAG 分为不同的stage
,每个stage
中又分为多个task
。Spark
将应用程序转换为一系列的转换操作,每个操作产生一个新的RDD
,并且这些操作形成了一个有向无环图。理解Spark
中的DAG
可以从两个方面来考虑:逻辑DAG
和物理DAG
。-
逻辑
DAG
- 逻辑
DAG
描述了应用程序中的转换操作和它们之间的依赖关系,而不考虑具体的执行计划和物理执行细节。逻辑DAG
是以逻辑操作符(如map
、filter
、reduceByKey
等)为节点,以依赖关系为边构成的有向无环图。逻辑DAG
表示了应用程序的逻辑流程和数据转换过程。
- 逻辑
-
物理
DAG
:物理
DAG
描述了Spark
应用程序的实际执行计划,它将逻辑DAG
映射到具体的执行引擎(如Spark Core
、Spark SQL
等)。物理DAG
考虑了底层计算资源、数据分片、并行度等因素,以最优的方式执行操作。
-
RDD
血缘关系
- RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的Lineage 会记录RDD 的元数据信息和转换行为,当该RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
RDD
依赖关系
- 两个相邻RDD之间的关系
RDD
窄依赖和RDD
宽依赖
Spark
应用执行过程中,会在逻辑上生成DAG
。当Action
算子被触发后,会将所有累积的算子生成有向无环图并由调度器对图上任务进行调度执行。Spark
的调度方式较传统的MapReduce
复杂许多,它会根据RDD
之间的依赖关系来划分不同的阶段(Stage
),而一个Stage
则包含一系列执行任务(TaskSet
)。Stage
划分是基于数据依赖关系的,一般分为两类:宽依赖(ShuffleDependency
)与窄依赖(NarrowDependency
)。
RDD
窄依赖
- 窄依赖表示每一个父(上游)RDD的Partition最多被子(下游)RDD的一个Partition使用
RDD
宽依赖
- 宽依赖表示同一个父(上游)RDD的Partition被多个子(下游)RDD的Partition依赖,会引起Shuffle
总结
- 区分宽窄依赖的主要判断条件是:父
RDD
的Partition
流向,要是流向单个RDD
就是窄依赖,流向多个RDD
就是宽依赖。
Stage
划分算法
- 从最后一个
RDD
往前推算,遇到窄依赖(NarrowDependency
)就将其加入该Stage
,当遇到宽依赖(ShuffleDependency
)则断开。每个Stage
里task
的数量由Stage
最后一个RDD
中的分区数决定。如果Stage
要生成Result
,则该Stage
里的Task
都是ResultTask
,否则是ShuffleMapTask
。因此spark
中task
只有这两种类型。 ShuffleMapTask
的计算结果需要shuffle
到下一个Stage
,其本质上相当于MapReduce
中的Mapper
。ResultTask
则相当于MapReduce
中的reducer
。因此整个计算过程会根据数据依赖关系自后向前建立,遇到宽依赖则形成新的Stage
。
RDD
任务划分
RDD
任务切分中间分为:Application
、Job
、Stage
和Task
Application
:初始化一个SparkContext
即生成一个Application
;Job
:一个Action
算子就会生成一个Job
;Stage:Stage
等于宽依赖(ShuffleDependency)
的个数加 1;Task
:一个Stage
阶段中,最后一个RDD
的分区个数就是Task
的个数。
RDD
依赖之间的 shuffle
为什么要排序
key
存在combiner
操作,排序之后相同的key
放到一块显然更加方便做合并操作reduce task
是按key
去处理数据的。 如果没有排序那必须从所有数据中把当前相同key
的所有value
数据拿出来,然后进行reduce
逻辑处理。显然每个key
到这个逻辑都需要做一次全量数据扫描,影响性能,有了排序很方便的得到一个key
对于的value
集合。- 同上,如果
key
按顺序排序,那么reduce task
就按key
顺序去读取,显然当读到的key
是文件末尾的key
时那么就标志数据处理完毕。如果没有排序那还得有其他逻辑来记录哪些key
处理完了,哪些key
没有处理完
为什么要文件合并
- 因为内存放不下就会溢写文件,就会发生多次溢写,形成很多小文件,如果不合并,显然会小文件泛滥,集群需要资源开销去管理这些小文件数据。
- 任务去读取文件的数增多,打开的文件句柄数(引用数)也会增多
mapreduce
是全局有序。单个文件有序,不代表全局有序,只有把小文件合并一起排序才会全局有序。
- 总结:减少系统去管理小文件所需要的开销。
spark shuffle
spark
的shuffle
是在MapReduce shuffle
基础上进行的调优。其实就是对排序、合并逻辑做了一些优化。在spark
中shuffle write
相当于MapReduce
的map
,shuffle read
相当于MapReduce
的reduce
。Spark
丰富了任务类型,有些任务之间数据流转不需要通过Shuffle
,但是有些任务之间还是需要通过Shuffle
来传递数据,比如宽依赖的groupByKey
以及各种xxxByKey
算子。
未经优化的 HashShuffleManager
- 上游
task
写文件的时候只是将数据按分区追加到文件中,并没有像MapReduce
那样先内存溢写成文件,然后再文件与文件之间进行合并,虽然节省了排序、合并的开销。但有一个弊端就是会产生大量的中间磁盘文件,进而由大量的磁盘IO
操作影响了性能。 - 问题:生成文件个数过多,生成和传输 上游
task
数量 * 下游task
数量个文件。 - 对应目前
spark
的参数:spark.shuffle.manager
=hash
spark.shuffle.consolidateFiles
=false
优化的 HashShuffleManager
- 相比第一种机制,就是在一个
Executor
中的所有的 task 是可以共用一个buffer
内存。在shuffle write
过程中,task
就不是为下游stage
的每个task
创建一个磁盘文件了,而是允许不同的task
复用同一批磁盘文件,这样就可以有效将多个task
的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write
的性能。此时的文件个数是CPU core
的数量 × 下一个stage
的task
数量。 - 为了开启优化后的
HashShuffleManager
,我们可以设置一个参数,spark.shuffle.consolidateFiles
。该参数默认值为false
,将其设置为true
即可开启优化机制。通常来说,如果我们使用HashShuffleManager
,那么都建议开启这个选项。 - 备注:在
Spark 1.2
以后的版本中,默认的ShuffleManager
改成了SortShuffleManager
SortShuffleManager
- 在该模式下,数据会先写入一个内存数据结构中,此时根据不同的
shuffle
算子,可能选用不同的数据结构。如果是reduceByKey
这种聚合类的shuffle
算子,那么会选用Map
数据结构,一边通过Map
进行聚合,一边写入内存;如果是join
这种普通的shuffle
算子,那么会选用Array
数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。 - 在溢写到磁盘文件之前,会先根据
key
对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch
数量是10000
条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java
的BufferedOutputStream
实现的。 BufferedOutputStream
是Java
的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO
次数,提升性能。- 一个
task
将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge
过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task
就只对应一个磁盘文件,也就意味着该task
为下游stage
的task
准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task
的数据(排好序的数据)在文件中的start offset
与end offset
。 SortShuffleManager
由于有一个磁盘文件merge
的过程,因此大大减少了文件数量,由于每个task
最终只有一个磁盘文件,所以文件个数等于上游shuffle write
个数。
SortShuffle
优化之 bypass
运行机制
- 相比第 3 中少了排序,
task
会为每个下游task
都创建一个临时磁盘文件,并将数据按key
进行hash
然后根据key
的hash
值,将key
写入对应的磁盘文件之中。当然,写入磁盘文件时,也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。 - 该过程的磁盘写机制其实跟未经优化的
HashShuffleManager
是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager
来说,shuffle read
的性能会更好。bypass
机制的目的是尽量减少不必要的数据重排和磁盘IO
操作,该机制的最大好处在于,shuffle write
过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。 bypass
运行机制的触发条件如下:shuffle map task
数量小于spark.shuffle.sort.bypassMergeThreshold
(默认是 200)参数的值。- 不是聚合类的
shuffle
算子(比如groupKey
)。因为不像第 3 种机制那样会对聚合类算子以map
的数据结构存储,在写的过程中会先进行局部聚合。
- 备注:之所以要满足“不是聚合类的
shuffle
算子”的原因是聚合类的shuffle
算子通常需要按照key
进行排序和分区,以确保相同key
的数据能够被正确聚合在一起。如groupByKey
这类。虽然是ByKey
,但是没有聚合操作,所以可以通过设置spark.shuffle.sort.bypassMergeThreshold
从而跳过排序操作
RDD
持久化
RDD
数据持久化/缓存
Spark
中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个RDD
时,每个节点的其它分区都可以使用 该RDD
的缓存在内存中进行计算,在该数据上的其他action
操作将直接使用内存中的数据。- 备注:缓存针对的是某个
RDD
或者说是某个Transformation
算子的结果。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。 RDD
可以使用persist()
方法或cache()
方法进行持久化。Spark
的缓存具有容错机制,如果一个缓存的RDD
的某个分区丢失了,Spark
将按照原来的计算过程,自动重新计算并进行缓存。- 在
shuffle
操作中(例如reduceByKey
),即便是用户没有调用persist
方法,Spark
也会自动缓存部分中间数据。这么做的目的是,在shuffle
的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个RDD
,强烈推荐在该RDD
上调用persist
方法。 - 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,
RDD
的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD
的一系列转换,丢失的数据会被重算,由于RDD
的各个Partition
是相对独立的,因此只需要计算丢失的部分即可, 并不需要重算全部Partition
。 Spark
会自动对一些Shuffle
操作的中间数据做持久化操作(比如:reduceByKey
)。这样做的目的是为了当一个节点Shuffle
失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist
或cache
。
cache
和 persist
-
cache
和persist
严格来说既不是transformation
算子,也不是action
算子,因为没有生成新的RDD
,只是标记了当前RDD
需要cache
或persist
。 -
cache
和persist
是lazy
的,当第一次遇到Action
算子的时侯才会进行缓存或持久化,以后再触发Action
会读取、复用缓存的RDD
的数据再进行操作。 -
另外
cache
底层调用了persist
方法:def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
-
上述代码中涉及到了存储级别(
StorageLevel
)class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable
-
在该文件中可以看出在
Spark
中有 12 种可选的存储级别:val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
-
部分级别解析:
MEMORY_ONLY
(默认值)- 使用未序列化的
Java
对象格式,将数据保存在内存中。如果内存不够存放所有的数据 (oom
),则数据可能就不会进行持久化。那么下次对这个RDD
执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略。 - 备注:
RDD
的cache
默认方式采用MEMORY_ONLY
,DataFrame
的cache
默认采用MEMORY_AND_DISK
- 使用未序列化的
MEMORY_AND_DISK
(企业中用的最多的)- 使用未序列化的
Java
对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD
执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
- 使用未序列化的
MEMORY_ONLY_SER
- 基本含义同
MEMORY_ONLY
。唯一的区别是,会将RDD
中的数据进行序列化,RDD
的每个partition
会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC
(garbage collection
)。
- 基本含义同
MEMORY_AND_DISK_SER
- 基本含义同
MEMORY_AND_DISK
。唯一的区别是,会将RDD
中的数据进行序列化,RDD
的每个partition
会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC
。
- 基本含义同
DISK_ONLY
- 使用未序列化的
Java
对象格式,将数据全部写入磁盘文件中。
- 使用未序列化的
MEMORY_ONLY_2
,MEMORY_AND_DISK_2
- 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
OFF_HEAP
- 类似于
MEMORY_ONLY_SER
,但是将数据存储在off-heap memory
,这需要启动off-heap
内存。
- 类似于
- 推荐使用持久化级别顺序:
MEMORY_ONLY
---->MEMORY_ONLY_SER
---->MEMORY_AND_DISK_SER
---->MEMORY_AND_DISK
RDD Cache
缓存
-
RDD
通过Cache
或者Persist
方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM
的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action
算子时,该RDD
将会被缓存在计算节点的内存中,并供后面重用。// cache 操作会增加血缘关系,不改变原有的血缘关系 println(wordToOneRdd.toDebugString) // 数据缓存。 wordToOneRdd.cache() // 可以更改存储级别 //mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
RDD CheckPoint
检查点
-
检查点其实就是通过将
RDD
中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。 -
cache
和checkpoint
之间有一个最大的区别,cache
将RDD
以及RDD
的血统(记录了这个RDD
如何产生)缓存到内存中,当缓存的RDD
失效的时候(如内存损坏),它们可以通过血统重新计算来进行恢复。但是checkpoint
将RDD
缓存到了HDFS
中,同时忽略了它的血统(也就是RDD
之前的那些依赖)。为什么要丢掉依赖?因为可以利用HDFS
多副本特性保证容错! -
对
RDD
进行checkpoint
操作并不会马上被执行,必须执行Action
操作才能触发。checkpoint
会等到job
结束后另外启动专门的job
去完成checkpoint
,也就是说需要checkpoint
的RDD
会被计算两次。// 设置检查点路径 sc.setCheckpointDir("./checkpoint1") // 创建一个 RDD,读取指定位置文件:hello atguigu atguigu val lineRdd: RDD[String] = sc.textFile("input/1.txt") // 业务逻辑 val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" ")) val wordToOneRdd: RDD[(String, Long)] = wordRdd.map { word => { (word, System.currentTimeMillis()) } } // 增加缓存,避免再重新跑一个 job 做 checkpoint wordToOneRdd.cache() // 数据检查点:针对 wordToOneRdd 做检查点计算 wordToOneRdd.checkpoint() // 触发执行逻辑 wordToOneRdd.collect().foreach(println)
缓存和检查点区别
-
Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
-
检查点是新建一个
job
来完成的,而缓存是job
执行过程中进行。 -
Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在HDFS 等容错、高可用的文件系统,可靠性高。
-
建议对checkpoint()的RDD 使用Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次RDD。
-
缓存可以将
RDD
的partition
持久化到磁盘,但该partition
由BlockManager
管理。一旦driver
执行结束,BlockManager
也会stop
,被cache
到磁盘上的RDD
也会被清空(整个blockManager
使用的local
文件夹被删除)。而checkpoint
将RDD
持久化到
HDFS
或本地文件夹,如果不被手动remove
掉,是一直存在的,也就是说可以被下一个driver program
使用,而cached RDD
不能被其他dirver program
使用。
RDD
分区器
- 控制好数据的分布以便获得最少的网络传输,可以极大的提升整体性能,减少网络开销。
Spark
分区器描述的是RDD
的数据在各个分区之间的分布规则,比如上游数据shuffle到下游时,分区器就决定了上游的哪些数据需要进入下游对应的哪些分区,但是只有kv
类型RDD
才有分区器,其它RDD
分区器都为none
。Spark
中实现的分区器有两种,HashPartitioner
(哈希分区器)和RangePartitioner
(范围分区器),其中最常用的分区器是HashPartitioner
。 Shuffle
和分区器之间的关系是:Shuffle
操作执行后,根据分区器的规则,将shuffle
后的数据分配到新的分区。因此,Shuffle
操作之后,RDD
的分区器通常会发生改变。在一些需要Spark Shuffle
操作的转换操作(例如groupByKey
、reduceByKey
、join
等)中,可以通过提供合适的分区器来优化Shuffle
性能,减少数据的移动和网络传输,提高作业执行效率。
HashPartitioner
HashPartitioner
采用哈希的方式对键值对数据进行分区。其数据分区规则为:$partitionId = Key.hashCode % numPartitions$,其中partitionId
代表该Key
对应的键值对数据应当分配到的Partition
标识,Key.hashCode
表示该Key
的哈希值,numPartitions
表示包含的Partition
个数。
RangePartitioner
Spark
引入RangePartitioner
的目的是为了解决HashPartitioner
所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner
采用哈希的方式将同一类型的Key
分配到同一个Partition
中,因此当某一或某几种类型数据量较多时,就会造成若干Partition
中包含的数据过大问题,而在Job
执行过程中,一个Partition
对应一个Task
,此时就会使得某几个Task
运行过慢。RangePartitioner
基于抽样的思想来对数据进行分区,简单的说就是将一定范围内的数映射到某一个分区内,分区与分区之间是有序的,也就是说一个分区中的元素肯定都比另一个分区中的元素小或者大;但是分区内的元素是不能保证顺序的。sortByKey
底层使用的数据分区器就是RangePartitioner
分区器,因此sortByKey
是可以保证全局有序的。- 该分区器的实现方式主要是通过两个步骤来实现的:
- 第一步:先从整个
RDD
中抽取样本数据,将样本数据排序,计算出每个分区的最大key
值,形成一个Array[key]
类型的数组变量rangeBounds
; - 第二步:判断
key
在rangeBounds
中所处的范围,给出该key
值在下一个RDD
中的分区id
下标。该分区器要求RDD
中的key
类型必须是可排序的。
- 第一步:先从整个
RangePartitioner 的总结:
- 可以做到分区间有序,没有办法做到分区内有序,如果希望全局有序需要借助
sortBy
算子因为需要对数据进行排序,因此会带来一些性能上的开销 - 如果数据分布不均匀,导致某个
key
数据量特别到,同样会造成数据倾斜,甚至会造成某个分区数据量特别大,某个分区甚至无数据
- 备注:将一定范围内的数映射到某一个分区内,在实现中,分界(
rangeBounds
)算法用到水塘抽样算法。
CustomPartitioner
-
当
Spark
系统提供的两个分区器没有办法满足具体的企业中的业务需要是,Spark
提供了Partitioner
分区器的抽象类,用户可以自行继承此抽象类完成自定义分区器。def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("customPartitioner demo") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(("勇士", "Curry"), ("掘金", "Jokic"), ("湖人", "LBJ"), ("火箭", "YaoM"))) // 业务:实现将湖人的信息单独分区 rdd.partitionBy(new MyPartitioner) .mapPartitionsWithIndex{ (id,iter) => { println(id + "分区号:" + iter.mkString(",")) iter } } .collect() } // 自定义分区器 class MyPartitioner extends Partitioner { // 自定义分区数量 override def numPartitions = 3 // 根据数据的 key 值返回分区索引 从 0 开始 from 0 to `numPartitions - 1` override def getPartition(key: Any) = { key match { case "湖人" => 0 case "火箭" => 1 case _ => 2 } } }
站在算子角度理解 spark 分区策略
- 在
Spark
中,RDD
的分区器不仅适用于键值对类型的RDD
,也适用于其他类型的RDD
。分区器决定了RDD
数据在集群中的分布和存储方式,并且可以在操作中提供性能优化。对于键值对类型的RDD
,确实有特定的分区器,如HashPartitioner
和RangePartitioner
。 - 这些分区器基于键的哈希值或排序规则,将数据分配到不同的分区中,从而实现数据的分布。对于其他类型的
RDD
,它们也可以具有分区器。默认情况下,这些RDD
的分区器为None
,表示没有特定的数据分布方式。这意味着数据在RDD
中的分布是不确定的,每个分区中的数据是独立的。
Source
算子
- 一般在日常的工作中,
source
算子大多数是从例如hdfs
、mysql
、本地文件等获取数据,这时候数据去往哪个分区和分区策略基本没有关系,而是取决于外部的这些系统,比如读取hdfs
文件,是根据blocksize
以及你设置的分区大小来计算分区数,然后按照block
去读取数据,这个时候数据在哪个分区取决于它属于哪个block
;再比如mysql
,spark
读取mysql
一般是单并行度读取,如果是多并行度,数据去往哪个分区取决于你设置的条件
Transformation
算子
Transformation
算子因为算子的不同分区策略也有所不同,但是总结分为以下几类:repartition
&coalease
repartition
底层是调用开启shuffle
的coalease
,当调用他们时,无论是否是key
/value
类型数据,都是遍历上游算子每个分区中的数据,轮询放入下游算子分区中,放入的时候下游算子的初始分区id
是随机的,随后依次加 1
groupBy
&groupByKey
&partitionBy(new HashPartitioner(num))
&reduceByKey
&join
...- 从
groupBy
可以看出,不仅key
/value
数据可以使用HashPartitioner
,不是key
/value
的数据也可以使用,虽然底层是将一条数据作为key
,然后调用的groupByKey
- 从
sortBy
&sortByKey
&partitionBy(new RangePartitioner(num,rdd))
- sortBy 底层是 sortByKey,但是其应用的是 RangePartitioner
共享变量
Spark
的一个核心功能是创建两种特殊类型的变量:广播变量和累加器变量
广播变量
- 广播变量顾名思义,由
Driver
端发送数据,所有Executor
端接收并保存这份数据,用于每个Executor
上的数据计算工作。
广播变量的几点特性
- 广播变量是保存在
Executor
内存中的,每个Executor
一份。如果一个Executor
上执行多个Task
,那么多个Task
将共享一份广播变量 - 广播变量是只读变量,即
Driver
端将变量发送到Executor
端后,Executor
端只能读取变量数据,而不能修改变量数据 - 当多个
Executor
上需要同一份数据时,可以考虑使用广播变量形式发送数据。尤其当该份数据在多个Stage
中使用时,通过广播变量一次性发送的方案,执行性能将明显优于当数据需要时再发送的多次发送方案。
-
广播变量是使用
SparkContext
对象调用broadcast
函数,传入要广播的变量。在Executor
端使用广播变量时,使用Broadcast
对象,调用value
函数就可以获得广播的数值了。def main(args: Array[String]): Unit = { val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[*]").setAppName("broadcastTest")) var factor = 3 val b: Broadcast[Int] = sc.broadcast(factor) factor = 5 val source = sc.makeRDD(Seq(1, 2, 3, 4, 5)) source.foreach(number => println(number * b.value)) }
def main(args: Array[String]): Unit = { val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[*]").setAppName("broadcastTest")) val map = Map(1 -> 'a', 2 -> 'b', 3 -> 'c') val user = new User(1, "zs") with Serializable val b = sc.broadcast(user) user.uName = "ls" val source = sc.makeRDD(Seq(1, 2, 3, 4, 5)) source.foreach(number => println(b.value)) } class User(idIn: Int, nameIn: String) { var uid = idIn var uName = nameIn override def toString: String = s"uid:$uid , uName:$uName" }
累加器变量
-
累加器与广播变量类似,也是定义好后由
Driver
发到各Executor
上辅助运算。累加器区别于广播变量的特性是:累加器取值accumulator.value
在Executor
端无法被读取,只能在Driver
端被读取,而Executor
端只能执行累加操作。累加器是一种比较高效的计数或者求和的工具 -
def main(args: Array[String]): Unit = { val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[*]").setAppName("broadcastTest")) val oddCnt = sc.longAccumulator("oddCnt") val rdd = sc.makeRDD(List(1, 2, 3, 4, 5)) rdd.foreach(number => if (number % 2 == 1) oddCnt.add(1)) println("奇数的数量是:" + oddCnt.value) } def main(args: Array[String]): Unit = { val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[*]").setAppName("broadcastTest")) val oddSum = sc.longAccumulator("oddCnt") val rdd = sc.makeRDD(List(1, 2, 3, 4, 5)) rdd.foreach(number => if (number % 2 == 1) oddSum.add(number)) println("所有奇数之和是:" + oddSum.value) }
-
Spark
提供了三种累加器,除了上面演示的longAccumulator
外,还有doubleAccumulator
和collectionAccumulator
,对于整型数值累加就采用longAccumulator
,浮点型数值累加采用doubleAccumulator
(整型数值都可以转化成浮点型数值,所以doubleAccumulator
也可以进行整型数值的累加),collectionAccumulator
的累加操作是将目标元素放入一个集合中。 -
留意下
collectionAccumulator
的使用,在创建时需要指定放入元素的泛型,否则无法执行累加操作: -
def main(args: Array[String]): Unit = { val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[*]").setAppName("broadcastTest")) val map = sc.collectionAccumulator[String]("map") val rdd = sc.makeRDD(List("hello", "world", "hi", "scala", "hallo", "spark")) rdd.foreach(e => if (e.startsWith("h")) map.add(e)) println(map.value) }
-
累加器是可以自定义的,官方提供了累加器自定义的方式,自定义类继承
AccumulatorV2
类并实现其抽象函数即可 -
object First { def main(args: Array[String]): Unit = { val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[*]").setAppName("broadcastTest")) // 创建自定义累加器对象 val wordAccumulator = new WordAccumulator // 注册累加器后,就就可以直接使用了 sc.register(wordAccumulator, "wc") sc.textFile("in/article.txt") // 读取源文件 .filter(e => e.length != 0) // 过滤空行后切分为单词,再处理掉空的数据 .flatMap(line => { line.trim.split("[\\s|\"|,|\\.|]+") }) .filter(!_.isEmpty) .foreach(e => wordAccumulator.add(e.trim)) // 调用累加方法就可以执行上面定义的代码了 println(wordAccumulator.value) } } class WordAccumulator extends AccumulatorV2[String, util.HashMap[String, Int]] { // 定义一个用来存储中间过程的 Map private val map = new util.HashMap[String, Int]() // 判断累加器是否是初始值,对于当前案例,执行过数据后 map 中肯定会有元素,所有累加器为初始值时 map 为空 override def isZero: Boolean = map.isEmpty // 创建一个新的累加器 override def copy(): AccumulatorV2[String, util.HashMap[String, Int]] = new WordAccumulator // 重置累加器,只需要将 map 置空就行了 override def reset(): Unit = map.clear() // 添加一条数据,如果当前有该数据就使其 value + 1,否则就新加入一条值为 1 的键值对 override def add(v: String): Unit = if (map.containsKey(v)) map.put(v, map.get(v) + 1) else map.put(v, 1) // 当前累加器的 map 与另外一个累加器的 map 合并 override def merge(other: AccumulatorV2[String, util.HashMap[String, Int]]): Unit = { other match { case o: WordAccumulator => o.value.entrySet().forEach(new Consumer[util.Map.Entry[String, Int]] { override def accept(t: util.Map.Entry[String, Int]): Unit = { if (map.containsKey(t.getKey)) { map.put(t.getKey, map.get(t.getKey) + t.getValue) } else { map.put(t.getKey, t.getValue) } } }) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } } // 最终要返回的值,我们返回 map 就可以了 override def value: util.HashMap[String, Int] = map }
两种特殊变量的总结
Spark
广播变量和累加器变量是两个重要的概念,它们在Spark
分布式计算中起着不同的作用和意义。
广播变量的好处:
- 减少数据传输:通过将只读变量广播到集群中,避免了在网络上传输大量数据的开销,
- 减少了网络通信的负担。
- 提高性能:广播变量在每个节点上只有一份副本,减少了内存使用和垃圾回收的压力,提高了任务的执行效率。
- 共享数据:广播变量可以在集群中的所有任务中共享,使得每个任务都可以访问相同的数据,方便数据共享和操作。
累加器变量的好处
- 累加器变量在任务执行期间可以被多个任务并行地更新,但只能以只读方式访问其值。累加器变量通常用于收集任务执行期间的统计信息。
- 分布式累加:累加器变量可以在并行任务中进行累加操作,无论任务在集群中的哪个节点执行,都可以正确地对变量进行更新,确保了数据的一致性。
- 统计信息收集:累加器变量可以用于收集任务执行期间的统计信息,例如计数、求和、最大值、最小值等。这对于分布式计算中的调试和监控非常有用。
- 需要注意的是,广播变量和累加器变量都是在分布式环境中使用的工具,用于提高
Spark
应用程序的性能和效率,但需要谨慎使用,确保在适当的场景下使用正确。