java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot3 Kafka实战

SpringBoot3+Kafka实战指南

作者:neoooo

本文主要介绍了SpringBoot3+Kafka实战指南,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1. 项目层级

像火锅店的分工:点单员、传菜员、食客清清楚楚。

kafka/
├── pom.xml                 # 根 POM(BOM对齐)
├── provider/               # 点单:生产者
│   ├── pom.xml             # 子模块 POM
│   └── src/main/java/org/example/provider/
│       ├── ProviderApplication.java
│       ├── conf/KafkaTopicsConfig.java
│       ├── controller/ProviderController.java
│       └── service/KafkaProducerService.java
│   └── src/main/resources/application.yaml
└── consumer/               # 上桌:消费者
    ├── pom.xml             # 子模块 POM
    └── src/main/java/org/example/consumer/
        ├── ConsumerApplication.java
        └── listener/KafkaConsumerListener.java
    └── src/main/resources/application.yaml

2. 根 POM(大厨的调料表)

<modules>
    <module>provider</module>
    <module>consumer</module>
</modules>


<properties>
    <java.version>17</java.version>
    <spring.boot.version>3.4.3</spring.boot.version>
    <spring.cloud.version>2024.0.2</spring.cloud.version>
</properties>

<!-- 关键:用 BOM 管理依赖版本(不用 parent 也行) -->
<dependencyManagement>
    <dependencies>
        <!-- Spring Boot 依赖版本对齐(含 starter、lombok 等) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

        <!-- Spring Cloud 依赖版本对齐 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>


<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    
 <!-- hutool工具类 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-ai</artifactId>
        <version>5.8.38</version>
    </dependency>

</dependencies>

👉 全局版本对齐,避免“锅底和食材不搭”。

3. 子模块 POM

3.1 provider/pom.xml

<parent>
    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.example</groupId>
<artifactId>provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>provider</name>
<description>provider</description>
<packaging>jar</packaging>

<properties>
    <java.version>17</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

3.2 consumer/pom.xml

<parent>
    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.example</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer</name>
<description>consumer</description>
<packaging>jar</packaging>

<properties>
    <java.version>17</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.58</version>
    </dependency>
</dependencies>

4. 配置(菜单写清楚)

4.1 Provider(application.yaml - 生产者)

server:
  port: 1003  # 本模块 HTTP 端口

app:
  kafka:
    topic: demo.topic.v1         # 要发送/创建的主题名
    auto-create-topic: true      # 开启后,会注册 NewTopic bean 从而在启动时创建主题(见 KafkaTopicsConfig)

spring:
  kafka:
    bootstrap-servers: yiqiquhuxi.cn:9092
    # 数据网络IO 序列化方式
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer  # 发送字符串
      # 可靠性
      acks: all
      retries: 3

4.2 Consumer(application.yaml - 消费者)

server:
  port: 1004  # 本模块端口(通常只看日志)

app:
  kafka:
    topic: demo.topic.v1  # 要订阅的主题名(与 provider 保持一致)


spring:
  kafka:
    bootstrap-servers: yiqiquhuxi.cn:9092
    consumer:
      group-id: demo-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 接收字符串

# 说明:
# - JsonDeserializer 的默认类型键为 spring.json.value.default.type(源码常量 VALUE_DEFAULT_TYPE)。:contentReference[oaicite:7]{index=7}
# - @KafkaListener 支持使用 ${...} 占位符读取上述配置。:contentReference[oaicite:8]{index=8}

5. 核心代码(厨师上阵)

5.1 入口

@SpringBootApplication
public class ProviderApplication {
  public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); }
}
@SpringBootApplication
public class ConsumerApplication {
  public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }
}

5.2 消息模型

public record MessagePayload(String id, String content, long ts) {}

5.3 Provider(点菜 + 上菜)

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafka;

    @Value("${app.kafka.topic}")
    private String topic;


    public void send(String content) {
        MessagePayload payload = new MessagePayload(
                UUID.randomUUID().toString(),
                content,
                System.currentTimeMillis()
        );
        //序列化
        String jsonStr = JSONUtil.toJsonStr(payload);
        kafka.send(topic, jsonStr);
    }
}

@RestController
@RequestMapping("/provider")
public class ProviderController {
  @Autowired private KafkaProducerService producer;
  @GetMapping("/done") public String done() { producer.send("done"); return "done"; }
}

5.4 Provider(创建 Topic)

@Configuration
public class KafkaTopicsConfig {

    @Value("${app.kafka.topic}")
    private String topic;

    // 只有当 app.kafka.auto-create-topic=true(或缺省并 matchIfMissing=true)才注册 NewTopic
    @Bean
    @ConditionalOnProperty(name = "app.kafka.auto-create-topic", havingValue = "true", matchIfMissing = true)
    public NewTopic demoTopic() {
        // 分区/副本按你的集群实际调整;单 Broker 可用 (3,1)
        return new NewTopic(topic, 3, (short) 1);
    }
}

👉 有了它,就不用手动 kafka-topics.sh --create,Spring Boot 启动时就能帮你“先起锅烧水”。

5.5 Consumer(开吃)

@Slf4j
@Component
public class KafkaConsumerListener {


    @KafkaListener(
            topics = "${app.kafka.topic}",
            groupId = "${spring.kafka.consumer.group-id}"
    )
    public void onMessage(String msg) {
        try {
            // json反序列化成对象
            MessagePayload payload = JSON.parseObject(msg, MessagePayload.class);
            log.info("✅ received: id={}, content={}, ts={}",
                    payload.id(), payload.content(), payload.ts());
        } catch (Exception e) {
            log.error("❌ JSON解析失败,原始消息: {}", msg, e);
        }
    }
}

6. 运行流程

  1. 点火:Kafka Broker 先启动
  2. 开店:先跑 consumer,再跑 provider
  3. 点单GET http://localhost:1003/provider/done
  4. 吃菜:consumer 日志里出现 🍜 → 成功!

7. 常见坑

8. 总结

Spring Boot + Kafka 的套路:
👉 Provider 点单,Kafka 传菜,Consumer 开吃。

到此这篇关于SpringBoot3+Kafka实战指南的文章就介绍到这了,更多相关SpringBoot3 Kafka实战内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文