1 消费者与消费组

消费者(Consumer)订阅Kafka中的主题(Topic),并且从主题上拉取消息,每个消费者都对应着一个消费组(Consumer Group),当消费被发布到主题后,会被投递到所有订阅它的消费组中的一个消费者。
消费者只能消费所分配的分区中的消息,每一个分区只能被一个消费组中的一个消费者所消费。
image.png
图:消费者、消费组、主题、分区之间的关系

如果消费者的数量大于分区的数量,将出现有消费者消费不到任何分区的消息,从而造成资源浪费。

2 客户端开发

2.1 客户端示例

public class KafkaConsumerAnalysis {

    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "a.kolbe.com.cn:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
        return properties;
    }

    public static void main(String[] args) throws Exception {
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
        kafkaConsumer.subscribe(Collections.singletonList("topic-demo"));

        while(true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                StringBuilder sb = new StringBuilder();
                sb.append("topic:")
                        .append(record.topic())
                        .append(", partition:")
                        .append(record.partition())
                        .append(", key:")
                        .append(record.key())
                        .append(", value:")
                        .append(record.value());
                System.out.println(sb.toString());
            }
        }
    }
}

2.2 必要的参数配置

  • bootstrap.servers:指定连接的Kafka集群地址清单,多个则以英文逗号分隔
  • group.id:指定消费者所属的消费组
  • key.deserializer:对应生产者的key.serializer,用来做Key的反序列化
  • value.deserializer:对应生产者的value.serializer,用来做Value的反序列化
  • client.id:用来指定客户端的id,非必要参数

2.3 订阅主题和分区

1)订阅单个主题

public static void main(String[] args) throws Exception {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
    kafkaConsumer.subscribe(Collections.singletonList("topic-demo"));
}

2)订阅多个主题

public static void main(String[] args) throws Exception {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
    kafkaConsumer.subscribe(Arrays.asList("topic-demo1", "topic-demo-2"));
}

3)订阅分区

public static void main(String[] args) throws Exception {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
    kafkaConsumer.assign(Collections.singleton(new TopicPartition("topic-demo", 0)));
}

4)正则表达式订阅

public static void main(String[] args) throws Exception {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
    kafkaConsumer.subscribe(Pattern.compile("topic-.*"));
}

2.4 获取分区信息

public static void main(String[] args) throws Exception {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
    List<PartitionInfo> partitions = kafkaConsumer.partitionsFor("topic-demo");
}
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
}

PartitionInfo类中包含了以下信息

  • topic:分区的topic信息
  • partition:分区号
  • leader:分区的leader所在节点
  • replicas:分区所有的副本所在节点
  • inSyncReplicas:ISR所在节点集合
  • offlineReplicas:OSR所在节点集合

2.4 取消订阅

public static void main(String[] args) throws Exception {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
    kafkaConsumer.subscribe(Collections.singletonList("topic-demo"));
    kafkaConsumer.unsubscribe();
}

3 反序列化

Kafka生产者需要用序列化器将对象转换成字节数组,然后通过网络传送给Kafka,则在消费者端,需要使用反序列化器,把从Kafka中收到的字节数组转换成相应的对象。

3.1 Deserializer

public interface Deserializer<T> extends Closeable {

    void configure(Map<String, ?> configs, boolean isKey);

    T deserialize(String topic, byte[] data);

    @Override
    void close();
}

Deserializer接口包含三个方法

  • configure:负责配置当前的类
  • deserialize:负责执行反序列化操作,将字节数组转换成对象
  • close:负责关闭当前序列化器,需要保证幂等性,有可能被KafkaConsumer调用多次

3.2 StringDeserializer

public class StringDeserializer implements Deserializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

StringDeserializer负责处理字符串类型的对象反序列化,该对象的configure方法主要用来设置字段串的字符集,其中设置的键为

  • key.deserializer.encoding:键的反序列化编码类型
  • value.deserializer.encoding:值的反序列化编码类型
  • deserializer.encoding:当键和值均未指定时,使用的编码类型

