RocketMQ的四种常用消息队列及代码演示
作者:小汤汤汤汤
这篇文章主要介绍了RocketMQ的四种常用消息队列及代码演示,普通消息队列是最基本的一种消息队列,可以按照先进先出(FIFO)的顺序存储消息,并且可以被多个消费者同时消费,可以通过在生产者端指定主题名称和标签来创建普通消息队列,需要的朋友可以参考下
消息队列
普通消息队列
普通消息队列是最基本的一种消息队列,可以按照先进先出(FIFO)的顺序存储消息,并且可以被多个消费者同时消费。可以通过在生产者端指定主题名称和标签来创建普通消息队列。
顺序消息队列
顺序消息队列可以保证相同主题和相同消息键的消息按照严格的顺序被消费,例如可以用于订单等需要保证处理顺序的场景。可以通过在创建普通消息队列时指定MessageQueueSelector对象和键来创建顺序消息队列。
延迟消息队列
延迟消息队列是一种可以在指定时间后被消费的消息队列。可以在生产者端指定消息发送的时间戳和延迟时间,RocketMQ会根据这些信息将消息存储到延迟消息队列中,并在指定的时间后投递消息到普通消息队列中。
事务消息队列
事务消息队列是一种可以保证消息投递的事务性消息队列。在生产者端发送事务消息时,会先向RocketMQ发送一条预提交消息,然后在本地事务执行成功后再提交或回滚事务。如果提交事务,则RocketMQ会将消息投递到消费者,否则将不会投递该消息。可以通过在创建事务消息队列时指定本地事务执行器来创建事务消息队列。
除此之外,RocketMQ还支持多主题(Topic)、多消息生产者(Producer)和多消费者组(Consumer Group)的概念,可以为不同的业务场景创建不同的消息队列。
代码演示
普通消息队列
@Service public class MyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendMessage(String message) { rocketMQTemplate.convertAndSend("myTopic", message); } }
顺序消息队列
@Service public class MyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendOrderMessage(String message, int orderId) { rocketMQTemplate.setMessageQueueSelector(new OrderMessageQueueSelector()); rocketMQTemplate.convertAndSend("myTopic", message, orderId); } } class OrderMessageQueueSelector implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object orderId) { int index = (int) orderId % mqs.size(); return mqs.get(index); } }
延迟消息队列
@Service public class MyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendDelayMessage(String message, long delayTime) { rocketMQTemplate.syncSend("myTopic", MessageBuilder.withPayload(message) .build(), 3000, 2, delayTime); } }
事务消息队列
@Service public class MyTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { // 执行本地事务 // 如果本地事务执行成功,则返回RocketMQLocalTransactionState.COMMIT // 如果本地事务执行失败,则返回RocketMQLocalTransactionState.ROLLBACK return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { // 检查本地事务状态 // 如果本地事务执行成功,则返回RocketMQLocalTransactionState.COMMIT // 如果本地事务执行失败,则返回RocketMQLocalTransactionState.ROLLBACK // 如果本地事务状态未知,则返回RocketMQLocalTransactionState.UNKNOWN return RocketMQLocalTransactionState.UNKNOWN; } } @Service public class MyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private MyTransactionListener transactionListener; public void sendTransactionMessage(String message) { rocketMQTemplate.setTransactionListener(transactionListener); rocketMQTemplate.sendMessageInTransaction("myTransactionGroup", "myTopic", MessageBuilder.withpayload(message).build(), null); } }
到此这篇关于RocketMQ的四种常用消息队列及代码演示的文章就介绍到这了,更多相关RocketMQ常用消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!