Spark
概念
- 用于大规模数据分析的统一引擎,支持批/流处理,支持纯
SQL
开发等。
特点
- 简单:支持
Scala
、Java
、Python
、R
的API
,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark
支持交互式的Python
和Scala
的shell
,可以非常方便地在这些shell
中使用Spak
集群来验证解决问题的方法 - 速度快:与
MapReduce
相比,Spark
基于内存的运算要快100
倍以上,基于硬盘的运算也要快10
倍以上。Spark
实现了高效的DAG
执行引擎,可以通过基于内存来高效处理数据流; - 通用:
Spark
提供了统一的解决方案。Spark
可以用于批处理、交互式查询 (Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算 (GraphX)。 - 兼容好:
Spark
可以非常方便地与其他的开源产品进行融合。Spark
可以使用YARN
、Mesos
作为它的资源管理和调度器;可以处理所有Hadoop
支持的数据,包括HDFS
、HBase
和Cassandra
等。对于已经部署Hadoop
集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark
的强大处理能力。
架构
最上层
- Spark SQL:
Spark SQL
是Spark
用来处理结构化数据的一个模块,它提供了2
个编程抽象:DataFrame
和DataSet
,并且作为分布式SQL
查询引擎的作用。将Spark SQL
转换成RDD
,然后提交到集群执行. - Spark Streaming:
Spark Streaming
是Spark Core
的扩展应用,它具有可扩展,高吞吐量,对于流数据的可容错性等特点。Spark Streaming
是个粗粒度的伪实时流程序 - MLlib:
Spark MLlib
是Spark
的重要组成部分,是最初提供的一个机器学习库。 - GraphX:
Spark GraphX
是一个分布式图处理框架,它是基于Spark
平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求。
第二层:Spark Core
Spark Core
是Spark
的核心与基础,实现了Spark
的基本功能,包含任务调度,内存管理,错误恢复与存储系统交互等模块。
第三层:spark 的部署模式
- **Local:**是我们平时开发测试中最常用的一种手段,可以直接在 IDEA 上就可以运行 spark代码,也是我们之后学习中使用比较多的一种方式
- **Standalone:**被称为集群单机模式。本身都自带了完整的资源调度管理服务,可以独立部署到一个集群中,无需依赖任何其他的资源管理系统
- Yarn:
Yarn
模式被称为Spark on Yarn
模式,即把Spark
作为一个客户端,将作业提交给Yarn
服务,由于在生产环境中,很多时候都要与Hadoop
使用同一个集群,因此采用Yarn
来管理资源调度,可以有效提高资源利用率,Yarn
模式又分为Yarn Cluster
模式和Yarn Client
模式。
最底层:存储系统
Spark
提供多存储系统的接口,比如HDFS
、Amazon S3
、HBase
等
Spark 集群架构
Cluster Manager
:在standalone
模式中即为Master
主节点,控制整个集群,监控worker
。在YARN
模式中为资源管理器。Worker
:Woker
是Spark
集群中的一台具体的机器。一台Worker
机器会运行很多Executor
,每一个Executor
都是一个JVM
进程,这些进程才是Spark
真正进行计算的地方,而每个Executor
的内部又会有很多的Task
,而每个Task
都是一个线程,提高整个计算的并行度。- **Driver:**创建
spark
上下文对象环境的应用程序就称为Driver
驱动器。Driver
在Spark
作业执行时主要负责:- 负责将用户程序解析为具体的
spark
作业(job
) - 负责了应用程序的整体的资源调度
- 跟踪
Executor
的执行情况 - 通过
UI
展示查询运行情况
- 负责将用户程序解析为具体的
- Executor:
Spark Executor
是集群中工作节点(Worker
)中的一个JVM
进程,负责在Spark
作业中运行具体任务(Task
),任务彼此之间相互独立。Spark
应用启动时,Executor
节点被同时启动,并且始终伴随着整个Spark
应用的生命周期而存在。如果有Executor
节点发生了故障或崩溃,Spark
应用也可以继续执行,会将出错节点上的任务调度到其他Executor
节点上继续运行。Executor
有以下两个核心功能:- 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
- 它们通过自身的块管理器(
Block Manager
)为用户程序中要求缓存的RDD
提供内存式存储。RDD
是直接缓存在Executor
进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
Spark on yarn 架构
- 当在
YARN
上运行Spark
作业,每个Spark executor
作为一个YARN
容器运行。Spark
可以使得多个Tasks
在同一个容器里面运行。Spark on yarn
通常有以下两种运行模式。
client 模式
- 在
YARN Client
模式下,spark-submit
提交Spark Job
之后,就会在提交的本地机器上启动一个对应的Driver
; Driver
启动后会与ResourceManager
建立通讯并发起启动ApplicationMaster
请求;ResourceManage
接收到这个Job
时,会在集群中选一个合适的NodeManager
并分配一个Container
,及启动 ApplicationMaster(初始化SparkContext
);ApplicationMaster
的功能相当于一个ExecutorLaucher
,负责向ResourceManager
申请Container
资源;ResourceManage
便会与NodeManager
通信,并启动Container
;ApplicationMaster
对指定NodeManager
分配的Container
发出启动Executor
进程请求;Executor
进程启动后会向Driver
反向注册,Executor
全部注册完成后Driver
开始执行执行Job
任务;Driver
中的SparkContext
分配Task
给Executor
执行,Executor
运行Task
并向Driver
汇报运行的状态、进度、以及最终的计算结果;让Driver
随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;应用程序运行完成后,ApplicationMaster
向ResourceManager
申请注销并关闭自己。
cluster 模式
- 在
YARN Cluster
模式下,Spark
任务提交之后会与ResourceManager
建立通讯,并发出申请启动ApplicationMaster
请求; ResourceManage
接收到这个Job
时,会在集群中选一个合适的NodeManager
并分配一个Container
;以及启动ApplicationMaster
,此时的ApplicationMaster
就是Driver
;ApplicationMaster
启动后向ResourceManager
申请资源,ResourceManager
接到ApplicationMaster
的资源申请后会在合适(有资源的情况下)的NodeManager
中分配Container
;ApplicationMaster
对指定NodeManager
分配的Container
发出启动Executor
进程请求;Executor
进程启动后会向Driver
反向注册,Executor
全部注册完成后Driver
开始执行Job
任务;ApplicationMaster
中的SparkContext
分配Task
给Executor
执行,Executor
运行Task
并向ApplicationMaster
汇报运行的状态、进度、以及最终的计算结果;让ApplicationMaster
随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;- 应用程序运行完成后,
ApplicationMaster
向ResourceManager
申请注销并关闭自己;
两种模式的区别
Yarn Client
与Yarn Cluster
任务提交方式两者区别,可以通过上面的Spark
任务提交流程图可以看出来;主要区别在于 Driver 的创建的位置不一样,Client
方式是直接在本地机器上创建一个Driver
进程,而Cluster
方式在通过ResourceManager
在某一个NodeManager
中创建一个Driver
。- 在使用场景当中,
Yarn Client
方式一般适用于进行Job
的调试(Debug
),因为Driver
是在本地可以直接远程断点调试,而且Driver
会与Executor
进行大量的通信就会造成占用大量IO
;Yarn Cluster
方式一般适用于生产环境,因为Driver
运行在某一个NodeManager
中就不会出现某一台机器出现网卡激增的情况,缺点就是运行的Job
日志不能在机器本地实时查看而是需要通过Job Web
界面查看。
运行环境
Local
模式
Local
模式,就是不需要其他任何节点资源就可以在本地执行Spark
代码的环境
安装步骤
-
将
spark-3.0.0-bin-hadoop3.2.tgz
文件上传到Linux
并解压缩,放置在指定位置tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module cd /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-local
-
启动
Local
环境bin/spark-shell
-
启动成功后,可以输入网址进行Web UI监控页面访问
http://虚拟机地址:4040
-
退出本地模式
:quit
-
提交应用
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local[2] \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
- --class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
- --master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
- spark-examples_2.12-3.0.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
- 数字10表示程序的入口参数,用于设定当前应用的任务数量
Standalone
模式
-
只使用Spark自身节点运行的集群模式,也就是所谓的独立部署(Standalone)模式。
-
集群规划
Linux1 Linux2 Linux3 Spark Worker
MasterWorker Worker
安装步骤
-
解压缩文件
tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module cd /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-standalone
-
修改配置文件
-
进入解压缩后路径的conf目录,修改slaves.template文件名为slaves
mv slaves.template slaves
-
修改slaves文件,添加work节点
hadoop102 hadoop103 hadoop104
-
修改spark-env.sh.template文件名为spark-env.sh
mv spark-env.sh.template spark-env.sh
-
修改spark-env.sh文件,添加JAVA_HOME环境变量和集群对应的master节点
export JAVA_HOME=/opt/module/jdk1.8.0_144 SPARK_MASTER_HOST=hadoop102 SPARK_MASTER_PORT=7077
-
分发spark-standalone目录
xsync spark-standalone
-
-
启动集群
-
执行脚本命令
sbin/start-all.sh
-
查看Master资源监控Web UI界面:
http://linux1:8080
-
提交应用
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://linux1:7077 \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
- --class表示要执行程序的主类
- --master spark://linux1:7077 独立部署模式,连接到Spark集群
- spark-examples_2.12-3.0.0.jar 运行类所在的jar包
- 数字10表示程序的入口参数,用于设定当前应用的任务数量
-
-
提交参数说明
-
在提交应用中,一般会同时一些提交参数
bin/spark-submit \ --class <main-class> --master <master-url> \ ... # other options <application-jar> \ [application-arguments]
参数 解释 可选值举例 --class Spark
程序中包含主函数的类--master Spark
程序运行的模式(环境)模式:local[*]、spark://linux1:7077、Yarn --executor memory 1G 指定每个 executor 可用内存为 1G --total executor cores 2 指定所有executor使用的cpu核数为2个 --executor cores 指定每个executor使用的cpu核数 application jar 打包好的应用jar,包含依赖。这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar application arguments 传给 main() 方法的参数
-
-
配置历史服务
-
修改spark-defaults.conf.template文件名为spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
-
修改 spark default.conf 文件,配置日志存储路径
spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory ##注意:需要启动hadoop集群,HDFS上的directory目录需要提前存在。 sbin/start dfs.sh hadoop fs mkdir /directory
-
修改 spark env sh 文件 , 添加日志配置
export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory -Dspark.history.retainedApplications=30"
- 参数1含义:WEB UI访问的端口号为18080
- 参数2含义:指定历史服务器日志存储路径
- 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
-
分发配置文件
xsync conf
-
重新启动集群和历史服务
sbin/start-all.sh sbin/start-history-server.sh
-
重新执行任务
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://linux1:7077 \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
-
-
配置高可用(HA)
-
高可用是因为当前集群中的Master节点只有一个,所以会存在单点故障问题。所以为了解决单点故障问题,需要在集群中配置多个Master节点,一旦处于活动状态的Master发生故障时,由备用Master提供服务,保证作业可以继续执行。这里的高可用一般采用Zookeeper设置
-
集群规划
Linux1 Linux2 Linux3 Spark Master
Zookeeper
WorkerMaster
Zookeeper
WorkerZookeeper
Worker -
停止集群
sbin/stop all.sh
-
启动Zookeeper
zk.sh start
-
修改
spark-env.sh
文件添加如下配置注释如下内容: SPARK_MASTER_HOST linux1 SPARK_MASTER_PORT=7077 添加如下内容 #Master 监控页面默认访问端口为 8080 ,但是可能会和 Zookeeper 冲突,所以改成 8989 ,也可以自 定义,访问 UI 监控页面时请注意 SPARK_MASTER_WEBUI_PORT= 8989 export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104 -Dspark.deploy.zookeeper.dir=/spark"
-
分发配置文件
xsync conf/
-
启动集群
sbin/start all.sh
-
启动linux2的单独Master节点,此时linux2节点Master状态处于备用状态
sbin/start master.sh
-
提交应用到高可用集群
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://linux1:7077,linux2:7077 \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
-
Yarn
模式
安装步骤
-
解压缩文件
tar zxvf spark 3.0.0 bin hadoop3.2.tgz C /opt/module cd /opt/module mv spark 3.0.0 bin hadoop3.2 spark yarn
-
修改配置文件
- 修改hadoop配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml, 并分发
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认 是 true --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是 true --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> roperty>
-
修改 conf/spark-env.sh,添加 JAVA_HOME 和YARN_CONF_DIR 配置
mv spark-env.sh.template spark-env.sh 。。。 export JAVA_HOME=/opt/module/jdk1.8.0_144 YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
-
启动 HDFS 以及 YARN 集群
-
提交应用
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
-
配置历史服务器
-
修改spark-defaults.conf.template文件名为spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
-
修改 spark-default.conf 文件,配置日志存储路径
spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory ##注意:需要启动hadoop集群,HDFS上的目录需要提前存在。 sbin/start -dfs.sh hadoop fs -mkdir /directory
-
修改
spark-env.sh
文件 , 添加日志配置export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://linux1:8020/directory -Dspark.history.retainedApplications=30"
- 参数 1 含义:WEB UI 访问的端口号为 18080
- 参数 2 含义:指定历史服务器日志存储路径
- 参数 3 含义:指定保存Application 历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
-
修改
spark-defaults.conf
spark.yarn.historyServer.address=linux1:18080 spark.history.ui.port=18080
-
启动历史服务
sbin/start-history-server.sh
-
重新提交应用
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
-
端口号
- Spark 查看当前 Spark-shell 运行任务情况端口号:4040(计算)
- Spark Master 内部通信服务端口号:7077
- Standalone 模式下,Spark Master Web 端口号:8080(资源)
- Spark 历史服务器端口号:18080
- Hadoop YARN 任务运行情况查看端口号:8088