Spark

概念

  • 用于大规模数据分析的统一引擎,支持批/流处理,支持纯SQL开发等。

特点

  • 简单:支持ScalaJavaPythonRAPI,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的PythonScalashell,可以非常方便地在这些shell中使用Spak集群来验证解决问题的方法
  • 速度快:与 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快 10 倍以上。Spark 实现了高效的 DAG 执行引擎,可以通过基于内存来高效处理数据流;
  • 通用:Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询 (Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算 (GraphX)。
  • 兼容好:Spark 可以非常方便地与其他的开源产品进行融合。Spark 可以使用 YARNMesos 作为它的资源管理和调度器;可以处理所有 Hadoop 支持的数据,包括 HDFSHBaseCassandra 等。对于已经部署 Hadoop 集群的用户特别重要,因为不需要做任何数据迁移就可以使用 Spark 的强大处理能力。

架构

最上层

  • Spark SQL:Spark SQLSpark 用来处理结构化数据的一个模块,它提供了 2 个编程抽象:DataFrameDataSet,并且作为分布式 SQL 查询引擎的作用。将 Spark SQL 转换成 RDD,然后提交到集群执行.
  • Spark Streaming:Spark StreamingSpark Core 的扩展应用,它具有可扩展,高吞吐量,对于流数据的可容错性等特点。Spark Streaming 是个粗粒度的伪实时流程序
  • MLlib:Spark MLlibSpark 的重要组成部分,是最初提供的一个机器学习库。
  • GraphX:Spark GraphX 是一个分布式图处理框架,它是基于 Spark 平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求。

第二层:Spark Core

  • Spark CoreSpark 的核心与基础,实现了 Spark 的基本功能,包含任务调度,内存管理,错误恢复与存储系统交互等模块。

第三层:spark 的部署模式

  • **Local:**是我们平时开发测试中最常用的一种手段,可以直接在 IDEA 上就可以运行 spark代码,也是我们之后学习中使用比较多的一种方式
  • **Standalone:**被称为集群单机模式。本身都自带了完整的资源调度管理服务,可以独立部署到一个集群中,无需依赖任何其他的资源管理系统
  • Yarn:Yarn 模式被称为 Spark on Yarn 模式,即把 Spark 作为一个客户端,将作业提交给Yarn 服务,由于在生产环境中,很多时候都要与 Hadoop 使用同一个集群,因此采用 Yarn 来管理资源调度,可以有效提高资源利用率,Yarn 模式又分为 Yarn Cluster模式和 Yarn Client模式。

最底层:存储系统

  • Spark 提供多存储系统的接口,比如 HDFSAmazon S3HBase

Spark 集群架构

  • Cluster Manager:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器。
  • WorkerWokerSpark集群中的一台具体的机器。一台 Worker 机器会运行很多 Executor,每一个 Executor 都是一个 JVM 进程,这些进程才是 Spark 真正进行计算的地方,而每个 Executor 的内部又会有很多的 Task,而每个 Task 都是一个线程,提高整个计算的并行度。
  • **Driver:**创建 spark 上下文对象环境的应用程序就称为 Driver 驱动器。DriverSpark 作业执行时主要负责:
    1. 负责将用户程序解析为具体的 spark 作业(job
    2. 负责了应用程序的整体的资源调度
    3. 跟踪 Executor 的执行情况
    4. 通过 UI 展示查询运行情况
  • Executor:Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。Executor 有以下两个核心功能:
    1. 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
    2. 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

Spark on yarn 架构

  • 当在 YARN 上运行 Spark 作业,每个 Spark executor 作为一个 YARN 容器运行。Spark 可以使得多个 Tasks 在同一个容器里面运行。Spark on yarn 通常有以下两种运行模式。

client 模式

  1. YARN Client 模式下,spark-submit 提交 Spark Job 之后,就会在提交的本地机器上启动一个对应的 Driver
  2. Driver 启动后会与 ResourceManager 建立通讯并发起启动 ApplicationMaster 请求;
  3. ResourceManage 接收到这个 Job 时,会在集群中选一个合适的 NodeManager 并分配一个 Container,及启动 ApplicationMaster(初始化 SparkContext);
  4. ApplicationMaster 的功能相当于一个 ExecutorLaucher ,负责向 ResourceManager申请 Container 资源; ResourceManage 便会与 NodeManager 通信,并启动 Container
  5. ApplicationMaster 对指定 NodeManager分配的 Container 发出启动 Executor 进程请求;
  6. Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行执行 Job 任务;
  7. Driver 中的 SparkContext 分配 TaskExecutor 执行,Executor运行 Task 并向 Driver 汇报运行的状态、进度、以及最终的计算结果;让 Driver 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;应用程序运行完成后,ApplicationMasterResourceManager 申请注销并关闭自己。

cluster 模式

  1. YARN Cluster 模式下,Spark 任务提交之后会与 ResourceManager 建立通讯,并发出申请启动 ApplicationMaster 请求;
  2. ResourceManage 接收到这个 Job 时,会在集群中选一个合适的 NodeManager 并分配一个 Container;以及启动 ApplicationMaster ,此时的 ApplicationMaster 就是 Driver
  3. ApplicationMaster 启动后向 ResourceManager 申请资源,ResourceManager 接到 ApplicationMaster 的资源申请后会在合适(有资源的情况下)的 NodeManager 中分配 Container
  4. ApplicationMaster 对指定 NodeManager 分配的 Container 发出启动 Executor 进程请求;
  5. Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 Job 任务;
  6. ApplicationMaster 中的 SparkContext 分配 TaskExecutor 执行,Executor 运行 Task 并向 ApplicationMaster 汇报运行的状态、进度、以及最终的计算结果;让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
  7. 应用程序运行完成后,ApplicationMasterResourceManager 申请注销并关闭自己;

两种模式的区别

  • Yarn ClientYarn Cluster 任务提交方式两者区别,可以通过上面的 Spark 任务提交流程图可以看出来;主要区别在于 Driver 的创建的位置不一样,Client 方式是直接在本地机器上创建一个 Driver 进程,而 Cluster 方式在通过 ResourceManager 在某一个 NodeManager 中创建一个 Driver
  • 在使用场景当中,Yarn Client 方式一般适用于进行 Job 的调试(Debug),因为 Driver是在本地可以直接远程断点调试,而且 Driver 会与 Executor 进行大量的通信就会造成占用大量 IOYarn Cluster 方式一般适用于生产环境,因为 Driver 运行在某一个 NodeManager 中就不会出现某一台机器出现网卡激增的情况,缺点就是运行的 Job 日志不能在机器本地实时查看而是需要通过 Job Web 界面查看。

运行环境

Local模式

  • Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境

安装步骤

  • spark-3.0.0-bin-hadoop3.2.tgz文件上传到Linux并解压缩,放置在指定位置

    tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
    cd /opt/module
    mv spark-3.0.0-bin-hadoop3.2 spark-local
    
  • 启动Local环境

    bin/spark-shell
    
  • 启动成功后,可以输入网址进行Web UI监控页面访问

    http://虚拟机地址:4040
    
  • 退出本地模式

    :quit
    
  • 提交应用

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master local[2] \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    
    • --class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
    1. --master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
    2. spark-examples_2.12-3.0.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
    3. 数字10表示程序的入口参数,用于设定当前应用的任务数量

Standalone 模式

  • 只使用Spark自身节点运行的集群模式,也就是所谓的独立部署(Standalone)模式。

  • 集群规划

    Linux1Linux2Linux3
    SparkWorker
    Master
    WorkerWorker

安装步骤

  • 解压缩文件

    tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
    cd /opt/module
    mv spark-3.0.0-bin-hadoop3.2 spark-standalone
    
  • 修改配置文件

    • 进入解压缩后路径的conf目录,修改slaves.template文件名为slaves

      mv slaves.template slaves
      
    • 修改slaves文件,添加work节点

      hadoop102 
      hadoop103
      hadoop104
      
    • 修改spark-env.sh.template文件名为spark-env.sh

      mv spark-env.sh.template spark-env.sh
      
    • 修改spark-env.sh文件,添加JAVA_HOME环境变量和集群对应的master节点

      export JAVA_HOME=/opt/module/jdk1.8.0_144
      SPARK_MASTER_HOST=hadoop102
      SPARK_MASTER_PORT=7077
      
    • 分发spark-standalone目录

      xsync spark-standalone
      
  • 启动集群

    • 执行脚本命令

      sbin/start-all.sh
      
    • 查看Master资源监控Web UI界面:

      http://linux1:8080
      
    • 提交应用

      bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master spark://linux1:7077 \
      ./examples/jars/spark-examples_2.12-3.0.0.jar \
      10
      
      • --class表示要执行程序的主类
      • --master spark://linux1:7077 独立部署模式,连接到Spark集群
      • spark-examples_2.12-3.0.0.jar 运行类所在的jar包
      • 数字10表示程序的入口参数,用于设定当前应用的任务数量
  • 提交参数说明

    • 在提交应用中,一般会同时一些提交参数

      bin/spark-submit \
      --class <main-class>
      --master <master-url> \
      ... # other options
      <application-jar> \ [application-arguments]
      
      参数解释可选值举例
      --classSpark程序中包含主函数的类
      --masterSpark程序运行的模式(环境)模式:local[*]、spark://linux1:7077、Yarn
      --executor memory 1G指定每个 executor 可用内存为 1G
      --total executor cores 2指定所有executor使用的cpu核数为2个
      --executor cores指定每个executor使用的cpu核数
      application jar打包好的应用jar,包含依赖。这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar
      application arguments传给 main() 方法的参数
  • 配置历史服务

    • 修改spark-defaults.conf.template文件名为spark-defaults.conf

      mv spark-defaults.conf.template spark-defaults.conf
      
    • 修改 spark default.conf 文件,配置日志存储路径

      spark.eventLog.enabled        true
      spark.eventLog.dir            hdfs://hadoop102:8020/directory
      ##注意:需要启动hadoop集群,HDFS上的directory目录需要提前存在。
      sbin/start dfs.sh
      hadoop fs mkdir /directory
      
    • 修改 spark env sh 文件 , 添加日志配置

      export SPARK_HISTORY_OPTS="
      -Dspark.history.ui.port=18080
      -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory
      -Dspark.history.retainedApplications=30"
      
      • 参数1含义:WEB UI访问的端口号为18080
      • 参数2含义:指定历史服务器日志存储路径
      • 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
    • 分发配置文件

      xsync conf
      
    • 重新启动集群和历史服务

      sbin/start-all.sh
      sbin/start-history-server.sh
      
    • 重新执行任务

      bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master spark://linux1:7077 \
      ./examples/jars/spark-examples_2.12-3.0.0.jar \
      10
      
  • 配置高可用(HA)

    • 高可用是因为当前集群中的Master节点只有一个,所以会存在单点故障问题。所以为了解决单点故障问题,需要在集群中配置多个Master节点,一旦处于活动状态的Master发生故障时,由备用Master提供服务,保证作业可以继续执行。这里的高可用一般采用Zookeeper设置

    • 集群规划

      Linux1Linux2Linux3
      SparkMaster
      Zookeeper
      Worker
      Master
      Zookeeper
      Worker
      Zookeeper
      Worker
    • 停止集群

      sbin/stop all.sh
      
    • 启动Zookeeper

      zk.sh start
      
    • 修改spark-env.sh文件添加如下配置

      注释如下内容:
      SPARK_MASTER_HOST linux1
      SPARK_MASTER_PORT=7077
      添加如下内容
      #Master 监控页面默认访问端口为 8080 ,但是可能会和 Zookeeper 冲突,所以改成 8989 ,也可以自
      定义,访问 UI 监控页面时请注意
      SPARK_MASTER_WEBUI_PORT= 8989
      export SPARK_DAEMON_JAVA_OPTS="
      -Dspark.deploy.recoveryMode=ZOOKEEPER
      -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104
      -Dspark.deploy.zookeeper.dir=/spark"
      
    • 分发配置文件

      xsync conf/
      
    • 启动集群

      sbin/start all.sh
      
    • 启动linux2的单独Master节点,此时linux2节点Master状态处于备用状态

      sbin/start master.sh
      
    • 提交应用到高可用集群

      bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master spark://linux1:7077,linux2:7077 \
      ./examples/jars/spark-examples_2.12-3.0.0.jar \
      10
      

Yarn 模式

安装步骤

  • 解压缩文件

    tar zxvf spark 3.0.0 bin hadoop3.2.tgz C /opt/module
    cd /opt/module
    mv spark 3.0.0 bin hadoop3.2 spark yarn
    
  • 修改配置文件

    • 修改hadoop配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml, 并分发
    <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认
    是 true -->
    <property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
    </property>
    
    
    <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是 true -->
    <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value> roperty>
    
    
  • 修改 conf/spark-env.sh,添加 JAVA_HOME 和YARN_CONF_DIR 配置

    mv spark-env.sh.template spark-env.sh
    。。。
    export JAVA_HOME=/opt/module/jdk1.8.0_144 
    YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
    
  • 启动 HDFS 以及 YARN 集群

  • 提交应用

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    
  • 配置历史服务器

    • 修改spark-defaults.conf.template文件名为spark-defaults.conf

      mv spark-defaults.conf.template spark-defaults.conf
      
    • 修改 spark-default.conf 文件,配置日志存储路径

      spark.eventLog.enabled        true
      spark.eventLog.dir            hdfs://hadoop102:8020/directory
      ##注意:需要启动hadoop集群,HDFS上的目录需要提前存在。
      sbin/start -dfs.sh
      hadoop fs -mkdir /directory
      
    • 修改 spark-env.sh 文件 , 添加日志配置

      export SPARK_HISTORY_OPTS="
      -Dspark.history.ui.port=18080
      -Dspark.history.fs.logDirectory=hdfs://linux1:8020/directory
      -Dspark.history.retainedApplications=30"
      
      • 参数 1 含义:WEB UI 访问的端口号为 18080
      • 参数 2 含义:指定历史服务器日志存储路径
      • 参数 3 含义:指定保存Application 历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
    • 修改spark-defaults.conf

      spark.yarn.historyServer.address=linux1:18080
      spark.history.ui.port=18080
      
    • 启动历史服务

      sbin/start-history-server.sh
      
    • 重新提交应用

      bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master yarn \
      --deploy-mode client \
      ./examples/jars/spark-examples_2.12-3.0.0.jar \
      10
      

端口号

  • Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算)
  • Spark Master 内部通信服务端口号:7077
  • Standalone 模式下,Spark Master Web 端口号:8080(资源)
  • Spark 历史服务器端口号:18080
  • Hadoop YARN 任务运行情况查看端口号:8088