Kafka其他

Kafka-Eagle监控

  • Kafka-Eagle框架可以监控Kafka集群的整体运行情况,在生产环境中经常使用

MySQL环境准备

  • Kafka-Eagle的安装依赖于MySQL,MySQL主要用来存储可视化展示的数据

Kafka环境准备

  1. 关闭Kafka集群

    kf.sh stop
    
  2. 修改/opt/module/kafka/bin/kafka-server-start/sh命令中

    vim bin/kafka-server-start.sh
    
    • 修改如下参数值

      if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then 
      	export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
      fi
      
    • 修改为

      if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
      	export KAFKA_HEAP_OPTS="-server     -Xms2G     -Xmx2G     -XX:PermSize=128m    -XX:+UseG1GC    
      -XX:MaxGCPauseMillis=200    -XX:ParallelGCThreads=8            -XX:ConcGCThreads=5            
      -XX:InitiatingHeapOccupancyPercent=70"
      	export JMX_PORT="9999"
      #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
      fi
      
    • 修改后,分发其他节点

      xsync kafka-server-start.sh
      

Kafka-Eagle安装

  1. 官网:https://www.kafka-eagle.org/

  2. 上传压缩包kafka-eagle-bin-2.0.8.tar.gz 到集群/opt/software 目录

  3. 解压到本地

    tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
    
  4. 进入刚才解压的目录

    ll
    
    tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module
    
  5. 修改名称

    #一定要这么修改,不然后面启动会报错
    mv efak-web-2.0.8/ efak
    
  6. 修改配置文件 /opt/module/efak/conf/system-config.properties

    vim system-config.properties
    
    ######################################
    # multi zookeeper & kafka cluster list
    # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
    ######################################
    #配置---------------zk连接-----------
    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
    
    ######################################
    # zookeeper enable acl
    ######################################
    cluster1.zk.acl.enable=false
    cluster1.zk.acl.schema=digest
    cluster1.zk.acl.username=test
    cluster1.zk.acl.password=test123
    
    ######################################
    # broker size online list
    ######################################
    cluster1.efak.broker.size=20
    
    ######################################
    # zk client thread limit
    ######################################
    kafka.zk.limit.size=32
    
    ######################################
    # EFAK webui port
    ######################################
    efak.webui.port=8048
    
    ######################################
    # kafka jmx acl and ssl authenticate
    ######################################
    cluster1.efak.jmx.acl=false
    cluster1.efak.jmx.user=keadmin
    cluster1.efak.jmx.password=keadmin123
    cluster1.efak.jmx.ssl=false
    cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
    cluster1.efak.jmx.truststore.password=ke123456
    
    ######################################
    # kafka offset storage
    ######################################
    #-------------------------offset 保存在 kafka -------------------------
    cluster1.efak.offset.storage=kafka
    #cluster2.efak.offset.storage=zk
    
    ######################################
    # kafka jmx uri
    ######################################
    cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
    
    ######################################
    # kafka metrics, 15 days by default
    ######################################
    efak.metrics.charts=true
    efak.metrics.retain=15
    
    ######################################
    # kafka sql topic records max
    ######################################
    efak.sql.topic.records.max=5000
    efak.sql.topic.preview.records.max=10
    
    ######################################
    # delete kafka topic token
    ######################################
    efak.topic.token=keadmin
    
    ######################################
    # kafka sasl authenticate
    ######################################
    cluster1.efak.sasl.enable=false
    cluster1.efak.sasl.protocol=SASL_PLAINTEXT
    cluster1.efak.sasl.mechanism=SCRAM-SHA-256
    cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
    cluster1.efak.sasl.client.id=
    cluster1.efak.blacklist.topics=
    cluster1.efak.sasl.cgroup.enable=false
    cluster1.efak.sasl.cgroup.topics=
    
    ######################################
    # kafka ssl authenticate
    ######################################
    cluster3.efak.ssl.enable=false
    cluster3.efak.ssl.protocol=SSL
    cluster3.efak.ssl.truststore.location=
    cluster3.efak.ssl.truststore.password=
    cluster3.efak.ssl.keystore.location=
    cluster3.efak.ssl.keystore.password=
    cluster3.efak.ssl.key.password=
    cluster3.efak.ssl.endpoint.identification.algorithm=https
    cluster3.efak.blacklist.topics=
    cluster3.efak.ssl.cgroup.enable=false
    cluster3.efak.ssl.cgroup.topics=
    
    ######################################
    # kafka sqlite jdbc driver address
    ######################################
    #----------------------配置MySQL连接------------------------
    efak.driver=com.mysql.jdbc.Driver
    efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=root
    efak.password=123456
    
    ######################################
    # kafka mysql jdbc driver address
    ######################################
    #efak.driver=com.mysql.cj.jdbc.Driver
    #efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    #efak.username=root
    #efak.password=123456
    
  7. 添加环境变量

    sudo vim /etc/profile.d/my_env.sh
    
    # kafkaEFAK
    export KE_HOME=/opt/module/efak
    export PATH=$PATH:$KE_HOME/bi
    
    • 注意

      . /etc/profile
      
  8. 启动

    1. 注意:启动之前需要先启动 ZK以及KAFKA

      kf.sh start
      
    2. 启动efak

      bin/ke.sh start
      
    3. 停止 efak

      bin/ke.sh stop
      
  9. 登录页面查看监控数据

    • http://192.168.10.102:8048/