如果上述三个设置均没有配置,则使用默认的编码集:UTF-8

4 消息消费

4.1 KafkaConsumer.poll

Kafka中的消费是基于拉模式,该模式是消费者主动向服务端发起请求来拉取消息,KafkaConsumer.poll方法带有一个超时时间参数,该参数用来控制方法的阻塞时间,在消费者缓冲区里没有可用数据时,会发生阻塞,当到达超时时间仍然没有消费时,会直接返回。

ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));

4.1 ConsumerRecord

public class ConsumerRecord<K, V> {
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
}
  • topic:代表消息的主题
  • partition:代表消息的分区
  • offset:代表消息的偏移
  • timestamp:代表消息的时间戳
  • timestampType:代表消息的时间戳类型(NO_TIMESTAMP_TYPE没有时间戳,CREATE_TIME消息的创建时间,LOG_APPEND_TIME追加到日志文件的时间)
  • serializedKeySize:key经过序列化后的大小
  • serializedValueSize:value经过序列化后的大小
  • headers:消息头
  • key:消息键
  • value:消息值

5 位移提交

5.1 位移概念

在Kafka中offset有两层意思:

  • 偏移量:代表消息在分区中的位置
  • 消费位移:代表消费者消费到了具体的位置

KafkaConsumer.poll() 方法每次调用的时候返回的未消费的消息集,所以需要记录上一次消费时的消费位移,为了做到这一点,需要将消费位移持久化,在新的消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。消费位移持久化的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交操作。

image.png

position:下一次要拉取的消息位置
committedOffset:已经提交过的消费位移
lastConsumedOffset:当前消费到的位置

注:假设当前消费到了x位置,当需要提交消费位移时,提交的值为 x + 1(position),而非 x

public static void main(String[] args) throws Exception {
    TopicPartition topicPartition = new TopicPartition("topic-demo", 0);

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(initConfig());
    kafkaConsumer.assign(Collections.singletonList(topicPartition));

    OffsetAndMetadata committedOffset = kafkaConsumer.committed(topicPartition);
    long position = kafkaConsumer.position(topicPartition);
}

Kafka提供了两个方法获取position和committedOffset

  • KafkaConsumer.position() 获取 position
  • KafkaConsumer.committed() 获取 committedOffset

5.2 自动提交

Kafka提交消费位移的动作默认是自动的,可以通过以下两个配置来控制自动提交的开关和间隔

# 是否开启自动提交,默认为true,关闭的话需要修改成false
enable.auto.commit=true
# 自动提交的间隔,默认为5秒,enable.auto.commit=true时该配置才生效
auto.commit.interval.ms=5000

注:在默认的情况下,消费者会每隔5秒获取每个分区中的最大消费位移,并进行提交

5.3 同步提交

5.3.1 不带参数提交

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @Override
    public void commitSync() {
        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
    }   
}

不带参数的同步提交将对整个拉取到的消息集进行同步提交,commitSync() 方法会你根据 poll() 方法拉取的最新位移来进行提交,提交的值对应的是 position的位置

5.3.2 带参数提交

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @Override
    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
    }
}

带参数的同步提交可以支持按分区的粒度进行位移提交,如果以最可靠的方式则每消费一条提交一次,但性能也最低,其次则可以每个分区消费完提交一次,但存在重复消费的可能性。

5.4 异步提交

5.4.1 不带参数提交

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @Override
    public void commitAsync() {
        commitAsync(null);
    }   
}

不带参数的异步提交将对整个拉取到的消息集进行异步提交,commitAsync() 方法会你根据 poll() 方法拉取的最新位移来进行提交,提交的值对应的是 position的位置

5.4.2 带参数提交

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        acquireAndEnsureOpen();
        try {
            commitAsync(subscriptions.allConsumed(), callback);
        } finally {
            release();
        }
    }   
    
    @Override
    public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        acquireAndEnsureOpen();
        try {
            log.debug("Committing offsets: {}", offsets);
            coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
        } finally {
            release();
        }
    }
}

