三个开发入门类

SparkConf

  • SparkConfSpark 的配置类,Spark 中的每一个组件都是直接或者间接地使用着它所存储的属性。

SparkContext

  • SparkContext 是通往 Spark 集群的唯一入口,可以用来在 Spark 集群中创建 RDDs、累加器变量和广播变量等。
  • SparkContext 也是整个 Spark 应用程序( Application )中至关重要的一个对象,可以说是整个 Application 运行调度的核心(不是指资源调度)。
  • SparkContext 的核心作用是初始化 Spark 应用程序运行所需要的核心组件,包括高层调度器( DAGScheduler )、底层调度器( TaskScheduler )和调度器的通信终端( SchedulerBackend ),同时还会负责 Spark 程序向 Master 注册程序等。
  • 备注:只可以有一个 SparkContext 实例运行在一个 JVM 内存中,所以在创建新的 SparkContext 实例前,必须调用 stop 方法停止当前 JVM 唯一运行的 SparkContext 实例。

SparkSession

  • SparkSession 主要用在 SparkSQL 中,当然也可以用在其他场合,他可以代替 SparkContextSparkSession 实际上封装了 SparkContext,另外也封装了 SparkConfsqlContext
  • 备注:从这三个类所处位置我们可以发现,SparkConfSparkContext 是位于 org.apache.spark 包下,是一个通用的包;而 SparkSession 位于 org.apache.spark.sql 包下,是一个专用附加包。因此从这里我们可以看到,在使用 SparkSQL 的时候尽量使用 SparkSession ,如果发现有些 API 不在 SparkSession 中,也可以通过 SparkSession 拿到SparkContext 和其他 Context

spark-submit

作用

  • spark-submit 是在 spark 安装目录中 bin 目录下的一个 shell 脚本文件,用于在集群中启动应用程序(如*.jar、*.py 脚本等);对于 spark 支持的集群模式,spark-submit 提交应用的时候有统一的接口,不用太多的设置。

参数

--master

  • 该参数表示提交任务到哪里执行,常见的选项有:
    • local:提交到本地服务器执行,并分配单个线程
    • local[k]:提交到本地服务器执行,并分配 k 个线程
    • local[*]:提交到本地服务器执行,并分配本地 core 个数个线程
    • yarn:提交到 yarn 模式部署的集群中

--deploy-mode

  • spark on yarn 的两种启动方式,区别是 sparkdriver 是在本地 (client) 启动还是在 yarncontainer 中启动,默认是 client
    • client:本地driver
    • cluster:远端driver

--class

  • 应用程序的主类,仅针对 javascala 应用

--name

  • 指定应用程序的名称,在 yarn 调度系统下,只对 cluster 模式生效

--jar

  • 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driverexecutor 的 classpath 下。如果路径是个目录的话,--jars 的设置无法起作用,必须详细到 abc.jar
  • 备注:区别 spark-defaults.conf 配置文件中的 spark.yarn.jars
  • --jars:主要用于上传我们需要的第三方依赖
  • spark.yarn.jars:主要传入 spark 环境相关的 jar 包,例如 spark.corespark.sql

--conf prop=value

  • 指定 spark 配置属性的值,格式为 PROP=VALUE, 例如 --conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"

--properties-file

  • 指定需要额外加载的配置文件,用逗号分隔,如果不指定,默认为 conf/spark-defaults.conf

--driver-memory--driver-core

  • 前者表示 driver 内存,默认 1G;后者表示 driver 的核数,默认是 1。在 yarn 或者 standalone 下使用
  • 建议:对于 driver memory 通常不用设置,若出现使用 collect 算子将 RDD 数据全部拉取到 Driver 上处理,就必须确保该值足够大,否则 OOM 内存溢出(如果设置了广播变量再设置大一点)

--num-executors

  • 启动的 executor 数量,即该作业总共需要多少 executor 进程执行,默认为 2。建议:每个作业运行一般设置 5,10,20 个左右较合适。在 yarn 下使用。

--executor-memory 和--executor-cores

  • executor-memory:设置每个 executor 进程的内存, num-executors * executor-memory 代表作业申请的总内存量(尽量不要超过最大总内存的 1/3~1/2
  • 建议:设置 5G~10G 较合适
  • executor-cores:每个 executor 进程的 CPU Core 数量,该参数决定每个 executor 进程并行执行 task 线程的能力, num-executors* executor-cores 代表作业申请总 CPU core 数(不要超过总 CPU Core 的 1/3~1/2 )
  • 建议:设置 2~4 个比较合适

--queue QUEUE_NAME

  • 将任务提交给哪个 YARN 队列,默认为 YARN 的默认队列

RDD

概念

  • RDD(Resilient Distributed Dataset) 叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合
    • 弹性之一:自动的进行内存和磁盘数据存储的切换;
    • 弹性之二:基于 Lineage(血缘)的高效容错;
    • 弹性之三:Task 如果失败会自动进行特定次数的重试;
    • 弹性之四:Stage 如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
    • 弹性之五:checkpoint 和 persist(持久化和检查点);
    • 弹性之六:数据分片的高度弹性。

核心属性

  • 分区列表:RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性
  • 分区计算函数:Spark在计算时,是使用分区函数对每一个分区进行计算
  • RDD之间的依赖关系:RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系
  • 分区器(可选):当数据为KV类型数据时,可以通过设定分区器自定义数据的分区
  • 首选位置(可选):计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
  • Spark 运行时使用这 5 条信息来调度和执行用户数据的处理逻辑,其中每个属性的代码如下:
// RDD 中的依赖关系由一个 Seq 数据集来记录,使用 Seq 的原因是经常取第一个元素或者遍历。简记:获取每个 rdd 的依赖,即血缘
protected def getDependencies: Seq[Dependency[_]]
// 分区列表定义在一个数组中,这里使用 Array 的原因是随时使用下标来访问分区内容。计算分区信息,只会被调用一次,简记:获取分区信息
protected def getPartitions: Array[Partition]
// 接口定义,具体由子类实现,对输入的 RDD 分区进行计算,将某个分区的数据读成一个 Iterator。简记:获取分区信息之后对每个分区进行子计算
def compute(split: Partition, context: TaskContext): Iterator[T]
// 分区器,可选,子类可以重写以指定新的分区方式,Spark 支持 Hash 和 Range两种分区方式
@transient val partitioner: Option[Partitioner] = None
// 可选,每个 partition 数据驻留在集群中的位置,子类可以指定分区的位置,如 HadoopRDD 可以重写此方法,让分区尽可能与数据在相同的节点上
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  • Spark RDD 中,locationPrefs 是指用于执行任务的节点的位置偏好设置。具体来说,它是一个 Map,其中键是 RDD 的分区索引,值是一个序列,表示首选执行该分区任务的节点列表。这些位置偏好设置可以在 Spark 调度任务时使用,以尽可能地在节点之间分配任务,以减少数据移动和网络延迟,并提高任务执行效率。例如,如果一个节点已经拥有某个分区的数据,那么最好将该分区的任务分配给该节点,以避免将数据移动到其他节点。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地优先将计算任务分配到其所要处理的 block 的存储位置

总结

  1. RDD 可以看做是一系列的 Partition 所组成的
  2. RDD 之间存在依赖关系
  3. 算子是作用在 Partition 之上
  4. 分区器是作用在 kv 形式的 RDD
  5. Partition 提供是最佳计算位置,利于数据从本地化计算,移动计算不是移动数据

RDD创建

  • Spark中创建RDD的方式分为四种:

    1. 使用程序中的集合创建 RDD,主要用于进行测试

      • 集合中的元素类型就是 RDD 的泛型类型。可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的 spark 应用的流程。
      • 从集合中创建 RDDSpark 主要提供了两中函数:parallelizemakeRDD。我们可以先看看这两个函数的声明:
      val sparkConf =
      new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new 
      SparkContext(sparkConf)
      val rdd1 = sparkContext.parallelize( List(1,2,3,4)
      )
      val rdd2 = sparkContext.makeRDD( List(1,2,3,4)
      )
      rdd1.collect().foreach(println) rdd2.collect().foreach(println)
      sparkContext.stop()
      
      • 从底层代码实现来讲,makeRDD方法就是parallelize方法
      def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices)
      }
      
    2. 从外部存储(文件)创建RDD

      • 由外部存储系统的数据集创建RDD
      val sparkConf =
      new SparkConf().setMaster("local[*]").setAppName("spark") val sparkContext = new 
      SparkContext(sparkConf)
      val fileRDD: RDD[String] = sparkContext.textFile("input") fileRDD.collect().foreach(println)
      sparkContext.stop()
      
    3. 从其他RDD创建

      • 主要通过一个RDD运算完后,再产生新的RDD
    4. 直接创建RDD

      • 使用new的方式直接构造RDD,一般由Spark框架自身使用

RDD并行度与分区

