Kafka 入门第二篇:快速上手与 Java 示例

玄武2026-03-06字数:2327阅读:约 8 分钟kafkaKafka中间件

Kafka 入门第二篇:快速上手与 Java 示例

🎯 目标读者:已了解 Kafka 基本概念,希望快速跑通代码
📖 难度:⭐⭐☆☆☆
⏱️ 预计阅读时间:30 分钟
💻 前置条件:Java 8+,Maven,Docker(可选)


一、启动 Kafka 环境

方式一:Docker Compose(推荐,最快)

创建 docker-compose.yml

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
# 启动
docker-compose up -d

# 验证启动成功
docker ps | grep kafka

方式二:KRaft 模式(无 ZooKeeper,Kafka 3.0+)

version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

方式三:常用命令行操作

# 进入 Kafka 容器
docker exec -it <kafka-container-id> bash

# 创建 Topic
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic order-events \
  --partitions 3 \
  --replication-factor 1

# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看 Topic 详情
kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092

# 命令行生产消息
kafka-console-producer.sh --topic order-events --bootstrap-server localhost:9092

# 命令行消费消息(从头开始消费)
kafka-console-consumer.sh \
  --topic order-events \
  --from-beginning \
  --bootstrap-server localhost:9092

二、Java 原生 API 快速上手

2.1 Maven 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>
</dependency>

2.2 Producer:发送消息

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaProducerDemo {

    public static void main(String[] args) throws Exception {
        // 1. 配置 Producer
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // key 和 value 的序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 可靠性配置:所有 ISR 副本确认才算成功
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // 失败重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 批量发送的最大字节数(默认 16KB)
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 批量等待时间(毫秒),等待更多消息合并发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);

        // 2. 创建 Producer
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {

            // ===== 发送方式一:fire-and-forget(发完不管) =====
            ProducerRecord<String, String> record1 =
                new ProducerRecord<>("order-events", "order-001", "订单创建:用户A购买了商品X");
            producer.send(record1);  // 异步发送,不关心结果

            // ===== 发送方式二:同步发送(等待响应) =====
            ProducerRecord<String, String> record2 =
                new ProducerRecord<>("order-events", "order-002", "订单支付:用户B支付了100元");
            Future<RecordMetadata> future = producer.send(record2);
            RecordMetadata metadata = future.get();  // 阻塞等待
            System.out.printf("消息已发送 → Topic: %s, Partition: %d, Offset: %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());

            // ===== 发送方式三:异步回调(推荐) =====
            for (int i = 0; i < 10; i++) {
                String key = "order-" + String.format("%03d", i);
                String value = "订单事件-" + i;
                ProducerRecord<String, String> record3 =
                    new ProducerRecord<>("order-events", key, value);

                producer.send(record3, (metadata2, exception) -> {
                    if (exception != null) {
                        // 发送失败
                        System.err.println("消息发送失败: " + exception.getMessage());
                    } else {
                        // 发送成功
                        System.out.printf("✅ 发送成功 → Partition: %d, Offset: %d, Key: %s%n",
                            metadata2.partition(), metadata2.offset(), record3.key());
                    }
                });
            }

            // 确保所有消息都发送完毕
            producer.flush();
        }
    }
}

Producer 消息发送路径

sequenceDiagram
    participant App as 应用代码
    participant Interceptor as 拦截器
    participant Serializer as 序列化器
    participant Partitioner as 分区器
    participant Accumulator as RecordAccumulator
    participant Sender as Sender线程
    participant Broker as Kafka Broker

    App->>Interceptor: send(record)
    Interceptor->>Serializer: 序列化 key/value
    Serializer->>Partitioner: 计算目标 Partition
    Partitioner->>Accumulator: 追加到对应 Partition 的 Batch
    Note over Accumulator: 等待 batch.size 满 或 linger.ms 到期
    Accumulator->>Sender: 批量提交
    Sender->>Broker: 网络发送
    Broker-->>Sender: ACK 确认
    Sender-->>App: 回调 Callback