带参数提交的方法支持提交成功或失败时触发回调操作,以及支持按分区提交成功或失败时触发回调操作

5.5 重复消费

当消费者拉取消息后,在提交消费位移之前,消费者消费了一些消息后崩溃了,此时如果重启应用,将从上次拉取的位置进行消费,而重启前消费的那几条数据将重新被消费,此时即产生了重复消费的问题

5.6 消息丢失

当消费者拉取消息后,在消费完所有消息之前,就提交了消费位移,此时如果消费者还没消费完所有的数据,应用就崩溃了,如果应用重启将从最新的位置消费,而之前未消费完的数据丢失了,此时即产生了消息丢失的问题

6 控制或关闭消费

6.1 暂停指定分区拉取

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @Override
    public void pause(Collection<TopicPartition> partitions) {
        acquireAndEnsureOpen();
        try {
            log.debug("Pausing partitions {}", partitions);
            for (TopicPartition partition: partitions) {
                subscriptions.pause(partition);
            }
        } finally {
            release();
        }
    }
}

6.2 启用指定分区拉取

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @Override
    public void resume(Collection<TopicPartition> partitions) {
        acquireAndEnsureOpen();
        try {
            log.debug("Resuming partitions {}", partitions);
            for (TopicPartition partition: partitions) {
                subscriptions.resume(partition);
            }
        } finally {
            release();
        }
    }
}

6.3 获取暂停的分区

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @Override
    public Set<TopicPartition> paused() {
        acquireAndEnsureOpen();
        try {
            return Collections.unmodifiableSet(subscriptions.pausedPartitions());
        } finally {
            release();
        }
    }
}

6.4 中断消费

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    @Override
    public void wakeup() {
        this.client.wakeup();
    }
}

通过调用wakeup()方法可以退出poll()逻辑,并抛出WakeupException异常

7 指定位移消费

7.1 默认消费位移

在Kafka中每当消费者找不到记录的消费位移时,就会根据消费者客户端参数配置来决定从何处开始进行消费

# 该配置用来决定消费者找不到消费位移时,从哪里开始消费
# 该配置可以取三种值:latest(最新位置),earliest(起始处),none(找不到则抛异常)
auto.offset.reset=latest

7.2 指定位移消费

7.2.1 指定固定的位移

KafkaConsumer.seek()支持按指定位移消费,因为seek()仅支持重置消费者分配到的分区的消费位置,而分区的分配又在poll()中,所以在seek()方法调用前,需要执行一次poll()操作。

private void seekFixedOffset() {
    Properties properties = new Properties();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("topic-demo"));
    Set<TopicPartition> assignmentPartitions = new HashSet<>();
    while(assignmentPartitions.size() == 0) {
        consumer.poll(Duration.ofMillis(100));
        assignmentPartitions = consumer.assignment();
    }
    for (TopicPartition topicPartition : assignmentPartitions) {
        consumer.seek(topicPartition, 10);
    }
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    }
}

7.2.2 指定分区的末尾

private void seekEndOffset() {
    Properties properties = new Properties();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("topic-demo"));
    Set<TopicPartition> assignmentPartitions = new HashSet<>();
    while(assignmentPartitions.size() == 0) {
        consumer.poll(Duration.ofMillis(100));
        assignmentPartitions = consumer.assignment();
    }

    Map<TopicPartition, Long> partitionOffsets = consumer.endOffsets(assignmentPartitions);
    for (TopicPartition topicPartition : assignmentPartitions) {
        consumer.seek(topicPartition, partitionOffsets.get(topicPartition));
    }

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    }
}

7.2.3 指定分区的开头

private void seekBeginOffset() {
    Properties properties = new Properties();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singletonList("topic-demo"));
    Set<TopicPartition> assignmentPartitions = new HashSet<>();
    while(assignmentPartitions.size() == 0) {
        consumer.poll(Duration.ofMillis(100));
        assignmentPartitions = consumer.assignment();
    }

    Map<TopicPartition, Long> partitionOffsets = consumer.beginningOffsets(assignmentPartitions);
    for (TopicPartition topicPartition : assignmentPartitions) {
        consumer.seek(topicPartition, partitionOffsets.get(topicPartition));
    }

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    }
}

