java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Rocketmq消息批量发送&消息批量消费

Rocketmq消息批量发送&消息批量消费方式

作者:拽着尾巴的鱼儿

文章介绍了RocketMQ中消息的批量发送和消费机制,包括批量发送的优点、限制及注意事项,以及消息消费模式的推、拉两种方式,重点阐述了DefaultLitePullConsumer和DefaultMQPushConsumer的使用方法和注意事项,并提供了代码示例

前言:批量发送和消费消息在一定程度上可以提高吞吐量,减少带宽,那么Rocketmq 中的消息怎么进行批量的发送和批量的消费呢;

1 消息的批量发送

1.1 批量发送的优点以及实现

批量发送消息可以提高 RocketMQ 的生产者性能和吞吐量。由于批量发送消息可以减少网络 I/O 操作和降低消息发送延迟,因此它在以下情况下特别有用:

但是,批量发送消息也存在一些注意事项,需要注意以下几点:

批量发送消息是一种提高 RocketMQ 生产者性能和吞吐量的好方法,但需要注意消息列表大小和错误处理机制,以确保生产者的可靠性和稳定性。

public class SimpleBatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();

        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

        producer.send(messages);
    }
}

1.2 批量发送消息为什么要限制maxMessageSize 

消息列表的大小不能超过生产者设置的 maxMessageSize 参数,主要是为了避免消息发送延迟和消息过大导致 broker 出现性能问题。如果尝试发送大于 maxMessageSize 的消息,RocketMQ 会抛出 MessageTooLargeException 异常,并且消息不会被发送到 broker。

如果开发者在开发时遇到了消息列表大小超过 maxMessageSize 的情况,可以考虑以下几种处理方式:

开发者在开发时需要注意消息列表的大小限制,避免出现超出限制的情况。

2 消息的批量消费

2.1 批量消费的优点

批量消费消息可以提高 RocketMQ 的消费者性能和吞吐量,因为批量消费消息可以减少网络 I/O 操作和降低消息消费延迟。批量消费消息在以下情况下特别有用:

批量消费消息是一种提高 RocketMQ 消费者性能和吞吐量的好方法,但需要注意消息列表大小、消息顺序和事务性质等问题,以确保消费者的可靠性和稳定性。

2.2 推、拉和长轮询

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull

2.3 对pull(拉模式)的批量消费

DefaultLitePullConsumer是RocketMQ中的拉模式消息消费者,其工作流程如下:

在使用DefaultLitePullConsumer时,需要注意控制拉取消息的速率(比如使用定时任务调用poll()方法)及消息处理的并发能力。根据实际业务去实现适当的处理策略,保证在消费者速率和处理能力之间达到一个平衡。

demo:

 public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.setNamesrvAddr("localhost:9876");
        litePullConsumer.subscribe("test_topic", "*");
        litePullConsumer.setPullBatchSize(20);
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }

首先还是初始化DefaultLitePullConsumer并设置ConsumerGroupName,调用subscribe方法订阅topic并启动。与Push Consumer不同的是,LitePullConsumer拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过setPullBatchSize可以设置每一次拉取的最大消息数量,此外如果不额外设置,LitePullConsumer默认是自动提交位点。在subscribe模式下,同一个消费组下的多个LitePullConsumer会负载均衡消费。

2.4 对push(推模式)的批量消费

DefaultMQPushConsumer的工作流程如下:

请注意,在使用DefaultMQPushConsumer时,消费者的并发能力由MessageListener实现来保证。因此,在设计MessageListener实现时需要考虑到高并发处理能力。虽然RocketMQ客户端提供了设置消费线程池的配置选项,但还是推荐根据实际需求来实现合适的并发方案。‘

2.41 使用DefaultMQPushConsumer :

批量消费消息需要在消费者端设置 ConsumeMessageBatchMaxSize 参数,以指定每次批量消费的消息数量。

public static void main(String[] args) throws InterruptedException, MQClientException {
        // step1: 创建一个DefaultMQPushConsumer实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchPushConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeMessageBatchMaxSize(10); // 设置每次批量消费的消息数量

        // step2: 为消费者订阅一个Topic
        consumer.subscribe("test_topic", "*");

        // step3: 注册一个MessageListenerConcurrently,并实现批量消费逻辑
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 处理批量消息
                for (MessageExt msg : msgs) {
                    // 在此处理每条消息,例如保存到数据库等
                    System.out.println("msg : " + new String(msg.getBody()));
                }
                // 返回消费状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // step4: 启动消费者
        consumer.start();
        System.out.println("Consumer started!");

        // 让主线程等待以保持进程不退出
        TimeUnit.SECONDS.sleep(60);
    }

在上述示例中,每次批量消费的消息数量被设置为10。每次推送过来的消息数量可能并不总是达到这个数字,但是它不会超过这个数量。如果希望调整批量大小,可以通过consumer.setConsumeMessageBatchMaxSize();修改这个值。

2.4.2 :springboot 通过@RocketMQMessageListener 完成消息消费:

:默认的 RocketMQ Spring Boot Starter 并不支持直接设置批量消费模式,消息是一个个处理的。

对于RocketMQ Listener可以管理多个线程同时处理消息。可以有多个消息同时处理。通过将顺序消息设置为并发模式并设置消费线程数。

import com.example.demo.MessageProcessor;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "myConsumerGroup", consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 3)
public class MyConsumer implements RocketMQListener<MessageExt> {
    @Autowired
    private MessageProcessor messageProcessor;

    @Override
    public void onMessage(MessageExt messageExt) {
        messageProcessor.process(messageExt);
    }
}

在这个例子中,consumeMode = ConsumeMode.CONCURRENTLY表示消费者将启用并发模式,而consumeThreadMax = 3将同时处理三个消息。注意,这不是真正意义上的批量消费,而是通过多线程来同时处理多个消息。要实现批量消费,需要进一步处理MessageExt消息,这取决于的实际需求。例如,可以缓冲消息,等待足够多的消息可用后一次性处理它们。

在设置 consumeThreadMax 参数时,请确保它不要过大,以避免系统资源过载。同时,优化MessageProcessor中的相关逻辑,尽量减少处理每个消息所需的时间。通过这种机制,虽然无法实现真正意义上的批量消费,但仍然可以帮助提高消息处理的效率。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文