Spark任务中的相关的概念

  1. 什么是 job
    • Job 简单讲就是提交给 spark 的任务,严格的说一个 Action 算子就属于一个 job
  2. 什么是 stage
    • Stage 是每一个 job 处理过程要分为的几个阶段,stage 的分割线是看是否发生 shuffle,即有没有产生宽依赖,后面会详细的说明。
  3. 什么是 task
    • Task 是每一个 job 处理过程要分为几次任务。Task 是任务运行的最小单位,一个 stage 要处理的 partition 对应一个 task,最终是要以 task 为单位运行在 Executor 中,taskExecutor进程中的一个线程。
  4. Spark 官网建议的 Task 的设置原则是:设置 Task 数目为 num-executors * executor-cores 的 2~3 倍较为合适。
  5. 一个 stagetask 的数量是由谁来决定的?
    • 是由输入文件的切片个数来决定的。通过算子修改了某一个 rdd 的分区数量,task 数量也会同步修改。总结:一个分区对应一个 task
  6. 一个 job 任务的 task 数量是由谁来决定的?
    • 一个 job 任务可以有一个或多个 stage,一个 stage 又可以有一个或多个 task。所以一个 jobtask 数量是 (stage 数量 * task 数量)的总和。
  7. 每一个 stage 中的 task 最大的并行度?
    • 并行度:是指指令并行执行的最大条数。在指令流水中,同时执行多条指令称为指令并行。
    • 理论上:每一个 stage 下有多少的分区,就有多少的 tasktask 的数量就是任务的最大的并行度。(一般情况下,一个 task 运行的时候,使用一个 cores
    • 实际上:最大的并行度,取决于任务运行时使用的 executor 拥有的 core 的数量。

分区和并行度的关系

  • 分区数:是一个相对静态的概念,这个值的初始大小由数据源的分布情况决定(如果是内存数据,分区数和设置的并行度一致),比如读取 hdfs,此时有 10block 块,那么你的分区数就是 10;

  • 并行度:是一个相对动态的概念,是根据当前计算引擎可用资源来动态决定的,它的值是小于等于分区数

  • 总结:

    1. 分区数描述的是数据源,是个相对静态的概念
    2. 并行度描述的是计算引擎一次要同时处理的分区数,是根据资源情况临时决定的
    3. 并行度建议小于等于分区数

RDD 转换算子

  • transformation 操作会针对已有的 RDD 创建一个新的 RDDtransformation 的特点就是 lazy 特性。lazy 特性指的是,如果一个 spark 应用中只定义了 transformation 操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation 是不会触发 spark 程序的执行的,它们只是记录了对 RDD 所做的操作,但是不会自发的执行。只有当 transformation 之后,接着执行了一个 action 操作,那么所有的 transformation 才会执行。Spark 通过这种 lazy 特性,来进行底层的 spark 应用执行的优化,避免产生过多中间结果。

  • RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型

Value类型

map

  • 函数签名

    def map[U: ClassTag](f: T => U): RDD[U]
    
  • 函数说明

    • map 是对 RDD 中的每个元素都执行一个指定函数来产生一个新的 RDD。任何原 RDD 中的元素在新 RDD 中都有且只有一个元素与之对应,将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
  • 函数示例:

    def main(args: Array[String]) {
    	val conf = new SparkConf().setAppName("Map").setMaster("local")
    	val sc =new SparkContext(conf)
    	val dataRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    	val dataRDD1: RDD[Int] = dataRDD.map(
    		num => {num * 2}
    	)
    	val dataRDD2: RDD[String] = dataRDD1.map(
    		num => { "" + num }
    	)
    }
    

mapPartitions

  • 函数签名

    def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U]
    
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("mapPartition")
    	val sc = new SparkContext(conf)
    	val value: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9),1)
    	val rest = value.mapPartitions{
    		iter => {
    			var result = List[Int]()
    			var even = 0
    			var odd = 0
    			while (iter.hasNext){
    				var value = iter.next()
    				if(value % 2 == 0){
    				even += value // 2 +4 + 6 +8 = 20
    				// println(s"parID id ${TaskContext.get.partitionId()}, value is ${value}, even is ${even}")
    				}else{
    				odd += value // 1+3+5+7+9 = 25
    				// println(s"parID id ${TaskContext.get.partitionId()}, value is ${value}, even is ${even}")
    				}
    			}	
    		result = result :+ even :+ odd // list(20,25)
    		result.iterator
    		}
    	}
    rest.foreach(println)//20 25
    }
    
  • 函数说明

    • 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
  • mapmapPartitions 的区别

    • Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
  • 功能的角度

    • Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。
    • MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
  • 性能的角度

    • Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。
    • 但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。

mapPartitionsWithIndex

  • 函数签名

    def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U]
    
  • 函数说明

    • 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
  • 函数示例:

    def main(args: Array[String]) {
    	val conf = new
    	SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local")
    	val sc =new SparkContext(conf)
    	val rdd = sc.parallelize(1 to 8,3)
    	rdd.mapPartitionsWithIndex{ //查看各个分区的数据的最好的 api
    		(partid,iter)=>{
    			var part_map = scala.collection.mutable.Map[String,List[Int]]()
    			var part_name = "part_" + partid
    			part_map(part_name) = List[Int]()
    			while(iter.hasNext){
    				part_map(part_name) :+= iter.next()//:+= 列表尾部追加元素
    			}	
    			part_map.iterator
    		}
    	}.collect
    }
    

flatMap

  • 函数签名

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
    
  • 函数说明

    • 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
  • 函数示例

    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("flatMap")
        val sc = new SparkContext(sparkConf)
        val rdd = sc.makeRDD(List(
          List(1, 2), List(3, 4)
        ))
        val flatRDD = rdd.flatMap(
          list => {
            list//list是RDD中的一个元素,而list又是List类型,返回结果时会拆解为单个数字返回
          }
        )
        flatRDD.collect().foreach(println)
        sc.stop()
    }
    

glom

  • 函数签名

    def glom(): RDD[Array[T]]
    
  • 函数说明

    • 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
  • 函数示例

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new
            SparkConf().setMaster("local").setAppName("flatMap")
        val sc = new SparkContext(conf)
        val dataRDD = sc.makeRDD(List( 1,2,3,4),1)
        val glomRDD: RDD[Array[Int]] = dataRDD.glom()
        val mapRDD: RDD[Int] = glomRDD.map({
          iter =>
            iter.max
        })
        println(mapRDD.sum())
    }
    

groupBy

  • 函数签名

    def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    
  • 函数说明

  • 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

filter

  • 函数签名

    def filter(f: T => Boolean): RDD[T]
    
  • 函数说明

    • 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("mapPartition")
    	val sc = new SparkContext(conf)
    	val dataRDD = sc.makeRDD(List(1, 2, 3, 4), 1)
    	dataRDD.filter(_ % 2 == 0).foreach(println)
    }
    

sample

  • 函数签名

    def sample(
    withReplacement: Boolean,
    fraction: Double,
    seed: Long = Utils.random.nextLong): RDD[T]
    
  • 函数说明

    • 根据指定的规则从数据集中抽取数据
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("mapPartition")
    	val sc = new SparkContext(conf)
    	val dataRDD = sc.makeRDD(List( 1,2,3,4),1)
    	// 抽取数据不放回(伯努利算法)
    	// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
    	// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
    	// 第一个参数:抽取的数据是否放回,false:不放回
    	// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取; 如果大于 1 或者小于零将会报错
    	// 第三个参数:随机数种子
    	val dataRDD1 = dataRDD.sample(false, 0.5)
    	// 抽取数据放回(泊松算法)
    	// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
    	// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
    	// 第三个参数:随机数种子
    	val dataRDD2 = dataRDD.sample(true, 2) // 报错
    }
    

distinct

  • 函数签名

    def distinct()(implicit ord: Ordering[T] = null): RDD[T]
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
  • 函数说明

    • 将数据集中重复的数据去重
  • 函数示例

    def main(args: Array[String]): Unit = {
    	//创建配置文件
    	val conf: SparkConf = new
    	SparkConf().setAppName("distinct").setMaster("local[*]")
    	//创建 SparkContext,该对象是提交的入口
    	val sc = new SparkContext(conf)
    	//创建 RDD
    	val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 3, 3), numSlices = 3)
    	//用 mapPartitionsWithIndex 的目的是为了证明全局去重和 shuffle
    	rdd.mapPartitionsWithIndex((index: Int, datas: Iterator[Int]) => {
    		println(index + "分区:" + datas.mkString(","))
    		datas
    	}).collect()
    	println("***********************************")
    	//去重
    	rdd.distinct().mapPartitionsWithIndex((index, datas) => {
    	println(index + "分区:" + datas.mkString(","))
    	datas
    	}).collect()
    	//释放资源
    	sc.stop()
    }
    

coalesce

  • 函数签名

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
    partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    (implicit ord: Ordering[T] = null)
    : RDD[T]
    
  • 函数说明

    • 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("coalesce")
    	val sc = new SparkContext(conf)
    	val rdd = sc.parallelize(1 to 16,4)
    	//当 suffle 的值为 false 时,不能增加分区数
    	val coalesceRDD = rdd.coalesce(3)
    	println("重新分区后的分区个数:" + coalesceRDD.partitions.size)
    	// shuffle=true,可以增加或者減少分区
    	val coalesceRDDOnShuffle = rdd.coalesce(7,true)
    	println("重新分区后的分区个数:" + coalesceRDDOnShuffle.partitions.size)
    	println("RDD 依赖关系:"+coalesceRDDOnShuffle.toDebugString)
    }
    

