首页你问我答RocketMQ 消息失败重试 解析——图解、源码级解析

RocketMQ 消息失败重试 解析——图解、源码级解析

admin 10-30 10:47 447次浏览

Java学习:Java从入门到精通总结

深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

绝对不一样的职场干货:大厂最佳实践经验指南


最近更新:2022年10月24日

个人简介:通信工程本硕、Java程序员。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

点赞 收藏 ⭐留言 都是我最大的动力!


文章目录

  • 异常消息处理
  • Broker处理流程
  • 死信队列

异常消息处理

RocketMQ 消息失败重试 解析——图解、源码级解析_第1张图片

使用Consumer时会注册MessageListener,消费消息的接口会返回处理状态:

  • ConsumeConcurrentlyStatus.CONSUME_SUCCESS:消费成功
  • ConsumeConcurrentlyStatus.REConsume_LATER:等待一段时间后再消费

MessageListenerCnsumeMessageConcurrentlyService中被调用的,上面两个状态会映射到CMResult定义的枚举值:

  • CMResult.CR_SUCCESS:消费成功
  • CMResult.CR_LATER:等待一段时间后再消费
  • CMResult.CR_ROLLBACK:事务回滚
  • CMResult.CR_COMMIT:事务提交
  • CMResult.CR_THROW_EXCEPTION:消费异常
  • CMResult.CR_RETURN_NULL:消费结果为null

针对CMResult.CR_LATER状态的处理策略为:将该消息发挥Broker,继续等待后续消息。发送回的消息会设置重试的Topic,命名规则为:“%RETRY%” + Consumer组名,消息原本的Topic会暂存到消息体中,并且会额外设置delayLevelreconsumeTimes

