springboot使用@KafkaListener监听多个kafka配置实现
作者:道不平
当服务中需要监听多个kafka时, 需要配置多个kafka,本文主要介绍了springboot使用@KafkaListener监听多个kafka配置实现,具有一定的参考价值,感兴趣的可以了解一下
背景
使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用
方案
可以手动读取不同kafka配置信息, 创建不同的Kafka 监听容器工厂, 使用@KafkaListener时指定相应的容器工厂, 代码如下:
1. 导入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2. yml配置
kafka: # 默认消费者配置 default-consumer: # 自动提交已消费offset enable-auto-commit: true # 自动提交间隔时间 auto-commit-interval: 1000 # 消费的超时时间 poll-timeout: 1500 # 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量 auto.offset.reset: latest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) session.timeout.ms: 120000 # 消费请求超时时间 request.timeout.ms: 180000 # 1号kafka配置 test1: bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx consumer: group-id: xxx sasl.mechanism: xxxx security.protocol: xxxx sasl.jaas.config: xxxx # 2号kafka配置 test2: bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx consumer: group-id: xxx sasl.mechanism: xxxx security.protocol: xxxx sasl.jaas.config: xxxx
3. 容器工厂配置
package com.zhdx.modules.backstage.config; import com.google.common.collect.Maps; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.Map; /** * kafka监听容器工厂配置 * <p> * 拓展其他消费者配置只需配置指定的属性和bean即可 */ @EnableKafka @Configuration @RefreshScope public class KafkaListenerContainerFactoryConfig { /** * test1 kafka配置 */ @Value("${kafka.test1.bootstrap-servers}") private String test1KafkaServerUrls; @Value("${kafka.test1.consumer.group-id}") private String test1GroupId; @Value("${kafka.test1.consumer.sasl.mechanism}") private String test1SaslMechanism; @Value("${kafka.test1.consumer.security.protocol}") private String test1SecurityProtocol; @Value("${kafka.test1.consumer.sasl.jaas.config}") private String test1SaslJaasConfig; /** * test2 kafka配置 */ @Value("${kafka.test2.bootstrap-servers}") private String test2KafkaServerUrls; @Value("${kafka.test2.consumer.group-id}") private String test2GroupId; @Value("${kafka.test2.consumer.sasl.mechanism}") private String test2SaslMechanism; @Value("${kafka.test2.consumer.security.protocol}") private String test2SecurityProtocol; @Value("${kafka.test2.consumer.sasl.jaas.config}") private String test2SaslJaasConfig; /** * 默认消费者配置 */ @Value("${kafka.default-consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.default-consumer.poll-timeout}") private int pollTimeout; @Value("${kafka.default-consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.default-consumer.session.timeout.ms}") private int sessionTimeoutMs; @Value("${kafka.default-consumer.request.timeout.ms}") private int requestTimeoutMs; /** * test1消费者配置 */ public Map<String, Object> test1ConsumerConfigs() { Map<String, Object> props = getDefaultConsumerConfigs(); // broker server地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls); // 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId); // 加密 props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism); props.put("security.protocol", test1SecurityProtocol); // 账号密码 props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig); return props; } /** * test2消费者配置 */ public Map<String, Object> test2ConsumerConfigs() { Map<String, Object> props = getDefaultConsumerConfigs(); // broker server地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls); // 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId); // 加密 props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism); props.put("security.protocol", test2SecurityProtocol); // 账号密码 props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig); return props; } /** * 默认消费者配置 */ private Map<String, Object> getDefaultConsumerConfigs() { Map<String, Object> props = Maps.newHashMap(); // 自动提交(按周期)已消费offset 批量消费下设置false props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); // 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); // 消费请求超时时间 props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); // 序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } /** * 消费者工厂类 */ public ConsumerFactory<String, String> initConsumerFactory(Map<String, Object> consumerConfigs) { return new DefaultKafkaConsumerFactory<>(consumerConfigs); } public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> initKafkaListenerContainerFactory( Map<String, Object> consumerConfigs) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(initConsumerFactory(consumerConfigs)); // 是否开启批量消费 factory.setBatchListener(false); // 消费的超时时间 factory.getContainerProperties().setPollTimeout(pollTimeout); return factory; } /** * 创建test1 Kafka 监听容器工厂。 * * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象 */ @Bean(name = "test1KafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test1KafkaListenerContainerFactory() { Map<String, Object> consumerConfigs = this.test1ConsumerConfigs(); return initKafkaListenerContainerFactory(consumerConfigs); } /** * 创建test2 Kafka 监听容器工厂。 * * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象 */ @Bean(name = "test2KafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test2KafkaListenerContainerFactory() { Map<String, Object> consumerConfigs = this.test2ConsumerConfigs(); return initKafkaListenerContainerFactory(consumerConfigs); } }
4. @KafkaListener使用
package com.zhdx.modules.backstage.kafka; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka监听器 */ @Slf4j @Component public class test1KafkaListener { @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx") public void handleHyPm(ConsumerRecord<String, String> record) { log.info("消费到topic xxx消息:{}", JSON.toJSONString(record.value())); } }
到此这篇关于springboot使用@KafkaListener监听多个kafka配置实现的文章就介绍到这了,更多相关springboot 监听多个kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!