8 拦截器

8.1 接口

Kafka消费者拦截器是通过实现 ConsumerInterceptor接口来实现的

public interface ConsumerInterceptor<K, V> extends Configurable {

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    public void close();
}

8.2 onConsume() 方法

KafkaConsumer会在调用poll()方法返回之前调用调过ConsumerInterceptor.onConsume()方法

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {
        acquireAndEnsureOpen();
        try {
            if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            long elapsedTime = 0L;
            do {
                client.maybeTriggerWakeup();
                final long metadataEnd;
                if (includeMetadataInTimeout) {
                    final long metadataStart = time.milliseconds();
                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
                        return ConsumerRecords.empty();
                    }
                    metadataEnd = time.milliseconds();
                    elapsedTime += metadataEnd - metadataStart;
                } else {
                    while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {
                        log.warn("Still waiting for metadata");
                    }
                    metadataEnd = time.milliseconds();
                }

                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));

                if (!records.isEmpty()) {
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                        client.pollNoWakeup();
                    }
                    // 在返回记录之前调用 ConsumerInterceptor.onConsume() 方法
                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
                final long fetchEnd = time.milliseconds();
                elapsedTime += fetchEnd - metadataEnd;

            } while (elapsedTime < timeoutMs);

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

8.3 onCommit() 方法

KafkaConsumer会在调用commitSync()及commitAsync()方法成功之后调用调过ConsumerInterceptor.onCommit()方法

    public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
        invokeCompletedOffsetCommitCallbacks();
        if (offsets.isEmpty())
            return true;
        long now = time.milliseconds();
        long startMs = now;
        long remainingMs = timeoutMs;
        do {
            if (coordinatorUnknown()) {
                if (!ensureCoordinatorReady(remainingMs))
                    return false;
                remainingMs = timeoutMs - (time.milliseconds() - startMs);
            }
            RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
            client.poll(future, remainingMs);
            invokeCompletedOffsetCommitCallbacks();
            if (future.succeeded()) {
                if (interceptors != null)
                    // 提交位移成功后调用
                    interceptors.onCommit(offsets);
                return true;
            }

            if (future.failed() && !future.isRetriable())
                throw future.exception();

            time.sleep(retryBackoffMs);

            now = time.milliseconds();
            remainingMs = timeoutMs - (now - startMs);
        } while (remainingMs > 0);

        return false;
    }

9 重要的消费者参数

参数说明默认值
fetch.min.bytesKafka在收到Consumer拉取请求时,如果数据量小于该值,则需要进行等待1B
fetch.max.bytesKafka在收到Consumer拉取请求时,最大拉取的数据量50MB
fetch.max.wait.ms拉取的等待时间,如果消息不满足fecth.min.bytes参数要求,最终会等待到该参数配置值后,直接返回500ms
max.partition.fetch.bytes限制每个分区最大的拉取量,fetch.max.bytes用来限制总体的拉取量1M
max.poll.recordsConsumer在一次拉取请示中拉取的最大消息数500
connections.max.idle.ms关闭闲置连接的时间9分钟
exclude.internal.topics是否向消费者公开内部主题(__consumer_offsets和__transaction_state)true
receive.buffer.bytesSocket接收消息缓冲区的大小,如果为-1则使用操作系统默认值64KB
send.buffer.bytesSocket发送消息缓冲区的大小,如果为-1则使用操作系统默认值128KB
request.timeout.msConsumer等待请求响应的最长时间30秒
metadata.max.age.ms元数据的过期时间,如果元数据在限定时间没有更新,则会被强制更新5分钟
reconnect.backoff.ms消费者尝试重连主机的等待时间50毫秒
retry.backoff.ms尝试失败请求的间隔100毫秒
isolation.level事务的隔离级别,可选值为read_uncommitted和read_committed,默认为read_uncommitted,即可消费到High Watermark处的位置read_uncommitted