Kafka
其他
Kafka-Eagle
监控
Kafka-Eagle
框架可以监控Kafka
集群的整体运行情况,在生产环境中经常使用
MySQL
环境准备
Kafka-Eagle
的安装依赖于MySQL
,MySQL
主要用来存储可视化展示的数据
Kafka
环境准备
-
关闭
Kafka
集群kf.sh stop
-
修改
/opt/module/kafka/bin/kafka-server-start/sh
命令中vim bin/kafka-server-start.sh
-
修改如下参数值
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
-
修改为
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999" #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
-
修改后,分发其他节点
xsync kafka-server-start.sh
-
Kafka-Eagle
安装
-
官网:https://www.kafka-eagle.org/
-
上传压缩包
kafka-eagle-bin-2.0.8.tar.gz
到集群/opt/software
目录 -
解压到本地
tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
-
进入刚才解压的目录
ll tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module
-
修改名称
#一定要这么修改,不然后面启动会报错 mv efak-web-2.0.8/ efak
-
修改配置文件
/opt/module/efak/conf/system-config.properties
vim system-config.properties
###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### #配置---------------zk连接----------- efak.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # EFAK webui port ###################################### efak.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### #-------------------------offset 保存在 kafka ------------------------- cluster1.efak.offset.storage=kafka #cluster2.efak.offset.storage=zk ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.efak.ssl.enable=false cluster3.efak.ssl.protocol=SSL cluster3.efak.ssl.truststore.location= cluster3.efak.ssl.truststore.password= cluster3.efak.ssl.keystore.location= cluster3.efak.ssl.keystore.password= cluster3.efak.ssl.key.password= cluster3.efak.ssl.endpoint.identification.algorithm=https cluster3.efak.blacklist.topics= cluster3.efak.ssl.cgroup.enable=false cluster3.efak.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### #----------------------配置MySQL连接------------------------ efak.driver=com.mysql.jdbc.Driver efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=123456 ###################################### # kafka mysql jdbc driver address ###################################### #efak.driver=com.mysql.cj.jdbc.Driver #efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #efak.username=root #efak.password=123456
-
添加环境变量
sudo vim /etc/profile.d/my_env.sh # kafkaEFAK export KE_HOME=/opt/module/efak export PATH=$PATH:$KE_HOME/bi
-
注意
. /etc/profile
-
-
启动
-
注意:启动之前需要先启动
ZK
以及KAFKA
kf.sh start
-
启动
efak
bin/ke.sh start
-
停止
efak
bin/ke.sh stop
-
-
登录页面查看监控数据
- http://192.168.10.102:8048/
Kafka-Kraft
模式
Kafka-Kraft
架构
Kafka
2.8.0以前,元数据在zookeeper
中,运行时动态选举controller
,由controller
进行Kafka
集群管理。Kafka-Kraft
架构不再依赖zookeeper
集群,而是用三台controller
节点代替zookeeper
,元数据保存在controller
中,由controller
直接进行Kafka
集群管理- 好处:
Kafka
不再依赖外部框架,而是能够独立运行controller
管理集群时,不再需要从zookeeper
中先读取数据,集群性能上升- 由于不依赖
zookeeper
,集群扩展时不再收到zookeeper
读写能力限制 controller
不再动态选举,而是由配置文件规定。这样可以有针对性的加强controller
节点的配置
Kafka-Kraft
集群部署
-
再次解压一份
kafka
安装包tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
-
重命名为
kafka2
mv kafka_2.12-3.0.0/ kafka2
-
在
hadoop102
上修改/opt/module/kafka2/config/kraft/server.properties
配置文件vim server.properties
#kafka 的角色(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功能) process.roles=broker, controller #节点 ID node.id=2 #controller 服务协议别名 controller.listener.names=CONTROLLER #全 Controller 列表 controller.quorum.voters=2@hadoop102:9093,3@hadoop103:9093,4@hadoop104:9093 #不同服务器绑定的端口 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 #broker 服务协议别名 inter.broker.listener.name=PLAINTEXT #broker 对外暴露的地址 advertised.Listeners=PLAINTEXT://hadoop102:9092 #协议别名到安全协议的映射 listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLA INTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL #kafka 数据存储目录 log.dirs=/opt/module/kafka2/data
-
分发
kafka2
xsync kafka2/
- 在
hadoop103
和hadoop104
上 需 要 对node.id
相应改变 , 值 需 要 和controller.quorum.voters
对应。 - 在
hadoop103
和hadoop104
上需要根据各自的主机名称, 修改相应的advertised.Listeners
地址。
- 在
-
初始化集群数据目录
-
首先生成存储目录唯一 ID
bin/kafka-storage.sh random-uuid J7s9e8PPTKOO47PxzI39VA
-
用该
ID
格式化kafka
存储目录(三台节点)。bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties
-
-
启动
kafka
集群bin/kafka-server-start.sh -daemon config/kraft/server.properties
-
停止
kafka
集群bin/kafka-server-stop.sh
Kafka-Kraft
集群启动停止脚本
-
在
/home/atguigu/bin
目录下 创建文件kf2.sh
脚本文件vim kf2.sh
#! /bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------启动 $i Kafka2-------" ssh $i "/opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.properties" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------停止 $i Kafka2-------" ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh " done };; esac
-
添加执行权限
chmod +x kf2.sh
-
启动集群命令
kf2.sh start
-
停止集群命令
kf2.sh stop