解决SpringCloudStream整合Kafka,两个通道对应同一个topic报错的情况
作者:加把劲骑士RideOn
文章指出通道需唯一对应topic,否则会报错,因两个通道共用一个topic导致绑定失败,通过修改配置文件,为不同通道分配不同的topic解决
总结
- 一个通道(如:evad_input)只能唯一对应一个topic,否则会报错
- 消费者组则可以被多个通道共同使用

报错日志
2022-05-25 14:46:03.697 ERROR 17108 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
。。。
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer:
。。。
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'Evad.consumer-group-evad.errors.recoverer' defined in null。。。
问题所在
yml配置文件中定义的两个通道:evad_input和devilvan_input,却共用了一个topic:Evad,导致绑定失败。
配置文件
spring:
application:
name: devilvan-kafka
cloud:
stream:
default-binder: kafka
bindings:
evad_input:
destination: Evad
binder: kafka
group: consumer-group-evad
content-type: text/plain
evad_output:
destination: Evad
binder: kafka
content-type: text/plain
devilvan_input:
# 一个通道只能唯一对应一个topic,否则会报binder
destination: Evad
binder: kafka
# 一个消费者组可以被多个通道使用
group: consumer-group-evad
content-type: text/plain
devilvan_output:
destination: Evad
binder: kafka
content-type: text/plain解决方法
新定义一个Topic:Evad05,使devilvan通道对应topic,区别于evad通道对应的topic
修改后
spring:
application:
name: devilvan-kafka
cloud:
stream:
default-binder: kafka
bindings:
evad_input:
destination: Evad
binder: kafka
group: consumer-group-evad
content-type: text/plain
evad_output:
destination: Evad
binder: kafka
content-type: text/plain
devilvan_input:
# 一个通道只能唯一对应一个topic,否则会报binder
destination: Evad05
binder: kafka
# 一个消费者组可以被多个通道使用
group: consumer-group-evad
content-type: text/plain
devilvan_output:
destination: Evad05
binder: kafka
content-type: text/plain代码
1. XXXController(生产消息的控制器)
@PostMapping("sendEvadMessage")
public ResultMessage<String> sendEvadMessage(@RequestBody String message) {
ResultMessage<String> resultMessage = new ResultMessage<>();
sender.sendEvadMessage(message);
resultMessage.setData(message);
return resultMessage.success();
}
@PostMapping("sendDevilvanMessage")
public ResultMessage<String> sendDevilvanMessage(@RequestBody String message) {
ResultMessage<String> resultMessage = new ResultMessage<>();
sender.sendDevilvanMessage(message);
resultMessage.setData(message);
return resultMessage.success();
}
2. 自定义通道
public interface EvadChannel {
String EVAD_INPUT = "evad_input";
String EVAD_OUTPUT = "evad_output";
String DEVILVAN_INPUT = "devilvan_input";
String DEVILVAN_OUTPUT = "devilvan_output";
/**
* 缺省接收消息通道
* @return channel 返回缺省信息接收通道
*/
@Input(EVAD_INPUT)
MessageChannel receiveEvadMessage();
/**
* 缺省发送消息通道
* @return channel 返回缺省信息发送通道
*/
@Output(EVAD_OUTPUT)
MessageChannel sendEvadMessage();
/**
* 缺省接收消息通道
* @return channel 返回缺省信息接收通道
*/
@Input(DEVILVAN_INPUT)
MessageChannel receiveDevilvanMessage();
/**
* 缺省发送消息通道
* @return channel 返回缺省信息发送通道
*/
@Output(DEVILVAN_OUTPUT)
MessageChannel sendDevilvanMessage();
}
3. EvadMessageSender(通过通道发送消息)
@Slf4j
@Component
public class EvadMessageSender {
@Autowired
private EvadChannel channel;
/**
* 消息发送到默认通道:缺省通道对应缺省主题
*
* @param message
*/
public void sendEvadMessage(String message) {
channel.sendEvadMessage().send(MessageBuilder.withPayload(message).build());
}
/**
* 消息发送到默认通道:缺省通道对应缺省主题
*
* @param message
*/
public void sendDevilvanMessage(String message) {
channel.sendDevilvanMessage().send(MessageBuilder.withPayload(message).build());
}
}
4. EvadReceiveListener(订阅/消费者)
@Slf4j
@Configuration
@EnableBinding(value = EvadChannel.class)
public class EvadReceiveListener {
@StreamListener(EvadChannel.EVAD_INPUT)
public void receiveEvadMessage(Message<String> message) {
log.info("{} 订阅消息:通道 = " + EvadChannel.EVAD_INPUT + ",data = {}",
DateUtil.now(), message.getPayload());
}
@StreamListener(EvadChannel.DEVILVAN_INPUT)
public void receiveDevilvanMessage(Message<String> message) {
log.info("{} 订阅消息:通道 = " + EvadChannel.DEVILVAN_INPUT + ",data = {}",
DateUtil.now(), message.getPayload());
}
}
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
