Kafka 生产者

生产者消息发送流程

发送原理

  • 在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccmulatormain线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker

生产者重要参数列表

参数名称描述
bootstrap.servers生产者连接集群所需的 broker 地址清单。 例如
hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker
里查找到其他 broker 信息。
key.serializer
value.serializer
指定发送消息的 keyvalue 的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可
以提高吞吐量,但是如果该值设置太大,会导致数据
传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,``sender等待linger.time<br/>之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms` 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leaderisr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性
要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries
表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送
成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是
100 m s
enable.idempotence是否开启
幂等性 默认 true ,开启 幂等性 。
compression.type生产者发送的所有数据的压缩方式。
默认是 none ,也就是不压缩。
支持压缩类型 none 、 gzip 、 snappy 、 lz4 和 zstd

异步发送API

普通异步发送

  1. 导入依赖

    <dependencies>
    	<dependency>
    		<groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>3.0.0</version>
    	</dependency>
    </dependencies>
    
  2. 编写不带回调函数的API 代码

    public class CustomProducer {
    
        public static void main(String[] args) {
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    
            //1.创建 kafka 生产者对象
            // hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord("first", "atguigu" + i));
            }
    
            //3 关闭资源
            kafkaProducer.close();
        }
    }
    

带回调函数的 异步发送

  • 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果Exception不为 null,说明消息发送失败。
  • 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
  1. 编写代码

    public class CustomProducerCallback {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    
            //1.创建 kafka 生产者对象
            // hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord("first", "atguigu" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition());
                        }
                    }
                });
            }
    
            //3 关闭资源
            kafkaProducer.close();
    
        }
    }
    

同步发送API

  • 只需在异步发送的基础上,再调用一下get()方法即可

    public class CustomProducerSync {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
    
            //1.创建 kafka 生产者对象
            // hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord("first", "atguigu" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition());
                        }
                    }
                }).get();
            }
    
            //3 关闭资源
            kafkaProducer.close();
    
        }
    
    }
    
    

生产者分区

分区好处

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数组存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  2. 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据

生产者发送消息的分区策略

  1. 默认的分区器DefaultPartitioner

    • IDEActrl+n,全局查找DefaultPartitioner
  2. IDEA中全局查找(ctrl+n)ProducerRecord类,在类中可以看到三个大类构造方法

    1. 指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0

    2. 没有指明partition值但有key的情况下,将keyhash值与topicpartition数进行取余得到partition的值

    3. 即没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)

      例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到,Kafka再随机一个分区进行使用(如果还是0会继续随机)

  • 案例一

    • 将数据发往指定 partition的情况下
    public class CustomProducerCallbackPartitions {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            //关联自定义分区其
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
    
            //1.创建 kafka 生产者对象
            // hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //2 发送数据
            for (int i = 0; i < 50; i++) {
                //指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
                kafkaProducer.send(new ProducerRecord("first",1,"","atguigu" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition());
                        }
                    }
                });
    
                Thread.sleep(2);
            }
    
            //3 关闭资源
            kafkaProducer.close();
    
        }
    }
    
  • 案例二

    • 没有指明 partition值但有key的情况下,将keyhash值与topicpartition数进行取余得到partition
    public class CustomProducerCallbackPartitions {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            //关联自定义分区其
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
    
            //1.创建 kafka 生产者对象
            // hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //2 发送数据
            for (int i = 0; i < 50; i++) {
                //依次指定 key 值为 a,b,f,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
                kafkaProducer.send(new ProducerRecord("first","a","atguigu" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition());
                        }
                    }
                });
    
                Thread.sleep(2);
            }
    
            //3 关闭资源
            kafkaProducer.close();
    
        }
    }
    

自定义分区器

  1. 定义类实现 Partition接口

  2. 重写 partition()方法

    public class MyPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
            //获取数据 atguigu hello
            String msgValues = value.toString();
    
            int partition;
            if(msgValues.contains("atguigu")){
                partition = 0;
            } else {
                partition = 1;
            }
            return partition;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    
  3. 使用分区器的方法,在生产者的配置中添加分区器参数

    public class CustomProducerCallbackPartitions {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            //关联自定义分区其
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
    
            //1.创建 kafka 生产者对象
            // hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //2 发送数据
            for (int i = 0; i < 50; i++) {
                kafkaProducer.send(new ProducerRecord("first","atguigu" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition());
                        }
                    }
                });
    
                Thread.sleep(2);
            }
    
            //3 关闭资源
            kafkaProducer.close();
    
        }
    }
    

生产经验

生产者如何提高吞吐量

  • batch.size:批次大小,默认16k
  • linger.ms:等待时间,修改为5-100ms
  • compression.type:压缩snappy
  • RecordAccumulator:缓冲区大小,修改为64m

数据可靠性

  1. ack应答原理

    • 0:生产者发送过来的数据,不需要等数据落盘应答
    • 1:生产者发送过来的数据,Leader收到数据后应答
    • -1(all):生产者发送过来的数据,LeaderISR队列里面的所有节点收齐数据后应答
      • Replicas队列里面有一个Follower迟迟不能与Leader进行同步,这种情况如何解决?
        • Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)
        • 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阀值由replica.lag.time.max.ms参数设定,默认30s
        • 这样,就不用等待长期联系不上或者已经故障的节点
  2. 数据可靠性分析:

    • 如果分区副本设置为 1 个,或者ISR里应答的最小副本数量(min.insync.replicas默认为 1)设置为 1 ,和 ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)
    • 数据完全可靠条件 = ACK级别设置为 -1 + 分区副本大于等于 2 + ISR里应答的最小副本数量大于等于 2
  3. 可靠性总结

    • ack = 0,生产者发送过来数据就不管了,可靠性差,效率高
    • ack = 1, 生产者发送过来数据 Leader应答,可靠性中等,效率中等
    • ack = -1,生产者发送过来数据LeaderISR队列里面所有Follower应答,可靠性高,效率低
    • 在生产环境中,ack = 0 很少使用;ack = 1,一般用于传输普通日志,允许丢个别数据;acks = -1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景
  4. 代码

    public class CustomProducerParameters {
    
        public static void main(String[] args) {
    
            Properties properties = new Properties();
    
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 设置 ack s
    	   properties.put( ProducerConfig.ACKS_CONFIG , "all");
    
    	   // 重试次数 retries ,默认是 int 最大值, 2147483647
           properties.put( ProducerConfig.RETRIES_CONFIG ,3);
    
            //1 创建生产者
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //
    
            for(int i = 0;i < 5;i++){
                kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
            }
    
            kafkaProducer.close();
        }
    }
    

数据重复分析

数据传递语义

  • 至少一次(At Least Once) = ACK级别设置为 -1 + 分区副本大于等于 2 + ISR里应答的最小副本数量大于等于 2
  • 至多一次(At Most Once) = ACK级别设置为 0
  • 总结:
    • At Least Once可以保证数据不丢失,但是不能保证数据不重复
    • At Most Once可以保证数据不重复,但是不能保证数据不丢失
  • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失
    • Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务

幂等性

  • 幂等性是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复

  • 精确一次(Exactly Once) = 幂等性 + 至少一次 (ack = -1 + 分区副本数 >= 2 + ISR最小副本数量 >= 2 )

  • 重复数据的判断标准:具有 <PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PIDKafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的

  • 所以幂等性只能保证的是在单分区单会话内不重复

  1. 如何使用幂等性
    • 开启参数 enable.idempotence默认为true,false关闭

生产者事务

  • 说明:开启事务,必须开启幂等性

  • transaction_state-分区-Leader:存储事务信息的特殊主题

    • 默认有50个分区,每个分区负责一部分事务。事务划分是根据transactional.idhashcode值%50,计算出该事务属于哪个分区。该分区Leader副本所在的broker节点即为这个transactional.id对应的Transaction Coordinator节点
  • 事务流程

    1. Kafka ProducerTransaction Coordinator请求producer id(幂等性需要)
    2. Transaction Coordinator 返回 producer idKafka Producer
    3. Kafka Producer发送消息到TopicA
    4. Kafka ProducerTransaction Coordinator发送commit请求
    5. Transaction Coordinatortransaction_state发送持久化commit请求
    6. Transaction Coordinator返回成功给Kafka Producer
    7. Transaction CoordinatorTopicA-Partition()Leader后台发送commit请求
    8. TopicA-Partition()Leader返回给Transaction Coordinator成功
    9. Transaction Coordinatortransaction_state持久化事务成功信息
  • tips:

    • Producer在使用事务功能钱,1必须先自定义一个唯一的transactional.id。有了transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事物
  • 代码

    public class CustomProducerTransactions {
    
        public static void main(String[] args) {
            // 0 配置
            Properties properties = new Properties();
    
            // 连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            //指定事务id
            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01");
    
    
            //1.创建 kafka 生产者对象
            // hello
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            kafkaProducer.initTransactions();
    
            kafkaProducer.beginTransaction();
    
    
            try {
                //2 发送数据
                for (int i = 0; i < 5; i++) {
                    kafkaProducer.send(new ProducerRecord("first", "atguigu" + i));
                }
    
                int i = 1 / 0;
    
                kafkaProducer.commitTransaction();
            }catch (Exception e){
                kafkaProducer.abortTransaction();
            }finally {
                //3 关闭资源
                kafkaProducer.close();
            }
        }
    }
    

数据乱序

  • 单分区内,有序
  • 多分区,分区与分区间无序
  • kafka1.x及以后版本保证数据单分区有序
    1. 未开启幂等性
      • max.in.flight.requests.per.connection需要设置为1
    2. 开启幂等性
      • max.in.flight.requests.per.connection需要设置小于等于5
      • 原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的