2.3 Consumer:消费消息

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {

    public static void main(String[] args) {
        // 1. 配置 Consumer
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 消费者组ID(相同 group.id 的 Consumer 共同消费同一 Topic)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
        // key 和 value 的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 自动提交 offset(不推荐生产使用,会有重复消费风险)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 当没有初始 offset 时,从最早的消息开始读
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 单次 poll 最多返回 100 条消息
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

        // 2. 创建 Consumer 并订阅 Topic
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("order-events"));

            System.out.println("开始消费消息...");
            while (true) {
                // 3. 拉取消息(超时时间 1 秒)
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                if (records.isEmpty()) {
                    continue;
                }

                // 4. 处理消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf(
                        "📨 收到消息 → Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value()
                    );

                    // 业务处理...
                    processOrder(record.value());
                }

                // 5. 手动提交 offset(处理完成后再提交,避免消息丢失)
                consumer.commitSync();
                // 或者异步提交(性能更好,但提交失败不抛异常)
                // consumer.commitAsync();
            }
        }
    }

    private static void processOrder(String orderData) {
        // 模拟业务处理
        System.out.println("处理订单: " + orderData);
    }
}

⚠️ 手动提交 vs 自动提交

  • 自动提交enable.auto.commit=true):可能丢消息(消息还没处理完就提交了)
  • 手动提交(推荐生产使用):处理完业务再提交,保证 at-least-once

2.4 指定 Partition 消费

// 场景:只消费某个分区,用于数据修复或重新处理
TopicPartition partition0 = new TopicPartition("order-events", 0);
consumer.assign(Collections.singletonList(partition0));

// 从指定 offset 开始消费
consumer.seek(partition0, 100L); // 从 offset=100 开始消费

// 从最早消费
consumer.seekToBeginning(Collections.singletonList(partition0));

// 从最新消费
consumer.seekToEnd(Collections.singletonList(partition0));

三、Spring Boot 集成 Kafka

3.1 Maven 依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3.2 application.yml 配置

spring:
  kafka:
    bootstrap-servers: localhost:9092

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
        linger.ms: 5
        batch.size: 16384

    consumer:
      group-id: order-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.example.demo.dto"
        max.poll.records: 100

    listener:
      ack-mode: MANUAL_IMMEDIATE  # 手动提交 offset

3.3 消息 DTO

package com.example.demo.dto;

import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;

@Data
public class OrderEvent {
    private String orderId;
    private String userId;
    private BigDecimal amount;
    private String status;
    private LocalDateTime createTime;
}

3.4 Producer:使用 KafkaTemplate

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.math.BigDecimal;
import java.time.LocalDateTime;

@Service
public class OrderEventProducer {

