RocketMQMessageListener注解对rocketmq消息的消费实现机制
作者:codecraft
序
本文主要研究一下RocketMQMessageListener的实现机制
示例
@Service @RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer") public class UserConsumer implements RocketMQListener<User> { @Override public void onMessage(User message) { System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getUserAge(), message.getUserName()); } }
实现了RocketMQListener接口的类,再配合@RocketMQMessageListener注解就可以实现对rocketmq消息的消费
RocketMQListener
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQListener.java
public interface RocketMQListener<T> { void onMessage(T message); }
RocketMQListener接口定义了onMessage方法
RocketMQMessageListener
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; /** * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * * * See <a href="http://rocketmq.apache.org/docs/core-concept/" rel="external nofollow" >here</a> for further discussion. */ String consumerGroup(); /** * Topic name. */ String topic(); /** * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */ String selectorExpression() default "*"; /** * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * Max consumer thread number. * This property control consumer thread pool executor maximumPoolSize see * {@link ConsumeMessageService#updateCorePoolSize(int)} * @see <a href="https://github.com/apache/rocketmq-spring/issues/546" rel="external nofollow" >issues#546</a> */ int consumeThreadMax() default 64; /** * consumer thread number. */ int consumeThreadNumber() default 20; /** * Max re-consume times. * * In concurrently mode, -1 means 16; * In orderly mode, -1 means Integer.MAX_VALUE. */ int maxReconsumeTimes() default -1; /** * Maximum amount of time in minutes a message may block the consuming thread. */ long consumeTimeout() default 15L; /** * Timeout for sending reply messages. */ int replyTimeout() default 3000; /** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */ String secretKey() default SECRET_KEY_PLACEHOLDER; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default false; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; /** * The property of "name-server". */ String nameServer() default NAME_SERVER_PLACEHOLDER; /** * The property of "access-channel". */ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; /** * The property of "tlsEnable" default false. */ String tlsEnable() default "false"; /** * The namespace of consumer. */ String namespace() default ""; /** * Message consume retry strategy in concurrently mode. * * -1,no retry,put into DLQ directly * 0,broker control retry frequency * >0,client control retry frequency */ int delayLevelWhenNextConsume() default 0; /** * The interval of suspending the pull in orderly mode, in milliseconds. * * The minimum value is 10 and the maximum is 30000. */ int suspendCurrentQueueTimeMillis() default 1000; /** * Maximum time to await message consuming when shutdown consumer, in milliseconds. * The minimum value is 0 */ int awaitTerminationMillisWhenShutdown() default 1000; /** * The property of "instanceName". */ String instanceName() default "DEFAULT"; }
RocketMQMessageListener注解定义了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeThreadNumber、maxReconsumeTimes、consumeTimeout、replyTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel、tlsEnable、namespace、delayLevelWhenNextConsume、suspendCurrentQueueTimeMillis、awaitTerminationMillisWhenShutdown、instanceName属性
RocketMQMessageListenerBeanPostProcessor
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean { private ApplicationContext applicationContext; private AnnotationEnhancer enhancer; private ListenerContainerConfiguration listenerContainerConfiguration; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopUtils.getTargetClass(bean); RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class); if (ann != null) { RocketMQMessageListener enhance = enhance(targetClass, ann); if (listenerContainerConfiguration != null) { listenerContainerConfiguration.registerContainer(beanName, bean, enhance); } } return bean; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public void afterPropertiesSet() throws Exception { buildEnhancer(); this.listenerContainerConfiguration = this.applicationContext.getBean(ListenerContainerConfiguration.class); } private void buildEnhancer() { if (this.applicationContext != null) { Map<String, AnnotationEnhancer> enhancersMap = this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false); if (enhancersMap.size() > 0) { List<AnnotationEnhancer> enhancers = enhancersMap.values() .stream() .sorted(new OrderComparator()) .collect(Collectors.toList()); this.enhancer = (attrs, element) -> { Map<String, Object> newAttrs = attrs; for (AnnotationEnhancer enh : enhancers) { newAttrs = enh.apply(newAttrs, element); } return attrs; }; } } } private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) { if (this.enhancer == null) { return ann; } else { return AnnotationUtils.synthesizeAnnotation( this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null); } } public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>> { } }
RocketMQMessageListenerBeanPostProcessor实现了ApplicationContextAware、BeanPostProcessor、InitializingBean接口,其中postProcessAfterInitialization方法判断bean有没有RocketMQMessageListener注解,如果有的话则通过enhance进行增强,另外会通过listenerContainerConfiguration的registerContainer进行注册
其afterPropertiesSet方法会执行buildEnhancer方法,该方法会获取AnnotationEnhancer的bean实例,然后排序好,最后构造AnnotationEnhancer,其作用就是把这些enhancers挨个apply上去
registerContainer
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName()); } if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName()); } String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); }
ListenerContainerConfiguration的registerContainer方法会根据注解信息及对应的bean构造DefaultRocketMQListenerContainer并注册到genericApplicationContext,同时执行其start方法
DefaultRocketMQListenerContainer
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware { private DefaultMQPushConsumer consumer; private RocketMQListener rocketMQListener; @Override public void start() { if (this.isRunning()) { throw new IllegalStateException("container already running. " + this.toString()); } try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); } @Override public void afterPropertiesSet() throws Exception { initRocketMQPushConsumer(); this.messageType = getMessageType(); this.methodParameter = getMethodParameter(); log.debug("RocketMQ messageType: {}", messageType); } private void initRocketMQPushConsumer() throws MQClientException { if (rocketMQListener == null && rocketMQReplyListener == null) { throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required"); } Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } consumer.setNamespace(namespace); String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); consumer.setConsumeThreadMin(consumeThreadNumber); consumer.setConsumeTimeout(consumeTimeout); consumer.setMaxReconsumeTimes(maxReconsumeTimes); consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown); consumer.setInstanceName(instanceName); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } //if String is not is equal "true" TLS mode will represent the as default value false consumer.setUseTLS(new Boolean(tlsEnable)); if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); } } //...... }
DefaultRocketMQListenerContainer的start方法(SmartLifecycle)执行consumer的start方法;其afterPropertiesSet方法(InitializingBean)会执行initRocketMQPushConsumer方法来创建consumer
initRocketMQPushConsumer方法主要是创建DefaultMQPushConsumer,设置messageModel,根据selectorType执行subscribe方法,根据consumeMode来设置messageListener(DefaultMessageListenerOrderly或者DefaultMessageListenerConcurrently),最后针对RocketMQPushConsumerLifecycleListener执行prepareStart方法
DefaultMessageListenerOrderly
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @SuppressWarnings("unchecked") @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }
DefaultMessageListenerOrderly实现了MessageListenerOrderly的consumeMessage接口,它内部遍历msgs,挨个执行handleMessage,有异常的话则设置context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
DefaultMessageListenerConcurrently
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
DefaultMessageListenerConcurrently实现了MessageListenerConcurrently接口的consumeMessage方法,它内部遍历msgs,挨个执行handleMessage,有异常的话设置context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume),然后返回ConsumeConcurrentlyStatus.RECONSUME_LATER
handleMessage
private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { if (rocketMQListener != null) { rocketMQListener.onMessage(doConvertMessage(messageExt)); } else if (rocketMQReplyListener != null) { Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt)); Message<?> message = MessageBuilder.withPayload(replyContent).build(); org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message)); DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer(); producer.setSendMsgTimeout(replyTimeout); producer.send(replyMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { if (sendResult.getSendStatus() != SendStatus.SEND_OK) { log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus()); } else { log.debug("Consumer replies message success."); } } @Override public void onException(Throwable e) { log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage()); } }); } }
handleMessage方法则是委托给了rocketMQListener.onMessage(doConvertMessage(messageExt)),即回调业务自定义的RocketMQListener
start
org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
/** * This method gets internal infrastructure readily to serve. Instances must call this method after configuration. * * @throws MQClientException if there is any client error. */ @Override public void start() throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
DefaultMQPushConsumer的start方法先执行setConsumerGroup,然后委托给了defaultMQPushConsumerImpl.start(),如果有traceDispatcher的话,则执行traceDispatcher.start方法
defaultMQPushConsumerImpl.start()方法会触发MQClientInstance.start(),后者会触发pullMessageService.start()以及rebalanceService.start()(会在doRebalance的时候触发defaultMQPushConsumerImpl.executePullRequestImmediately,往pullRequestQueue放pullRequest),前者会从pullRequestQueue获取pullRequest然后执行DefaultMQPushConsumerImpl.pullMessage方法,里头是执行pullAPIWrapper.pullKernelImpl,然后通过pullCallback往processQueue.putMessage,再触发consumeMessageService.submitConsumeRequest,它会回调listener.consumeMessage来消费消息
小结
RocketMQMessageListenerBeanPostProcessor实现了ApplicationContextAware、BeanPostProcessor、InitializingBean接口,其中postProcessAfterInitialization方法判断bean有没有RocketMQMessageListener注解,如果有的话则通过enhance进行增强,另外会通过listenerContainerConfiguration的registerContainer进行注册
ListenerContainerConfiguration的registerContainer方法会根据注解信息及对应的bean构造DefaultRocketMQListenerContainer并注册到genericApplicationContext,同时执行其start方法
DefaultRocketMQListenerContainer的start方法(SmartLifecycle)执行consumer的start方法;其afterPropertiesSet方法(InitializingBean)会执行initRocketMQPushConsumer方法来创建consumer
start方法主要是执行pullMessageService.start()以及rebalanceService.start(),前者负责从pullRequestQueue获取pullRequest然后拉取消息放到processQueue,然后触发回调listener.consumeMessage来消费消息;后者负责rebalance,一开始会触发defaultMQPushConsumerImpl.executePullRequestImmediately,即往pullRequestQueue放pullRequest
pushConsumer本质上还是基于pull的模式来的,从RocketMQMessageListenerBeanPostProcessor --> DefaultRocketMQListenerContainer.start --> DefaultMQPushConsumer.start --> defaultMQPushConsumerImpl.start() --> pullMessageService.start()以及rebalanceService.start()
以上就是RocketMQMessageListener注解对rocketmq消息的消费实现机制的详细内容,更多关于RocketMQMessageListener rocketmq消费的资料请关注脚本之家其它相关文章!