java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java Kafka消费者

Java Kafka消费者实现过程

作者:lifallen

Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交则依赖框架,本文给大家介绍Java Kafka消费者实现过程,感兴趣的朋友一起看看吧

基础

Java Kafka消费者主要通过以下核心类实现:

偏移量(Offset)的含义

Kafka提供两种主要的消费方式:

(1)手动提交offset方式

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交
props.put(ConsumerConfig.GROUP_ID_CONFIG, "csdn");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("testKafka"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());
    }
    consumer.commitAsync(); // 异步提交
    // 或者使用 consumer.commitSync(); // 同步提交
}

(2)自动提交offset方式

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 开启自动提交
props.put("auto.commit.interval.ms", "1000"); // 自动提交时间间隔
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("topic = " + record.topic() + " offset = " + record.offset() + " value = " + record.value());
    }
    // 不需要手动提交offset
}

手动提交offset有两种具体实现:

消费者组(Consumer Group)

// 通过group.id配置指定消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "experiment");

消费者组的特性:

Broker连接

// 指定Kafka集群的broker地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

偏移量(Offset)管理

// 记录分区的offset信息
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
// 在处理消息时更新offset
currentOffsets.put(
    new TopicPartition(record.topic(), record.partition()), 
    new OffsetAndMetadata(record.offset() + 1, "no metadata")
);
// 提交特定的偏移量
consumer.commitAsync(currentOffsets, null);

消费者通过pull(拉)模式从broker中读取数据:

在生产环境中,一般使用手动提交offset方式,因为:

KafkaConsumer类分析

KafkaConsumer 是 Kafka 的客户端核心类之一,用于消费 Kafka 集群中的消息。它支持高可靠的消息消费、自动容错、分区分配与再均衡、消费组(Consumer Group)等机制。

主要成员

主要构造方法

主要方法

关键代码与核心算法

2.1 订阅与分区分配

2.2 拉取消息

2.3 偏移量管理

2.4 消费组与再均衡

核心数据结构

3.1 SubscriptionState

3.2 ConsumerConfig

3.3 ConsumerRecords、ConsumerRecord

3.4 TopicPartition

3.5 OffsetAndMetadata

与 Broker 的交互流程

消费组(Consumer Group)机制与关联

总结

网络连接分析

1. 消费组与 Broker 的连接与交互

1.1 消费组与 Broker 的网络连接

连接建立流程

2. 拉取数据的批量处理机制

拉取数据不是“一条一条”

这样做的原因

关键源码位置

数据拉取的底层流程

  1. poll() 方法被调用
  2. 根据分区分配情况,构造 FetchRequest
  3. 通过 NetworkClient 向每个 leader broker 发送 FetchRequest
  4. Broker 返回批量数据
  5. 反序列化为 ConsumerRecords<K, V>,返回给用户

关键代码入口

3. 多线程实现与线程安全

KafkaConsumer 并不是多线程的

源码注释说明

多线程架构推荐

4. 复杂和有意思的实现分析

4.1 消费组协调与再均衡

代码位置

4.2 批量拉取的背后——高效网络 IO

代码位置

4.3 Offset 管理的强一致性

5. 直接源码定位

功能关键类或方法
网络连接的建立KafkaConsumer → ConsumerDelegate → KafkaClient (NetworkClient)
消费组协调ConsumerCoordinator、GroupCoordinator、JoinGroup/SyncGroup
消息批量拉取KafkaConsumer.poll()、Fetcher.fetchRecords()
多线程相关说明KafkaConsumer 注释、wakeup()
Offset 管理commitSync/commitAsync、OffsetCommitRequest、__consumer_offsets
负载均衡与再均衡ConsumerPartitionAssignor、ConsumerCoordinator

小结

协作

详细说明各个组件及其关系:

Fetcher

sendFetches 实现了基本发送请求

    /**
     * Set up a fetch request for any node that we have assigned partitions for which doesn't already have
     * an in-flight fetch or pending fetch data.
     * @return number of fetches sent
     */
    public synchronized int sendFetches() {
        final Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = prepareFetchRequests();
        sendFetchesInternal(
                fetchRequests,
                (fetchTarget, data, clientResponse) -> {
                    synchronized (Fetcher.this) {
                        handleFetchSuccess(fetchTarget, data, clientResponse);
                    }
                },
                (fetchTarget, data, error) -> {
                    synchronized (Fetcher.this) {
                        handleFetchFailure(fetchTarget, data, error);
                    }
                });
        return fetchRequests.size();
    }

总结一下合并的流程:

合并的本质是将发往同一个 Broker 的多个分区的拉取操作打包到一个网络请求中。 这样做的好处是:

到此这篇关于Java Kafka消费者实现过程的文章就介绍到这了,更多相关Java Kafka消费者内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文