1 发送消息(KafkaProducer)

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
package net.kolbe.kafka.client;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class ProducerSendDemo {

    public static final String brokerList = "a.kolbe.com.cn:9092,b.kolbe.com.cn:9092,c.kolbe.com.cn:9092";
    public static final String topic = "topic-demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        ProducerSendDemo producerDemo = new ProducerSendDemo();
        producerDemo.fireAndForgetSend(producer);
        producerDemo.syncSend(producer);
        producerDemo.asyncSend(producer);
        
        producer.close();
    }
}
package net.kolbe.kafka.client;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerFastStart {

    public static final String brokerList = "a.kolbe.com.cn:9092,b.kolbe.com.cn:9092,c.kolbe.com.cn:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(topic));
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }

    }
}

1.1 发后即忘

    /**
     * Fire And Forget
     */
    private void fireAndForgetSend(KafkaProducer<String, String> producer) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Fire And Forget Send Message");
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

1.2 同步发送

    /**
     * Sync Send
     */
    private void syncSend(KafkaProducer<String, String> producer) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Send Sync Message");
        try {
            Future<RecordMetadata> future = producer.send(record);
            
            // 同步获取发送结果
            RecordMetadata metadata = future.get();
            System.out.println(metadata);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

1.3 异步发送

    /**
     * Async Send
     */
    private void asyncSend(KafkaProducer<String, String> producer) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Send Async Message");
        try {
            producer.send(record, new AsyncCallback());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * Async Send Callback
     */
    private static class AsyncCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println("Async send message callback");
        }
    }

2 生产者消息(ProducerRecord)

2.1 指定Topic、Value

public ProducerRecord(String topic, V value) {}

2.2 指定Topic、Key、Value

public ProducerRecord(String topic, K key, V value) {}

2.3 指定Topic、Partition、Key、Value

public ProducerRecord(String topic, Integer partition, K key, V value) {}

2.4 指定Topic、Partition、Timestamp、Key、Value

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}

注:如果Timestamp为空,则取System.currentTimeMillis()

2.5 指定Topic、Partition、Timestamp、Key、Value、Header

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}

注:如果Timestamp为空,则取System.currentTimeMillis()

2.6 指定Topic、Partition、Key、Value、Header

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}

3 序列化器(Serializer)

Kafka生产者需要用序列化器将对象转换成字节数组,然后通过网络传送给Kafka,则在消费者端,需要使用反序列化器,把从Kafka中收到的字节数组转换成相应的对象。

3.1 Serializer

public interface Serializer<T> extends Closeable {

    void configure(Map<String, ?> configs, boolean isKey);

    byte[] serialize(String topic, T data);

    @Override
    void close();
}

Serializer接口包含三个方法

  • configure:负责配置当前的类
  • serialize:负责执行序列化操作,将对象转换成字节数组
  • close:负责关闭当前序列化器,需要保证幂等性,有可能被KafkaProducer调用多次

3.2 StringSerializer

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

StringSerializer负责处理字符串类型的对象序列化,该对象的configure方法主要用来设置字段串的字符集,其中设置的键为

  • key.serializer.encoding:键的序列化编码类型
  • value.serializer.encoding:值的序列化编码类型
  • serializer.encoding:当键和值均未指定时,使用的编码类型

如果上述三个设置均没有配置,则使用默认的编码集:UTF-8

4 分区器(Partitioner)

消息在发往Kafka的过程中,要经过序列化器和分区器,分区器是用来确定消息要发往的分区,如果消息发送的时候指定的Partition,则以用户指定的为准,如果没有指定,则通过分区器来计算要发往的分区号。

4.1 分区器接口(Paritioner)

public interface Partitioner extends Configurable, Closeable {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    public void close();
}

Kafka分区器接口Partitioner一共有三个方法,其中自身定义了两个方法:partition及close,以及从Configurable继承来的configure

  • partition:通过topic、key、value、cluster来计算分区号
  • close:通过close方法在关闭分区器时回收资源
  • configure:用来获取配置信息及初始化数据

4.2 默认分区器(DefaultPartitioner)

public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}

}

Kafka提供了默认的分区器实现 DefaultPartitioner,其中partition方法通过topic、key、value获取分区号:

  • 当key为空时:消息将轮询的方式发到主题的可用分区中
  • 当key不为空时:消息将通过对key进行哈希,最终根据得到的哈希值来计算分区号(所有可用分区),相同key将被分配到同一分区

注:分区可用与不可用的区别在于Partition的Leader是否可用,如果Partition的Leader可用,则为可用分区,反之则为不可用分区

4.3 自定义分区器

public class MyPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(0);
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            return counter.getAndIncrement() % numPartitions;
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

自定义分区器时,需要通过实现Partitioner接口,通过自定义partition方法,当key为空时,同样取所有的分区做轮询。

配置自定义分区器时使用以下方式:

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    }

5 拦截器(Interceptor)

5.1 拦截器接口(ProducerInterceptor)

public interface ProducerInterceptor<K, V> extends Configurable {

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    public void onAcknowledgement(RecordMetadata metadata, Exception exception);

    public void close();
}