repartition

  • 函数签名

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
  • 函数说明

    • 该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的RDD,还是将分区数少的 RDD 转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("coalesce")
    	val sc = new SparkContext(conf)
    	val rdd = sc.parallelize(1 to 16,4)
    	// 减少分区
    	val repatitionRDD = rdd.repartition(3)
    	println("重新分区后的分区个数:" + repatitionRDD.partitions.size)
    	// 增加分区
    	val repatitionRDDAdd = rdd.repartition(7)
    	println("重新分区后的分区个数:" + repatitionRDDAdd.partitions.size)
    }
    

repartition和coalesce区别

  • repartition 会产生 shuffle,是一个消耗比较昂贵的操作算子,Spark 出了一个优化版的 repartition 叫做 coalesce(shuffle=false),它可以尽量避免数据迁移,但是你只能减少同等 RDDpartition
  • 1、N 小于 M(只能使用 shuffle)
    • 一般情况下,N 个分区有数据分布不均匀的状况,利用 HashPartitioner 函数将数据重新分区为 M 个,此时需要使用 repartition 或者 coalesce(shuffle=true)。
  • 2、N 大于 M 且和 M 相差不多(不建议使用 shuffle)
    • 假如 N1000M100,那么就可以将 N 个分区中的若干个分区合并成一个新的分区,最终合并为 M 个分区,这时可以使用 coalesce(shuffle=false),不产生 shuffle,如果使用 repartition 将会产生 shuffle
  • 3、N 大于 M 且和 M 相差悬殊(建议使用 shuffle)
    • 这时如果将 shuffle 设置为 false,父子 RDD 是窄依赖关系,他们同处在一个 stage 中,就可能造成 spark 程序的并行度不够,从而影响性能。如果在 M 为 1 的时候,为了使 coalesce 之前的操作有更好的并行度,可以将 shuffle 设置为 true

