Kafka 概述

kafka定义

  • Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域
    • 发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受感兴趣的消息
  • Kafka最新定义:Kafka是一个分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用

消息队列

  • 目前企业中比较常见的消息队列产品主要有 Kafka、ActiveMQ、RAbbitMQ、RocketMQ等
  • 在大数据场景主要采用Kafka作为消息队列

传统消息队列的应用场景

  • 传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信
    • 缓存/消峰:有助于控制和优化数据流经过系统的速度,解决生产消费和消费消息的处理速度不一致的情况
    • 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
    • 异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们

消息队列的两种模式

  1. 点对点模式
    • 消费者主动拉取数据,消息收到后清除消息
  2. 发布/订阅模式
    • 可以有多个topic主题(浏览、点赞、收藏、评论等)
    • 消费者消费数据之后,不删除数据
    • 每个消费者相互独立,都可以消费到数据

Kafka基础架构

  1. 为方便扩展,并提高吞吐量,一个topic分为多个partition
  2. 配合分区的涉及,提出消费者组的概念,组内每个消费者并行消费
  3. 为提高可用性,为每个partition增加若干副本,类似NameNode HA
  4. ZK中记录谁是leaderKafka2.8.0以后也可以配置不采用ZK

Kafka组成

  1. Producer:消息生产者,就是向 Kafka borker发消息的客户端
  2. Consumer:消息消费者,向Kafka broker取消息的客户端
  3. Consumer Group(CG):消费者组,由多个consumer组成。消费者内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
  4. Broker:一台Kafka服务器就是一个borker。一个集群由多个broker组成。一个borker可以容纳多个topic
  5. Topic:生产者和消费者面向的都是一个topic
  6. Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列
  7. Replica:副本:一个topic的每个分区都有若干个副本,一个Leader和若干个Follower
  8. Leader:每个分区多个副本的"主",生产者发送数据的对象,以及消费者消费数据的对象都是Leader
  9. Follower:每个分区多个副本中的"从",实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader

Kafka安装

集群规划

hadoop102hadoop103hadoop104
zkzkzk
kafkakafkakafka

集群部署

  1. 官方下载地址:http://kafka.apache.org/downloads.html

  2. 解压安装包

    tar  -zxvf  kafka_2.12-3.0.0.tgz  -C /opt/module/
    
  3. 修改解压后的文件名称

    mv kafka_2.12-3.0.0/ kafka
    
  4. 进入到/opt/module/kafka 目录,修改配置文件

    cd config/
    vim server.properties
    
    • 输入以下内容
    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=0
    #处理网络请求的线程数量 
    num.network.threads=3 
    #用来处理磁盘 IO 的线程数量 
    num.io.threads=8
    #发送套接字的缓冲区大小 
    socket.send.buffer.bytes=102400 
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400 
    #请求套接字的缓冲区大小 
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 
    log.dirs=/opt/module/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量 
    num.recovery.threads.per.data.dir=1 
    # 每个 topic 创建时的副本数,默认时 1 个副本 
    offsets.topic.replication.factor=1 
    #segment 文件保留的最长时间,超时将被删除 
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G 
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
    
  5. 分发安装包

    xsync kafka/
    
  6. 分别在 hadoop103hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2

  7. 配置环境变量

    1. /etc/profile.d/my_env.sh 文件中增加kafka 环境变量配置
    sudo vim /etc/profile.d/my_env.sh
    
    • 增加如下内容:
    #KAFKA_HOME
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
    1. 刷新一下环境变量
    source /etc/profile
    
    1. 分发环境变量文件到其他节点,并source
    sudo    /home/atguigu/bin/xsync/etc/profile.d/my_env.sh
    source /etc/profile
    source /etc/profile
    
  8. 启动集群

    1. 先启动Zookeeper集群,然后启动Kafka
    zk.sh start
    
    1. 依次在hadoop102hadoop103hadoop104节点上启动Kafka
    bin/kafka-server-start.sh   -daemon config/server.properties
    bin/kafka-server-start.sh   -daemon config/server.properties
    bin/kafka-server-start.sh   -daemon config/server.properties
    
  9. 关闭集群

    bin/kafka-server-stop.sh
    bin/kafka-server-stop.sh
    bin/kafka-server-stop.sh
    

集群启停脚本

  1. /home/atguigu/bin目录下创建文件kf.sh脚本文件

    vim kf.sh
    
    • 脚本如下:
    #! /bin/bash
    case $1 in 
    "start"){
    	for i in hadoop102 hadoop103 hadoop104 
    	do
    		echo " --------启动 $i Kafka-------"
    		ssh   $i   "/opt/module/kafka/bin/kafka-server-start.sh   - daemon /opt/module/kafka/config/server.properties"
    	done
    };;
    "stop"){
    	for i in hadoop102 hadoop103 hadoop104 
    	do
    		echo " --------停止 $i Kafka-------"
    		ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    	done
    };;
    esac
    
  2. 添加执行权限

    chmod +x kf.sh
    
  3. 启动集群命令

    kf.sh start
    
  4. 停止集群命令

    kf.sh stop
    
    • 注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进城了。

命令行操作

主题命令行操作

  1. 查看操作主题命令参数
bin/kafka-topics.sh
参数描述
--bootstrap-server <String: server toconnect to>连接的Kafka Broker 主机名称和端口号。
--topic <String: topic>操作的topic 名称。
--create创建主题
--delete删除主题。
--alter修改主题。
--list查看所有主题
--describe查看主题详细描述
--partitions <Integer: # of partitions>设置分区数
--replication-factor<Integer: replication factor>设置分区副本
--config <String: name=value>更新系统默认的配置。
  1. 查看当前服务器中的所有topic

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
    
  2. 创建first topic

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
    
    • 选项说明:
      • topic定义topic
      • replication-factor定义副本数
      • partitions定义分区数
  3. 查看first主题详情

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
    
  4. 修改分区数(注意:分区数只能增加,不能减少)

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
    
  5. 再次查看 first 主题的详情

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
    
  6. 删除topic

    bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
    

生产者命令行操作

  1. 查看操作生产者命令参数

    bin/kafka-console-producer.sh
    
    参数描述
    --bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
    --topic <String: topic>操作的 topic 名称
  2. 发送消息

    bin/kafka-console-producer.sh   --bootstrap-server hadoop102:9092 --topic first
    >hello world
    >atguigu  atguigu
    

消费者命令行操作

  1. 查看操作消费者命令参数

    bin/kafka-console-consumer.sh
    
    参数描述
    --bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号。
    --topic <String: topic>操作的 topic 名称
    --from-beginning从头开始消费
    --group <String: consumer group id>指定消费者组名称。
  2. 消费信息

    1. 消费first主题中的数据

      bin/kafka-console-consumer.sh   --bootstrap-server hadoop102:9092 --topic first
      
    2. 把主题中所有的数据都读取出来(包括历史数据)。

      bin/kafka-console-consumer.sh   --bootstrap-server hadoop102:9092 --from-beginning --topic first