RocketMQ producer同步发送单向发送源码解析
作者:hsfxuebao
RocketMQ生产者发送消息分为三种模式
RocketMQ生产者发送消息分为三种模式,分别是同步发送,异步发送和单向发送。
- 单向发送,这个就是发送之后不用接收结果的,就是你发出去一个消息,然后就返回了,就算有结果返回也不会接收了,这是站在消息生产者的角度;
- 同步发送的话,就是发出去一个消息,这个线程要等着它返回消息发送结果,然后你这个线程再根据这个消息发送结果再做一些业务操作等等;
- 异步发送,这个就是在你发送消息之前要给一个callback,发送的时候,你这个线程就不用等着,该干什么就干什么,然后发送结果回来的时候,是由其他线程调用你这个callback来处理的,你可以把这个callback看作是一个回调函数,回调方法,这个方法里面的业务逻辑就是你对这个消息发送结果的处理。注意,本文介绍的消息发送只是普通的消息发送,那种事务类型的消息,我们以后会有介绍。
1. 同步发送
producer同步发送消息的示例在org.apache.rocketmq.example.simple.Producer类中,代码如下:
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { // 1. 创建 DefaultMQProducer 对象 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); /* * Launch the instance. */ // todo 2. 启动 producer producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 3. 发送消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } ... } producer.shutdown(); } }
我们可以看到这个代码,你是同步消息你是需要在你自己的业务线程里面接收这个sendResult的,然后在做一些业务处理,比如我这里就是打印了一下这个sendResult。
接下来我们看下它是怎样发送的,这里是调用了这个producer的send方法。
@Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // topic 和消息长度 校验 Validators.checkMessage(msg, this); msg.setTopic(withNamespace(msg.getTopic())); // todo return this.defaultMQProducerImpl.send(msg); }
我们可以看到,这个 DefaultMQProducer 将这个消息给了defaultMQProducerImpl 这个实现的send方法来处理了。
public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // todo 默认超时时间3s return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }
defaultMQProducerImpl的send方法,加了个超时时间 ,然后有调用它的重载方法send(msg,timeout)
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // todo 同步模式 return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }
这个send(msg,timeout)又调用了sendDefaultImpl 方法,然后他这里加了个通信模式是同步,CommunicationMode.SYNC。
1.1 DefaultMQProducerImpl#sendDefaultImpl
sendDefaultImpl 方法就比较长了了我们分成几部分来介绍:
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 判断状态是否是running this.makeSureStateOK(); // 检查消息合法性 Validators.checkMessage(msg, this.defaultMQProducer); // 随机的invokeID final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // todo 获取topic信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); ... }
这一小段代码其实就是做了一些准备检查工作,注意第二行的个检查消息合法性,它要检查你topic,消息长度的,你不能发空消息,消息长度也不能太长,默认是不超过4m,接下来这些就是记录一下时间了,再看最后一行,就是根据你这个消息发送的topic,然后获取topic 发送消息的这么一个信息,这里面就有这topic 有几个MessageQueue,然后每个MessageQueue对应在哪个broker上面,broker 的地址又是啥的,它这个方法会先从本地的一个缓存中获取下,没有的话就从nameserv更新下这个本地缓存,再找找,要是再找不到,它就认为你没有这个topic了,然后就去nameserv上面拉取一个默认topic的一些配置信息给你用(这个其实就是在新建一个topic)。 接着这个方法往下看,接着就是判断 这个TopicPublishInfo 是否存在了,如果不存在的话就抛出异常了,没有后续了就,如果存在的话:
... if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 重试次数 区分同步、其他 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; // 存放发送过的broker name String[] brokersSent = new String[timesTotal]; // 重试发送 for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // todo 选择message queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // todo 进行发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // todo isolation 参数为false(看一下异常情况) this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); ...
其实下面还有许多处理异常的操作没有放上,不过不影响我们的主流程,先是判断你这个通信模式,如果是同步的话,默认重试次数就是2 ,然后加上本身这次请求,也就是最查请求3次。这个for循环就是失败重试的代码,再看下代码selectOneMessageQueue这个就是选择一个MesssageQueue的方法了,这个是比较重要的,这里我们先不说,你可以把它理解为 我们的负载均衡。接着往下走,就是判断一下时间了,计算一下剩下的时间, 如果这一堆前面的内容耗时很长,然后已经超了之前设置的默认超时时间,这个时候就会超时了,然后将这个calltimeout设置成true了。
1.2 DefaultMQProducerImpl#sendKernelImpl
接着就是进行发送了调用sendKernelImpl 方法:
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); // 根据MessageQueue获取Broker的网络地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; ...
这个sendKernelImpl 也是有点长,然后我们一部分一部分的看下,这就是根据MessageQueue里面的broker name 获取一下broker addr,他这个broker addr 选的是master的,比如说我们 broker使用的是 master/slave 高可用架构,这个时候只会选择那个master,毕竟是往里面写消息,然后只能用master,等到介绍消息消费者的时候,消息消费者是可以向slave node 获取消息消费的,前提是 master 负载比较大,然后消息消费者下次获取消费的消息已经在slave里面了,然后消息消费者获取到消息之后,它里面有个字段是告诉你下次可以去xxx 地址的broker 拉取消息,这个我们介绍到消息消费者的时候再说。
接着回来,如果没有获取到这个broker 地址的话,就是去nameserv上更新下本地缓存,然后再获取下。接着再往下就是再次判断一下这个broker addr 了,如果还没有就抛出异常,如果有的话 就执行下面的代码了:
... SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process // 给消息设置全局唯一id, 对于MessageBatch在生成过程中已设置了id if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; // 消息体是否压缩 boolean msgBodyCompressed = false; // 压缩消息 内容部分超了4k就会压缩 if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } // 判断有没有hook if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); // 执行Forbidden 钩子 this.executeCheckForbiddenHook(checkForbiddenContext); } ...
第一句,这个其实就是进行一个vip通道地址的转换,这个比较有意思,如果你这个支持vip channel的话,它会把broker addr 里面的端口改变一下,这个所谓的vip channel ,其实就是与它的另一个端口建立连接,这个端口就是当前端口-2 ;
接着,如果这个消息不是批量消息的话,我们就给这个消息设置一个唯一的消息id,再往下就是 sysflag的处理了,这个sysflag里面记录了好几个属性值,使用二进制来处理的,比如说消息是否压缩了(这个压缩,就是你消息内容超过了默认的4k之后,就会进行压缩,这个压缩的阈值你是可以配置的),是否是个事务消息等等。 接下来就是执行hook了,这个hook就是forbidenHook ,其实就是对消息进行过滤。
... if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } // 封装消息头 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); // 设置group requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); // topic requestHeader.setTopic(msg.getTopic()); // 设置默认topic requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); // 设置默认topic的队列数量 默认4个 requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); // 队列id requestHeader.setQueueId(mq.getQueueId()); // 消息系统标记 requestHeader.setSysFlag(sysFlag); // 消息发送时间 requestHeader.setBornTimestamp(System.currentTimeMillis()); // 消息标记(RocketMQ对消息标记不做任何处理,供应用程序使用) requestHeader.setFlag(msg.getFlag()); // 设置扩展属性 requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); // 是否批量 requestHeader.setBatch(msg instanceof MessageBatch); // 判断消息是否是 %RETRY% 开头 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } ...
在往下就是执行一下发送消息之前的hook,再往下就是封装发送消息请求头,然后这个请求头里面就涵盖了很多的参数,比如说topic,MessageQueue 队列Id, 出生日期,flag等等。再往下就是消息发送了
... SendResult sendResult = null; // 同步 异步 单向 switch (communicationMode) { // 异步 case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } // 判断超时时间 long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // todo sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; // 单向 case ONEWAY: // 同步 case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; // 判判是否超时 if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // todo 交给 mq api去发送消息 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } // 是否注册了消息发送钩子函数 if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } ...
因为本小节主要是介绍下这个同步发送消息,然后我们就主要介绍下这个sync的代码逻辑: 首先是判断超时,然后交给 MQClientAPI层去处理,然后返回sendResult。
1.3 MQClientAPIImpl#sendMessage
我们这里接着看下MQClientAPIImpl里面的sendMessage 实现:
public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); RemotingCommand request = null; String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG); if (isReply) { if (sendSmartMsg) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); } } else { // sendSmartMsg默认开启,也算一种优化吧 批量消息 if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { // 普通消息 request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } } // 设置消息体 request.setBody(msg.getBody()); switch (communicationMode) { case ONEWAY: // todo this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; // 判断超时时间 if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } // todo this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; // 判断超时时间 if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } // todo 同步发送 return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break; } return null; }
这里先生成一个RemotingCommand 这么个实体对象,然后RequestCode就是SEND_MESSAGE,其实这里判断了一下sendSmartMsg 这个参数,把requestHeader优化了一下,然后换成了requestHeaderV2,其实这个requestHeaderV2 内容跟requestHeader一样,但是变量名是单个字母的,然后序列化,反序列化,传输内容都有所优化,其实他这个序列化使用是json形式的,然后想想就知道有些哪些好处了, 唯一的缺点就是可读性差点,但是这个玩意是对用户透明的,用户不需要关心。
接着就是判断通信类型,然后发送消息了,这里是同步发送,先是判断一下超时时间,接着就是调用sendMessageSync 进行同步发送了,我们接着来看下这个sendMessageSync 方法实现。
1.4 MQClientAPIImpl#sendMessageSync
private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { // todo 同步调用 RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; // 处理响应 return this.processSendResponse(brokerName, msg, response,addr); }
这里就调用到了client 模块(这个client其实就是直接操作netty了)来处理了,然后返回响应,调用processSendResponse 方法来处理响应。
1.5 NettyRemotingClient#invokeSync
我们再来看下client的 invokeSync 方法:
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { // 开始时间 long beginStartTime = System.currentTimeMillis(); // todo 轮询获取namesrv地址Channel final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { // 执行开始之前的rpchook doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; // 判断超时 之前有获取链接的操作,可能会出现超时的情况 if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // todo 进行同步执行,获取响应 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); // 执行之后的rpchook doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; // 远程发送请求异常 } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); // 关闭channel this.closeChannel(addr, channel); throw e; // 超时异常 } catch (RemotingTimeoutException e) { // 如果超时 就关闭cahnnel话,就关闭channel 默认是不关闭的 if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
这里有两个点需要关注下,首先是根据broker addr 这个地址获取一下对应的channel ,如果不存在的话就创建一下这个连接, 稍微看下这块的代码:
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException { // 如果地址不存在,就返回namesrv 的channel if (null == addr) { return getAndCreateNameserverChannel(); } ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } // 创建channel return this.createChannel(addr); }
如果你这个addr是空的话,这个就是默认找nameserv的addr ,然后找对应channel就可以了,如果不是null ,然后它会去这个channelTable 这个map中去找,如果没有的话就创建一个对应的channel
接着回到这个invokeSync 方法中,获得channel之后,就是执行一下rpcHook了,这东西就是你在创建MQProducer的时候设置的,在调用前执行一次,调用后执行一次,其实你就可以通过这个hook来实现很多功能,监控的功能比较多些。接着就是调用了invokeSyncImpl 这个实现方法来发送消息了,这个方法是它的一个父类里面的:
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { // 获取 请求id final int opaque = request.getOpaque(); try { // 创建ResponseFuture final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 放入responseTable 表中 this.responseTable.put(opaque, responseFuture); // 获取远程地址 final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { // 成功 if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; // 失败 } else { responseFuture.setSendRequestOK(false); } // 移除response中的缓存 responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { // 成功了还是null 还是超时 if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { // 没发出去,就排除异常 throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } // 返回响应结果 return responseCommand; } finally { // 移除 this.responseTable.remove(opaque); } }
这个方法其实就是最终往 channel里面写内容的方法了,我们来看下,先是为这次request创建一个id 吧,这个id主要用来返回响应的时候用的。
接着创建一个ResposeFuture ,这个东西异步,同步都可以用,这个一会介绍一下它的原理,接着就是将这个id 与这个 ResposeFuture 关联起来放到这个 responseTable 里面的, 接着就是往channel里面发送消息了,这里它添加一个listener ,这listener的执行时机就是发送出去的时候,最后就是等待这个响应了。
我们来解释下这个ResposeFuture 原理, 当执行了responseFuture.waitResponse(timeoutMillis); 这行代码,当前线程就会wait ,然后被阻塞,然后等着响应回来的时候,netty处理响应的线程会从响应里面获取一下这个opaque这个id,就是请求之前在request生成的,broker 在响应的时候会会把这个id 放回到response 中, 然后会根据这个opaque 从responseTable中找到这个 ResposeFuture ,然后把响应设置到这个里面,最后唤醒一下wait在这个对象里面的线程就可以了,这样你这个业务线程就得到了这个RemotingResponse 了。 好了,到这我们就解释清楚了,然后我们看下他这个代码是怎样实现的:
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); // 获取对应id 的responseFuture final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { // 设置 responseFuture.setResponseCommand(cmd); // 从响应表中移除 responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { // todo 执行回调 executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
不过它这个ResposeFuture 是使用CountDownLatch 来实现这个wait与唤醒的。我们来具体看下这个 waitResponse方法与这个putResponse方法:
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }
2. 单向发送
单向发送其实这块跟同步发送的流程差不多,我们来看下它的生产者代码是怎样写的: org.apache.rocketmq.example.openmessaging.SimpleProducer:
public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); final Producer producer = messagingAccessPoint.createProducer(); messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); producer.startup(); System.out.printf("Producer startup OK%n"); ... { producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.printf("Send oneway message OK%n"); } ... }
可以看到我们最后发送的时候调用的是sendOneway方法,这个方法是没有返回值的。
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.sendOneway(msg); }
2.1 DefaultMQProducerImpl#sendOneway
这里就是调用了defaultMQProducerImpl的 sendOneway方法
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.sendOneway(msg); }
这里需要注意的是它也是调用了sendDefaultImpl 方法,然后通信方式是oneway 。这里我们就不细说了,可以看下同步方法解析这个方法的说明,这里唯一要提一点是单向发送是没有这个重试的,然后就发送一次。下面的流程都是一样的,然后就到了这个MQClientAPIImpl 的 sendMessage 方法
... switch (communicationMode) { case ONEWAY: // todo this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; ...
然后他这个是又调用了NettyRemotingClient 的 invokeOneway 方法:
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request); this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
这里也是根据broker addr 获取channel, 如果没有的话,也是创建一个,接着就是执行这个rpc调用前的hook ,注意这里没有调用后的一个hook,因为我们并不知道它是什么情况。 接着又调用了invokeOnewayImpl 方法:
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { // 请求体, 标记是一个单向调用 request.markOnewayRPC(); // 获取凭证 boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { // 释放信号量 once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { // 释放信号量 once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
这里使用了semaphore进行限流,然后默认的话是同时支持65535 个请求发送的,这个semaphore 限流只有单向发送与这个异步发送会有,接着就会将这个request写入channel中,然后add了一个listener ,这个listener执行时机就是消息发送出去了,这个时候就会释放 信号量。
到这我们这个单向发送就解析完成了。
参考文章
以上就是RocketMQ producer同步发送单向发送源码解析的详细内容,更多关于RocketMQ producer的资料请关注脚本之家其它相关文章!