sortBy

  • 函数签名

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length):RDD[(K, V)]
    
    def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
    
  • 函数说明

    • 该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原``RDD的分区数一致。中间存在shuffle` 的过程
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setMaster("local").setAppName("sortByKey")
    	val sc = new SparkContext(conf)
    	val rdd: RDD[(Int, String)] = sc.makeRDD(List((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd")))
    	// 默认是升序
    	val resRDD1: RDD[(Int, String)] = rdd.sortByKey()
    	resRDD1.collect().foreach(println)
    	println("**************************************")
    	// 降序
    	val resRDD2: RDD[(Int, String)] = rdd.sortByKey(ascending = false)
    	resRDD2.collect().foreach(println)
    	val products = sc.parallelize(List("屏保 20 10","支架 20 1000","酒精棉 5 2000","吸氧机 5000 1000"))
    	val productData = products.map(x=>{
    		val splits = x.split(" ")
    		val name = splits(0)
    		val price = splits(1).toDouble
    		val amount = splits(2).toInt
    		(name,price,amount)
    		})
    	productData.sortBy(_._2, false).collect().foreach(println)
    	productData.sortBy(x=>(-x._2, -x._3)).collect().foreach(println)
    }
    
    object CustomSortByKey {
    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setAppName("CustomSortByKey").setMaster("local")
    	val sc = new SparkContext(conf)
    	val stdList: List[(Student, Int)] = List(
    		(new Student("zs", 18), 1), (new Student("li", 18), 1), (new Student("zs", 19), 1), (new Student("wangwu", 18), 1), (new Student("zs", 20), 1)
    	)
    	//创建
    	val stdRDD: RDD[(Student, Int)] = sc.makeRDD(stdList)
    	val resRDD: RDD[(Student, Int)] = stdRDD.sortByKey()
    	//打印输出
    	resRDD.collect().foreach(println)
    	}
    }
    
    class Student(var name: String, var age: Int) extends Ordered[Student] with Serializable {
    	//自定义比较规则
    	override def compare(that: Student): Int = {
    		//先按照名称升序排序,如果名称相同的话,在按照年龄降序排序
    		var res: Int = this.name.compareTo(that.name)
    		if (res == 0) {
    			res = that.age - this.age
    		}
    		res
    	}
    	override def toString: String = s"Student($name, $age)"
    }
    
    
    object SortByDemo {
    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("sortBy demo")
    	val sc = new SparkContext(conf)
    	val students = List(Student("zhangsan",15), Student("lisi",10), Student("wangwu",12), Student("wangwu",14), 	Student("zhuqi",18)
    	)
    	sc.makeRDD(students).sortBy(_.age)(Ordering[Int].reverse, ClassTag.Int) // 防止泛型擦除
    		.collect()
    		.foreach(println)
    }
    case class Student(var name: String, var age: Int)
    

双Value类型

intersection

  • 函数签名

    def intersection(other: RDD[T]): RDD[T]
    
  • 函数说明

    • 对源RDD 和参数RDD 求交集后返回一个新的RDD
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setAppName("intersection").setMaster("local")
    	val sc = new SparkContext(conf)
    	val dataRDD1 = sc.makeRDD(List(1,2,3,4))
    	val dataRDD2 = sc.makeRDD(List(3,4,5,6))
    	val dataRDD = dataRDD1.intersection(dataRDD2)
    }
    

union

  • 函数签名

    def union(other: RDD[T]): RDD[T]
    
  • 函数说明

    • 对源RDD和参数RDD求并集后返回一个新的RDD
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("union demo")
    	val sc = new SparkContext(conf)
    	val rdd1 = sc.makeRDD(List(1,2,3,4,5))
    	val rdd2 = sc.makeRDD(List(3,4,5,6,7))
    	// union 是不去重的,如果想去重,再使用 distinct 算子
    	rdd1.union(rdd2)
    	  .distinct()
    	  .collect()
    	  .foreach(println)
    	val rdd3 = rdd1 ++ rdd2 // rdd1.++(rdd2)
    	rdd3.distinct().collect().foreach(println)
    }
    

subtract

  • 函数签名

    def subtract(other: RDD[T]): RDD[T]
    
  • 函数说明

    • 以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setMaster("local").setAppName("subtract demo")
    	val sc = new SparkContext(conf)
    	val rdd1 = sc.makeRDD(List(1,2,3,4,5))
    	val rdd2 = sc.makeRDD(List(3,4,5,6,7))
    	// 以调用者为主
    	rdd1.subtract(rdd2).collect().foreach(println)
    	rdd2.subtract(rdd1).collect().foreach(println)
    }
    

cartesian

  • 函数签名

    def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
    
  • 函数说明

    • 笛卡尔积操作,对输入 RDD 内的所有元素计算笛卡尔积
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setAppName("cartesian").setMaster("local")
    	val sc = new SparkContext(conf)
    	val x = sc.parallelize(List(1,2,3,4,5))
    	val y = sc.parallelize(List(6,7,8,9,10))
    	x.cartesian(y).collect
    }
    

zip

  • 函数签名

    def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
    
  • 函数说明

    • 将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setAppName("zip").setMaster("local")
    	val sc = new SparkContext(conf)
    	val dataRDD1 = sc.makeRDD(List(1,2,3,4))
    	val dataRDD2 = sc.makeRDD(List(3,4,5,6))
    	val dataRDD = dataRDD1.zip(dataRDD2)
    }
    

zipPartitions

  • 函数签名

    def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning:Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
    
  • 函数说明

    • 该算子将多个 RDD 按照 partition 组合成为新的 RDD,该算子需要组合的 RDD 具有相同的分区数,如果分区数不相同,则会报错。但对于每个分区内的元素数量没有要求(如果数据不对齐,那么由用户自定义解决对齐问题)。参数preservesPartitioning 表示是否保留父 RDDpartitioner 分区信息
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setAppName("zipPartitions").setMaster("local")
    	val sc = new SparkContext(conf)
    	var rdd1 = sc.makeRDD(1 to 5,2)
    	var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
    	//rdd1 两个分区中元素分布:
    	rdd1.mapPartitionsWithIndex{
    		(x,iter) => {
    			var result = List[String]()
    			while(iter.hasNext){
    			result ::= ("part_" + x + "|" + iter.next())
    			}
    		result.iterator
    		}
    	}.collect.foreach(println)
    	//rdd2 两个分区中元素分布
    	rdd2.mapPartitionsWithIndex{
    		(x,iter) => {
    		var result = List[String]()
    		while(iter.hasNext){
    			result ::= ("part_" + x + "|" + iter.next())
    		}
    		result.iterator
    		}
    	}.collect.foreach(println)
    	//rdd1 和 rdd2 做 zipPartition
    	rdd1.zipPartitions(rdd2){
    	(rdd1Iter,rdd2Iter) => {
    		var result = List[String]()
    		while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
    		result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
    	}
    	result.iterator
    }
    	}.collect.foreach(println)
    }
    

Key - Value类型

mapValues&flatMapValues

  • 函数签名

    def mapValues[U](f: V => U): RDD[(K, U)]
    
  • 函数说明

    • 输入函数应用于 RDDKVKey-Value)类型元素中的 Value,原 RDD 中的 Key 保持不变,与新的 Value 一起组成新的 RDD 中的元素。
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("mapValues demo")
    	val sc = new SparkContext(conf)
    	sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4),("e",5),("f",6)))
    		.mapValues(_ * 3)
    		.collect()
    		.foreach(println)
    		sc.makeRDD(List(("a","hello java"),("b","hello scala"),("c","hello hadoop")))
    		.flatMapValues(_.split(" "))
    		.collect()
    		.foreach(println)
    }
    

partitionBy

  • 函数签名

    def partitionBy(partitioner: Partitioner): RDD[(K, V)]
    
  • 函数说明

    • 将数据按照指定Partitioner 重新进行分区。Spark 默认的分区器是HashPartitioner
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("partitionBy demo")
    	val sc = new SparkContext(conf)
    	sc.makeRDD(List((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
        	.partitionBy(new HashPartitioner(3))
    		// .repartition(3)
    		.mapPartitionsWithIndex{
    			(id,iter)=>{
    				println(id + "分区:" + iter.mkString(","))
    				iter
    				}
    			}
    		.collect()
    }
    

reduceByKey

  • 函数签名

    def reduceByKey(func: (V, V) => V): RDD[(K, V)]
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
    
  • 函数说明

    • 可以将数据按照相同的KeyValue 进行聚合
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setAppName("reduceByKey").setMaster("local")
    	val sc = new SparkContext(conf)
    	val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
    	val dataRDD2 = dataRDD1.reduceByKey(_ + _)
    	val dataRDD3 = dataRDD1.reduceByKey(_ + _, 2)
    }
    

groupByKey

  • 函数签名

    def groupByKey(): RDD[(K, Iterable[V])]
    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
    
  • 函数说明

    • 将数据源的数据根据 keyvalue 进行分组
  • reduceByKeygroupByKey 的区别

    • shuffle的角度:reduceByKeygroupByKey 都存在 shuffle 的操作,但是reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
    • 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("groupByKey demo")
    	val sc = new SparkContext(conf)
    	sc.makeRDD(List((1,"A"),(2,"B"),(3,"C"),(1,"D"),(2,"C")),2)
    		.groupByKey()
    		.collect()
    		.foreach(println)
    	sc.makeRDD(List((1,"A"),(2,"B"),(3,"C"),(1,"D"),(2,"C")),2)
    		.groupBy(_._2)
    		.collect()
    		.foreach(println)
    }
    

aggregateByKey

  • 函数签名

    def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)]
    
  • 函数说明

    • 将数据根据不同的规则进行分区内计算和分区间计算。
    • 需要注意的是:aggregateByKey 最终的返回数据结果应该和初始值的类型保持一致
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setAppName("aggregateByKey").setMaster("local")
    	val sc = new SparkContext(conf)
    	// 将数据根据不同的规则进行分区内计算和分区间计算
    	val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
    	// aggregateByKey 算子是函数柯里化,存在两个参数列表
    	// 1. 第一个参数列表中的参数表示初始值
    	// 2. 第二个参数列表中含有两个参数
    	// 2.1 第一个参数表示分区内的计算规则
    	// 2.2 第二个参数表示分区间的计算规则
    	dataRDD1.aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)
    	// 取出每个分区内相同 key 的最大值然后分区间相加
    	val rdd = sc.makeRDD(List( ("a",1), ("a",2), ("c",3), ("b",4), ("c",5), ("c",6)), 2)
    	// 分区 0:("a",1),("a",2),("c",3) => (a,10),(c,10)
    	// 分区 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
    	// 最终的结果:分区 0 的(a,10)不动,将分区 0 的(c,10)和分区 1 的(c,10)相加结果(c,20)放在分区 1中,分区 1 中的(b,10)也保持不动
    	rdd.aggregateByKey(10)(
    		(x, y) => math.max(x,y), 
            (x, y) => x + y
    	)
    	.collect().foreach(println)
    }
    

foldByKey

  • 函数签名

    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
    
  • 函数说明

    • 当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为foldByKey

combineByKey

  • 函数签名

    def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)]
    
  • 函数说明

    • 允许对具有相同键的元素进行自定义的聚合操作。它接收三个函数参数:
      1. createCombiner:对于每个键的第一个值,创建一个累加器的初始值。
      2. mergeValue:对于每个键的后续值,将其合并到累加器中。
      3. mergeCombiners:在不同分区上的累加器之间进行合并
    • 这个操作常用于需要对键值对进行聚合计算的场景,比如求和、求平均等。它的结果是一个新的RDD,其中每个键都对应一个聚合后的结果值。
  • 函数示例

    // 统计男性和女生的个数,并以(性别,(名字,名字....),个数)的形式输出
    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("combineByKey demo")
    	val sc = new SparkContext(conf)
    	val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy"))
    	sc.parallelize(people)
    		.combineByKey(
    		//把不同 key 映射成另外的形式,这里决定了 V 和 C 的数据类型,一元匿名函数
    			(x: String) => (List(x), 1), //这里匿名函数输入参数就是上一步匿名函数的输出类型和输入类型组成的
    			(peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), 
    			//这一步主要是用来合并上一步的不同分区的输出,因是同类型合并,因此输出类型和第二步一样
    			(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)
    		)
    		.collect()
    		.foreach(println)
    }
    

reduceByKey、aggregateByKey、foldByKey、combineByKey 的区别

  • reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
  • aggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
  • foldByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
  • combineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

sortByKey

  • 函数签名

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)]
    
  • 函数说明

    • 在一个(K,V)RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的

join

  • 函数签名

    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    
  • 函数说明

    • 在类型为(K,V)(K,W)RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD
  • 函数详解

    • 本质是对两个含有 KV 对元素的 RDD 进行 cogroup 算子协同划分,再通过 flatMapValues 将合并的数据分散。在类型为(K, V)(K, W)RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K, (V,W))RDD
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setAppName("join").setMaster("local")
    	val sc = new SparkContext(conf)
    	val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    	val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
    	rdd.join(rdd1).collect().foreach(println)
    }
    

leftOuterJoin

  • 函数签名

    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    
  • 函数说明

    • 类似于SQL语句的左外连接

cogroup

  • 函数签名

    def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
    
  • 函数说明

    • 在类型为(K,V)(K,W)RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
  • 函数示例:

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setAppName("cogroup").setMaster("local")
    	val sc = new SparkContext(conf)
    	val dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("c",3)))
    	val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3)))
    	val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =
    	dataRDD1.cogroup(dataRDD2)
    }
    

自定义 Transformation 算子

  • 如果以上的这些算子依旧不能满足企业的业务需要,那么我们可以通过实现自定的 Transformation 算子实现。在自定义 Transformation 算子之前,可以先观察系统内置的 Transformation 算子是如何实现,通过模仿结构,接着只需要实现自定义的业务代码即可。

    object CustomTransformation {
    	def main(args: Array[String]): Unit = {
    		val conf = new SparkConf().setAppName("Custom Transformation").setMaster("local")
    		val sc = new SparkContext(conf)
    		val inputData: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
    		val transformedData = new MyTransformationRDD(inputData)
    		transformedData.collect().foreach(println)
    	}
    }
    class MyTransformationRDD(prev: RDD[Int]) extends RDD[Int](prev) {
    	override def compute(split: Partition, context: TaskContext):
    		Iterator[Int] = {
    		val inputIterator = firstParent[Int].iterator(split, context)
    		inputIterator.map(_ * 2)
    	}	
    	override protected def getPartitions: Array[Partition] = firstParent[Int].partitions
    }
    

RDD行动算子

  • action 则主要是对 RDD 进行最后的操作,或者成为触发操作,比如遍历、reduce、保存到文件等,并可以返回结果给 Driver 程序。action 操作执行,会触发一个 spark job 的运行,从而触发这个 action 之前所有的 transformation 的执行。

reduce

  • 函数签名

    def reduce(f: (T, T) => T): T
    
  • 函数说明

    • 聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
  • 函数示例:

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setAppName("reduce demo").setMaster("local")
    	val sc = new SparkContext(conf)
    	val reduce: Int = sc.makeRDD(Array(1, 2, 4, 5, 3)).reduce(_ + _)
    	println(reduce)
    }
    

collect&collectAsMap

  • 函数签名

    def collect(): Array[T]
    def collectAsMap(): Map[K, V]
    
  • 函数说明

    • RDD 多分区的元素转换为单机上的 Scala 数组并返回,类似于 toArray 功能
  • 补充:

    • collectAsMapcollect 类似,主要针对元素类型为 key-value 对的 RDD,转换为 Scala Map 并返回,保存元素的 KV 结构,作用与 collect 不同的是 collectAsMap 函数不包含重复的 key,对于重复的 key,后面的元素覆盖前面的元素
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local[*]").setAppName("collect demo")
    	val sc = new SparkContext(conf)
    	sc.makeRDD(List(1, 2, 3, 4),2)
    		.map(_*2)
    		.collect()
    		.foreach(println)
    }
    
    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setAppName("collectAsMap").setMaster("local")
    	val sc = new SparkContext(conf)
    	val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
    	val rdd = sc.parallelize(arr,2)
    	rdd.collectAsMap().foreach(println)
    }
    

count

  • 函数签名

    def count(): Long
    
  • 函数说明

    • 返回RDD中元素的个数
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("count demo")
    	val sc = new SparkContext(conf)
    	val rddCount: Long = sc.makeRDD(List(1, 2, 3, 4), 2)
    		.count()
    	println(rddCount)
    }
    

first

  • 函数签名

    def first(): T
    
  • 函数说明

    • 返回RDD 中的第一个元素
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setAppName("first demo").setMaster("local")
    	val sc = new SparkContext(conf)
    	val first: Int = sc.makeRDD(Array(1, 2, 4, 5, 3)).first()
    	println(first)
    }
    

take

  • 函数签名

    def take(num: Int): Array[T]
    
  • 函数说明

    • 返回一个由RDD 的前 n 个元素组成的数组
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("take demo")
    	val sc = new SparkContext(conf)
    	sc.makeRDD(List(1, 3, 4, 5, 6),3)
    		.take(4)
    		.foreach(println)
    	sc.makeRDD(List(1, 3, 4, 5, 6),3)
    		.takeSample(false, 3)
    		.foreach(println)
    }
    

takeOrdered

  • 函数签名

    def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
    
  • 函数说明

    • 返回该 RDD 排序后的前 n 个元素组成的数组
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local[*]").setAppName("takeOrder demo")
    	val sc = new SparkContext(conf)
    	sc.parallelize(Array(1,3,2,7,5))
    		.takeOrdered(2)
    		.foreach(println)
    }
    

aggregate

  • 函数签名

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    
  • 函数说明

    • 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
  • 函数详解

    • 允许用户对 RDD 使用两个不同的 reduce 函数,第一个 reduce 函数对各个分区内的数据聚集,每个分区得到一个结果;第二个 reduce 函数对每个分区的结果进行聚集,最终得到一个总的结果。aggregate 相当于对 RDD 内的元素数据归并聚集。如果 aggregate 的分区内分区间的操作是一样的,那么就可以简化为 fold
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setAppName("aggregate demo").setMaster("local")
    	val sc = new SparkContext(conf)
    	val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
    	// 将该 RDD 所有元素相加得到结果,对于两个函数相同的情况下,aggregate 可以简化为 fold
    	println(rdd.aggregate(0)(_ + _, _ + _))
    	println(rdd.fold(0)(_ + _))
    }
    

fold

  • 函数签名

    def fold(zeroValue: T)(op: (T, T) => T): T
    
  • 函数说明

    • 折叠操作,aggregate的简化版操作

countByKey&countByValue

  • 函数签名

    def countByKey(): Map[K, Long]
    
    def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
    	map(value => (value, null)).countByKey()
    }
    
  • 函数说明

    • countByKey:统计每种 key 的个数, 因此针对的是(key, value)这种 pair 对数据
    • countByValue:统计相同的 value 的个数,原理就是把 value 封装成(value,null)然后再调用 countByKey
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setAppName("countByKey demo").setMaster("local")
    	val sc = new SparkContext(conf)
    	sc.makeRDD(List((0,"a"), (1, "a"), (1, "b"), (2, "c"), (2, "c"), (3, "d")))
    		.countByKey
    		.foreach{
    			tuple => {
    			println(s"key is ${tuple._1},and value is ${tuple._2}")
    			}
    		}
    }
    

save

  • 函数签名

    def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit
    def saveAsSequenceFile(
    path: String,
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit
    def saveAsObjectFile(path: String): Unit
    
  • 函数说明

    • 将数据保存到不同格式的文件中,常见的有一下多种形式:
      • **saveAsTextFile:**用于将 RDD 以文本文件的格式存储到指定的文件系统中。从源码中可以看到,saveAsTextFile 函数是依赖于 saveAsHadoopFile 函数,由于 saveAsHadoopFile 函数接受 PairRDD,所以在 saveAsTextFile 函数中利用rddToPairRDDFunctions 函数转化为(NullWritable,Text)类型的 RDD,然后通过 saveAsHadoopFile 函数实现相应的写操作。
      • saveAsObjectFile:saveAsObjectFile 用于将 RDD 中的元素序列化成对象,存储到文件中。从源码中可以看出,saveAsObjectFile 函数是依赖于 saveAsSequenceFile 函数实现的,将 RDD 转化为类型为<NullWritable, BytesWritable>PairRDD,然后通过 saveAsSequenceFile 函数实现。
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new SparkConf().setAppName("save demo").setMaster("local")
    	val sc = new SparkContext(conf)
    	// 企业中一般会使用此种方式
    	val path = this.getClass.getResource("/data/aaaa.txt").getPath
    	// 一般测试中会使用此种方式
    	sc.textFile("src/main/resources/data/aaaaa.txt").repartition(2)
    	// 上一步有几个分区,就会有几个数据文件。另外下面保存的路径是会生成文件夹
    	// 如果指定位置已经存在,则会报错
    		.saveAsTextFile("src/main/resources/data/aaaa2.txt")
    	// 如果是一个目录,会读取目录下除“.”和“_”开头的所有文件
    	sc.textFile("src/main/resources/data/aaaa1.txt")
    		.collect
    		.foreach(println)
    	// 以“.”和“_”开头文件会被认为不存在
    	// sc.textFile("src/main/resources/data/abcd1.txt/_acc.txt")
    	// .collect()
    	// .foreach(println)
    	// 这里之所以能够访问到 hdfs,是因为本地配置了 HADOOP_HOME,如下:
    	// println(System.getenv("HADOOP_HOME"))
    	// 如果本地没有配置 HADOOP_HOME,可以将 core-site.xml 和hdfs-site.xml 放在 resources 目录下,
    	// 但是一旦放在 resourcer 目录下,这个时候默认的路径又变成了 hdfs:///,本地的话需要加 file:///
        // sc.textFile("src/main/resources/data/aaaa1.txt")
    	sc.textFile("file:///" + path)
    	// 即使是上面报错了,下面依旧可以在 hdfs 生成文件夹,只不过是空的,这点区别于本地
    		.saveAsTextFile("hdfs://hadoop004:8020/spark/whj2333")
    	// 一般在生产环境中不建议直接连接 hdfs 集群,而是通过 args 获取指定传参,然后通过 spark-submit 将指定的位置传入
    	// sc.textFile(args(0)).saveAsTextFile(args(1))
    	sc.textFile("hdfs://hadoop004:8020/spark/whj1")
    		.collect()
    		.foreach(println)
    }
    

foreach&foreachPartition

  • 函数签名

    def foreach(f: T => Unit): Unit = withScope {
    	val cleanF = sc.clean(f)
    	sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    
    def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    	val cleanF = sc.clean(f)
    	sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
    }
    
  • 函数说明

    • foreach 是对 RDD 中的每个元素执行无参数的 f 函数,返回 Unit,而 foreachPartiion 则是对每个分区
  • 函数示例

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("foreach demo")
    	val sc = new SparkContext(conf)
    	val value = sc.makeRDD(List(1, 2, 3, 4),2)
    		.map(_*2)
    	value.collect().foreach(println)
    	println("**********")
    	value.foreach(println)
    }
    
  • 函数补充

    • foreachPartition 算子可以提高模型效率,比如在使用 foreach 时,将 RDD 中所有数据写 Mongo 中,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用 foreachPartitions 算子一次性处理一个 partition 的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后

      执行批量插入操作,此时性能是比较高的。

自定义 Action 算子

  • 上面很多的 Action 算子,基本上已经足够日常的使用了,但是如果以上的这些算子依旧不能满足企业的业务需要,那么我们可以通过实现自定的 Action 算子实现。在自定义 Action 算子之前,可以先观察系统内置的 Action 算子是如何实现,通过模仿结构,接着只需要实现自定义的业务代码即可。

  • 通过阅读每一个 Action 算子的源码可以发现一个通用的方法:runJob,而这个方法才是真正触发 Spark 任务执行的方式,因此只要显示的调用该方法,就可以实现自定义 Action 算子的实现。

    object CustomAction {
    def main(args: Array[String]): Unit = {
    	val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
    	val sc = new SparkContext(conf)
    	val rdd = sc.parallelize(Array(1,2,3,4,5,6,7),4)
    	val res = sc.runJob(rdd, myAction _)
    	res.foreach(println)
    }
    
    /*在 driver 端获取 executor 执行 task 返回的结果,比如 task 是个规则引擎,我想
    知道每条规则命中了几条数据
    */
    def myAction(itr : Iterator[Int]) = {
    	var count = 0
    	itr.foreach(each =>{
    		count += 1
    	})
    	(TaskContext.getPartitionId(),count)
    }
    }
    

Transformation 算子和 Action 算子的判别

  • Spark 中,可以通过以下几个方面来判断一个算子是 Transformation 类型还是 Action类型:
    1. 返回值类型:Transformation 类型的算子返回一个新的 RDD,而 Action 类型的算子返回一个非分布式的结果(通常是 Scala 集合或基本数据类型)。
    2. 惰性执行:Transformation 类型的算子通常是惰性执行的,即它们不会立即执行,而是在遇到 Action 类型的算子时触发执行。这是因为 Transformation 类型的算子只是描述了要对数据集进行的转换操作,并不会立即触发计算。
    3. 操作触发时机:Transformation 类型的算子通常在数据集上定义了一系列的转换操作,而不会立即触发计算。只有当遇到 Action 类型的算子时,Spark 才会根据依赖关系图(DAG)执行之前定义的一系列 Transformation 算子,并将结果返回给驱动程序。

RDD 序列化

闭包检查

  • 从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

序列化方法和属性

  • 从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行

Kryo序列化框架

  • Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

RDD中的 DAG 和 依赖关系

DAG

  • Spark 中,DAGDirected Acyclic Graph,有向无环图)是执行计划的核心概念。DAG代表了 Spark 应用程序中的一系列操作步骤和它们之间的依赖关系。DAG 是一组顶点与边的组合,顶点代表 RDD,边代表对 RDD 的一系列操作。DAG Sheduler 根据 RDD 的不同 transformation 操作,将 DAG 分为不同的 stage,每个 stage 中又分为多个 taskSpark 将应用程序转换为一系列的转换操作,每个操作产生一个新的 RDD,并且这些操作形成了一个有向无环图。理解 Spark 中的 DAG 可以从两个方面来考虑:逻辑 DAG 和物理 DAG

    1. 逻辑 DAG

      • 逻辑 DAG 描述了应用程序中的转换操作和它们之间的依赖关系,而不考虑具体的执行计划和物理执行细节。逻辑 DAG 是以逻辑操作符(如 mapfilterreduceByKey 等)为节点,以依赖关系为边构成的有向无环图。逻辑 DAG 表示了应用程序的逻辑流程和数据转换过程。
    2. 物理 DAG

      物理 DAG 描述了 Spark 应用程序的实际执行计划,它将逻辑 DAG 映射到具体的执行引擎(如 Spark CoreSpark SQL 等)。物理 DAG 考虑了底层计算资源、数据分片、并行度等因素,以最优的方式执行操作。

RDD 血缘关系

  • RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的Lineage 会记录RDD 的元数据信息和转换行为,当该RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

RDD 依赖关系

  • 两个相邻RDD之间的关系

RDD 窄依赖和RDD 宽依赖

  • Spark 应用执行过程中,会在逻辑上生成 DAG。当 Action 算子被触发后,会将所有累积的算子生成有向无环图并由调度器对图上任务进行调度执行。Spark 的调度方式较传统的 MapReduce 复杂许多,它会根据 RDD 之间的依赖关系来划分不同的阶段(Stage),而一个 Stage 则包含一系列执行任务(TaskSet)。
  • Stage 划分是基于数据依赖关系的,一般分为两类:宽依赖(ShuffleDependency)与窄依赖(NarrowDependency)。

RDD 窄依赖

  • 窄依赖表示每一个父(上游)RDD的Partition最多被子(下游)RDD的一个Partition使用

RDD 宽依赖

  • 宽依赖表示同一个父(上游)RDD的Partition被多个子(下游)RDD的Partition依赖,会引起Shuffle

总结

  • 区分宽窄依赖的主要判断条件是:父 RDDPartition 流向,要是流向单个 RDD 就是窄依赖,流向多个 RDD 就是宽依赖。

Stage 划分算法

  • 从最后一个 RDD 往前推算,遇到窄依赖(NarrowDependency)就将其加入该 Stage,当遇到宽依赖(ShuffleDependency)则断开。每个 Stagetask 的数量由 Stage 最后一个 RDD 中的分区数决定。如果 Stage 要生成 Result,则该 Stage 里的 Task都是 ResultTask,否则是 ShuffleMapTask。因此 sparktask 只有这两种类型。
  • ShuffleMapTask 的计算结果需要 shuffle 到下一个 Stage,其本质上相当于 MapReduce中的 MapperResultTask 则相当于 MapReduce 中的 reducer。因此整个计算过程会根据数据依赖关系自后向前建立,遇到宽依赖则形成新的 Stage

RDD 任务划分

  • RDD任务切分中间分为:ApplicationJobStageTask
    • Application:初始化一个 SparkContext 即生成一个Application
    • Job:一个Action 算子就会生成一个Job
    • Stage:Stage等于宽依赖(ShuffleDependency)的个数加 1;
    • Task:一个 Stage 阶段中,最后一个RDD 的分区个数就是Task 的个数。

RDD 依赖之间的 shuffle

为什么要排序

  1. key 存在 combiner 操作,排序之后相同的 key 放到一块显然更加方便做合并操作
  2. reduce task 是按 key 去处理数据的。 如果没有排序那必须从所有数据中把当前相同 key 的所有 value 数据拿出来,然后进行 reduce 逻辑处理。显然每个 key 到这个逻辑都需要做一次全量数据扫描,影响性能,有了排序很方便的得到一个 key 对于的 value 集合。
  3. 同上,如果 key 按顺序排序,那么 reduce task 就按 key 顺序去读取,显然当读到的 key 是文件末尾的 key 时那么就标志数据处理完毕。如果没有排序那还得有其他逻辑来记录哪些 key处理完了,哪些 key 没有处理完

为什么要文件合并

  1. 因为内存放不下就会溢写文件,就会发生多次溢写,形成很多小文件,如果不合并,显然会小文件泛滥,集群需要资源开销去管理这些小文件数据。
  2. 任务去读取文件的数增多,打开的文件句柄数(引用数)也会增多
  3. mapreduce 是全局有序。单个文件有序,不代表全局有序,只有把小文件合并一起排序才会全局有序。
  • 总结:减少系统去管理小文件所需要的开销。

spark shuffle

  • sparkshuffle 是在 MapReduce shuffle 基础上进行的调优。其实就是对排序、合并逻辑做了一些优化。在 sparkshuffle write 相当于 MapReducemapshuffle read 相当于 MapReducereduceSpark 丰富了任务类型,有些任务之间数据流转不需要通过 Shuffle,但是有些任务之间还是需要通过 Shuffle 来传递数据,比如宽依赖的 groupByKey 以及各种 xxxByKey 算子。

未经优化的 HashShuffleManager

  • 上游 task 写文件的时候只是将数据按分区追加到文件中,并没有像 MapReduce 那样先内存溢写成文件,然后再文件与文件之间进行合并,虽然节省了排序、合并的开销。但有一个弊端就是会产生大量的中间磁盘文件,进而由大量的磁盘 IO操作影响了性能。
  • 问题:生成文件个数过多,生成和传输 上游 task 数量 * 下游 task 数量个文件。
  • 对应目前 spark 的参数:
    • spark.shuffle.manager=hash
    • spark.shuffle.consolidateFiles=false

优化的 HashShuffleManager

  • 相比第一种机制,就是在一个 Executor 中的所有的 task 是可以共用一个 buffer 内存。在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件了,而是允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。此时的文件个数是 CPU core的数量 × 下一个 stagetask 数量。
  • 为了开启优化后的 HashShuffleManager,我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。
  • 备注:在 Spark 1.2 以后的版本中,默认的 ShuffleManager 改成了 SortShuffleManager

SortShuffleManager

  • 在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
  • 在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 JavaBufferedOutputStream实现的。
  • BufferedOutputStreamJava 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。
  • 一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是 merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stagetask 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据(排好序的数据)在文件中的 start offsetend offset
  • SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量,由于每个 task 最终只有一个磁盘文件,所以文件个数等于上游 shuffle write 个数。

SortShuffle 优化之 bypass 运行机制

  • 相比第 3 中少了排序,task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 keyhash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时,也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
  • 该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。bypass 机制的目的是尽量减少不必要的数据重排和磁盘 IO 操作,该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
  • bypass 运行机制的触发条件如下:
    1. shuffle map task 数量小于 spark.shuffle.sort.bypassMergeThreshold(默认是 200)参数的值。
    2. 不是聚合类的 shuffle 算子(比如 groupKey)。因为不像第 3 种机制那样会对聚合类算子以 map 的数据结构存储,在写的过程中会先进行局部聚合。
  • 备注:之所以要满足“不是聚合类的 shuffle 算子”的原因是聚合类的 shuffle 算子通常需要按照 key 进行排序和分区,以确保相同 key 的数据能够被正确聚合在一起。如 groupByKey 这类。虽然是 ByKey,但是没有聚合操作,所以可以通过设置spark.shuffle.sort.bypassMergeThreshold 从而跳过排序操作

RDD 持久化

RDD 数据持久化/缓存

  • Spark 中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 该 RDD 的缓存在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。
  • 备注:缓存针对的是某个 RDD 或者说是某个 Transformation 算子的结果。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。
  • RDD 可以使用 persist() 方法或 cache() 方法进行持久化。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。
  • shuffle 操作中(例如 reduceByKey),即便是用户没有调用 persist 方法,Spark 也会自动缓存部分中间数据。这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法。
  • 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可, 并不需要重算全部 Partition
  • Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persistcache

cachepersist

  • cachepersist 严格来说既不是 transformation 算子,也不是 action 算子,因为没有生成新的 RDD,只是标记了当前 RDD 需要 cachepersist

  • cachepersistlazy 的,当第一次遇到 Action 算子的时侯才会进行缓存或持久化,以后再触发 Action 会读取、复用缓存的 RDD 的数据再进行操作。

  • 另外 cache 底层调用了 persist 方法:

    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
    
  • 上述代码中涉及到了存储级别(StorageLevel

    class StorageLevel private(
    	private var _useDisk: Boolean,
        private var _useMemory: Boolean, 
        private var _useOffHeap: Boolean, 
        private var _deserialized: Boolean, 
        private var _replication: Int = 1)
    extends Externalizable
    
  • 在该文件中可以看出在 Spark 中有 12 种可选的存储级别:

    val NONE = new StorageLevel(false, false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
    val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
    val MEMORY_ONLY = new StorageLevel(false, true, false, true)
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
    val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
    
  • 部分级别解析:

    1. MEMORY_ONLY(默认值)
      • 使用未序列化的 Java 对象格式,将数据保存在内存中。如果内存不够存放所有的数据 (oom),则数据可能就不会进行持久化。那么下次对这个 RDD 执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略。
      • 备注:RDDcache 默认方式采用 MEMORY_ONLYDataFramecache 默认采用 MEMORY_AND_DISK
    2. MEMORY_AND_DISK(企业中用的最多的)
      • 使用未序列化的 Java 对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个 RDD 执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
    3. MEMORY_ONLY_SER
      • 基本含义同 MEMORY_ONLY。唯一的区别是,会将 RDD 中的数据进行序列化,RDD 的每个 partition 会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁 GC(garbage collection)。
    4. MEMORY_AND_DISK_SER
      • 基本含义同 MEMORY_AND_DISK。唯一的区别是,会将 RDD 中的数据进行序列化,RDD 的每个 partition 会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁 GC
    5. DISK_ONLY
      • 使用未序列化的 Java 对象格式,将数据全部写入磁盘文件中。
    6. MEMORY_ONLY_2MEMORY_AND_DISK_2
      • 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
    7. OFF_HEAP
      • 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。
    • 推荐使用持久化级别顺序:
    • MEMORY_ONLY ----> MEMORY_ONLY_SER ----> MEMORY_AND_DISK_SER ----> MEMORY_AND_DISK

RDD Cache缓存

  • RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

    // cache 操作会增加血缘关系,不改变原有的血缘关系
    println(wordToOneRdd.toDebugString)
    
    // 数据缓存。
    wordToOneRdd.cache()
    
    // 可以更改存储级别
    //mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
    

RDD CheckPoint检查点

  • 检查点其实就是通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

  • cachecheckpoint 之间有一个最大的区别,cacheRDD 以及 RDD 的血统(记录了这个 RDD 如何产生)缓存到内存中,当缓存的 RDD 失效的时候(如内存损坏),它们可以通过血统重新计算来进行恢复。但是 checkpointRDD 缓存到了 HDFS 中,同时忽略了它的血统(也就是 RDD 之前的那些依赖)。为什么要丢掉依赖?因为可以利用 HDFS 多副本特性保证容错!

  • RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。checkpoint 会等到 job 结束后另外启动专门的 job 去完成 checkpoint,也就是说需要 checkpointRDD 会被计算两次。

    // 设置检查点路径
    sc.setCheckpointDir("./checkpoint1")
    
    // 创建一个 RDD,读取指定位置文件:hello atguigu atguigu val lineRdd: RDD[String] = sc.textFile("input/1.txt")
    
    // 业务逻辑
    val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
    val wordToOneRdd: RDD[(String, Long)] = wordRdd.map { word => {
    (word, System.currentTimeMillis())
    }
    }
    // 增加缓存,避免再重新跑一个 job 做 checkpoint
    wordToOneRdd.cache()
    // 数据检查点:针对 wordToOneRdd 做检查点计算
    wordToOneRdd.checkpoint()
    
    // 触发执行逻辑
    wordToOneRdd.collect().foreach(println)
    

缓存和检查点区别

  1. Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。

  2. 检查点是新建一个 job 来完成的,而缓存是 job 执行过程中进行。

  3. Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在HDFS 等容错、高可用的文件系统,可靠性高。

  4. 建议对checkpoint()的RDD 使用Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次RDD。

  5. 缓存可以将 RDDpartition 持久化到磁盘,但该 partitionBlockManager 管理。一旦 driver 执行结束,BlockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpointRDD

    持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。

RDD 分区器

  • 控制好数据的分布以便获得最少的网络传输,可以极大的提升整体性能,减少网络开销。Spark分区器描述的是RDD的数据在各个分区之间的分布规则,比如上游数据shuffle到下游时,分区器就决定了上游的哪些数据需要进入下游对应的哪些分区,但是只有 kv 类型 RDD 才有分区器,其它 RDD 分区器都为 noneSpark 中实现的分区器有两种,HashPartitioner(哈希分区器)和RangePartitioner(范围分区器),其中最常用的分区器是 HashPartitioner
  • Shuffle 和分区器之间的关系是:Shuffle 操作执行后,根据分区器的规则,将 shuffle 后的数据分配到新的分区。因此,Shuffle 操作之后,RDD 的分区器通常会发生改变。在一些需要 Spark Shuffle 操作的转换操作(例如 groupByKeyreduceByKeyjoin 等)中,可以通过提供合适的分区器来优化 Shuffle 性能,减少数据的移动和网络传输,提高作业执行效率。

HashPartitioner

  • HashPartitioner 采用哈希的方式对键值对数据进行分区。其数据分区规则为:$partitionId = Key.hashCode % numPartitions$,其中 partitionId 代表该 Key 对应的键值对数据应当分配到的 Partition 标识,Key.hashCode 表示该 Key 的哈希值,numPartitions 表示包含的 Partition 个数。

RangePartitioner

  • Spark 引入 RangePartitioner 的目的是为了解决 HashPartitioner 所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner 采用哈希的方式将同一类型的 Key 分配到同一个 Partition 中,因此当某一或某几种类型数据量较多时,就会造成若干 Partition 中包含的数据过大问题,而在 Job 执行过程中,一个 Partition 对应一个 Task,此时就会使得某几个 Task 运行过慢。
  • RangePartitioner 基于抽样的思想来对数据进行分区,简单的说就是将一定范围内的数映射到某一个分区内,分区与分区之间是有序的,也就是说一个分区中的元素肯定都比另一个分区中的元素小或者大;但是分区内的元素是不能保证顺序的。sortByKey 底层使用的数据分区器就是 RangePartitioner 分区器,因此 sortByKey 是可以保证全局有序的。
  • 该分区器的实现方式主要是通过两个步骤来实现的:
    • 第一步:先从整个 RDD 中抽取样本数据,将样本数据排序,计算出每个分区的最大 key 值,形成一个 Array[key] 类型的数组变量 rangeBounds
    • 第二步:判断 keyrangeBounds 中所处的范围,给出该 key 值在下一个 RDD 中的分区 id 下标。该分区器要求 RDD 中的 key 类型必须是可排序的。

RangePartitioner 的总结:

  1. 可以做到分区间有序,没有办法做到分区内有序,如果希望全局有序需要借助 sortBy 算子因为需要对数据进行排序,因此会带来一些性能上的开销
  2. 如果数据分布不均匀,导致某个 key 数据量特别到,同样会造成数据倾斜,甚至会造成某个分区数据量特别大,某个分区甚至无数据
  • 备注:将一定范围内的数映射到某一个分区内,在实现中,分界(rangeBounds)算法用到水塘抽样算法。

CustomPartitioner

  • Spark 系统提供的两个分区器没有办法满足具体的企业中的业务需要是,Spark 提供了 Partitioner 分区器的抽象类,用户可以自行继承此抽象类完成自定义分区器。

    def main(args: Array[String]): Unit = {
    	val conf: SparkConf = new
    	SparkConf().setMaster("local").setAppName("customPartitioner demo")
    	val sc = new SparkContext(conf)
    	val rdd = sc.makeRDD(List(("勇士", "Curry"), ("掘金", "Jokic"), ("湖人", "LBJ"), ("火箭", "YaoM")))
    	// 业务:实现将湖人的信息单独分区
    	rdd.partitionBy(new MyPartitioner)
    		.mapPartitionsWithIndex{
    			(id,iter) => {
    				println(id + "分区号:" + iter.mkString(","))
    				iter
    			}
    		}
    		.collect()
    }
    
    // 自定义分区器
    class MyPartitioner extends Partitioner {
    	// 自定义分区数量
    	override def numPartitions = 3
    	// 根据数据的 key 值返回分区索引 从 0 开始 from 0 to `numPartitions - 1`
    	override def getPartition(key: Any) = 		{
    		key match {
    			case "湖人" => 0
    			case "火箭" => 1
    			case _ => 2
    		}
    	}
    }
    

站在算子角度理解 spark 分区策略

  • Spark 中,RDD 的分区器不仅适用于键值对类型的 RDD,也适用于其他类型的 RDD。分区器决定了 RDD 数据在集群中的分布和存储方式,并且可以在操作中提供性能优化。对于键值对类型的 RDD,确实有特定的分区器,如 HashPartitionerRangePartitioner
  • 这些分区器基于键的哈希值或排序规则,将数据分配到不同的分区中,从而实现数据的分布。对于其他类型的 RDD,它们也可以具有分区器。默认情况下,这些 RDD 的分区器为 None,表示没有特定的数据分布方式。这意味着数据在 RDD 中的分布是不确定的,每个分区中的数据是独立的。

Source 算子

  • 一般在日常的工作中,source 算子大多数是从例如 hdfsmysql、本地文件等获取数据,这时候数据去往哪个分区和分区策略基本没有关系,而是取决于外部的这些系统,比如读取 hdfs文件,是根据 blocksize 以及你设置的分区大小来计算分区数,然后按照 block 去读取数据,这个时候数据在哪个分区取决于它属于哪个 block;再比如 mysqlspark 读取 mysql 一般是单并行度读取,如果是多并行度,数据去往哪个分区取决于你设置的条件

Transformation算子

  • Transformation 算子因为算子的不同分区策略也有所不同,但是总结分为以下几类:
    1. repartition&coalease
      • repartition 底层是调用开启 shufflecoalease,当调用他们时,无论是否是 key/value 类型数据,都是遍历上游算子每个分区中的数据,轮询放入下游算子分区中,放入的时候下游算子的初始分区 id 是随机的,随后依次加 1
    2. groupBy & groupByKey & partitionBy(new HashPartitioner(num)) & reduceByKey & join...
      • groupBy 可以看出,不仅 key/value 数据可以使用 HashPartitioner,不是 key/value的数据也可以使用,虽然底层是将一条数据作为 key,然后调用的 groupByKey
    3. sortBy & sortByKey & partitionBy(new RangePartitioner(num,rdd))
      • sortBy 底层是 sortByKey,但是其应用的是 RangePartitioner

共享变量

  • Spark 的一个核心功能是创建两种特殊类型的变量:广播变量和累加器变量

广播变量

  • 广播变量顾名思义,由 Driver 端发送数据,所有 Executor 端接收并保存这份数据,用于每个 Executor 上的数据计算工作。

广播变量的几点特性

  1. 广播变量是保存在 Executor 内存中的,每个 Executor 一份。如果一个 Executor 上执行多个 Task,那么多个 Task 将共享一份广播变量
  2. 广播变量是只读变量,即 Driver 端将变量发送到 Executor 端后,Executor 端只能读取变量数据,而不能修改变量数据
  3. 当多个 Executor 上需要同一份数据时,可以考虑使用广播变量形式发送数据。尤其当该份数据在多个 Stage 中使用时,通过广播变量一次性发送的方案,执行性能将明显优于当数据需要时再发送的多次发送方案。
  • 广播变量是使用 SparkContext 对象调用 broadcast 函数,传入要广播的变量。在 Executor端使用广播变量时,使用 Broadcast 对象,调用 value 函数就可以获得广播的数值了。

    def main(args: Array[String]): Unit = {
    	val sc = SparkContext.getOrCreate(new SparkConf().setMaster("local[*]").setAppName("broadcastTest"))
    	var factor = 3
    	val b: Broadcast[Int] = sc.broadcast(factor)
        factor = 5
    	val source = sc.makeRDD(Seq(1, 2, 3, 4, 5))
    	source.foreach(number => println(number * b.value))
    }
    
    def main(args: Array[String]): Unit = {
    	val sc = SparkContext.getOrCreate(new
    	SparkConf().setMaster("local[*]").setAppName("broadcastTest"))
    	val map = Map(1 -> 'a', 2 -> 'b', 3 -> 'c')
    	val user = new User(1, "zs") with Serializable
    	val b = sc.broadcast(user)
    	user.uName = "ls" 
        val source = sc.makeRDD(Seq(1, 2, 3, 4, 5))
    	source.foreach(number => println(b.value))
    }
    
    class User(idIn: Int, nameIn: String) {
    	var uid = idIn
    	var uName = nameIn
    	override def toString: String = s"uid:$uid , uName:$uName"
    }
    

累加器变量

  • 累加器与广播变量类似,也是定义好后由 Driver 发到各 Executor 上辅助运算。累加器区别于广播变量的特性是:累加器取值 accumulator.valueExecutor 端无法被读取,只能在 Driver 端被读取,而 Executor 端只能执行累加操作。累加器是一种比较高效的计数或者求和的工具

  • def main(args: Array[String]): Unit = {
    	val sc = SparkContext.getOrCreate(new
    	SparkConf().setMaster("local[*]").setAppName("broadcastTest"))
    	val oddCnt = sc.longAccumulator("oddCnt")
    	val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
    	rdd.foreach(number => if (number % 2 == 1) oddCnt.add(1))
    	println("奇数的数量是:" + oddCnt.value)
    }
    
    def main(args: Array[String]): Unit = {
    	val sc = SparkContext.getOrCreate(new
    	SparkConf().setMaster("local[*]").setAppName("broadcastTest"))
    	val oddSum = sc.longAccumulator("oddCnt")
    	val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
    	rdd.foreach(number => if (number % 2 == 1) oddSum.add(number))
    	println("所有奇数之和是:" + oddSum.value)
    }
    
  • Spark 提供了三种累加器,除了上面演示的 longAccumulator 外,还有doubleAccumulatorcollectionAccumulator,对于整型数值累加就采用 longAccumulator,浮点型数值累加采用 doubleAccumulator(整型数值都可以转化成浮点型数值,所以 doubleAccumulator 也可以进行整型数值的累加),collectionAccumulator 的累加操作是将目标元素放入一个集合中。

  • 留意下 collectionAccumulator 的使用,在创建时需要指定放入元素的泛型,否则无法执行累加操作:

  • def main(args: Array[String]): Unit = {
    	val sc = SparkContext.getOrCreate(new
    	SparkConf().setMaster("local[*]").setAppName("broadcastTest"))
    	val map = sc.collectionAccumulator[String]("map")
    	val rdd = sc.makeRDD(List("hello", "world", "hi", "scala", "hallo", "spark"))
    	rdd.foreach(e => if (e.startsWith("h")) map.add(e))
    	println(map.value)
    }
    
  • 累加器是可以自定义的,官方提供了累加器自定义的方式,自定义类继承 AccumulatorV2 类并实现其抽象函数即可

  • object First {
    	def main(args: Array[String]): Unit = {
    		val sc = SparkContext.getOrCreate(new
    		SparkConf().setMaster("local[*]").setAppName("broadcastTest"))
    		// 创建自定义累加器对象
    		val wordAccumulator = new WordAccumulator
    		// 注册累加器后,就就可以直接使用了
    		sc.register(wordAccumulator, "wc")
    		sc.textFile("in/article.txt") // 读取源文件
    			.filter(e => e.length != 0) // 过滤空行后切分为单词,再处理掉空的数据
    			.flatMap(line => {
    				line.trim.split("[\\s|\"|,|\\.|]+")
    			})
    			.filter(!_.isEmpty)
    			.foreach(e => wordAccumulator.add(e.trim)) // 调用累加方法就可以执行上面定义的代码了
    		println(wordAccumulator.value)
    	}
    }
    
    class WordAccumulator extends AccumulatorV2[String, util.HashMap[String, Int]] {
    	// 定义一个用来存储中间过程的 Map
    	private val map = new util.HashMap[String, Int]()
    	// 判断累加器是否是初始值,对于当前案例,执行过数据后 map 中肯定会有元素,所有累加器为初始值时 map 为空
    	override def isZero: Boolean = map.isEmpty
    	// 创建一个新的累加器
    	override def copy(): AccumulatorV2[String, util.HashMap[String, Int]] = new WordAccumulator
    	// 重置累加器,只需要将 map 置空就行了
    	override def reset(): Unit = map.clear()
    	// 添加一条数据,如果当前有该数据就使其 value + 1,否则就新加入一条值为 1 的键值对
    	override def add(v: String): Unit = if (map.containsKey(v))
    		map.put(v, map.get(v) + 1) else map.put(v, 1)
    	// 当前累加器的 map 与另外一个累加器的 map 合并
    	override def merge(other: AccumulatorV2[String, util.HashMap[String, Int]]): Unit = {
    		other match {
    			case o: WordAccumulator =>
    				o.value.entrySet().forEach(new
    					Consumer[util.Map.Entry[String, Int]] {
    	override def accept(t: util.Map.Entry[String, Int]): Unit = {
    		if (map.containsKey(t.getKey)) {
    			map.put(t.getKey, map.get(t.getKey) + t.getValue)
    		} else {
    			map.put(t.getKey, t.getValue)
    		}
    	}
    })
    		case _ => throw new UnsupportedOperationException(
    	s"Cannot merge ${this.getClass.getName} with
    	${other.getClass.getName}")
    	}
    }
    	// 最终要返回的值,我们返回 map 就可以了
    	override def value: util.HashMap[String, Int] = map
    }
    

两种特殊变量的总结

  • Spark 广播变量和累加器变量是两个重要的概念,它们在 Spark 分布式计算中起着不同的作用和意义。

广播变量的好处:

  1. 减少数据传输:通过将只读变量广播到集群中,避免了在网络上传输大量数据的开销,
  2. 减少了网络通信的负担。
  3. 提高性能:广播变量在每个节点上只有一份副本,减少了内存使用和垃圾回收的压力,提高了任务的执行效率。
  4. 共享数据:广播变量可以在集群中的所有任务中共享,使得每个任务都可以访问相同的数据,方便数据共享和操作。

累加器变量的好处

  1. 累加器变量在任务执行期间可以被多个任务并行地更新,但只能以只读方式访问其值。累加器变量通常用于收集任务执行期间的统计信息。
  2. 分布式累加:累加器变量可以在并行任务中进行累加操作,无论任务在集群中的哪个节点执行,都可以正确地对变量进行更新,确保了数据的一致性。
  3. 统计信息收集:累加器变量可以用于收集任务执行期间的统计信息,例如计数、求和、最大值、最小值等。这对于分布式计算中的调试和监控非常有用。
  • 需要注意的是,广播变量和累加器变量都是在分布式环境中使用的工具,用于提高 Spark 应用程序的性能和效率,但需要谨慎使用,确保在适当的场景下使用正确。