Kafka 入门第三篇:核心配置参数详解

玄武2026-03-06字数:2625阅读:约 9 分钟kafkaKafka中间件

Kafka 入门第三篇:核心配置参数详解

🎯 目标读者:已能跑通 Kafka 基础代码,想深入理解各配置参数含义
📖 难度:⭐⭐⭐☆☆
⏱️ 预计阅读时间:25 分钟


一、为什么要深入理解配置参数?

很多开发者用 Kafka 出问题,根源往往不是代码逻辑错了,而是配置参数没设对

错误场景根因配置
Producer 发消息丢了acks=0 或没有配置重试
Consumer 重复消费enable.auto.commit=true + 消费慢
消息积压处理不过来max.poll.records 太小,fetch.max.bytes 太小
Broker 磁盘快满了log.retention.hours 太长,未配置 log.retention.bytes
Rebalance 太频繁session.timeout.ms 太短,max.poll.interval.ms 太小

二、Producer 核心配置参数

2.1 可靠性相关

// ===== 最重要的配置 =====
props.put(ProducerConfig.ACKS_CONFIG, "all");

acks —— 消息确认机制(最核心)

acks含义可靠性性能使用场景
0发完不等确认最低(可能丢消息)最高日志采集(允许少量丢失)
1Leader 写入后确认中(Leader 宕机会丢)一般业务
-1/all所有 ISR 副本写入后确认最高(不丢消息)最低金融/订单/核心业务
graph LR
    subgraph acks=0
        P0[Producer] -->|发送| B0[Broker]
        P0 -.->|不等待| P0_done((✓))
    end

    subgraph acks=1
        P1[Producer] -->|发送| L1[Leader]
        L1 -->|写入本地日志| L1_done((✓))
        L1 -->|ack| P1
    end

    subgraph acks=all
        P2[Producer] -->|发送| L2[Leader]
        L2 -->|同步| F2A[Follower A]
        L2 -->|同步| F2B[Follower B]
        F2A -->|确认| L2
        F2B -->|确认| L2
        L2 -->|ack| P2
    end

retriesretry.backoff.ms

// 失败重试次数(Kafka 2.1+ 默认 Integer.MAX_VALUE,配合 delivery.timeout.ms 使用)
props.put(ProducerConfig.RETRIES_CONFIG, 3);

// 重试间隔(默认 100ms)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200);

// 消息发送最大超时时间(默认 120000ms = 2分钟)
// retries 用尽前,只要在这个时间内,会一直重试
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

⚠️ 注意:开启 retries 可能导致消息乱序(先发的消息重试时,后发的消息已成功)。
解决方案:设置 max.in.flight.requests.per.connection=1(牺牲吞吐量)或开启幂等性。

enable.idempotence —— 幂等生产者

// 开启幂等性(Kafka 3.0+ 默认开启)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// 开启幂等性会自动设置:
// acks = all
// retries > 0
// max.in.flight.requests.per.connection <= 5

幂等性原理:Producer 启动时获得唯一 PID,每条消息附带递增的 Sequence Number,Broker 拒绝重复的 <PID, Partition, SequenceNumber> 组合。

2.2 性能相关

// 批量发送的最大字节数(默认 16KB)
// 消息积累到 batch.size 后立即发送
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB

// 等待更多消息的最长时间(默认 0ms,即不等待)
// 设置 5-10ms 可显著提升吞吐量,略微增加延迟
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);

batch.sizelinger.ms 的关系

graph TD
    msg[新消息到达] --> add[追加到 Batch]
    add --> check{哪个条件先满足?}
    check -->|batch.size 满了| send_batch[立即发送 Batch]
    check -->|linger.ms 超时| send_linger[超时发送 Batch]
    send_batch --> clear[清空 Batch]
    send_linger --> clear
// 发送缓冲区总大小(默认 32MB)
// 当所有 Batch 总大小达到此值,send() 会阻塞
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB

// 缓冲区满后的最长等待时间(默认 60000ms)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);

// 压缩算法(默认 none)
// 推荐 lz4(平衡压缩率和性能),大数据场景用 zstd
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
压缩算法压缩率CPU 消耗速度推荐场景
none-最快消息量小
gzip存储敏感
snappy平衡场景
lz4极低最快生产推荐
zstd大数据场景

