Springboot使用kafka的两种方式
作者:香菜菜
1、创建实验项目
第一步创建一个Springboot项目,引入spring-kafka依赖,这是后面的基础。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
kafka配置
spring: kafka: bootstrap-servers: kafka.tyjt.com:9092 consumer: auto-offset-reset: earliest group-id: sharingan-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
2、自动档
为了方便使用kafka,Springboot提供了spring-kafka 这个包,在已开始我们已经导入了,下面直接使用吧
Spring项目里引入Kafka非常方便,使用kafkaTemplate(Producer的模版)+@KafkaListener(Consumer的监听器)即可完成生产者-消费者的代码开发
2.1 监听listener
为了使创建 kafka 监听器更加简单,Spring For Kafka 提供了 @KafkaListener 注解,
@KafkaListener 注解配置方法上,凡是此注解的方法就会被标记为是 Kafka 消息监听器,所以可以用
@KafkaListener 注解快速创建消息监听器。
@Configuration @EnableKafka public class ConsumerConfigDemo { @KafkaListener(topics = {"test"},groupId = "group1") public void kafkaListener(String topic,String message){ System.out.println("消息:"+message); } }
2.2 发布消息
发布消息通过kafkaTemplate,kafkaTemplate是spring-kafka 的封装
@Slf4j @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String key, String message) throws Exception { kafkaTemplate.send(topic,key,message); } }
kafkaTemplate 有很多不同的发送方法,根据自己的需求使用,这里只记录最简单的状况。
3、手动档
3.1 手动创建consumer
关于consumer的主要的封装在ConcurrentKafkaListenerContainerFactory这个里头,
本身的KafkaConsumer是线程不安全的,无法并发操作,这里spring又在包装了一层,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
package com.tyjt.sharingan.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 启动kafka consumer * * @author 种鑫 * @date 2023/10/18 17:26 */ @EnableKafka @Component @Slf4j public class KafkaConsumerMgr { @Resource ConcurrentKafkaListenerContainerFactory<String, byte[]> containerFactory; Map<String, ConcurrentMessageListenerContainer<?, ?>> containerMap = new ConcurrentHashMap<>(); public void startListener(KafkaProtoConsumer kafkaConsumer) { // 停止相同的 if (containerMap.containsKey(kafkaConsumer.getTopic())) { containerMap.get(kafkaConsumer.getTopic()).stop(); } ConcurrentMessageListenerContainer<String, byte[]> container = createListenerContainer(kafkaConsumer); container.start(); containerMap.put(kafkaConsumer.getTopic(), container); } private ConcurrentMessageListenerContainer<String, byte[]> createListenerContainer(KafkaProtoConsumer consumer) { ConcurrentMessageListenerContainer<String, byte[]> container = containerFactory.createContainer(consumer.topic()); container.setBeanName(consumer.group() + "-" + consumer.topic()); container.setConcurrency(consumer.getPartitionCount()); consumer.deployContainer(container); // 防止被修改的配置 ContainerProperties containerProperties = container.getContainerProperties(); containerProperties.setMessageListener(new Listener<>(consumer)); containerProperties.setAsyncAcks(false); containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); containerProperties.setGroupId(consumer.group()); return container; } /** * 定义监听 */ private static class Listener<T> implements AcknowledgingConsumerAwareMessageListener<String, T> { private final KafkaConsumer<T> kafkaConsumer; public Listener(KafkaConsumer<T> consumer) { this.kafkaConsumer = consumer; } @Override public void onMessage(ConsumerRecord<String, T> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { log.info("group【{}】接收到来自topic【{}】的消息", kafkaConsumer.group(), data.topic()); // 处理数据 kafkaConsumer.process(data.value()); // 提交offset log.info("group【{}】提交topic【{}】的offset", kafkaConsumer.group(), data.topic()); consumer.commitSync(); } } }
这个可以根据需要动态的启动消费者
3.2 手动创建KafkaProducer
@Bean public KafkaProducer<String, byte[]> kafkaProducer() { Properties props = new Properties(); // 这里可以配置几台broker即可,他会自动从broker去拉取元数据进行缓存 props.put("bootstrap.servers", bootstrapServers); // 这个就是负责把发送的key从字符串序列化为字节数组 props.put("key.serializer", keySerializer); // 这个就是负责把你发送的实际的message从字符串序列化为字节数组 props.put("value.serializer", valueSerializer); // 默认是32兆=33554432 props.put("buffer.memory", bufferMemory); // 一般来说是要自己手动设置的,不是纯粹依靠默认值的,16kb props.put("batch.size", batchSize); // 发送一条消息出去,100ms内还没有凑成一个batch发送,必须立即发送出去 props.put("linger.ms", lingerMs); // 这个是说你可以发送的最大的请求的大小 默认是1m=1048576 // props.put("max.request.size", 10485760); // follower有没有同步成功你就不管了 props.put("acks", acks); // 这个重试,一般来说,给个3次~5次就足够了,可以cover住一般的异常场景 props.put("retries", retries); // 每次重试间隔100ms props.put("retry.backoff.ms", retryBackOffMs); props.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection); return new KafkaProducer<>(props); }
4、总结
4.1 区别
KafkaProducer是Kafka-client提供的原生Java Kafka客户端发送消息的API。
KafkaTemplate是Spring Kafka中提供的一个高级工具类,用于可以方便地发送消息到Kafka。它封装了KafkaProducer,提供了更多的便利方法和更高级的消息发送方式。
org.apache.kafka.clients.producer.KafkaProducer
org.springframework.kafka.core.KafkaTemplate
4.2 场景选择
在spring应用中如果需要订阅kafka消息,通常情况下我们不会直接使用kafka-client, 而是使用更方便的一层封装spring-kafka。
不需要动态的选择时候可以使用Spring-kafka,在需要动态创建时可以使用kafka-client的api进行处理
4.3 ConsumerRecord和ProducerRecord
两者都是kafka-client的类,在Spring-kafka中依然可以使用,可以发送和接受
以上就是Springboot使用kafka的两种方式的详细内容,更多关于Springboot使用kafka的资料请关注脚本之家其它相关文章!