    private static final String TOPIC = "order-events";

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public OrderEventProducer(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 发送订单事件(异步 + 回调)
     */
    public void sendOrderEvent(OrderEvent event) {
        kafkaTemplate.send(TOPIC, event.getOrderId(), event)
            .addCallback(new ListenableFutureCallback<SendResult<String, OrderEvent>>() {
                @Override
                public void onSuccess(SendResult<String, OrderEvent> result) {
                    System.out.printf("✅ 订单事件发送成功, orderId=%s, partition=%d, offset=%d%n",
                        event.getOrderId(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }

                @Override
                public void onFailure(Throwable ex) {
                    System.err.println("❌ 订单事件发送失败, orderId=" + event.getOrderId() + ", error=" + ex.getMessage());
                    // 实际生产中:记录失败日志,存入数据库重试,或对接告警系统
                }
            });
    }
}

3.5 Consumer:使用 @KafkaListener

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class OrderEventConsumer {

    /**
     * 基础用法:监听单个 Topic
     */
    @KafkaListener(
        topics = "order-events",
        groupId = "order-consumer-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
        ConsumerRecord<String, OrderEvent> record,
        Acknowledgment ack  // 手动 ack
    ) {
        try {
            OrderEvent event = record.value();
            System.out.printf("📨 收到订单事件: orderId=%s, partition=%d, offset=%d%n",
                event.getOrderId(), record.partition(), record.offset());

            // 业务处理
            processOrder(event);

            // 处理成功,手动提交 offset
            ack.acknowledge();

        } catch (Exception e) {
            System.err.println("消费失败: " + e.getMessage());
            // 不 ack → 消息会在下次 poll 时重新投递(at-least-once)
            // 超过重试次数后进入死信队列(需配置)
        }
    }

    /**
     * 批量消费(提升吞吐量)
     */
    @KafkaListener(
        topics = "order-events-batch",
        groupId = "batch-consumer-group",
        containerFactory = "batchKafkaListenerContainerFactory"  // 需要配置批量 containerFactory
    )
    public void handleBatchOrderEvents(
        List<ConsumerRecord<String, OrderEvent>> records,
        Acknowledgment ack
    ) {
        System.out.println("批量消费,本次消息数: " + records.size());
        records.forEach(record -> processOrder(record.value()));
        ack.acknowledge();
    }

    private void processOrder(OrderEvent event) {
        // 实际业务逻辑:扣库存、发通知等
        System.out.println("处理订单: " + event.getOrderId());
    }
}

3.6 KafkaTemplate 配置类

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    /**
     * 手动提交 offset 的 ContainerFactory(单条消费)
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
        ConsumerFactory<String, OrderEvent> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        // 设置并发数(= Partition 数)
        factory.setConcurrency(3);
        // 手动提交模式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    /**
     * 批量消费的 ContainerFactory
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> batchKafkaListenerContainerFactory(
        ConsumerFactory<String, OrderEvent> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);  // 开启批量消费
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

四、完整示例:订单事件异步解耦

sequenceDiagram
    participant 用户
    participant OrderService as 订单服务
    participant Kafka
    participant InventoryService as 库存服务
    participant SMSService as 短信服务

    用户->>OrderService: POST /order/create
    OrderService->>OrderService: 写入订单DB
    OrderService->>Kafka: 发布 OrderCreatedEvent
    OrderService-->>用户: 返回 orderId(快速响应)

    Note over Kafka: Kafka 持久化消息
    Kafka-->>InventoryService: 消费事件(异步扣库存)
    Kafka-->>SMSService: 消费事件(异步发短信)

Controller 层

@RestController
@RequestMapping("/order")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("/create")
    public ResponseEntity<String> createOrder(@RequestBody CreateOrderRequest request) {
        String orderId = orderService.createOrder(request);
        return ResponseEntity.ok(orderId);
    }
}

Service 层

@Service
@Transactional
public class OrderService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private OrderEventProducer producer;

    public String createOrder(CreateOrderRequest request) {
        // 1. 写入订单数据库
        Order order = new Order(request);
        orderRepository.save(order);

        // 2. 发布订单创建事件到 Kafka
        OrderEvent event = new OrderEvent();
        event.setOrderId(order.getId());
        event.setUserId(request.getUserId());
        event.setAmount(request.getAmount());
        event.setStatus("CREATED");
        event.setCreateTime(LocalDateTime.now());

        producer.sendOrderEvent(event);

        // 3. 快速返回,不等待下游处理
        return order.getId();
    }
}

五、面试高频追问

Q1:KafkaTemplate.send() 是同步还是异步的?

KafkaTemplate.send() 返回一个 ListenableFuture本身是异步的
如果要同步等待结果,调用 .get() 方法。

// 同步发送(会阻塞当前线程)
SendResult<String, OrderEvent> result = kafkaTemplate.send(topic, key, value).get();

// 也可以设置超时
kafkaTemplate.send(topic, key, value).get(5, TimeUnit.SECONDS);

⚠️ 生产环境一般不推荐同步发送,会严重降低吞吐量。

Q2:@KafkaListener 的 concurrency 怎么设置?

@KafkaListener(
    topics = "order-events",
    groupId = "order-consumer-group",
    concurrency = "3"  // 启动 3 个线程,对应 3 个 Partition
)
  • concurrency ≤ Partition 数:超出的线程会闲置
  • concurrency = Partition 数:每个 Partition 一个线程,最高并行度
  • concurrency > Partition 数:多余线程不消费任何 Partition

Q3:消费者线程是否安全?

@KafkaListener 默认每个分区独立一个线程,线程之间不共享状态,是线程安全的。但如果你在 Listener 方法中调用的业务组件(如 Service、Repository)使用了共享变量,需要自行保证线程安全。


📚 下一篇

Kafka入门-03-核心配置参数详解


本文是 Kafka 系统学习路线 的第 2 篇

💬 文章评论 (0)

支持 Markdown,请友善交流
正在加载评论...