Kafka 入门第三篇:核心配置参数详解
Kafka 入门第三篇:核心配置参数详解
🎯 目标读者:已能跑通 Kafka 基础代码,想深入理解各配置参数含义
📖 难度:⭐⭐⭐☆☆
⏱️ 预计阅读时间:25 分钟
一、为什么要深入理解配置参数?
很多开发者用 Kafka 出问题,根源往往不是代码逻辑错了,而是配置参数没设对。
| 错误场景 | 根因配置 |
|---|---|
| Producer 发消息丢了 | acks=0 或没有配置重试 |
| Consumer 重复消费 | enable.auto.commit=true + 消费慢 |
| 消息积压处理不过来 | max.poll.records 太小,fetch.max.bytes 太小 |
| Broker 磁盘快满了 | log.retention.hours 太长,未配置 log.retention.bytes |
| Rebalance 太频繁 | session.timeout.ms 太短,max.poll.interval.ms 太小 |
二、Producer 核心配置参数
2.1 可靠性相关
// ===== 最重要的配置 =====
props.put(ProducerConfig.ACKS_CONFIG, "all");
acks —— 消息确认机制(最核心)
| acks | 含义 | 可靠性 | 性能 | 使用场景 |
|---|---|---|---|---|
0 | 发完不等确认 | 最低(可能丢消息) | 最高 | 日志采集(允许少量丢失) |
1 | Leader 写入后确认 | 中(Leader 宕机会丢) | 中 | 一般业务 |
-1/all | 所有 ISR 副本写入后确认 | 最高(不丢消息) | 最低 | 金融/订单/核心业务 |
graph LR
subgraph acks=0
P0[Producer] -->|发送| B0[Broker]
P0 -.->|不等待| P0_done((✓))
end
subgraph acks=1
P1[Producer] -->|发送| L1[Leader]
L1 -->|写入本地日志| L1_done((✓))
L1 -->|ack| P1
end
subgraph acks=all
P2[Producer] -->|发送| L2[Leader]
L2 -->|同步| F2A[Follower A]
L2 -->|同步| F2B[Follower B]
F2A -->|确认| L2
F2B -->|确认| L2
L2 -->|ack| P2
end
retries 和 retry.backoff.ms
// 失败重试次数(Kafka 2.1+ 默认 Integer.MAX_VALUE,配合 delivery.timeout.ms 使用)
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 重试间隔(默认 100ms)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200);
// 消息发送最大超时时间(默认 120000ms = 2分钟)
// retries 用尽前,只要在这个时间内,会一直重试
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
⚠️ 注意:开启
retries可能导致消息乱序(先发的消息重试时,后发的消息已成功)。
解决方案:设置max.in.flight.requests.per.connection=1(牺牲吞吐量)或开启幂等性。
enable.idempotence —— 幂等生产者
// 开启幂等性(Kafka 3.0+ 默认开启)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 开启幂等性会自动设置:
// acks = all
// retries > 0
// max.in.flight.requests.per.connection <= 5
幂等性原理:Producer 启动时获得唯一 PID,每条消息附带递增的 Sequence Number,Broker 拒绝重复的 <PID, Partition, SequenceNumber> 组合。
2.2 性能相关
// 批量发送的最大字节数(默认 16KB)
// 消息积累到 batch.size 后立即发送
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB
// 等待更多消息的最长时间(默认 0ms,即不等待)
// 设置 5-10ms 可显著提升吞吐量,略微增加延迟
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
batch.size 与 linger.ms 的关系:
graph TD
msg[新消息到达] --> add[追加到 Batch]
add --> check{哪个条件先满足?}
check -->|batch.size 满了| send_batch[立即发送 Batch]
check -->|linger.ms 超时| send_linger[超时发送 Batch]
send_batch --> clear[清空 Batch]
send_linger --> clear
// 发送缓冲区总大小(默认 32MB)
// 当所有 Batch 总大小达到此值,send() 会阻塞
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB
// 缓冲区满后的最长等待时间(默认 60000ms)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
// 压缩算法(默认 none)
// 推荐 lz4(平衡压缩率和性能),大数据场景用 zstd
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
| 压缩算法 | 压缩率 | CPU 消耗 | 速度 | 推荐场景 |
|---|---|---|---|---|
| none | - | 无 | 最快 | 消息量小 |
| gzip | 高 | 高 | 慢 | 存储敏感 |
| snappy | 中 | 低 | 快 | 平衡场景 |
| lz4 | 中 | 极低 | 最快 | 生产推荐 |
| zstd | 高 | 中 | 快 | 大数据场景 |
2.3 Producer 配置速查表
// 完整的生产推荐配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 可靠性(核心业务用 all,日志用 1)
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 幂等(推荐开启)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 性能(根据业务延迟要求调整)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 5ms
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
三、Consumer 核心配置参数
3.1 最重要的配置
// 消费者组 ID(相同 group.id 共享消费)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
// 没有初始 offset 时的策略
// earliest: 从最早的消息开始消费(不会遗漏)
// latest: 从最新的消息开始消费(会遗漏已有消息)
// none: 没有 offset 就抛异常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
3.2 offset 提交配置
// ===== 核心选择:自动提交 vs 手动提交 =====
// 自动提交(不推荐生产使用)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 每1秒提交一次
// 手动提交(推荐)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 代码中通过 consumer.commitSync() 或 consumer.commitAsync() 提交
自动提交的陷阱:
sequenceDiagram
participant Consumer
participant Broker
Consumer->>Broker: poll() 拉取 offset 0-99 的消息
Note over Consumer: 自动提交线程:每1秒提交一次
Consumer->>Broker: 自动提交 offset=100(还没处理完!)
Note over Consumer: ❌ Consumer 宕机,offset已提交
Consumer->>Broker: 重启后继续从 offset=100 消费
Note over Consumer: offset 0-99 的消息永久丢失!
3.3 性能与稳定性配置
// 单次 poll 最多返回的消息数(默认 500)
// 根据每条消息处理时间调整:处理慢则调小,处理快则调大
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// 两次 poll 之间的最长间隔(默认 5 分钟)
// 如果业务处理时间超过此值,Consumer 会被踢出 Group(触发 Rebalance)
// 一定要大于批量处理最大耗时!
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5分钟
// Consumer 与 Broker 的心跳超时(默认 10 秒)
// Consumer 在此时间内没发心跳会被认为已死,触发 Rebalance
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// 心跳间隔(默认 3 秒,必须小于 session.timeout.ms / 3)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
Rebalance 触发参数关系:
graph LR
A[heartbeat.interval.ms\n心跳间隔 = 10s]
B[session.timeout.ms\n心跳超时 = 30s]
C[max.poll.interval.ms\n最长处理时间 = 5min]
A -->|心跳超时| B
B -->|触发| R[Rebalance]
C -->|处理超时| R
note1["规则:\nheartbeat.interval.ms\n< session.timeout.ms / 3"]
// 单次 fetch 的最少字节数(默认 1 字节,即有消息就返回)
// 增大可以减少网络请求次数(提升吞吐),但会增加延迟
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB
// 等待 fetch.min.bytes 满足的最长时间(默认 500ms)
// 超过此时间后即使不满足 fetch.min.bytes 也会返回
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
// 单次 fetch 的最大字节数(默认 50MB)
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 50MB
3.4 Consumer 配置速查表
// 生产推荐配置(手动提交 + 稳定不 Rebalance)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Offset 策略
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 性能与稳定性(根据业务处理速度调整)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
四、Broker 核心配置参数
Broker 配置在 server.properties 或 KRaft 配置文件中。
4.1 存储相关
# 日志存储目录(生产建议多个磁盘,逗号分隔)
log.dirs=/data/kafka/logs1,/data/kafka/logs2
# 消息保留时间(默认 168 小时 = 7 天)
log.retention.hours=168
# 消息保留最大字节数(默认 -1,不限制)
# 建议设置,防止磁盘写满
log.retention.bytes=107374182400 # 100GB
# 单个日志 Segment 文件的最大大小(默认 1GB)
# Segment 文件满了会创建新 Segment(触发索引写入)
log.segment.bytes=1073741824 # 1GB
# 检查过期日志的频率(默认 5 分钟)
log.retention.check.interval.ms=300000
4.2 Topic 默认配置
# 默认 Partition 数(新建 Topic 时未指定则用此值)
num.partitions=3
# 默认副本数
default.replication.factor=3
# 最小 ISR 同步副本数(配合 acks=all 使用)
# 如果 ISR 集合中的副本数 < min.insync.replicas,Producer 会收到异常
min.insync.replicas=2
min.insync.replicas 与 acks=all 的黄金搭配:
副本总数 = 3
min.insync.replicas = 2
acks = all
→ 允许 1 个副本宕机,仍能正常写入
→ 如果 2 个副本都宕机,写入失败(抛 NotEnoughReplicasException)
⚠️ 常见误解:
acks=all并不等于"所有副本都写入",而是"所有 ISR 副本都写入"。
配合min.insync.replicas=2,才能真正保证数据不丢。
4.3 性能相关
# 网络线程数(处理 Client 连接,默认 3)
num.network.threads=8
# IO 线程数(磁盘读写,默认 8)
num.io.threads=16
# Socket 接收缓冲区大小
socket.receive.buffer.bytes=1048576 # 1MB
# Socket 发送缓冲区大小
socket.send.buffer.bytes=1048576 # 1MB
4.4 Broker 配置速查表
# ===== 生产推荐 Broker 配置 =====
# 基础
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-host:9092
# 存储
log.dirs=/data/kafka/logs
log.retention.hours=168
log.retention.bytes=107374182400
log.segment.bytes=1073741824
# 副本
default.replication.factor=3
num.partitions=6
min.insync.replicas=2
# 性能
num.network.threads=8
num.io.threads=16
五、配置参数全局速查
Producer 参数
| 参数 | 默认值 | 最优实践 | 说明 |
|---|---|---|---|
acks | 1 | all | 核心业务必须 all |
retries | MAX_INT | 3 | 配合幂等使用 |
enable.idempotence | false | true | 防止重复发送 |
batch.size | 16384 | 32768 | 32KB 批量发送 |
linger.ms | 0 | 5-10 | 等待批量,提升吞吐 |
compression.type | none | lz4 | 推荐 lz4 |
buffer.memory | 33554432 | 按需调大 | 发送缓冲总大小 |
Consumer 参数
| 参数 | 默认值 | 最优实践 | 说明 |
|---|---|---|---|
enable.auto.commit | true | false | 生产环境必须手动提交 |
auto.offset.reset | latest | earliest | 不遗漏历史消息 |
max.poll.records | 500 | 100-500 | 按处理速度调整 |
max.poll.interval.ms | 300000 | 按业务需要 | 必须大于批量处理耗时 |
session.timeout.ms | 10000 | 30000 | 防止频繁 Rebalance |
fetch.min.bytes | 1 | 1024 | 减少空轮询 |
Broker 参数
| 参数 | 默认值 | 最优实践 | 说明 |
|---|---|---|---|
min.insync.replicas | 1 | 2 | 配合 acks=all |
default.replication.factor | 1 | 3 | 生产必须≥3 |
log.retention.hours | 168 | 按业务 | 7天一般够用 |
num.network.threads | 3 | 8 | 按 CPU 核数 |
num.io.threads | 8 | 16 | 按磁盘数 |
六、面试高频追问
Q1:acks=all 会不会导致性能很差?
不一定。影响因素:
- ISR 中副本数:只有 ISR 中的副本需要确认,不是所有副本
- 网络延迟:Broker 间同步延迟通常在 1-5ms 以内
linger.ms + batch.size:批量发送可以很大程度抵消 acks=all 的延迟
生产实践:核心业务在 acks=all 下吞吐量通常在万级/秒,满足大多数业务需求。
Q2:max.poll.interval.ms 设置太大有什么风险?
设置太大意味着 Consumer 宕机后,Kafka 要等更长时间才发现(触发 Rebalance)。在此期间,这个 Consumer 负责的 Partition 消息积压会更严重。
建议:设置为实际最大处理时间的 2 倍,并配合监控告警,而不是无限调大。
Q3:fetch.min.bytes 和 fetch.max.wait.ms 怎么配合使用?
这两个配置实现了**"长轮询"**效果:
- Consumer 发出 fetch 请求后,Broker 等待直到累积了
fetch.min.bytes的数据,或者等了fetch.max.wait.ms毫秒 - 好处:减少消息量低时的空轮询次数,降低 CPU 和网络开销
场景:消息量低(低峰期)
→ Broker 积累 1KB 数据等了 500ms 才返回
→ Consumer 少发一次请求,节省网络开销
场景:消息量高(高峰期)
→ Broker 很快积累够 1KB,立即返回
→ 不影响实时性
📚 下一篇
→ Kafka为什么那么快?(进入核心机制阶段)
本文是 Kafka 系统学习路线 的第 3 篇


💬 文章评论 (0)