自定义JmsListenerContainerFactory时,containerFactory字段解读
作者:土狗头子
自定义JmsListenerContainerFactory时,containerFactory字段
最近项目中利用ActiveMQ作为消息中间件,JMS进行消息传递。在对@JmsListener注解研究时对“containerFactory”字段的填值产生了一些疑问。分享一下我的心得体会。
疑问
@Component
public class TestMessageListener implements MessageListener {
@Override
@JmsListener(destination = "myQueue", containerFactory = "jmsListenerContainerFactory")
public void onMessage(Message message) {
...
业务代码
...
}
}@Configuration
@EnableJms
public class ActiveMQConfig {
/**
* 发布-订阅模式的ListenerContainer
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory factory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
configurer.configure(jmsListenerContainerFactory, factory);
jmsListenerContainerFactory.setConnectionFactory(factory);
...
return jmsListenerContainerFactory;
}
/**
* P2P模式的ListenerContainer
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory factory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
configurer.configure(jmsListenerContainerFactory, factory);
jmsListenerContainerFactory.setConnectionFactory(factory);
...
return jmsListenerContainerFactory;
}
}我查找项目中,containerFactory的对应字段“jmsListenerContainerFactory”,并不存在被该字段修饰的@Bean注解,而最终项目运行起来后,却正确使用了jmsListenerContainerQueue()定义的配置属性。
按照@Bean注解的定义,在@Bean不自定义的情况下,以方法名作为标识,而若想要jmsListenerContainerQueue()被调用,containerFactory的对应字段应填写“jmsListenerContainerQueue”而不是“jmsListenerContainerFactory”。
对此我深感疑惑,由于对springboot项目也是初上手,对很多加载机制非常陌生,在翻阅了一些资料与查看了相关源码后,终于理解了其中的奥妙。
问题解析
翻找了一圈源码后,终于发现了被“jmsListenerContainerFactory”修饰的@Bean注解
@Bean
@ConditionalOnMissingBean(
name = {"jmsListenerContainerFactory"}
)
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}这是在JMS源码的JmsAnnotationDrivenConfiguration类中的一个方法,用于生成默认的JMS监听工厂。那么为何调用默认的工厂生成方法,最后返回的却是自定义的工厂?
其中最关键的技术实现基础,就是springboot的scope属性即默认的singleton模式以及@Bean注解的方法参数依赖。
首先,jmsListenerContainerFactory()被@Bean注解,该方法又需要参数configurer,而该configurer由于@Bean注解的方法参数依赖,依赖调用了以下方法生成
@Bean
@ConditionalOnMissingBean
public DefaultJmsListenerContainerFactoryConfigurer jmsListenerContainerFactoryConfigurer() {
DefaultJmsListenerContainerFactoryConfigurer configurer = new DefaultJmsListenerContainerFactoryConfigurer();
configurer.setDestinationResolver((DestinationResolver)this.destinationResolver.getIfUnique());
configurer.setTransactionManager((JtaTransactionManager)this.transactionManager.getIfUnique());
configurer.setMessageConverter((MessageConverter)this.messageConverter.getIfUnique());
configurer.setJmsProperties(this.properties);
return configurer;
}该方法同样被@Bean修饰,最终返回一个DefaultJmsListenerContainerFactoryConfigurer。
第一个关键点来了,项目中自定义的jmsListenerContainerQueue()中的configurer参数,同样由于@Bean的参数依赖,由jmsListenerContainerFactoryConfigurer()生成。因此,无论containerFactory字段填“jmsListenerContainerFactory”或“jmsListenerContainerQueue”,最终获得的DefaultJmsListenerContainerFactoryConfigurer是由同一个方法生成的。
但是问题又来了,jmsListenerContainerFactoryConfigurer()返回的DefaultJmsListenerContainerFactoryConfigurer对象,是每次新new出来的,为何两处拿到的对象是同一个呢。
那么引出第二个关键点,springboot中Bean加载的singleton机制,由于该机制,导致被@Bean所注解的方法,在项目启动时执行,并将该返回值放入BeanFactory中保存。后续若该方法被调用,不像普通java代码一样,重新执行一遍方法获取返回值,而是直接从BeanFactory中获取。
由此,无论containerFactory使用哪个字段,最终获得的Factory配置,均是同一个。
心得
这一次的疑问,使我对springboot的加载机制及JMS消息传递都有了深入的理解,还是那句话,实践出真知。
springboot自定义JMSListener.destination
情景
项目在组内开发人员电脑上经常跑本地,activemq的队列名写在配置文件上。由于代码分支不一样,导致消息经常被不正常得消费掉。想要改进这个问题,最简单的是将注解@JMSListener 改为动态加载监听BEAN,但是大家不想为了这个事改变开发习惯,所以定为动态修改入队和监听的地址。
开始工作
从JMSListener注解入手,注释中提到了JmsListenerContainerFactory和DestinationResolver。从JmsListenerContainerFactory开始查找,项目中用到的默认factory-DefaultJmsListenerContainerFactory,生产泛型为DefaultMessageListenerContainer的container,找到了最终监听内部类scheduledInvokers,但是没方法修改该类的内容。
换个路子看DestinationResolver。找到了DynamicDestinationResolver,看到了希望,查找包内引用查到了JmsDestinationAccessor,方法是new DynamicDestinationResolver(),开始想研究这个resolver并想办法替换。找了一路十八开,原来这玩意是个池子,把初始化好的放里边,你改变这里的东西其实没啥大用处,队列已经建好了,尝试stop旧的container然后new一个新的,证明也是徒劳。期间也尝试了EmbeddedValueResolver,发现更坑,直接就是spring底层的BeanFactory。
把org.springframework.jms.listener下面的包翻了一遍没结果,就要放弃的时候,发现annotation包里有个JmsListenerAnnotationBeanPostProcessor一下豁然开朗。我不能初始化之后改,为啥不在初始化之前改呢……继续折腾。
赫然看见救命注释@see JmsListenerConfigurer。后面百度重启这些小白操作被大风吹跑了。
总之最后结果就是,继承JmsListenerEndpointRegistry,重写registerListenerContainer方法,将endpoint的destincation加上自己的suffix,然后继续走父类逻辑,然后通过JmsListenerConfigurer替换JmsListenerEndpointRegistrar中的Registry。
最后,这东西就是为了本地启动有的,就不要影响线上了,排除prd环境。
代码贴出来
@Slf4j
@Component
@Profile("!prd")
public class MyJmsListenerConfigurer implements JmsListenerConfigurer, ApplicationContextAware {
private ConfigurableApplicationContext context;
@Autowired
private MyJmsListenerEndpointRegistry myJmsListenerEndpointRegistry;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (applicationContext instanceof ConfigurableApplicationContext) {
this.context = (ConfigurableApplicationContext) applicationContext;
}
}
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
try {
InetAddress address = InetAddress.getLocalHost();
String localName = "-" + address.getHostName() + "@" + address.getHostAddress();
myJmsListenerEndpointRegistry.setSuffix(localName);
registrar.setEndpointRegistry(myJmsListenerEndpointRegistry);
} catch (UnknownHostException e) {
log.error("获取本机端口号异常", e);
if (context != null) {
context.close();
}
}
}
}@Component
@Profile("!prd")
public class MyJmsListenerEndpointRegistry extends JmsListenerEndpointRegistry {
private String suffix;
public void setSuffix(String suffix) {
this.suffix = suffix;
}
@Override
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory, boolean startImmediately) {
MethodJmsListenerEndpoint methodEndpoint = (MethodJmsListenerEndpoint) endpoint;
methodEndpoint.setDestination(methodEndpoint.getDestination() + suffix);
super.registerListenerContainer(endpoint, factory, startImmediately);
}
}入队的地址替换就很简单了,注入直接换就行:
@Component
@Order
@Profile(value = "!prd")
public class LocalQueueBeanNameChanger implements CommandLineRunner {
@Autowired
private List<Queue> queueList;
@Override
public void run(String... args) throws Exception {
InetAddress address = InetAddress.getLocalHost();
String localName = "-" + address.getHostName() + "@" + address.getHostAddress();
for (Queue queue : queueList) {
if (queue instanceof ActiveMQQueue) {
ActiveMQQueue activeMQQueue = (ActiveMQQueue) queue;
activeMQQueue.setPhysicalName(activeMQQueue.getPhysicalName() + localName);
}
}
}
}总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
