java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java实现RocketMQ生产与消费

使用Java代码实现RocketMQ的生产与消费消息

作者:小威要向诸佬学习呀

这篇文章介绍一下其他的小组件以及使用Java代码实现生产者对消息的生成,消费者消费消息等知识点,并通过代码示例介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下

RocketMQ其他组件

在RocketMQ中,除了生产者,消费者,还有一些其他的小组件,接下来逐一介绍一下他们。

监听器(Listener)

定义:监听器是消费者用于处理消息的组件。在PushConsumer(推)模式下,消费者客户端必须设置消费监听器,以便在接收到消息时执行相应的处理逻辑。(比如一会儿下面的代码)

偏移量(Offset)

定义:偏移量是指在消费消息时,记录消费者已经消费到的消息位置的值。每个消息都有一个唯一的偏移量值,它代表了消息在消息队列中的位置。

偏移量具有很大的作用:它能够保证消费者在重启或宕机后能够从上次消费的位置继续消费消息,避免重复消费或漏消费。

简而言之就是它能告诉消费者已经消费到哪一条消息!!

所以,偏移量的的实现方式有两种:包括存储在本地文件(OffsetStore)和存储在Broker中这两种方式。(这样一看,清晰了吧)

命名服务器(NameServer)

定义:命名服务器是RocketMQ中的轻量级路由服务,存储生产者和消费者与Broker之间的路由信息。

它的作用:提供Broker的动态注册与发现服务,生产者和消费者通过NameServer查询Broker的路由信息,从而进行消息的投递和消费。

消息组成

实现生产与消费消息

按之前的步骤搭建完成RocketMQ集群后...

首先我们创建一个空的maven工程,在pom.xml文件中添加RocketMQ的依赖(RocketMQ的依赖版本需要与虚拟机中的保持一致,这里选择和之前一样的4.7.1版本):

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

生产消息

然后编写生产者生产消息的代码:

// 1.创建一个DefaultMQProducer实例,指定生产者组名 
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");  
// 2.设置NameServer的地址
producer.setNamesrvAddr("192.168.220.135:9876");    
// 3.启动生产者实例  
producer.start();   
// 4.使用for循环发送10条消息  
for (int i = 0; i < 10; i++) {  
    // 创建一条消息,指定Topic为"MyTopic1",Tag标签为"TagA",消息体为"hello rocketmq"加上循环变量的值,同时把字符串转换为字节数组  
    Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8));       
    // 5.发送消息并接收发送结果  
    SendResult sendResult = producer.send(message);  
    // 打印发送结果,包括消息ID、发送状态等信息  
    System.out.println(sendResult);  
}  
// 6.发送完所有消息后,关闭生产者实例,释放资源  
producer.shutdown();

生产者生产消息和消费者消费消息这块的代码都相对较为简单,已经在代码块中加了注释,这里就不再赘述了。

这个时候就可以访问虚拟机+端口号来搜索到发送的消息详情了!

消费消息

// 1.和生产者一样,创建一个DefaultMQPushConsumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");  
// 2.设置NameServer的地址,消费者通过这个地址与NameServer进行通信,来获取Broker的地址信息  
consumer.setNamesrvAddr("192.168.220.135:9876");  
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息。我们订阅了"MyTopic1",使用"*"来匹配此Topic下的所有Tag  
consumer.subscribe("MyTopic1", "*");  
// 3.注册消息监听器,用于处理从Broker接收到的消息。使用MessageListenerConcurrently接口的实现,表示并行消费  
consumer.registerMessageListener(new MessageListenerConcurrently() {  
    @Override  
    // 4.当收到消息时,方法会被调用。
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
        // 5.遍历消息列表,并打印每条消息的内容(注意:这里直接打印msg对象不会得到预期的消息内容字符串)  
        for (MessageExt msg : msgs) {  
            // 所以我们打印msg.getBody()的内容,为了保留消息原样  
            System.out.println("已收到消息" + msg);  
        }  
        // 6.返回消费状态,这里表示消息已成功消费  
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    }  
});  
// 7.启动消费者实例  
consumer.start();  
// 8.打印日志消费者已启动  
System.out.println("消费者已启动");

这里需要注意,msg包含了消息的详细信息,包括消息体、标签、属性等等。如果想打印消息内容,应该使用msg.getBody()方法获取消息体的字节数组,并且把它转换为字符串(如果消息体是文本的话)。

本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!

以上就是使用Java代码实现RocketMQ的生产与消费消息的详细内容,更多关于Java实现RocketMQ生产与消费的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文