消息消费的结果会在CnsumeMessageConcurrentlyService.processConsumeResult中处理

    public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
		
		// 消息为空,直接返回
        if (consumeRequest.getMsgs().isEmpty())
            return;

		// 计算从consumerequest.msg[0]到consumerequest.msgs[ackIndex]的消息消费成功的数量
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
               	// 统计成功/失败数量
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                // 统计失败数量
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }
		
		// 处理消息失败的消息
        switch (this.defaultMQPushConsumer.getMessageModel()) {
        	// 如果是广播模式,无论是否消费失败,都不回发消息给Broker,只打印Log
            case BROADCASTING:
                for (int i = ackIndex + 1; i consumeRequestspan class="token punctuation"./spanspan class="token function"getMsgs/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"size/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation";/span ispan class="token operator"++/spanspan class="token punctuation")/span span class="token punctuation"{/span
                    span class="token class-name"MessageExt/span msg span class="token operator"=/span consumeRequestspan class="token punctuation"./spanspan class="token function"getMsgs/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"get/spanspan class="token punctuation"(/spanispan class="token punctuation")/spanspan class="token punctuation";/span
                    logspan class="token punctuation"./spanspan class="token function"warn/spanspan class="token punctuation"(/spanspan class="token string""BROADCASTING, the message consume failed, drop it, {}"/spanspan class="token punctuation",/span msgspan class="token punctuation"./spanspan class="token function"toString/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
                span class="token punctuation"}/span
                span class="token keyword"break/spanspan class="token punctuation";/span
            span class="token comment"// 发回失败消息到Broker/span
            span class="token keyword"case/span CLUSTERINGspan class="token operator":/span
                span class="token class-name"List/spanspan class="token generics"span class="token punctuation"/spanspan class="token class-name"MessageExt/spanspan class="token punctuation"/span/span msgBackFailed span class="token operator"=/span span class="token keyword"new/span span class="token class-name"ArrayList/spanspan class="token generics"span class="token punctuation"/spanspan class="token class-name"MessageExt/spanspan class="token punctuation"/span/spanspan class="token punctuation"(/spanconsumeRequestspan class="token punctuation"./spanspan class="token function"getMsgs/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"size/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
                span class="token keyword"for/span span class="token punctuation"(/spanspan class="token keyword"int/span i span class="token operator"=/span ackIndex span class="token operator"+/span span class="token number"1/spanspan class="token punctuation";/span i span class="token operator"/span consumeRequestspan class="token punctuation"./spanspan class="token function"getMsgs/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"size/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation";/span ispan class="token operator"++/spanspan class="token punctuation")/span span class="token punctuation"{/span
                    span class="token class-name"MessageExt/span msg span class="token operator"=/span consumeRequestspan class="token punctuation"./spanspan class="token function"getMsgs/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"get/spanspan class="token punctuation"(/spanispan class="token punctuation")/spanspan class="token punctuation";/span
                    span class="token comment"// 回发给Broker的的具体方法/span
                    span class="token keyword"boolean/span result span class="token operator"=/span span class="token keyword"this/spanspan class="token punctuation"./spanspan class="token function"sendMessageBack/spanspan class="token punctuation"(/spanmsgspan class="token punctuation",/span contextspan class="token punctuation")/spanspan class="token punctuation";/span
                    span class="token keyword"if/span span class="token punctuation"(/spanspan class="token operator"!/spanresultspan class="token punctuation")/span span class="token punctuation"{/span
                    	span class="token comment"// 重复消费次数 + 1/span
                        msgspan class="token punctuation"./spanspan class="token function"setReconsumeTimes/spanspan class="token punctuation"(/spanmsgspan class="token punctuation"./spanspan class="token function"getReconsumeTimes/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"+/span span class="token number"1/spanspan class="token punctuation")/spanspan class="token punctuation";/span
                        msgBackFailedspan class="token punctuation"./spanspan class="token function"add/spanspan class="token punctuation"(/spanmsgspan class="token punctuation")/spanspan class="token punctuation";/span
                    span class="token punctuation"}/span
                span class="token punctuation"}/span
				
				span class="token comment"// 如果回发给Broker也失败的话,则提交延迟消费请求(稍后在客户端重新消费)/span
                span class="token keyword"if/span span class="token punctuation"(/spanspan class="token operator"!/spanmsgBackFailedspan class="token punctuation"./spanspan class="token function"isEmpty/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/span span class="token punctuation"{/span
                    consumeRequestspan class="token punctuation"./spanspan class="token function"getMsgs/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"removeAll/spanspan class="token punctuation"(/spanmsgBackFailedspan class="token punctuation")/spanspan class="token punctuation";/span

                    span class="token keyword"this/spanspan class="token punctuation"./spanspan class="token function"submitConsumeRequestLater/spanspan class="token punctuation"(/spanmsgBackFailedspan class="token punctuation",/span consumeRequestspan class="token punctuation"./spanspan class="token function"getProcessQueue/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation",/span consumeRequestspan class="token punctuation"./spanspan class="token function"getMessageQueue/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
                span class="token punctuation"}/span
                span class="token keyword"break/spanspan class="token punctuation";/span
            span class="token keyword"default/spanspan class="token operator":/span
                span class="token keyword"break/spanspan class="token punctuation";/span
        span class="token punctuation"}/span
		
		span class="token comment"// 移除消费成功消息,并返回消费的最新进度/span
        span class="token keyword"long/span offset span class="token operator"=/span consumeRequestspan class="token punctuation"./spanspan class="token function"getProcessQueue/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"removeMessage/spanspan class="token punctuation"(/spanconsumeRequestspan class="token punctuation"./spanspan class="token function"getMsgs/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token comment"// 更新最新消费进度,进度更新只能增长 /span
        span class="token keyword"if/span span class="token punctuation"(/spanoffset span class="token operator"=/span span class="token number"0/span span class="token operator"&&/span span class="token operator"!/spanconsumeRequestspan class="token punctuation"./spanspan class="token function"getProcessQueue/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"isDropped/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/span span class="token punctuation"{/span
            span class="token keyword"this/spanspan class="token punctuation"./spandefaultMQPushConsumerImplspan class="token punctuation"./spanspan class="token function"getOffsetStore/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"updateOffset/spanspan class="token punctuation"(/spanconsumeRequestspan class="token punctuation"./spanspan class="token function"getMessageQueue/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation",/span offsetspan class="token punctuation",/span span class="token boolean"true/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span
    span class="token punctuation"}/span
/code/pre 
  pcodeConsumer/code消费的时候可以设置codeconsumeMessageBatchMaxSize/code来控制传入codeMessageListener/code的消息数量。RocketMQ认为只要有一条消息消费失败,这一批消息都会发回给Broker,所以设置codeconsumeMessageBatchMaxSize/code这个值时应当注意避免出现消息重复消费的问题。/p 
  pbrbr/p 
  h1Broker处理流程/h1 
  pBroker端对应的处理逻辑在codeSendMessageProcessor.consumerSendMsgBack/code里,对于Consumer发送失败返的消息,Broker会将其放入重试codeTopic/code中/p 
  precode class="prism language-java"span class="token comment"/**
     * 消费者将消息发回给Broker,可以指定多久后重新消费该消息
     *
     * @param ctx
     * @param request
     * @return
     * @throws RemotingCommandException
     *//span
    span class="token keyword"private/span span class="token class-name"CompletableFuture/spanspan class="token generics"span class="token punctuation"/spanspan class="token class-name"RemotingCommand/spanspan class="token punctuation"/span/span span class="token function"asyncConsumerSendMsgBack/spanspan class="token punctuation"(/spanspan class="token class-name"ChannelHandlerContext/span ctxspan class="token punctuation",/span
                                                                        span class="token class-name"RemotingCommand/span requestspan class="token punctuation")/span span class="token keyword"throws/span span class="token class-name"RemotingCommandException/span span class="token punctuation"{/span
        span class="token comment"// 初始化响应/span
        span class="token keyword"final/span span class="token class-name"RemotingCommand/span response span class="token operator"=/span span class="token class-name"RemotingCommand/spanspan class="token punctuation"./spanspan class="token function"createResponseCommand/spanspan class="token punctuation"(/spanspan class="token keyword"null/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token keyword"final/span span class="token class-name"ConsumerSendMsgBackRequestHeader/span requestHeader span class="token operator"=/span
                span class="token punctuation"(/spanspan class="token class-name"ConsumerSendMsgBackRequestHeader/spanspan class="token punctuation")/spanrequestspan class="token punctuation"./spanspan class="token function"decodeCommandCustomHeader/spanspan class="token punctuation"(/spanspan class="token class-name"ConsumerSendMsgBackRequestHeader/spanspan class="token punctuation"./spanspan class="token keyword"class/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token class-name"String/span namespace span class="token operator"=/span span class="token class-name"NamespaceUtil/spanspan class="token punctuation"./spanspan class="token function"getNamespaceFromResource/spanspan class="token punctuation"(/spanrequestHeaderspan class="token punctuation"./spanspan class="token function"getGroup/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token keyword"if/span span class="token punctuation"(/spanspan class="token keyword"this/spanspan class="token punctuation"./spanspan class="token function"hasConsumeMessageHook/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"&&/span span class="token operator"!/spanspan class="token class-name"UtilAll/spanspan class="token punctuation"./spanspan class="token function"isBlank/spanspan class="token punctuation"(/spanrequestHeaderspan class="token punctuation"./spanspan class="token function"getOriginMsgId/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation")/span span class="token punctuation"{/span
            span class="token class-name"ConsumeMessageContext/span context span class="token operator"=/span span class="token function"buildConsumeMessageContext/spanspan class="token punctuation"(/spannamespacespan class="token punctuation",/span requestHeaderspan class="token punctuation",/span requestspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"this/spanspan class="token punctuation"./spanspan class="token function"executeConsumeMessageHookAfter/spanspan class="token punctuation"(/spancontextspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span
        span class="token class-name"SubscriptionGroupConfig/span subscriptionGroupConfig span class="token operator"=/span
                span class="token keyword"this/spanspan class="token punctuation"./spanbrokerControllerspan class="token punctuation"./spanspan class="token function"getSubscriptionGroupManager/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"findSubscriptionGroupConfig/spanspan class="token punctuation"(/spanrequestHeaderspan class="token punctuation"./spanspan class="token function"getGroup/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token keyword"if/span span class="token punctuation"(/spanspan class="token keyword"null/span span class="token operator"==/span subscriptionGroupConfigspan class="token punctuation")/span span class="token punctuation"{/span
            responsespan class="token punctuation"./spanspan class="token function"setCode/spanspan class="token punctuation"(/spanspan class="token class-name"ResponseCode/spanspan class="token punctuation"./spanSUBSCRIPTION_GROUP_NOT_EXISTspan class="token punctuation")/spanspan class="token punctuation";/span
            responsespan class="token punctuation"./spanspan class="token function"setRemark/spanspan class="token punctuation"(/spanspan class="token string""subscription group not exist, "/span span class="token operator"+/span requestHeaderspan class="token punctuation"./spanspan class="token function"getGroup/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"+/span span class="token string"" "/span
                    span class="token operator"+/span span class="token class-name"FAQUrl/spanspan class="token punctuation"./spanspan class="token function"suggestTodo/spanspan class="token punctuation"(/spanspan class="token class-name"FAQUrl/spanspan class="token punctuation"./spanSUBSCRIPTION_GROUP_NOT_EXISTspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"return/span span class="token class-name"CompletableFuture/spanspan class="token punctuation"./spanspan class="token function"completedFuture/spanspan class="token punctuation"(/spanresponsespan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span

        span class="token comment"// 检查Broker是否有写入权限/span
        span class="token keyword"if/span span class="token punctuation"(/spanspan class="token operator"!/spanspan class="token class-name"PermName/spanspan class="token punctuation"./spanspan class="token function"isWriteable/spanspan class="token punctuation"(/spanspan class="token keyword"this/spanspan class="token punctuation"./spanbrokerControllerspan class="token punctuation"./spanspan class="token function"getBrokerConfig/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"getBrokerPermission/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation")/span span class="token punctuation"{/span
            responsespan class="token punctuation"./spanspan class="token function"setCode/spanspan class="token punctuation"(/spanspan class="token class-name"ResponseCode/spanspan class="token punctuation"./spanNO_PERMISSIONspan class="token punctuation")/spanspan class="token punctuation";/span
            responsespan class="token punctuation"./spanspan class="token function"setRemark/spanspan class="token punctuation"(/spanspan class="token string""the broker["/span span class="token operator"+/span span class="token keyword"this/spanspan class="token punctuation"./spanbrokerControllerspan class="token punctuation"./spanspan class="token function"getBrokerConfig/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"getBrokerIP1/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"+/span span class="token string""] sending message is forbidden"/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"return/span span class="token class-name"CompletableFuture/spanspan class="token punctuation"./spanspan class="token function"completedFuture/spanspan class="token punctuation"(/spanresponsespan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span

        span class="token comment"// 检查重试队列个数是否大于0/span
        span class="token keyword"if/span span class="token punctuation"(/spansubscriptionGroupConfigspan class="token punctuation"./spanspan class="token function"getRetryQueueNums/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"=/span span class="token number"0/spanspan class="token punctuation")/span span class="token punctuation"{/span
            responsespan class="token punctuation"./spanspan class="token function"setCode/spanspan class="token punctuation"(/spanspan class="token class-name"ResponseCode/spanspan class="token punctuation"./spanSUCCESSspan class="token punctuation")/spanspan class="token punctuation";/span
            responsespan class="token punctuation"./spanspan class="token function"setRemark/spanspan class="token punctuation"(/spanspan class="token keyword"null/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"return/span span class="token class-name"CompletableFuture/spanspan class="token punctuation"./spanspan class="token function"completedFuture/spanspan class="token punctuation"(/spanresponsespan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span

        span class="token comment"// 计算retry Topic = "%RETRY% + consumeGroup"/span
        span class="token class-name"String/span newTopic span class="token operator"=/span span class="token class-name"MixAll/spanspan class="token punctuation"./spanspan class="token function"getRetryTopic/spanspan class="token punctuation"(/spanrequestHeaderspan class="token punctuation"./spanspan class="token function"getGroup/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span

        span class="token comment"// 计算队列编号/span
        span class="token keyword"int/span queueIdInt span class="token operator"=/span span class="token class-name"Math/spanspan class="token punctuation"./spanspan class="token function"abs/spanspan class="token punctuation"(/spanspan class="token keyword"this/spanspan class="token punctuation"./spanrandomspan class="token punctuation"./spanspan class="token function"nextInt/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"%/span span class="token number"99999999/spanspan class="token punctuation")/span span class="token operator"%/span subscriptionGroupConfigspan class="token punctuation"./spanspan class="token function"getRetryQueueNums/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token keyword"int/span topicSysFlag span class="token operator"=/span span class="token number"0/spanspan class="token punctuation";/span
        span class="token keyword"if/span span class="token punctuation"(/spanrequestHeaderspan class="token punctuation"./spanspan class="token function"isUnitMode/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/span span class="token punctuation"{/span
            topicSysFlag span class="token operator"=/span span class="token class-name"TopicSysFlag/spanspan class="token punctuation"./spanspan class="token function"buildSysFlag/spanspan class="token punctuation"(/spanspan class="token boolean"false/spanspan class="token punctuation",/span span class="token boolean"true/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span

        span class="token comment"// 获取topicConfig,如果获取不到,则在response里进行相应设置/span
        span class="token class-name"TopicConfig/span topicConfig span class="token operator"=/span span class="token keyword"this/spanspan class="token punctuation"./spanbrokerControllerspan class="token punctuation"./spanspan class="token function"getTopicConfigManager/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"createTopicInSendMessageBackMethod/spanspan class="token punctuation"(/span
                newTopicspan class="token punctuation",/span
                subscriptionGroupConfigspan class="token punctuation"./spanspan class="token function"getRetryQueueNums/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation",/span
                span class="token class-name"PermName/spanspan class="token punctuation"./spanPERM_WRITE span class="token operator"|/span span class="token class-name"PermName/spanspan class="token punctuation"./spanPERM_READspan class="token punctuation",/span topicSysFlagspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token keyword"if/span span class="token punctuation"(/spanspan class="token keyword"null/span span class="token operator"==/span topicConfigspan class="token punctuation")/span span class="token punctuation"{/span
            responsespan class="token punctuation"./spanspan class="token function"setCode/spanspan class="token punctuation"(/spanspan class="token class-name"ResponseCode/spanspan class="token punctuation"./spanSYSTEM_ERRORspan class="token punctuation")/spanspan class="token punctuation";/span
            responsespan class="token punctuation"./spanspan class="token function"setRemark/spanspan class="token punctuation"(/spanspan class="token string""topic["/span span class="token operator"+/span newTopic span class="token operator"+/span span class="token string""] not exist"/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"return/span span class="token class-name"CompletableFuture/spanspan class="token punctuation"./spanspan class="token function"completedFuture/spanspan class="token punctuation"(/spanresponsespan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span

        span class="token comment"// 不允许写入/span
        span class="token keyword"if/span span class="token punctuation"(/spanspan class="token operator"!/spanspan class="token class-name"PermName/spanspan class="token punctuation"./spanspan class="token function"isWriteable/spanspan class="token punctuation"(/spantopicConfigspan class="token punctuation"./spanspan class="token function"getPerm/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation")/span span class="token punctuation"{/span
            responsespan class="token punctuation"./spanspan class="token function"setCode/spanspan class="token punctuation"(/spanspan class="token class-name"ResponseCode/spanspan class="token punctuation"./spanNO_PERMISSIONspan class="token punctuation")/spanspan class="token punctuation";/span
            responsespan class="token punctuation"./spanspan class="token function"setRemark/spanspan class="token punctuation"(/spanspan class="token class-name"String/spanspan class="token punctuation"./spanspan class="token function"format/spanspan class="token punctuation"(/spanspan class="token string""the topic[%s] sending message is forbidden"/spanspan class="token punctuation",/span newTopicspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"return/span span class="token class-name"CompletableFuture/spanspan class="token punctuation"./spanspan class="token function"completedFuture/spanspan class="token punctuation"(/spanresponsespan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span

        span class="token comment"// 根据消息的commitLog Offset查询实际的MessageExt(消费失败的实际消息)/span
        span class="token class-name"MessageExt/span msgExt span class="token operator"=/span span class="token keyword"this/spanspan class="token punctuation"./spanbrokerControllerspan class="token punctuation"./spanspan class="token function"getMessageStore/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"lookMessageByOffset/spanspan class="token punctuation"(/spanrequestHeaderspan class="token punctuation"./spanspan class="token function"getOffset/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token keyword"if/span span class="token punctuation"(/spanspan class="token keyword"null/span span class="token operator"==/span msgExtspan class="token punctuation")/span span class="token punctuation"{/span
            responsespan class="token punctuation"./spanspan class="token function"setCode/spanspan class="token punctuation"(/spanspan class="token class-name"ResponseCode/spanspan class="token punctuation"./spanSYSTEM_ERRORspan class="token punctuation")/spanspan class="token punctuation";/span
            responsespan class="token punctuation"./spanspan class="token function"setRemark/spanspan class="token punctuation"(/spanspan class="token string""look message by offset failed, "/span span class="token operator"+/span requestHeaderspan class="token punctuation"./spanspan class="token function"getOffset/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"return/span span class="token class-name"CompletableFuture/spanspan class="token punctuation"./spanspan class="token function"completedFuture/spanspan class="token punctuation"(/spanresponsespan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span

        span class="token comment"// 设置 PROPERTY_RETRY_TOPIC = 原始消息的Topic,msgInner通过setProperties()方法将原始消息的Properties拷贝过去/span
        span class="token keyword"final/span span class="token class-name"String/span retryTopic span class="token operator"=/span msgExtspan class="token punctuation"./spanspan class="token function"getProperty/spanspan class="token punctuation"(/spanspan class="token class-name"MessageConst/spanspan class="token punctuation"./spanPROPERTY_RETRY_TOPICspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token keyword"if/span span class="token punctuation"(/spanspan class="token keyword"null/span span class="token operator"==/span retryTopicspan class="token punctuation")/span span class="token punctuation"{/span
            span class="token class-name"MessageAccessor/spanspan class="token punctuation"./spanspan class="token function"putProperty/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation",/span span class="token class-name"MessageConst/spanspan class="token punctuation"./spanPROPERTY_RETRY_TOPICspan class="token punctuation",/span msgExtspan class="token punctuation"./spanspan class="token function"getTopic/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span
        span class="token comment"// 设置消息不等待存储完成/span
        msgExtspan class="token punctuation"./spanspan class="token function"setWaitStoreMsgOK/spanspan class="token punctuation"(/spanspan class="token boolean"false/spanspan class="token punctuation")/spanspan class="token punctuation";/span

        span class="token keyword"int/span delayLevel span class="token operator"=/span requestHeaderspan class="token punctuation"./spanspan class="token function"getDelayLevel/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation";/span

        span class="token keyword"int/span maxReconsumeTimes span class="token operator"=/span subscriptionGroupConfigspan class="token punctuation"./spanspan class="token function"getRetryMaxTimes/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token comment"// 3.4.9版本之后可以支持自定义消息的最大消费次数,若未指定,默认为16/span
        span class="token keyword"if/span span class="token punctuation"(/spanrequestspan class="token punctuation"./spanspan class="token function"getVersion/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"=/span span class="token class-name"MQVersionspan class="token punctuation"./spanVersion/spanspan class="token punctuation"./spanV3_4_9span class="token punctuation"./spanspan class="token function"ordinal/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/span span class="token punctuation"{/span
            span class="token class-name"Integer/span times span class="token operator"=/span requestHeaderspan class="token punctuation"./spanspan class="token function"getMaxReconsumeTimes/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"if/span span class="token punctuation"(/spantimes span class="token operator"!=/span span class="token keyword"null/spanspan class="token punctuation")/span span class="token punctuation"{/span
                maxReconsumeTimes span class="token operator"=/span timesspan class="token punctuation";/span
            span class="token punctuation"}/span
        span class="token punctuation"}/span

        span class="token comment"// 如果超过最大消费次数或delayLevel = maxReconsumeTimes
                || delayLevel span class="token number"0/spanspan class="token punctuation")/span span class="token punctuation"{/span
            newTopic span class="token operator"=/span span class="token class-name"MixAll/spanspan class="token punctuation"./spanspan class="token function"getDLQTopic/spanspan class="token punctuation"(/spanrequestHeaderspan class="token punctuation"./spanspan class="token function"getGroup/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            queueIdInt span class="token operator"=/span span class="token class-name"Math/spanspan class="token punctuation"./spanspan class="token function"abs/spanspan class="token punctuation"(/spanspan class="token keyword"this/spanspan class="token punctuation"./spanrandomspan class="token punctuation"./spanspan class="token function"nextInt/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"%/span span class="token number"99999999/spanspan class="token punctuation")/span span class="token operator"%/span DLQ_NUMS_PER_GROUPspan class="token punctuation";/span

            topicConfig span class="token operator"=/span span class="token keyword"this/spanspan class="token punctuation"./spanbrokerControllerspan class="token punctuation"./spanspan class="token function"getTopicConfigManager/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"createTopicInSendMessageBackMethod/spanspan class="token punctuation"(/spannewTopicspan class="token punctuation",/span
                    DLQ_NUMS_PER_GROUPspan class="token punctuation",/span
                    span class="token class-name"PermName/spanspan class="token punctuation"./spanPERM_WRITEspan class="token punctuation",/span span class="token number"0/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token keyword"if/span span class="token punctuation"(/spanspan class="token keyword"null/span span class="token operator"==/span topicConfigspan class="token punctuation")/span span class="token punctuation"{/span
                responsespan class="token punctuation"./spanspan class="token function"setCode/spanspan class="token punctuation"(/spanspan class="token class-name"ResponseCode/spanspan class="token punctuation"./spanSYSTEM_ERRORspan class="token punctuation")/spanspan class="token punctuation";/span
                responsespan class="token punctuation"./spanspan class="token function"setRemark/spanspan class="token punctuation"(/spanspan class="token string""topic["/span span class="token operator"+/span newTopic span class="token operator"+/span span class="token string""] not exist"/spanspan class="token punctuation")/spanspan class="token punctuation";/span
                span class="token keyword"return/span span class="token class-name"CompletableFuture/spanspan class="token punctuation"./spanspan class="token function"completedFuture/spanspan class="token punctuation"(/spanresponsespan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token punctuation"}/span
        span class="token punctuation"}/span span class="token keyword"else/span span class="token punctuation"{/span
            span class="token keyword"if/span span class="token punctuation"(/spanspan class="token number"0/span span class="token operator"==/span delayLevelspan class="token punctuation")/span span class="token punctuation"{/span
                span class="token comment"// 设置延迟级别为重试消费次数 + 3/span
                delayLevel span class="token operator"=/span span class="token number"3/span span class="token operator"+/span msgExtspan class="token punctuation"./spanspan class="token function"getReconsumeTimes/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation";/span
            span class="token punctuation"}/span
            msgExtspan class="token punctuation"./spanspan class="token function"setDelayTimeLevel/spanspan class="token punctuation"(/spandelayLevelspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token punctuation"}/span

        span class="token comment"// 创建MessageExtBrokerInner,除了Topic、QueueId不同外,其他的都是拷贝原始消息的数据/span
        span class="token class-name"MessageExtBrokerInner/span msgInner span class="token operator"=/span span class="token keyword"new/span span class="token class-name"MessageExtBrokerInner/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setTopic/spanspan class="token punctuation"(/spannewTopicspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setBody/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getBody/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setFlag/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getFlag/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token class-name"MessageAccessor/spanspan class="token punctuation"./spanspan class="token function"setProperties/spanspan class="token punctuation"(/spanmsgInnerspan class="token punctuation",/span msgExtspan class="token punctuation"./spanspan class="token function"getProperties/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setPropertiesString/spanspan class="token punctuation"(/spanspan class="token class-name"MessageDecoder/spanspan class="token punctuation"./spanspan class="token function"messageProperties2String/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getProperties/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setTagsCode/spanspan class="token punctuation"(/spanspan class="token class-name"MessageExtBrokerInner/spanspan class="token punctuation"./spanspan class="token function"tagsString2tagsCode/spanspan class="token punctuation"(/spanspan class="token keyword"null/spanspan class="token punctuation",/span msgExtspan class="token punctuation"./spanspan class="token function"getTags/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span

        msgInnerspan class="token punctuation"./spanspan class="token function"setQueueId/spanspan class="token punctuation"(/spanqueueIdIntspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setSysFlag/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getSysFlag/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setBornTimestamp/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getBornTimestamp/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setBornHost/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getBornHost/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setStoreHost/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getStoreHost/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setReconsumeTimes/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getReconsumeTimes/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator"+/span span class="token number"1/spanspan class="token punctuation")/spanspan class="token punctuation";/span

        span class="token class-name"String/span originMsgId span class="token operator"=/span span class="token class-name"MessageAccessor/spanspan class="token punctuation"./spanspan class="token function"getOriginMessageId/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token class-name"MessageAccessor/spanspan class="token punctuation"./spanspan class="token function"setOriginMessageId/spanspan class="token punctuation"(/spanmsgInnerspan class="token punctuation",/span span class="token class-name"UtilAll/spanspan class="token punctuation"./spanspan class="token function"isBlank/spanspan class="token punctuation"(/spanoriginMsgIdspan class="token punctuation")/span span class="token operator"?/span msgExtspan class="token punctuation"./spanspan class="token function"getMsgId/spanspan class="token punctuation"(/spanspan class="token punctuation")/span span class="token operator":/span originMsgIdspan class="token punctuation")/spanspan class="token punctuation";/span
        msgInnerspan class="token punctuation"./spanspan class="token function"setPropertiesString/spanspan class="token punctuation"(/spanspan class="token class-name"MessageDecoder/spanspan class="token punctuation"./spanspan class="token function"messageProperties2String/spanspan class="token punctuation"(/spanmsgExtspan class="token punctuation"./spanspan class="token function"getProperties/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation")/spanspan class="token punctuation";/span
        
        span class="token comment"// 发送消息/span
        span class="token class-name"CompletableFuture/spanspan class="token generics"span class="token punctuation"/spanspan class="token class-name"PutMessageResult/spanspan class="token punctuation"/span/span putMessageResult span class="token operator"=/span span class="token keyword"this/spanspan class="token punctuation"./spanbrokerControllerspan class="token punctuation"./spanspan class="token function"getMessageStore/spanspan class="token punctuation"(/spanspan class="token punctuation")/spanspan class="token punctuation"./spanspan class="token function"asyncPutMessage/spanspan class="token punctuation"(/spanmsgInnerspan class="token punctuation")/spanspan class="token punctuation";/span
        span class="token keyword"return/span putMessageResultspan class="token punctuation"./spanspan class="token function"thenApply/spanspan class="token punctuation"(/spanspan class="token punctuation"(/spanrspan class="token punctuation")/span span class="token operator"-> {
            if (r != null) {
                switch (r.getPutMessageStatus()) {
                    case PUT_OK:
                        String backTopic = msgExt.getTopic();
                        String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                        if (correctTopic != null) {
                            backTopic = correctTopic;
                        }
                        this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                        response.setCode(ResponseCode.SUCCESS);
                        response.setRemark(null);
                        return response;
                    default:
                        break;
                }
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(r.getPutMessageStatus().name());
                return response;
            }
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("putMessageResult is null");
            return response;
        });
    }

重试消息的重新投递逻辑与延迟消息一致,等待DelayLevel对应的延时之后,Broker会尝试重新进行消息投递。

关于延迟级别的的配置在MessageStoreConfig.messageDelay里,默认配置如下:

this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

可以主动调整每一个延迟级别对应的时间,但仍然有一些缺陷:

  1. 时间精度不够细,最小粒度是1s
  2. 延迟级别的个数是固定的,无法调整



死信队列

RocketMQ里的消息不能无限次重复消费,当重试次数超过所有延迟级别的个数之后,消息就会进入到死信队列里,死信的Topic命名规则为:"%DLQ% " + Consumer组名

// 如果超过最大消费次数或delayLevel 
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
                || delayLevel 
RocketMQ 消息失败重试 解析——图解源码级解析
带有CloudformationWAFRegional的SAMAPI网关 Java经典算法折半查找的原理是什么与怎么实现
相关内容