2.3 Producer 配置速查表

// 完整的生产推荐配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 可靠性(核心业务用 all,日志用 1)
props.put(ProducerConfig.ACKS_CONFIG, "all");

// 幂等(推荐开启)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// 性能(根据业务延迟要求调整)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);      // 32KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);           // 5ms
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

三、Consumer 核心配置参数

3.1 最重要的配置

// 消费者组 ID(相同 group.id 共享消费)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");

// 没有初始 offset 时的策略
// earliest: 从最早的消息开始消费(不会遗漏)
// latest: 从最新的消息开始消费(会遗漏已有消息)
// none: 没有 offset 就抛异常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

3.2 offset 提交配置

// ===== 核心选择:自动提交 vs 手动提交 =====

// 自动提交(不推荐生产使用)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 每1秒提交一次

// 手动提交(推荐)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 代码中通过 consumer.commitSync() 或 consumer.commitAsync() 提交

自动提交的陷阱

sequenceDiagram
    participant Consumer
    participant Broker

    Consumer->>Broker: poll() 拉取 offset 0-99 的消息
    Note over Consumer: 自动提交线程:每1秒提交一次
    Consumer->>Broker: 自动提交 offset=100(还没处理完!)
    Note over Consumer: ❌ Consumer 宕机,offset已提交
    Consumer->>Broker: 重启后继续从 offset=100 消费
    Note over Consumer: offset 0-99 的消息永久丢失!

3.3 性能与稳定性配置

// 单次 poll 最多返回的消息数(默认 500)
// 根据每条消息处理时间调整:处理慢则调小,处理快则调大
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

// 两次 poll 之间的最长间隔(默认 5 分钟)
// 如果业务处理时间超过此值,Consumer 会被踢出 Group(触发 Rebalance)
// 一定要大于批量处理最大耗时!
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5分钟

// Consumer 与 Broker 的心跳超时(默认 10 秒)
// Consumer 在此时间内没发心跳会被认为已死,触发 Rebalance
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

// 心跳间隔(默认 3 秒,必须小于 session.timeout.ms / 3)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

Rebalance 触发参数关系

graph LR
    A[heartbeat.interval.ms\n心跳间隔 = 10s] 
    B[session.timeout.ms\n心跳超时 = 30s]
    C[max.poll.interval.ms\n最长处理时间 = 5min]

    A -->|心跳超时| B
    B -->|触发| R[Rebalance]
    C -->|处理超时| R
    
    note1["规则:\nheartbeat.interval.ms\n< session.timeout.ms / 3"] 
// 单次 fetch 的最少字节数(默认 1 字节,即有消息就返回)
// 增大可以减少网络请求次数(提升吞吐),但会增加延迟
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB

// 等待 fetch.min.bytes 满足的最长时间(默认 500ms)
// 超过此时间后即使不满足 fetch.min.bytes 也会返回
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);

// 单次 fetch 的最大字节数(默认 50MB)
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50MB

3.4 Consumer 配置速查表

// 生产推荐配置(手动提交 + 稳定不 Rebalance)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Offset 策略
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

// 性能与稳定性(根据业务处理速度调整)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);

四、Broker 核心配置参数

Broker 配置在 server.properties 或 KRaft 配置文件中。

4.1 存储相关

# 日志存储目录(生产建议多个磁盘,逗号分隔)
log.dirs=/data/kafka/logs1,/data/kafka/logs2

# 消息保留时间(默认 168 小时 = 7 天)
log.retention.hours=168

# 消息保留最大字节数(默认 -1,不限制)
# 建议设置,防止磁盘写满
log.retention.bytes=107374182400  # 100GB

# 单个日志 Segment 文件的最大大小(默认 1GB)
# Segment 文件满了会创建新 Segment(触发索引写入)
log.segment.bytes=1073741824  # 1GB

# 检查过期日志的频率(默认 5 分钟)
log.retention.check.interval.ms=300000

4.2 Topic 默认配置

# 默认 Partition 数(新建 Topic 时未指定则用此值)
num.partitions=3

# 默认副本数
default.replication.factor=3

