Springboot项目消费Kafka数据的方法
作者:布朗克168
一、引入依赖
你需要在 pom.xml 中添加 spring-kafka 相关依赖:
<dependencies> <!-- Spring Boot Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Spring Boot Starter for Logging (optional but useful for debugging) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <!-- Spring Boot Starter for Testing --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
二、添加Kafka配置
在 application.yml 或 application.properties 文件中配置 Kafka 连接属性:
application.yml 示例:
spring: kafka: bootstrap-servers: localhost:9092 # Kafka服务器地址 consumer: group-id: my-consumer-group # 消费者组ID auto-offset-reset: earliest # 消费者从头开始读取(如果没有已提交的偏移量) key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串 listener: missing-topics-fatal: false # 如果主题不存在,不抛出致命错误
application.properties 示例:
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.listener.missing-topics-fatal=false spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
以上配置说明Kafka生产的数据是json字符串,那么消费接收的数据默认也是json字符串,如果接收消息想用对象接受,需要自定义序列化器,比如以下配置
spring: kafka: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer # 对 Key 使用 StringSerializer value-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer # 对 Value 使用 ErrorHandlingSerializer properties: spring.json.value.default.type: com.example.Order # 默认的 JSON 反序列化目标类型为 Order
三、创建 Kafka 消费者
创建一个 Kafka 消费者类来处理消息。你可以使用 @KafkaListener 注解来监听 Kafka 中的消息
(一)Kafka生产的消息是JSON 字符串
1、方式一
如果消息是 JSON 字符串,你可以使用 StringDeserializer 获取消息后,再使用 ObjectMapper 将其转换为
Java 对象(如 Order)。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.stereotype.Service; @Service @EnableKafka // 启用 Kafka 消费者 public class KafkaConsumer { private final ObjectMapper objectMapper = new ObjectMapper(); // 监听 Kafka 中的 order-topic 主题 @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(String message) { try { // 将 JSON 字符串反序列化为 Order 对象 Order order = objectMapper.readValue(message, Order.class); System.out.println("Received order: " + order); } catch (Exception e) { e.printStackTrace(); } } }
说明:
@KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics 表示监听的 Kafka 主题,groupId 表示消费者所属的消费者组。
listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中,我们打印出消息内容。
2、方式二:需要直接访问消息元数据
可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据(如
topic、partition、offset)的场景,也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { // 监听 Kafka 中的 order-topic 主题 @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(ConsumerRecord<String, String> record) { // 获取消息的详细信息 String key = record.key(); // 获取消息的 key String value = record.value(); // 获取消息的 value String topic = record.topic(); // 获取消息的 topic int partition = record.partition(); // 获取消息的分区 long offset = record.offset(); // 获取消息的偏移量 long timestamp = record.timestamp(); // 获取消息的时间戳 // 处理消息(这里我们只是打印消息) System.out.println("Consumed record: "); System.out.println("Key: " + key); System.out.println("Value: " + value); System.out.println("Topic: " + topic); System.out.println("Partition: " + partition); System.out.println("Offset: " + offset); System.out.println("Timestamp: " + timestamp); } }
(二)Kafka生产的消息是对象Order
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { // 监听 Kafka 中的 order-topic 主题 @KafkaListener(topics = "order-topic", groupId = "order-consumer-group") public void consumeOrder(ConsumerRecord<String, Order> record) { // 获取消息的详细信息 String key = record.key(); // 获取消息的 key Order value = record.value(); // 获取消息的 value String topic = record.topic(); // 获取消息的 topic int partition = record.partition(); // 获取消息的分区 long offset = record.offset(); // 获取消息的偏移量 long timestamp = record.timestamp(); // 获取消息的时间戳 // 处理消息(这里我们只是打印消息) System.out.println("Consumed record: "); System.out.println("Key: " + key); System.out.println("Value: " + value); System.out.println("Topic: " + topic); System.out.println("Partition: " + partition); System.out.println("Offset: " + offset); System.out.println("Timestamp: " + timestamp); } }
四、创建 启动类
确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } }
五、配置 Kafka 生产者(可选)
(一)消息类型为json串
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; @Service @EnableKafka public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; // 发送的是 String 类型消息 private ObjectMapper objectMapper = new ObjectMapper(); // Jackson ObjectMapper 用于序列化 // 发送订单到 Kafka public void sendOrder(String topic, Order order) { try { // 将 Order 对象转换为 JSON 字符串 String orderJson = objectMapper.writeValueAsString(order); // 发送 JSON 字符串到 Kafka kafkaTemplate.send(topic, orderJson); // 发送字符串消息 System.out.println("Order JSON sent to Kafka: " + orderJson); } catch (Exception e) { e.printStackTrace(); } } }
(二)消息类型为对象Order
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.stereotype.Service; @Service @EnableKafka public class KafkaProducer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; // 发送订单到 Kafka public void sendOrder(String topic, Order order) { kafkaTemplate.send(topic, order); // 发送订单对象,Spring Kafka 会自动将 Order 转换为 JSON } }
六、启动 Kafka 服务
启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
七、测试 Kafka 消费者
你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题,可以使用 KafkaProducer 来发送消息:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @GetMapping("/sendOrder") public String sendOrder() { Order order = new Order(); order.setOrderId(1L); order.setUserId(123L); order.setProduct("Laptop"); order.setQuantity(2); order.setStatus("Created"); kafkaProducer.sendOrder("order-topic", order); return "Order sent!"; } }
当你访问 /sendOrder端点时,KafkaProducer 会将消息发送到 Kafka,KafkaConsumer 会接收到这条消息并打印出来。
九、测试和调试
你可以通过查看 Kafka 消费者日志,确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息,并确保 Kafka 生产者和消费者之间的连接正常。
十、 结语
至此,你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能,例如处理更复杂的消息类型、批量消费等。
到此这篇关于Springboot项目如何消费Kafka数据的文章就介绍到这了,更多相关Springboot消费Kafka数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!