Kafka-Kraft模式

Kafka-Kraft架构

  • Kafka2.8.0以前,元数据在zookeeper中,运行时动态选举controller,由controller进行Kafka集群管理。
  • Kafka-Kraft架构不再依赖zookeeper集群,而是用三台controller节点代替zookeeper,元数据保存在controller中,由controller直接进行Kafka集群管理
  • 好处:
    • Kafka不再依赖外部框架,而是能够独立运行
    • controller管理集群时,不再需要从zookeeper中先读取数据,集群性能上升
    • 由于不依赖zookeeper,集群扩展时不再收到zookeeper读写能力限制
    • controller不再动态选举,而是由配置文件规定。这样可以有针对性的加强controller节点的配置

Kafka-Kraft集群部署

  1. 再次解压一份kafka安装包

    tar  -zxvf  kafka_2.12-3.0.0.tgz  -C /opt/module/
    
  2. 重命名为kafka2

    mv kafka_2.12-3.0.0/ kafka2
    
  3. hadoop102 上修改/opt/module/kafka2/config/kraft/server.properties 配置文件

    vim server.properties
    
    #kafka 的角色(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功能)
    process.roles=broker, controller #节点 ID
    node.id=2
    #controller 服务协议别名
    controller.listener.names=CONTROLLER #全 Controller 列表
    controller.quorum.voters=2@hadoop102:9093,3@hadoop103:9093,4@hadoop104:9093
    #不同服务器绑定的端口 listeners=PLAINTEXT://:9092,CONTROLLER://:9093
    #broker 服务协议别名 inter.broker.listener.name=PLAINTEXT #broker 对外暴露的地址
    advertised.Listeners=PLAINTEXT://hadoop102:9092 #协议别名到安全协议的映射
    listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLA 
    INTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    #kafka 数据存储目录
    log.dirs=/opt/module/kafka2/data
    
  4. 分发 kafka2

    xsync kafka2/
    
    • hadoop103hadoop104 上 需 要 对 node.id 相应改变 , 值 需 要 和controller.quorum.voters 对应。
    • hadoop103hadoop104 上需要根据各自的主机名称, 修改相应的 advertised.Listeners 地址。
  5. 初始化集群数据目录

    1. 首先生成存储目录唯一 ID

      bin/kafka-storage.sh random-uuid
      J7s9e8PPTKOO47PxzI39VA
      
    2. 用该 ID 格式化 kafka 存储目录(三台节点)。

      bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties
      
  6. 启动 kafka 集群

    bin/kafka-server-start.sh -daemon config/kraft/server.properties
    
  7. 停止 kafka 集群

    bin/kafka-server-stop.sh
    

Kafka-Kraft 集群启动停止脚本

  1. /home/atguigu/bin 目录下 创建文件 kf2.sh 脚本文件

    vim kf2.sh
    
    #! /bin/bash
    case $1 in
    "start"){
        for i in hadoop102 hadoop103 hadoop104
        do
            echo " --------启动 $i Kafka2-------"
            ssh  $i  "/opt/module/kafka2/bin/kafka-server-start.sh  -daemon /opt/module/kafka2/config/kraft/server.properties"
        done
    };;
    "stop"){
        for i in hadoop102 hadoop103 hadoop104
        do
            echo " --------停止 $i Kafka2-------"
            ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh "
        done
    };;
    esac
    
  2. 添加执行权限

    chmod +x kf2.sh
    
  3. 启动集群命令

    kf2.sh start
    
  4. 停止集群命令

    kf2.sh stop