Kafka
概述
kafka定义
- Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域
- 发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受感兴趣的消息
- Kafka最新定义:Kafka是一个分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用
消息队列
- 目前企业中比较常见的消息队列产品主要有 Kafka、ActiveMQ、RAbbitMQ、RocketMQ等
- 在大数据场景主要采用Kafka作为消息队列
传统消息队列的应用场景
- 传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信
- 缓存/消峰:有助于控制和优化数据流经过系统的速度,解决生产消费和消费消息的处理速度不一致的情况
- 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
- 异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们
消息队列的两种模式
- 点对点模式
- 消费者主动拉取数据,消息收到后清除消息
- 发布/订阅模式
- 可以有多个topic主题(浏览、点赞、收藏、评论等)
- 消费者消费数据之后,不删除数据
- 每个消费者相互独立,都可以消费到数据
Kafka基础架构
- 为方便扩展,并提高吞吐量,一个
topic
分为多个partition
- 配合分区的涉及,提出消费者组的概念,组内每个消费者并行消费
- 为提高可用性,为每个
partition
增加若干副本,类似NameNode HA
- ZK中记录谁是
leader
,Kafka2.8.0
以后也可以配置不采用ZK
Kafka组成
Producer
:消息生产者,就是向Kafka borker
发消息的客户端Consumer
:消息消费者,向Kafka broker
取消息的客户端Consumer Group(CG)
:消费者组,由多个consumer
组成。消费者内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者Broker
:一台Kafka
服务器就是一个borker
。一个集群由多个broker
组成。一个borker
可以容纳多个topic
Topic
:生产者和消费者面向的都是一个topic
Partition
:为了实现扩展性,一个非常大的topic
可以分布到多个broker
上,一个topic
可以分为多个partition
,每个partition
是一个有序的队列Replica
:副本:一个topic
的每个分区都有若干个副本,一个Leader
和若干个Follower
Leader
:每个分区多个副本的"主",生产者发送数据的对象,以及消费者消费数据的对象都是Leader
Follower
:每个分区多个副本中的"从",实时从Leader
中同步数据,保持和Leader
数据的同步。Leader
发生故障时,某个Follower
会成为新的Leader
Kafka
安装
集群规划
hadoop102 | hadoop103 | hadoop104 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
集群部署
-
官方下载地址:http://kafka.apache.org/downloads.html
-
解压安装包
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
-
修改解压后的文件名称
mv kafka_2.12-3.0.0/ kafka
-
进入到
/opt/module/kafka
目录,修改配置文件cd config/ vim server.properties
- 输入以下内容
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=0 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/opt/module/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理) zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
-
分发安装包
xsync kafka/
-
分别在
hadoop103
和hadoop104
上修改配置文件/opt/module/kafka/config/server.properties
中的broker.id=1、broker.id=2
-
配置环境变量
- 在
/etc/profile.d/my_env.sh
文件中增加kafka
环境变量配置
sudo vim /etc/profile.d/my_env.sh
- 增加如下内容:
#KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin
- 刷新一下环境变量
source /etc/profile
- 分发环境变量文件到其他节点,并
source
sudo /home/atguigu/bin/xsync/etc/profile.d/my_env.sh source /etc/profile source /etc/profile
- 在
-
启动集群
- 先启动
Zookeeper
集群,然后启动Kafka
zk.sh start
- 依次在
hadoop102
、hadoop103
、hadoop104
节点上启动Kafka
bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-start.sh -daemon config/server.properties
- 先启动
-
关闭集群
bin/kafka-server-stop.sh bin/kafka-server-stop.sh bin/kafka-server-stop.sh
集群启停脚本
-
在
/home/atguigu/bin
目录下创建文件kf.sh
脚本文件vim kf.sh
- 脚本如下:
#! /bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh - daemon /opt/module/kafka/config/server.properties" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh " done };; esac
-
添加执行权限
chmod +x kf.sh
-
启动集群命令
kf.sh start
-
停止集群命令
kf.sh stop
- 注意:停止
Kafka
集群时,一定要等Kafka
所有节点进程全部停止后再停止Zookeeper
集群。因为Zookeeper
集群当中记录着Kafka
集群相关信息,Zookeeper
集群一旦先停止,Kafka
集群就没有办法再获取停止进程的信息,只能手动杀死Kafka
进城了。
- 注意:停止
命令行操作
主题命令行操作
- 查看操作主题命令参数
bin/kafka-topics.sh
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的Kafka Broker 主机名称和端口号。 |
--topic <String: topic> | 操作的topic 名称。 |
--create | 创建主题 |
--delete | 删除主题。 |
--alter | 修改主题。 |
--list | 查看所有主题 |
--describe | 查看主题详细描述 |
--partitions <Integer: # of partitions> | 设置分区数 |
--replication-factor<Integer: replication factor> | 设置分区副本 |
--config <String: name=value> | 更新系统默认的配置。 |
-
查看当前服务器中的所有
topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
-
创建
first topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
- 选项说明:
topic
定义topic
名replication-factor
定义副本数partitions
定义分区数
- 选项说明:
-
查看
first
主题详情bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
-
修改分区数(注意:分区数只能增加,不能减少)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
-
再次查看
first
主题的详情bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
-
删除
topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
生产者命令行操作
-
查看操作生产者命令参数
bin/kafka-console-producer.sh
参数 描述 --bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号 --topic <String: topic> 操作的 topic 名称 -
发送消息
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first >hello world >atguigu atguigu
消费者命令行操作
-
查看操作消费者命令参数
bin/kafka-console-consumer.sh
参数 描述 --bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。 --topic <String: topic> 操作的 topic 名称 --from-beginning 从头开始消费 --group <String: consumer group id> 指定消费者组名称。 -
消费信息
-
消费
first
主题中的数据bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
-
把主题中所有的数据都读取出来(包括历史数据)。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
-