SpringBoot3+Kafka实战指南
作者:neoooo
本文主要介绍了SpringBoot3+Kafka实战指南,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
1. 项目层级
像火锅店的分工:点单员、传菜员、食客清清楚楚。
kafka/ ├── pom.xml # 根 POM(BOM对齐) ├── provider/ # 点单:生产者 │ ├── pom.xml # 子模块 POM │ └── src/main/java/org/example/provider/ │ ├── ProviderApplication.java │ ├── conf/KafkaTopicsConfig.java │ ├── controller/ProviderController.java │ └── service/KafkaProducerService.java │ └── src/main/resources/application.yaml └── consumer/ # 上桌:消费者 ├── pom.xml # 子模块 POM └── src/main/java/org/example/consumer/ ├── ConsumerApplication.java └── listener/KafkaConsumerListener.java └── src/main/resources/application.yaml
2. 根 POM(大厨的调料表)
<modules> <module>provider</module> <module>consumer</module> </modules> <properties> <java.version>17</java.version> <spring.boot.version>3.4.3</spring.boot.version> <spring.cloud.version>2024.0.2</spring.cloud.version> </properties> <!-- 关键:用 BOM 管理依赖版本(不用 parent 也行) --> <dependencyManagement> <dependencies> <!-- Spring Boot 依赖版本对齐(含 starter、lombok 等) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- Spring Cloud 依赖版本对齐 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- hutool工具类 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-ai</artifactId> <version>5.8.38</version> </dependency> </dependencies>
👉 全局版本对齐,避免“锅底和食材不搭”。
3. 子模块 POM
3.1 provider/pom.xml
<parent> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>provider</artifactId> <version>0.0.1-SNAPSHOT</version> <name>provider</name> <description>provider</description> <packaging>jar</packaging> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
3.2 consumer/pom.xml
<parent> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>consumer</name> <description>consumer</description> <packaging>jar</packaging> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.58</version> </dependency> </dependencies>
4. 配置(菜单写清楚)
4.1 Provider(application.yaml - 生产者)
server: port: 1003 # 本模块 HTTP 端口 app: kafka: topic: demo.topic.v1 # 要发送/创建的主题名 auto-create-topic: true # 开启后,会注册 NewTopic bean 从而在启动时创建主题(见 KafkaTopicsConfig) spring: kafka: bootstrap-servers: yiqiquhuxi.cn:9092 # 数据网络IO 序列化方式 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 发送字符串 # 可靠性 acks: all retries: 3
4.2 Consumer(application.yaml - 消费者)
server: port: 1004 # 本模块端口(通常只看日志) app: kafka: topic: demo.topic.v1 # 要订阅的主题名(与 provider 保持一致) spring: kafka: bootstrap-servers: yiqiquhuxi.cn:9092 consumer: group-id: demo-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 接收字符串 # 说明: # - JsonDeserializer 的默认类型键为 spring.json.value.default.type(源码常量 VALUE_DEFAULT_TYPE)。:contentReference[oaicite:7]{index=7} # - @KafkaListener 支持使用 ${...} 占位符读取上述配置。:contentReference[oaicite:8]{index=8}
5. 核心代码(厨师上阵)
5.1 入口
@SpringBootApplication public class ProviderApplication { public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); } } @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
5.2 消息模型
public record MessagePayload(String id, String content, long ts) {}
5.3 Provider(点菜 + 上菜)
@Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafka; @Value("${app.kafka.topic}") private String topic; public void send(String content) { MessagePayload payload = new MessagePayload( UUID.randomUUID().toString(), content, System.currentTimeMillis() ); //序列化 String jsonStr = JSONUtil.toJsonStr(payload); kafka.send(topic, jsonStr); } } @RestController @RequestMapping("/provider") public class ProviderController { @Autowired private KafkaProducerService producer; @GetMapping("/done") public String done() { producer.send("done"); return "done"; } }
5.4 Provider(创建 Topic)
@Configuration public class KafkaTopicsConfig { @Value("${app.kafka.topic}") private String topic; // 只有当 app.kafka.auto-create-topic=true(或缺省并 matchIfMissing=true)才注册 NewTopic @Bean @ConditionalOnProperty(name = "app.kafka.auto-create-topic", havingValue = "true", matchIfMissing = true) public NewTopic demoTopic() { // 分区/副本按你的集群实际调整;单 Broker 可用 (3,1) return new NewTopic(topic, 3, (short) 1); } }
👉 有了它,就不用手动 kafka-topics.sh --create
,Spring Boot 启动时就能帮你“先起锅烧水”。
5.5 Consumer(开吃)
@Slf4j @Component public class KafkaConsumerListener { @KafkaListener( topics = "${app.kafka.topic}", groupId = "${spring.kafka.consumer.group-id}" ) public void onMessage(String msg) { try { // json反序列化成对象 MessagePayload payload = JSON.parseObject(msg, MessagePayload.class); log.info("✅ received: id={}, content={}, ts={}", payload.id(), payload.content(), payload.ts()); } catch (Exception e) { log.error("❌ JSON解析失败,原始消息: {}", msg, e); } } }
6. 运行流程
- 点火:Kafka Broker 先启动
- 开店:先跑 consumer,再跑 provider
- 点单:
GET http://localhost:1003/provider/done
- 吃菜:consumer 日志里出现 🍜 → 成功!
7. 常见坑
- 锅点不着:
bootstrap-servers
不通,先查网络 - 没菜:topic 不存在?开
auto-create-topic
- 吃不到:改
group-id
或加auto-offset-reset=earliest
- 串味了:序列化不匹配 Producer 默认用
StringSerializer
,Consumer 却用JsonDeserializer
,两边火候不对,消息就“夹生”了。- 建议:
- 如果传字符串,就都用
StringSerializer
/StringDeserializer
。 - 如果传对象,就统一用
JsonSerializer
/JsonDeserializer
,并在application.yaml
里显式声明spring.json.value.default.type
。
- 如果传字符串,就都用
- 建议:
8. 总结
Spring Boot + Kafka 的套路:
👉 Provider 点单,Kafka 传菜,Consumer 开吃。
到此这篇关于SpringBoot3+Kafka实战指南的文章就介绍到这了,更多相关SpringBoot3 Kafka实战内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!