Kafka Broker
Kafka
副本
副本 基本信息
Kafka
副本作用:提高数据可靠性Kafka
默认副本1
个,生产环境一般配置为2
个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率Kafka
中副本分为:Leader
和Follower
。Kafka
生产者只会把数据发往Leader
,然后Follower
找Leader
进行同步数据Kafka
分区中的所有副本统称为AR
(Assigned Repllicas)
AR
=ISR
+OSR
ISR
:表示和Leader
保持同步的Follower
集合。如果Follower
长时间未向Leader
发送通信请求或同步数据,则该Follower
将被踢出ISR
。该时间阀值由replica.lag.time.max.ms
参数设定,默认30s
。Leader发生故障之后,就会从ISR
中选举新的Leader
OSR
,表示Follower
与Leader
副本同步时,延迟过多的副本
Zookeeper
存储的Kafka
信息
-
启动
Zookeeper
客户端bin/zkCli.sh
-
通过
ls
命令查看kafka
相关信息ls /kafka
-
Zookeeper
中存储的Kafka
信息/kafka/brokers/ids
:[0,1,2]
:记录有哪些服务器/kafka/borkers/topics/first/partitions/0/state
:{"leader":1,"isr":[1,0,2]}
:记录谁是Leader
,有哪些服务器可用/kafka/controller
:{"brokerid":0}
:辅助选举Leader
Kafka Broker
总体工作流程
Leader选举流程
AR(Assigned Replica)
:Kafka
分区中的所有副本统称AR
是指Kafka
集群中一个分区所对应的所有副本的集合。- 一个分区可以有多个副本,其中包括一个
Leader
副本和若干个Follower
副本。 AR
包含了Leader
副本和Follower
副本。
ISR(In-Sync Replica)
:同步副本集ISR
是指分区的Leader
副本和正在与Leader
副本保持同步的 Follower 副本的集合。Follower
副本只有在与Leader
副本保持同步时,才被认为是ISR
的一部分。ISR
中的副本会参与消息的复制和读取操作。
broker
启动后在zk(/brokers/ids)
中注册controller
谁先注册,谁说了算- 由选举出来的
Controller
监听brokers
节点变化 Controller
决定Leader
选举- 选举规则:在
isr
中存活为前提,按照AR
中排在前面的优先。例如:ar[1,0,2]
,isr[1,0,2]
,那么leader
会按照1,0,2的顺序轮询
- 选举规则:在
Controller
将节点信息上传到ZK(/kafka/borkers/topics/first/partitions/0/state)
- 其他
contorller
从zk
同步相关信息 - 假设
Broker1
中Leader
挂了 Controller
监听到节点变化- 获取
ISR
- 选举新的
Leader
(在isr
中存活为前提,按照AR
中排在前面的优先) - 更新
Leader
及ISR
Leader
和Follower
故障处理细节
Follower
故障处理细节
LEO(Log End Offset)
:每个副本的最后一个offset
,LEO
是最新的offset
+1HW(High Watermark)
:所有副本中最小的LEO
Follower
故障Follower
发生故障后会被临时踢出ISR
- 这个期间
Leader
和Follower
继续接收数据 - 待该
Follower
恢复后,Follower
会读取本地磁盘记录的上次的HW
,并将log
文件高于HW
的部分截取掉,从HW
开始向Leader
进行同步 - 等该
Follower
的LEO
大于等于该Partition
的HW
,即Follower
追上Leader
之后,就可以重新加入ISR
了
Leader
故障处理细节
-
Leader
故障Leader
发生故障之后,会从ISR
中选出一个新的Leader
- 为保证多个副本之间的数据一致性,其余的
Follower
会先将各自的log
文件高于HW
的部分截掉,然后从新的Leader
同步数据
- 注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
文件存储
文件存储机制
Topic
数据的存储机制
Topic
是逻辑上的概念,而partition
是物理上的概念,每个partition
对应于一个log
文件,该log
文件中存储的就是Producer
生产的数据。Producer
生产的数据会被不断追加到该log
文件末端,为防止log
文件过大导致数据定位效率底下,Kafka
采取了分片和索引机制,将每个partition
分为多个segment
。每个segment
包括:".index文件
"、".log
"文件和".timeindex
"等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic
名称+分区序号,例如:first-0
Topic
数据存储位置
-
查看
hadoop102
的/opt/module/kafka/datas/first-1
路径上的文件cd /opt/module/kafka/datas/first-0 ls
-
直接查看
log
日志,发现是乱码cat 00000000000000000014.log
-
通过工具查看
index
和log
信息kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000014.index offset: 115 position: 4165 offset: 202 position: 8296 offset: 292 position: 12461 offset: 374 position: 16614 kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000014.log
index
文件和log
文件详解
- 注意
index
为稀疏索引,大约每往log
文件写入4kb
数据,会往index
文件写入一条索引。参数log.index.interval.bytes
默认4
kbindex
文件中保存的offset
为相对offset
,这样能确保offset
的值所占空间不会过大,因此能将offset
的值控制在固定大小
- 步骤
- 根据目标
offset
定位Segment
文件 - 找到小于等于目标
offset
的最大offset
对应的索引项 - 定位到
log
文件 - 向下遍历找到目标
Record
- 根据目标
日志存储参数配置
参数 | 描述 |
---|---|
log.segment.bytes | Kafka 中log 日志是分成一块块存储的,此配置是指log 日志划分成块的大小,默认值1G |
log.index.interval.bytes | 默认4kb ,kafka里面每当写入了4kb 大小的日志(.log ),然后就往index 文件里面记录一条索引。稀疏索引 |
文件清理策略
Kafka
中默认的日志保存时间为7
天,可以通过调整如下参数修改保存时间log.retention.hours
:最低优先级小时,默认7天log.retention.minutes
:分钟log.retention.ms
:最高优先级毫秒log.retention.check.interval.ms
:负责设置检查周期,默认5
分钟
Kafka
中提供的日志清理策略有delete
和compact
两种delete
日志删除:将过期数据删除log.cleanup.policy=delete
:所有数据启用删除策略- 基于时间:默认打开。以
segment
中所有记录中的最大时间戳作为该文件时间戳 - 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的
segment
。log.retention.bytes
:默认等于-1
,表示无穷大
- 基于时间:默认打开。以
compact
日志压缩- compact日志压缩:对于相同
key
的不同value
值,只保留最后一个版本log.cleanup.policy=compact
:所有数据启用压缩策略
- 压缩后的
offset
可能是不连续的,比如上图中没有6,当从这些offset
消费消息时,将会拿到比这个offset
大的offset
对应的消息,实际上会拿到offset
为7的消息,并从这个位置开始消费 - 这种策略只适合特殊场景,比如消息的
key
是用户ID
,value
是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料
- compact日志压缩:对于相同
高效读写数据
Kafka
本身是分布式集群,可以采用分区技术,并行度高- 读数据采用稀疏索引,可以快速定位要消费的数据
- 顺序写磁盘
Kafka
的producer
生产数据,要写入到log
文件中,写的过程是一直追加到文件末端,为顺序写。
- 页缓存+零拷贝技术
- 零拷贝:
Kafka
的数据加工处理操作交由Kafka
生产者和Kafka
消费者处理。Kafka Broker
应用层不关心存储的数据,所以就不用走应用层,传输效率高 PageCache页缓存
:Kafka
重度依赖底层操作系统提供的PageCache
功能。当上层有写操作时,操作系统只是将数据写入PageCache
。当读操作发生时,先从PageCache
中查找,如果找不到,再去磁盘中读取。实际上PageCache
是把尽可能多的空闲内存都当做了磁盘缓存来使用
- 零拷贝:
参数
参数 | 描述 |
---|---|
log.flush.interval.messages | 强制页缓存刷写到磁盘的的条数,默认是long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是null 。一般不建议修改,交给系统自己管理。 |
Broker
重要参数
参数名称 | 描述 |
---|---|
replica.lag.time.max.ms | ISR 中 如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR 。 该时间阈值 默认 30s 。 |
auto.leader.rebalance.enable | 默认是 true 。 自动 Leader Partition 平衡 。 |
leader.imbalance.per.broker.percentage | 默认是 1 0% 。 每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒 。检查 leader 负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小 默认值 1G 。 |
log.index.interval.bytes | 默认 4kb ,kafka 里面每当写入了 4kb 大小的日志(.log ) ,然后就往 index 文件里面记录一个索引 。 |
log.retention.hours | Kafka 中数据保存的时间, 默认 7 天。 |
log.retention.minutes | Kafka 中数据保存的时间, 分钟级别 ,默认关闭。 |
log.retention.ms | Kafka 中数据保存的时间, 毫秒级别 ,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟 |
log.retention.bytes | 默认等于 1 ,表示无穷大。 超过设置的 所有 日志总大小,删除最早的 segment 。 |
log.cleanup.policy | 默认是 delete ,表示所有数据启用删除策略如果设置值为 compact ,表示所有数据启用压缩策略。 |
num.io.threads | 默认是8 。 负责写磁盘的线程数。整个参数值要占总核数的 5 0% 。 |
num .replica.fetchers | 副本拉取线程数,这个参数占总核数的 50% 的 1/3 |
num.network.threads | 默认是3 。 数据传输线程数,这个参数占总核数的 5 0% 的 2 /3 。 |
log.flush<br/>.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值, 9223372036854775807 。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null 。一般不建议修改,交给系统自己管理。 |
生产经验
节点服役和退役
服役新节点
新节点准备
-
关闭
hadoop104
,并右键执行克隆操作 -
开启
hadoop105
,并修改IP
地址vim /etc/sysconfig/network-scripts/ifcfg-ens33 DEVICE=ens33 TYPE=Ethernet ONBOOT=yes BOOTPROTO=static NAME="ens33" IPADDR=192.168.10.105 PREFIX=24 GATEWAY=192.168.10.2 DNS1=192.168.10.2
-
在
hadoop 105
上,修改主机名称为hadoop 105
vim /etc/hostname hadoop105
-
重新启动
hadoop 104
、hadoop 105
。 -
修改
hadoop105
中kafka
的 broker .id 为 3 。 -
删除
hadoop 105
中kafka
下的datas
和logs
。rm rf datas/* logs/*
-
启动
hadoop 102
、hadoop 103
、hadoop 104
上的kafka
集群 。zk.sh start kf.sh start
-
单独启动
hadoop 105
中的kafka
bin/kafka-server-start.sh -daemon ./config/server.properties
执行 负载均衡 操作
-
创建一个要均衡的主题
vim topics-to-move.json { "topics": [ {"topic": "first"} ], "version": 1 }
-
生成一个负载均衡的计划 。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"first","partition":0,"replic as":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"to pic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any"," any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"first","partition":0,"replic as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
-
创建副本存储计划(所有副本存储在 broker 0 、 broker 1 、 broker 2 、 broker 3 中) 。
vim increase-replication-factor.json #输入如下内容 {"version":1,"partitions":[{"topic":"first","partition":0,"replic as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
-
执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
-
验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
退役旧节点
执行负载均衡操作
- 先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
-
创建一个要均衡的主题
vim topics-to-move.json { "topics": [ {"topic": "first"} ], "version": 1 }
-
创建执行计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"first","partition":0,"replic as":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
-
创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。
vim increase-replication-factor.json {"version":1,"partitions":[{"topic":"first","partition":0,"replic as":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
-
执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
-
验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase -replication-factor.json --verify Status of partition reassignment: Reassignment of partition first-0 Reassignment of partition first-1 Reassignment of partition first-2 is complete. is complete. is complete. Clearing broker-level throttles on brokers 0,1,2,3 Clearing topic-level throttles on topic first
执行停止命令
-
在
hadoop105
上执行停止命令即可 。bin/kafka-server-stop.sh
手动调整分区副本存储
-
在生产环境中,每台服务器的配置和性能不一致,但是
Kafka
只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。 -
手动调整分区副本存储的步骤
-
创建一个新的
topic
,名称为three
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic three --partitions 4 --replication-factor 2
-
查看分区副本存储情况
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
-
创建副本存储计划
vim increase-replication-factor.json { "version":1, "partitions":[{"topic":"three","partition":0,"replicas":[0,1]}, {"topic":"three","partition":1,"replicas":[0,1]}, {"topic":"three","partition":2,"replicas":[1,0]}, {"topic":"three","partition":3,"replicas":[1,0]}] }
-
执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
-
验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
-
查看分区副本存储情况
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three
-
Leader Partition
负载平衡
- 正常情况下,
Kafka
本身会自动把Leader Partition
均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker
宕机,会导致Leader Partition
过于集中在其他少部分几台broker
上,这会导致少数几台broker
的读写请求压力过高,其他宕机的broker
重启之后都是follow partition
,读写请求很低,造成集群负载不均匀
Leader Partition
负载平衡参数配置
参数名称 | 描述 |
---|---|
auto.leader.rebalance.enable | 默认是true 。自动Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为false 关闭 |
leader.imbalance.per.broker.percentage | 默认是10% 。每个broker 允许的不平衡的leader 的比率。如果每个broker 超过了这个值,控制器会触发leader 的平衡 |
leader.imbalance.check.interval.seconds | 默认值300秒。检查leader 负载是否平衡的间隔时间 |
增加副本因子
-
在生产环境当中,由于某个主题的重要等级需要提升,需要考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行
-
创建
topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic four --partitions 3 --replication-factor 1
-
手动增加副本存储
-
创建副本存储计划
vim increase-replication-factor.json {"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}}
-
执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
-
-