java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Activemq死信队列

Springboot实现Activemq死信队列详解

作者:demon7552003

这篇文章主要介绍了Springboot实现Activemq死信队列详解,Activemq服务端配置重新投递次数超过 MaximumRedeliveries ,则会进入死信队列,默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息,需要的朋友可以参考下

死信队列是什么

当消息不能重投递或者消息过期,会被移到死信队列中,由管理员消费。

可以进行以下操作:

image-20221021142625060

什么情况下消息会重投递

消息重投递的常用场景:

重新投递策略

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

死信队列

重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

Spring整合

Activemq服务端配置

重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

配置个性化死信队列。

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue=">">
                <deadLetterStrategy>
                  <individualDeadLetterStrategy queuePrefix="DLQ."
                  useQueueForQueueMessages="true"
                  processExpired="false"
                  processNonPersistent="false"/>
                </deadLetterStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

可以单独设置重投递策略

    @Bean
    public ActiveMQConnectionFactoryCustomizer myCustomizer2() {
        return new ActiveMQConnectionFactoryCustomizer() {

            /**
             * Customize the {@link ActiveMQConnectionFactory}.
             *
             * @param factory the factory to customize
             */
            @Override
            public void customize(ActiveMQConnectionFactory factory) {

                //设置了队列的 policy。
                ActiveMQQueue q = new ActiveMQQueue("queue.dlq.test");
                RedeliveryPolicy policy = new RedeliveryPolicy();
                policy.setMaximumRedeliveries(3);
                factory.getRedeliveryPolicyMap().put(q,policy);

                System.out.println("Customizer 2");
            }
        };
    }

触发重投递

代码回滚

 @JmsListener(destination = "queue.dlq.test" ,id = "test")
    public void consume(String param, Session session) {

        try {
            System.out.println(session.getAcknowledgeMode());
            System.out.println(param);
            //回滚,则重投递
            session.rollback();
        }catch (Exception ex){

        }
    }

抛出异常自动回滚

由于默认是开启事务的,因此抛出异常,会自动触发回滚。

@JmsListener(destination = "queue.dlq.test2",id="test2")
    public void consume2(String param, Session session) {
        System.out.println(param);
        throw new RuntimeException("xxxx");
    }
//AbstractMessageListenerContainer
protected void doExecuteListener(Session session, Message message) throws JMSException {
		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
			if (logger.isWarnEnabled()) {
				logger.warn("Rejecting received message because of the listener container " +
						"having been stopped in the meantime: " + message);
			}
			rollbackIfNecessary(session);
			throw new MessageRejectedWhileStoppingException();
		}

		try {
			invokeListener(session, message);
		}
     //JMSException,RuntimeException,Error  这3类异常会回滚。
		catch (JMSException | RuntimeException | Error ex) {
      //自动回滚
			rollbackOnExceptionIfNecessary(session, ex);
			throw ex;
		}
     //自动提交
		commitIfNecessary(session, message);
	}

其他的Exception 都会被包装成ListenerExecutionFailedException,它是JMSException的子类,所以所有异常都会导致回滚。

到此这篇关于Springboot实现Activemq死信队列详解的文章就介绍到这了,更多相关Activemq死信队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文