# 最小 ISR 同步副本数(配合 acks=all 使用)
# 如果 ISR 集合中的副本数 < min.insync.replicas,Producer 会收到异常
min.insync.replicas=2

min.insync.replicasacks=all 的黄金搭配

副本总数 = 3
min.insync.replicas = 2
acks = all

→ 允许 1 个副本宕机,仍能正常写入
→ 如果 2 个副本都宕机,写入失败(抛 NotEnoughReplicasException)

⚠️ 常见误解acks=all 并不等于"所有副本都写入",而是"所有 ISR 副本都写入"。
配合 min.insync.replicas=2,才能真正保证数据不丢。

4.3 性能相关

# 网络线程数(处理 Client 连接,默认 3)
num.network.threads=8

# IO 线程数(磁盘读写,默认 8)
num.io.threads=16

# Socket 接收缓冲区大小
socket.receive.buffer.bytes=1048576  # 1MB

# Socket 发送缓冲区大小
socket.send.buffer.bytes=1048576    # 1MB

4.4 Broker 配置速查表

# ===== 生产推荐 Broker 配置 =====

# 基础
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-host:9092

# 存储
log.dirs=/data/kafka/logs
log.retention.hours=168
log.retention.bytes=107374182400
log.segment.bytes=1073741824

# 副本
default.replication.factor=3
num.partitions=6
min.insync.replicas=2

# 性能
num.network.threads=8
num.io.threads=16

五、配置参数全局速查

Producer 参数

参数默认值最优实践说明
acks1all核心业务必须 all
retriesMAX_INT3配合幂等使用
enable.idempotencefalsetrue防止重复发送
batch.size163843276832KB 批量发送
linger.ms05-10等待批量,提升吞吐
compression.typenonelz4推荐 lz4
buffer.memory33554432按需调大发送缓冲总大小

Consumer 参数

参数默认值最优实践说明
enable.auto.committruefalse生产环境必须手动提交
auto.offset.resetlatestearliest不遗漏历史消息
max.poll.records500100-500按处理速度调整
max.poll.interval.ms300000按业务需要必须大于批量处理耗时
session.timeout.ms1000030000防止频繁 Rebalance
fetch.min.bytes11024减少空轮询

Broker 参数

参数默认值最优实践说明
min.insync.replicas12配合 acks=all
default.replication.factor13生产必须≥3
log.retention.hours168按业务7天一般够用
num.network.threads38按 CPU 核数
num.io.threads816按磁盘数

六、面试高频追问

Q1:acks=all 会不会导致性能很差?

不一定。影响因素:

  1. ISR 中副本数:只有 ISR 中的副本需要确认,不是所有副本
  2. 网络延迟:Broker 间同步延迟通常在 1-5ms 以内
  3. linger.ms + batch.size:批量发送可以很大程度抵消 acks=all 的延迟

生产实践:核心业务在 acks=all 下吞吐量通常在万级/秒,满足大多数业务需求。

Q2:max.poll.interval.ms 设置太大有什么风险?

设置太大意味着 Consumer 宕机后,Kafka 要等更长时间才发现(触发 Rebalance)。在此期间,这个 Consumer 负责的 Partition 消息积压会更严重。

建议:设置为实际最大处理时间的 2 倍,并配合监控告警,而不是无限调大。

Q3:fetch.min.bytesfetch.max.wait.ms 怎么配合使用?

这两个配置实现了**"长轮询"**效果:

  • Consumer 发出 fetch 请求后,Broker 等待直到累积了 fetch.min.bytes 的数据,或者等了 fetch.max.wait.ms 毫秒
  • 好处:减少消息量低时的空轮询次数,降低 CPU 和网络开销
场景:消息量低(低峰期)
→ Broker 积累 1KB 数据等了 500ms 才返回
→ Consumer 少发一次请求,节省网络开销

场景:消息量高(高峰期)
→ Broker 很快积累够 1KB,立即返回
→ 不影响实时性

📚 下一篇

Kafka为什么那么快?(进入核心机制阶段)


本文是 Kafka 系统学习路线 的第 3 篇

💬 文章评论 (0)

支持 Markdown,请友善交流
正在加载评论...