Kafka生产者拦截器接口ProducerInterceptor有三个自带方法onSend、onAcknowlegement、close,以及从父接口继承的configure方法

  • onSend:可以对生产者消息进行定制化操作,包括改topic、key、value、partition,但是一般不推荐修改topic、key、partition,因为将影响分区计算及日志压缩
  • onAcknowledgement:在消息被应答的后或者消息发送失败后调用,优先于用户设定的callback,该方法运行于Producer IO线程中,建议逻辑简单,否则影响发送速度
  • close:用于关闭拦截器时执行一些资源清理工作
  • configure:用来做一些初始化操作或获取配置的操作

5.2 自定义拦载器

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String modifyValue = "prefix-" + record.value();
        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifyValue, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            System.out.println("Send message error");
        } else {
            System.out.println("Send message success");
        }
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}

}

配置自定义拦截器时使用以下方式:

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    }

注:如果有多个拦截器,则以英文逗号“,”分隔

6 原理分析

6.1 整体架构

image.png

Kafka生产者客户端共有两个线程协调运行

  • 主线程:由KafkaProducer创建消息,并经过拦截器、序列化器、分区器后,到达消息累加器缓存起来
  • Sender线程:负责从消息累加器中获取消息并将其发送给Kafka

6.2 消息累加器(RecordAccumulator)

RecordAccumulator负责缓存KafkaProducer发来的消息,以便Sender可以批量发送减少网络消耗提升性能。
RecordAccumulator缓存的大小通过生产者客户端参数buffer.memory配置,默认为32MB。如果生产者发送的消息超过发送到服务器的速度,KafkaProducer的send方法将被阻塞,当超过配置的超时时间时,将抛异常。该超时配置为max.block.ms,默认值为60秒。
RecordAccumulator内部为每一个分区维护了一个双端队列,Producer线程产生的新消息被追加在队列的尾部,Sender线程读取消息从队列的头部,该队列为<Partition, Deque>,ProducerBatch是一个消息的批次,包含了多个ProducerRecord消息。
消息在网络上以字节的方式传输,在发送前需要创建一个内存区来保存消息,频繁的创建和释放比较耗资源,Kafka内部实用ByteBuffer来保存和释放消息,使用BufferPool来实现ByteBuffer的复用,只有特定大小的ByteBuffer会被复用,该大小是通过batch.size参数来指定,默认为16KB。ProducerRecord在经过RecordAccumulator时,将判断ProducerBatch是否已创建,如果已创建则直接使用,未创建则新建,新建时将判断该消息的大小有没有超过batch.size设置的大小,如果不超过则使用batch.size来创建ProducerBatch就可以通过BufferPool来复用,否则以实际大小创建,这段内存区域不会被复用。

6.3 发送线程(Sender)

Sender将从RecordAccumulator中获取缓存消息关封装成<Node, List,因为针对KafkaProducer应用逻辑来说,只关注发往发个分区,而网络连接来说则关注的是发往哪个Node(Broker)。
Sender在转换成<Node, List后,还会进一步封装成<Node, Request>的形式,这样就可以将Request发往各种Node,Request指的是Kafka各种协议请求(ProduceRequest)
Sender在将消息发往Kafka前还会保存一份InFlightRequests,具体形式为Map<NodeId, Deque>,主要用来缓存发出去但是没有收到响应的请求,可以通过配置参数来限制每个连接最多缓存的请求(max.in.flight.requests.per.connection),默认值为5,如果超过5个未响应,则不会在向这个连接发送更多的请求了。

7 重要生产者配置

配置项配置说明默认值
acks该参数用来设置多少副本收到消息后,生产者会认为消息成功写入,当为1时代表只要leader副本写入,当为0时代表无需等待服务端响应,当为-1或all时,则代表需要等待所有的ISR副本写入成功才算成功1
max.request.size生产者发送消息的最大值1MB
retries生产者发送消息的重试次数0
retry.backoff.ms生产者发送消息的重试间隔100毫秒
compression.type指定消息的压缩方式:gizp、snappy、lz4、nonenone
connections.max.idle.ms指定多久之后关闭闲置的连接9分钟
linger.ms生产者等待多久将消息发出,需结合ProducerBatch的默认大小使用,当超过Producer默认大小或超过等待时间(linger.ms),则消息会被发出0
receive.buffer.bytesSocker接收消息缓存区的大小,如果设置成-1,则使用操作系统默认值32KB
send.buffer.bytesSocket发送消息缓冲区的大小,如果设置成-1,则使用操作系统默认值128KB
request.timeout.ms生产者请求的超时时间,当超时时,会进行重试30秒
bootstrap.servers连接Kafka集群的地址清单
key.serializerKey的序列化类
value.serializerValue的序列化类
buffer.memory生产者客户端用于缓存消息的缓冲区大小32MB
batch.size用于指定ProducerBatch可以复用的内存区域大小16KB
client.id设置KafkaProducer的客户端id
partitioner.class用来指定分区器DefaultPartitioner
interceptor.classes用来设定生产者拦截器
max.in.flight.requests.per.connection限制每个连接最多缓存的请求数5
metadata.max.age.ms如果在这个时间内元数据没有更新的话,会被强制一更新5分钟
transactional.id事处id,必须唯一