Kafka 入门第二篇:快速上手与 Java 示例
Kafka 入门第二篇:快速上手与 Java 示例
🎯 目标读者:已了解 Kafka 基本概念,希望快速跑通代码
📖 难度:⭐⭐☆☆☆
⏱️ 预计阅读时间:30 分钟
💻 前置条件:Java 8+,Maven,Docker(可选)
一、启动 Kafka 环境
方式一:Docker Compose(推荐,最快)
创建 docker-compose.yml:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
# 启动
docker-compose up -d
# 验证启动成功
docker ps | grep kafka
方式二:KRaft 模式(无 ZooKeeper,Kafka 3.0+)
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
方式三:常用命令行操作
# 进入 Kafka 容器
docker exec -it <kafka-container-id> bash
# 创建 Topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic order-events \
--partitions 3 \
--replication-factor 1
# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 Topic 详情
kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092
# 命令行生产消息
kafka-console-producer.sh --topic order-events --bootstrap-server localhost:9092
# 命令行消费消息(从头开始消费)
kafka-console-consumer.sh \
--topic order-events \
--from-beginning \
--bootstrap-server localhost:9092
二、Java 原生 API 快速上手
2.1 Maven 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
2.2 Producer:发送消息
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
// 1. 配置 Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key 和 value 的序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 可靠性配置:所有 ISR 副本确认才算成功
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 失败重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 批量发送的最大字节数(默认 16KB)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 批量等待时间(毫秒),等待更多消息合并发送
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
// 2. 创建 Producer
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// ===== 发送方式一:fire-and-forget(发完不管) =====
ProducerRecord<String, String> record1 =
new ProducerRecord<>("order-events", "order-001", "订单创建:用户A购买了商品X");
producer.send(record1); // 异步发送,不关心结果
// ===== 发送方式二:同步发送(等待响应) =====
ProducerRecord<String, String> record2 =
new ProducerRecord<>("order-events", "order-002", "订单支付:用户B支付了100元");
Future<RecordMetadata> future = producer.send(record2);
RecordMetadata metadata = future.get(); // 阻塞等待
System.out.printf("消息已发送 → Topic: %s, Partition: %d, Offset: %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
// ===== 发送方式三:异步回调(推荐) =====
for (int i = 0; i < 10; i++) {
String key = "order-" + String.format("%03d", i);
String value = "订单事件-" + i;
ProducerRecord<String, String> record3 =
new ProducerRecord<>("order-events", key, value);
producer.send(record3, (metadata2, exception) -> {
if (exception != null) {
// 发送失败
System.err.println("消息发送失败: " + exception.getMessage());
} else {
// 发送成功
System.out.printf("✅ 发送成功 → Partition: %d, Offset: %d, Key: %s%n",
metadata2.partition(), metadata2.offset(), record3.key());
}
});
}
// 确保所有消息都发送完毕
producer.flush();
}
}
}
Producer 消息发送路径:
sequenceDiagram
participant App as 应用代码
participant Interceptor as 拦截器
participant Serializer as 序列化器
participant Partitioner as 分区器
participant Accumulator as RecordAccumulator
participant Sender as Sender线程
participant Broker as Kafka Broker
App->>Interceptor: send(record)
Interceptor->>Serializer: 序列化 key/value
Serializer->>Partitioner: 计算目标 Partition
Partitioner->>Accumulator: 追加到对应 Partition 的 Batch
Note over Accumulator: 等待 batch.size 满 或 linger.ms 到期
Accumulator->>Sender: 批量提交
Sender->>Broker: 网络发送
Broker-->>Sender: ACK 确认
Sender-->>App: 回调 Callback
2.3 Consumer:消费消息
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
// 1. 配置 Consumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 消费者组ID(相同 group.id 的 Consumer 共同消费同一 Topic)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
// key 和 value 的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 自动提交 offset(不推荐生产使用,会有重复消费风险)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 当没有初始 offset 时,从最早的消息开始读
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 单次 poll 最多返回 100 条消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// 2. 创建 Consumer 并订阅 Topic
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("order-events"));
System.out.println("开始消费消息...");
while (true) {
// 3. 拉取消息(超时时间 1 秒)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
continue;
}
// 4. 处理消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"📨 收到消息 → Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value()
);
// 业务处理...
processOrder(record.value());
}
// 5. 手动提交 offset(处理完成后再提交,避免消息丢失)
consumer.commitSync();
// 或者异步提交(性能更好,但提交失败不抛异常)
// consumer.commitAsync();
}
}
}
private static void processOrder(String orderData) {
// 模拟业务处理
System.out.println("处理订单: " + orderData);
}
}
⚠️ 手动提交 vs 自动提交:
- 自动提交(
enable.auto.commit=true):可能丢消息(消息还没处理完就提交了)- 手动提交(推荐生产使用):处理完业务再提交,保证 at-least-once
2.4 指定 Partition 消费
// 场景:只消费某个分区,用于数据修复或重新处理
TopicPartition partition0 = new TopicPartition("order-events", 0);
consumer.assign(Collections.singletonList(partition0));
// 从指定 offset 开始消费
consumer.seek(partition0, 100L); // 从 offset=100 开始消费
// 从最早消费
consumer.seekToBeginning(Collections.singletonList(partition0));
// 从最新消费
consumer.seekToEnd(Collections.singletonList(partition0));
三、Spring Boot 集成 Kafka
3.1 Maven 依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3.2 application.yml 配置
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
linger.ms: 5
batch.size: 16384
consumer:
group-id: order-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.demo.dto"
max.poll.records: 100
listener:
ack-mode: MANUAL_IMMEDIATE # 手动提交 offset
3.3 消息 DTO
package com.example.demo.dto;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
public class OrderEvent {
private String orderId;
private String userId;
private BigDecimal amount;
private String status;
private LocalDateTime createTime;
}
3.4 Producer:使用 KafkaTemplate
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Service
public class OrderEventProducer {
private static final String TOPIC = "order-events";
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public OrderEventProducer(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 发送订单事件(异步 + 回调)
*/
public void sendOrderEvent(OrderEvent event) {
kafkaTemplate.send(TOPIC, event.getOrderId(), event)
.addCallback(new ListenableFutureCallback<SendResult<String, OrderEvent>>() {
@Override
public void onSuccess(SendResult<String, OrderEvent> result) {
System.out.printf("✅ 订单事件发送成功, orderId=%s, partition=%d, offset=%d%n",
event.getOrderId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
System.err.println("❌ 订单事件发送失败, orderId=" + event.getOrderId() + ", error=" + ex.getMessage());
// 实际生产中:记录失败日志,存入数据库重试,或对接告警系统
}
});
}
}
3.5 Consumer:使用 @KafkaListener
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
public class OrderEventConsumer {
/**
* 基础用法:监听单个 Topic
*/
@KafkaListener(
topics = "order-events",
groupId = "order-consumer-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack // 手动 ack
) {
try {
OrderEvent event = record.value();
System.out.printf("📨 收到订单事件: orderId=%s, partition=%d, offset=%d%n",
event.getOrderId(), record.partition(), record.offset());
// 业务处理
processOrder(event);
// 处理成功,手动提交 offset
ack.acknowledge();
} catch (Exception e) {
System.err.println("消费失败: " + e.getMessage());
// 不 ack → 消息会在下次 poll 时重新投递(at-least-once)
// 超过重试次数后进入死信队列(需配置)
}
}
/**
* 批量消费(提升吞吐量)
*/
@KafkaListener(
topics = "order-events-batch",
groupId = "batch-consumer-group",
containerFactory = "batchKafkaListenerContainerFactory" // 需要配置批量 containerFactory
)
public void handleBatchOrderEvents(
List<ConsumerRecord<String, OrderEvent>> records,
Acknowledgment ack
) {
System.out.println("批量消费,本次消息数: " + records.size());
records.forEach(record -> processOrder(record.value()));
ack.acknowledge();
}
private void processOrder(OrderEvent event) {
// 实际业务逻辑:扣库存、发通知等
System.out.println("处理订单: " + event.getOrderId());
}
}
3.6 KafkaTemplate 配置类
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
/**
* 手动提交 offset 的 ContainerFactory(单条消费)
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 设置并发数(= Partition 数)
factory.setConcurrency(3);
// 手动提交模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/**
* 批量消费的 ContainerFactory
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> batchKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); // 开启批量消费
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
四、完整示例:订单事件异步解耦
sequenceDiagram
participant 用户
participant OrderService as 订单服务
participant Kafka
participant InventoryService as 库存服务
participant SMSService as 短信服务
用户->>OrderService: POST /order/create
OrderService->>OrderService: 写入订单DB
OrderService->>Kafka: 发布 OrderCreatedEvent
OrderService-->>用户: 返回 orderId(快速响应)
Note over Kafka: Kafka 持久化消息
Kafka-->>InventoryService: 消费事件(异步扣库存)
Kafka-->>SMSService: 消费事件(异步发短信)
Controller 层:
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/create")
public ResponseEntity<String> createOrder(@RequestBody CreateOrderRequest request) {
String orderId = orderService.createOrder(request);
return ResponseEntity.ok(orderId);
}
}
Service 层:
@Service
@Transactional
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private OrderEventProducer producer;
public String createOrder(CreateOrderRequest request) {
// 1. 写入订单数据库
Order order = new Order(request);
orderRepository.save(order);
// 2. 发布订单创建事件到 Kafka
OrderEvent event = new OrderEvent();
event.setOrderId(order.getId());
event.setUserId(request.getUserId());
event.setAmount(request.getAmount());
event.setStatus("CREATED");
event.setCreateTime(LocalDateTime.now());
producer.sendOrderEvent(event);
// 3. 快速返回,不等待下游处理
return order.getId();
}
}
五、面试高频追问
Q1:KafkaTemplate.send() 是同步还是异步的?
KafkaTemplate.send() 返回一个 ListenableFuture,本身是异步的。
如果要同步等待结果,调用 .get() 方法。
// 同步发送(会阻塞当前线程)
SendResult<String, OrderEvent> result = kafkaTemplate.send(topic, key, value).get();
// 也可以设置超时
kafkaTemplate.send(topic, key, value).get(5, TimeUnit.SECONDS);
⚠️ 生产环境一般不推荐同步发送,会严重降低吞吐量。
Q2:@KafkaListener 的 concurrency 怎么设置?
@KafkaListener(
topics = "order-events",
groupId = "order-consumer-group",
concurrency = "3" // 启动 3 个线程,对应 3 个 Partition
)
- concurrency ≤ Partition 数:超出的线程会闲置
- concurrency = Partition 数:每个 Partition 一个线程,最高并行度
- concurrency > Partition 数:多余线程不消费任何 Partition
Q3:消费者线程是否安全?
@KafkaListener 默认每个分区独立一个线程,线程之间不共享状态,是线程安全的。但如果你在 Listener 方法中调用的业务组件(如 Service、Repository)使用了共享变量,需要自行保证线程安全。
📚 下一篇
本文是 Kafka 系统学习路线 的第 2 篇


💬 文章评论 (0)