【RocketMQ学习】13.源码之消息过滤与重试


1 前言

RocketMQ支持消息过滤和重试。对于消息过滤,其支持两种方式:

  • TAG过滤:相当于子topic,对同一topic的消息再次进行过滤,分发至相应的消费者。
  • SQL过滤:支持SQL92标准形式,对于producer传递的属性值进行过滤。SQL中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。语法形式参见基础知识3.1.2节

消费端如果发生消息失败,没有提交成功,消息默认情况下会进入重试队列中。 重试队列的名字其实是跟消费群组有关,不是主题,因为一个主题可以有多个群组消费。

本章将从这两方面对其原理,源码进行更细致地学习。

这里先加点本地调试环境的部署过程,因为自己踩坑了,所以要记录一下。网上很多所谓的教程,不堪大用,参考即可。

1.1 源码获取

这里下载各种版本。我下的RocketMQ版本为:4.8.0。

1.2 环境变量

因为如果不是源码调试,若在本地启动打包好的东西也是需要配置ROCKETMQ_HOME的。所以我这里就采用了环境变量方式,当然也可以在NameServerBroker启动项里配置。如下图:

环境变量ROCKETMQ_HOME配置

1.3 必要目录

在RocketMQ运行主目录中,即环境变量ROCKETMQ_HOME配置的目录下,创建conflogsstore三个文件夹。

我这里配置的环境变量值为:/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release。不要以为版本冲突,只是个目录名称。

1.4 配置文件

RocketMQ distribution部署目录中将broker.conflogback_broker.xmllogback_namesrv.xml文件复制到conf目录中。

1.5 配置修改

logback_namesrv.xmllogback_broker.xml文件中的{user.home}全部修改为环境变量配置的目录。注意,是全部替换掉

然后,修改broker.conf配置,如下,主要是文件目录配置,当然还有开启SQL过滤:

brokerClusterName = DefaultCluster
brokerName = broker-zyxelva
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# nameServer地址,分号分隔
namesrvAddr=172.104.126.76:9876
#SQL过滤
enablePropertyFilter=true
# 存储路径
storePathRootDir=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store
# commitLog 存储路径
storePathCommitLog=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/consumequeue
# 消息索引存储路径
storePathIndex=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/index
# checkpoint 文件存储路径
storeCheckpoint=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/checkpoint
# abort 文件存储路径
abortFile=/Users/zyxelva/Documents/rocketmq-all-4.5.0-bin-release/store/abort

1.6 启动NameServer和Broker

NameServer直接点击启动main()即可,但对于Broker,一定要配置-c,指定broker.conf的路径。不然,就像我一样掉坑里了,始终是默认配置启动的。如1.2节图所示。

在IDEA启动日志下看到如下日志,代表NameServer启动成功:

Connected to the target VM, address: '127.0.0.1:55891', transport: 'socket'
The Name Server boot success. serializeType=JSON

同样,对于Broker启动成功后,会有如下日志输出,特别是有NameServer地址输出,代表大概率启动成功,没有的话,就要看logs下的broker.log日志了:

Connected to the target VM, address: '127.0.0.1:53061', transport: 'socket'
The broker[broker-zyxelva, 172.104.126.76:10911] boot success. serializeType=JSON and name server is 172.104.126.76:9876

2 消息过滤

RocketMQ支持上述两种类型的消息过滤,在源码包中,也有对应的样例。下面,将从这些样例开始,对源码进行追踪。

另外,关于消息过滤,producer就只是把它当作普通消息发送出去,并没有做什么额外的操作。

2.1 TAG过滤

2.1.1 Producer

/**
 * @author zyxelva
 * TAG过滤-生产者
 */
public class TagFilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
        // 指定Namesrv地址信息.
        producer.setNamesrvAddr("106.55.246.66:9876");
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",
                    tags[i % tags.length],
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

2.1.2 Consumer

以下消费者,根据consumer.subscribe("TagFilterTest", "TagA || TAGB || TAGC");可知,其订阅了主题为TagFilterTest,TAG为TagATAGBTAGC的消息。实际生产上,RocketMQ的最佳实践中就建议,使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。

/**
 * @author zyxelva
 * TAG过滤-消费者
 */
public class TagFilterConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("106.55.246.66:9876");
        // 设置要过滤的TAG,多个使用 || 分开
        consumer.subscribe("TagFilterTest", "TagA || TAGB || TAGC");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String msgPro = msg.getProperty("a");

                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

2.2 SQL过滤

若要使用SQL过滤,需要在broker配置中配置enablePropertyFilter=true,重启才会生效。
另外,属性值的命名也不要与内置的参数冲突。参见MessageConst.STRING_HASH_SET静态代码块部分。

2.2.1 Producer

/**
 * @author zyxelva
 * SQL过滤 -消息生产者(加入消息属性)
 */

public class SqlFilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
        // 指定Namesrv地址信息.
        producer.setNamesrvAddr("106.55.246.66:9876");
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("SqlFilterTest",
                    tags[i % tags.length],
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 设置一些属性
            //不仅设置了消息的tag,同时还通过msg.putUserProperty("a", String.valueOf(i));设置了自定义消息参数
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

2.2.2 Consumer

/**
 * @author zyxelva
 * SQL过滤-消费者
 */
public class SqlFilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("106.55.246.66:9876");
        // Don't forget to set enablePropertyFilter=true in broker
        //SQL过滤
        consumer.subscribe("SqlFilterTest",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                        "and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String msgPro = msg.getProperty("a");

                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;

            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

从以上代码来看,consumer会指定过滤规则,告诉broker自己能接收哪些消息,broker从而返回对应的消息。

2.3 源码追踪

生产者怎么发送消息的流程,这里不再赘述,详情可参见【RocketMQ学习】7-源码之Producer一文。

这里主要看消费端和服务端Broker是怎么处理的。

2.3.1 Consumer

2.3.1.1 订阅

Consumer会有两个地方向Broker发起订阅关系上报:

  • consumer.subscribe():路径为DefaultMQPushConsumer#subscribe()DefaultMQPushConsumerImpl#subscribe()MQClientInstance#sendHeartbeatToAllBroker()
  • consumer.start()心跳定时任务:路径为DefaultMQPushConsumer#start()DefaultMQPushConsumerImpl#start()MQClientInstance#start()MQClientInstance#startScheduledTask()
    //MQClientInstance
    private void startScheduledTask() {
    //定时任务二:定时向broker发送心跳,间隔30s
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.cleanOfflineBroker();
                        //发送心跳
                        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                    } catch (Exception e) {
                        log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                    }
                }
            }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    }

本质上,都是通过发送心跳给broker进行的。方法为MQClientInstance#sendHeartbeatToAllBroker().

//MQClientInstance
public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                //发送心跳
                this.sendHeartbeatToAllBroker();
                //已废弃
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed. [{}]", this.clientId);
        }
    }

心跳流程如下图所示:

消费者发送心跳至Broker流程

2.3.1.2 拉取

源码之Consumer2.1节 拉取中,了解了pullMessageService的作用就是拉取broker端的消息的线程服务。

这里,先不详聊,先看看Broker是怎样处理消息的。等这个跟完,再来继续消费端的消息过滤流程。

2.3.2 Broker

2.3.2.1 消息接收与存储

直接定位到消息发送的接口处,DefaultMQProducerImpl#sendKernelImpl()

//DefaultMQProducerImpl
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
        byte[] prevBody = msg.getBody();
        try {
            //省略...
            //组装请求头,包含消息过滤的属性
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setFlag(msg.getFlag());
            //设置属性信息(tag也在里边)
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            SendResult sendResult = null;
            switch (communicationMode) {
                //省略...
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            return sendResult;
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

怎么发送的,这里不详细罗列了。最终,通过NettyRemotingClient发送出去,Broker接收,请求代号:SEND_MESSAGE。Broker端处理器为SendMessageProcessor。接下来就是消息存储,包括Commitlog,ConsumeQueue,IndexFile相关的了,可参考【RocketMQ学习】9.源码之Store一文。

2.3.2.2 消息订阅关系维护

这里,直接通过请求代号为心跳的码跟进下,broker处理Consumer的心跳逻辑。请求代号:HEART_BEAT.

定位到BrokerController#registerProcessor()。得到处理心跳的处理器为ClientManageProcessor,顺便也把Broker处理消息的处理器列出,方便后续消息过滤处理流程的跟进,处理器为PullMessageProcessor,请求代号:PULL_MESSAGE

/**
 * ClientManageProcessor
 */
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
 * PullMessageProcessor
 */
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);

这里注册了ClientManageProcessor,Consumer发送心跳时会经由其processRequest处理:

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT:
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT:
                return this.unregisterClient(ctx, request);
            case RequestCode.CHECK_CLIENT_CONFIG:
                return this.checkClientConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
}

最终会调用ConsumerFilterManagerregister将订阅信息存储起来,这里只会保存SQL过滤的相关信息:
//ConsumerFilterManager
/**
     * 将非TAG过滤的信息缓存
     * @param topic 主题
     * @param consumerGroup 消费者组名
     * @param expression SQL表达式
     * @param type 过滤类型 TAG or SQL
     * @param clientVersion
     * @return success?
     */
    public boolean register(final String topic, final String consumerGroup, final String expression,
        final String type, final long clientVersion) {
        //TAG过滤,则不用注册消息过滤,这里主要针对SQL过滤的
        if (ExpressionType.isTagType(type)) {
            return false;
        }

        if (expression == null || expression.length() == 0) {
            return false;
        }

        FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);

        if (filterDataMapByTopic == null) {
            FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
            FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
            filterDataMapByTopic = prev != null ? prev : temp;
        }

        BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);

        return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
    }

broker接收心跳并存储订阅信息流程如下:

Broker处理心跳并保存订阅信息流程

2.3.2.3 Broker消息过滤

Consumer通过pullMessageService向Broker发起拉取消息的请求。PullMessageService在讲Consumer的时候提到过,它继承自ServiceThread,会执行消息的拉取任务。

public class PullMessageService extends ServiceThread {
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

    private void pullMessage(final PullRequest pullRequest) {
        //根据消费者组名,选择一个消费者
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            //执行拉取请求:立即执行、延迟执行
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
}

接着调用DefaultMQPushConsumerImpl#pullMessage

//DefaultMQPushConsumerImpl
public void pullMessage(final PullRequest pullRequest) {
	//省略...
        // 回调逻辑
    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        } else {
                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);
                        }
                        break;
                    //省略
                    default:
                        break;
                }
            }
        }
    };
    String subExpression = null;
    SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    try {
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(),
            subExpression,
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),
            pullRequest.getNextOffset(),
            this.defaultMQPushConsumer.getPullBatchSize(),
            sysFlag,
            commitOffsetValue,
            BROKER_SUSPEND_MAX_TIME_MILLIS,
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
            CommunicationMode.ASYNC,
            pullCallback
        );
    } catch (Exception e) {}
}

大致流程为:

  • 先定义了从broker拉到消息回调逻辑
  • 然后调用PullAPIWrapper#pullKernelImpl拉取消息并触发回调逻辑
  • 将远程拉到的消息放入本地队列提供消费,consumeMessageService.submitConsumeRequest会触发本地消费逻辑,最终会调用到我们定义的MessageListener

上面是Consumer发起拉取消息的请求过程,接下来就该Broker收到这个请求后的处理了。

拉取消息发送的请求代号是PULL_MESSAGE,上面我们也提到了PULL_MESSAGE对应的处理器是PullMessageProcessor

Broker处理拉取消息请求时会调用PullMessageProcessor#processRequest方法:

//PullMessageProcessor
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
    throws RemotingCommandException {
    //省略...
    MessageFilter messageFilter;
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    } else {
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    }
    final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    //省略...
}

构造消息过滤器并获取消息,调用的是DefaultMessageStore#getMessage(),这里过滤过程分为两步:

  • ConsumeQueue中的TAG过滤:根据offset获取对应的consumerQueue,为啥?想想consumerQueue条目的结构:
    消息条目示意图
    过滤时会调用传入的ExpressionMessageFilterisMatchedByConsumeQueue方法,该方法就是根据tagCode过滤消息的(不止这个功能,还有位图过滤):
    //ExpressionMessageFilter
        @Override
        public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
            if (null == subscriptionData) {
                return true;
            }
    
            if (subscriptionData.isClassFilterMode()) {
                return true;
            }
    
            // by tags code.
            //tag过滤,这里只判断tag的hashCode是否相等,但不同tag的hashCode可能相等,真正的tag过滤是在consumer中进行的。
            if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
    
                if (tagsCode == null) {
                    return true;
                }
    
                if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
                    return true;
                }
    
                return subscriptionData.getCodeSet().contains(tagsCode.intValue());
            }
            else {
                //省略
            }
    }
    Broker端TAG过滤
  • SQL过滤
    而后,调用ExpressionMessageFilter#isMatchedByCommitLog()进行SQL过滤
    //ExpressionMessageFilter
    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
        // 省略一些内容
        ...
    
        Object ret = null;
        try {
            MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
            // 处理值,即执行SQL
            ret = realFilterData.getCompiledExpression().evaluate(context);
        } catch (Throwable e) {
            log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
        }
    
        log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
    
        if (ret == null || !(ret instanceof Boolean)) {
            return false;
        }
    
        return (Boolean) ret;
    }

realFilterData就包含Consumer样例中的SQL:

(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)

SQL过滤debug

2.3.2.4 Consumer消息过滤

接下来就是Consumer处理消息过滤了,就是上节发起前定义的消息回调方法:

//DefaultMQPushConsumerImpl
public void pullMessage(final PullRequest pullRequest) {
       //...省略
       // 消息拉取的回调函数,在拉取到消息后会进入这个方法处理
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    // 处理消息,将二制消息解码为java对象,也会对消息进行tag过滤
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                        pullRequest.getMessageQueue(), pullResult, subscriptionData);
        //...省略
                }
        //...省略
            }
        //...省略
        }
   }

跟进PullAPIWrapper#processPullResult方法:
//PullAPIWrapper
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) {
    PullResultExt pullResultExt = (PullResultExt) pullResult;

    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
    if (PullStatus.FOUND == pullResult.getPullStatus()) {
        // 将二进制数据解码为对象
        ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

        List<MessageExt> msgListFilterAgain = msgList;
        // 按 TAG 过滤
        if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
            msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
            for (MessageExt msg : msgList) {
                if (msg.getTags() != null) {
                    // 根据tag过滤消息
                    if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                        msgListFilterAgain.add(msg);
                    }
                }
            }
        }
        //...省略
    }
        //...省略
}

从代码来看,方法中会根据TAG是否在TagsSet中来决定该消息是否需要加入msgListFilterAgain,而msgListFilterAgain就是过滤的消息列表了。

Consumer端TAG过滤

2.4 总结

RocketMq消息过滤支持TAGSQL两种方式,

2.4.1 TAG 方式

在broker获取消息时,根据TAG的hashCode过滤一波消息,但这样得到的消息可能并不只是指定TAG的,因此需要在consumer上做进一步的过滤。

举例来说,consumer订阅了TAG为tag1的消息,tag1与tag11两者的hashCode都是100,因此在broker上过滤时,根据TAG的hashCode,这两者对应的消息都会发往consumer,因此consumer需要再进比较TAG的值,过滤出真正需要的消息。

2.4.2 SQL 方式

SQL方式的过滤方式,只在Broker中进行。

3 消息重试

3.1 Producer重试

Producer对于one-way发送的消息不会有重试或重投的动作。对于同步消息、异步消息发送,Producer有几个参数配置:

  • retryTimesWhenSendFailed同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingExceptionMQClientException和部分MQBrokerException时会重投。
  • retryTimesWhenSendAsyncFailed异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

可以在创建Producer实例后,进行配置:

DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);

3.2 Consumer重试

Apache RocketMQ 有两种消费模式:集群消费模式广播消费模式

  • 消息重试只针对集群消费模式生效
  • 广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,只会以警告日志的形式记录消费失败的消息,继续消费新的消息。

同时RocketMQ Push消费提供了两种消费方式:并发消费顺序消费

顺序消费和并发消费的重试机制并不相同

  • 顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,
  • 而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。

在哪儿订阅的?DefaultMQPushConsumerImpl#start()启动时,会调用copySubscription(),该方法的作用有:

  • 将DefaultMQPushConsumer的订阅信息copy到RebalanceService中;
  • 如果是cluster模式,订阅了topic,则自动订阅 %RETRY%+ConsumerGroupName的topic。
//DefaultMQPushConsumerImpl
    /**
     * 拷贝订阅信息至RebalanceService中,包括重试的
     *
     * @throws MQClientException 异常
     */
    private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            //1.将DefaultMQPushConsumer的订阅信息copy到RebalanceService中
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString);
                    //2.订阅信息保存至RebalanceService中
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    //3.Push模式集群消费下,每个消费者组都会自动订阅一个用于重试的topic:%RETRY%+ConsumerGroupName
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL);
                    //4.重试topic相关的订阅信息保存至RebalanceService中
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

两者参数差别如下:

消费类型重试间隔最大重试次数
顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX
并发消费间隔时间根据重试次数阶梯变化,取值范围:10秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值

TCP协议并发消费(无序消息)重试间隔1如下:

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110秒97分钟
230秒108分钟
31分钟119分钟
42分钟1210分钟
53分钟1320分钟
64分钟1430分钟
75分钟151小时
86分钟162小时
  • 若最大重试次数小于等于16次,则每次重试的间隔时间会阶梯变化,具体时间,请参见上表TCP协议无序消息重试间隔
  • 若最大重试次数大于16次,则超过16次的间隔时间均为2小时

我们通过顺序消费和并发消费的状态,也可看出RocketMQ对于消息重试的处理方式:

//顺序消费,监听器的返回状态码
public enum ConsumeOrderlyStatus {
    /**
     * Success consumption
     */
    SUCCESS,
    /**
     * Rollback consumption(only for binlog consumption)
     */
    @Deprecated
    ROLLBACK,
    /**
     * Commit offset(only for binlog consumption)
     */
    @Deprecated
    COMMIT,
    /**
     * Suspend current queue a moment
     */
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

//并发消费,监听器的返回状态码
public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}

3.2.1 并发消费失败重试

我们从消费者监听往回看,Consumer从哪里处理这种失败的逻辑。以2.1.2节的Consumer为例。

监听调用者

从图中可以看出,调用监听处理消息消费结果的地方有两处:

  • consumeMessageDirectly:这个是被DefaultMQAdminExtImpl调用的,从名字我猜是给监控平台调用的,比如Console。读者有知道的可以告知留言。
  • ConsumeRequest#run()ConsumeMessageConcurrentlyService内部线程类,专用于处理消息结果,最终要发回Broker。

跟踪下ConsumeRequest怎么处理消息结果的,当然向后肯定是给Broker发信息处理,这里先看看是哪里调用的。

既然ConsumeRequest是个线程类,那就要看是哪里提交了线程任务,要么是定时处理,要么是提交任务队列。RocketMQ是通过提交线程队列处理的。

还记得之前的消费者回调吗?对的,就是2.3.2.3节的那个PullCallback。这个方法老长了。我们截图把重要的消费保留下来:

消费回调

消费回调是在消费者拉取到消息之后,对结果的一种处理。我们可以追踪下,PullCallback真正调用的地方,其传递路径为:
PullAPIWrapper#pullKernelImpl()MQClientAPIImpl#pullMessage()MQClientAPIImpl#pullMessageAsync()

消费回调调用处

回到提交线程队列的话题,最终ConsumeRequest是提交到一个任务队列中,这里当然要成功拉取到消息。对于拉取不成功的情况,RocketMQ会进行重新拉取。

拉取失败重新发起拉取消息请求

对于拉取成功的消息,进行消费、结果反馈。

提交消费结果

ConsumeRequest既然是个线程类,那就看看其run()方法咯:

class ConsumeRequest implements Runnable {
    //...........省略

    @Override
    public void run() {
        if (this.processQueue.isDropped()) {
            log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
            return;
        }
        //获取消费者定义的监听器
        MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
        ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
        ConsumeConcurrentlyStatus status = null;
        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

        ConsumeMessageContext consumeMessageContext = null;
        //钩子
        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
            consumeMessageContext.setProps(new HashMap<String, String>());
            consumeMessageContext.setMq(messageQueue);
            consumeMessageContext.setMsgList(msgs);
            consumeMessageContext.setSuccess(false);
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
        }

        long beginTimestamp = System.currentTimeMillis();
        boolean hasException = false;
        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
        try {
            if (msgs != null && !msgs.isEmpty()) {
                for (MessageExt msg : msgs) {
                    MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                }
            }
            //消费,这里会返回消费结果码,ConsumeConcurrentlyStatus
            status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } catch (Throwable e) {
            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
            hasException = true;
        }
        long consumeRT = System.currentTimeMillis() - beginTimestamp;
        if (null == status) {
            if (hasException) {
                returnType = ConsumeReturnType.EXCEPTION;
            } else {
                returnType = ConsumeReturnType.RETURNNULL;
            }
        } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
            returnType = ConsumeReturnType.TIME_OUT;
        } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
            returnType = ConsumeReturnType.FAILED;
        } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
            returnType = ConsumeReturnType.SUCCESS;
        }

        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
        }

        if (null == status) {
            log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
            status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext.setStatus(status.toString());
            consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
        }

        ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

        if (!processQueue.isDropped()) {
            //向Broker反馈消费结果
            ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
        } else {
            log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
        }
    }
    //.........省略
}

首先就是获取消费者定义的监听器,就是2.1.2节的Consumer注册的那个监听。然后执行完消费,就是把重要的消费结果status返回。这个消费结果status会给到Broker。这个方法就是processConsumeResult

//向Broker反馈消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty()) {
            return;
        }
        switch (status) {
            // 1、消费成功,注意这个ackIndex,对于是否重试至关重要,其实还是消费结果status引起的
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            //2.消费重试,ackIndex = -1
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            //3.broadcast模式,处理失败,不做处理,只会打印日志
            case BROADCASTING:
                //4.广播模式:如果消费结果是 ackIndex=-1就会执行循环,可以看到只是打印日志,没有其它多余的操作
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                //5.集群消费模式:RECONSUME_LATER 时,消费失败,ackIndex = -1,下面的循环才会执行,因为CONSUME_SUCCESS的结果中,ackIndex = consumeRequest.getMsgs().size() - 1
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    //6.Cluster模式,将消息发回broker重新发送(重试)
                    //能到这里说明是 RECONSUME_LATER 状态:回退Msg到Broker,也就是ACK(重试)
                    boolean result = this.sendMessageBack(msg, context);
                    //7.ACK 可能会失败,需要记录失败的ACK
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
                //8.存在ACK 失败的消息,将消息丢到线程池延迟 5s 重新消费
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    //9.发回broker失败,则再次尝试本地消费
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }
        //将消费前缓存的消息清除
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        // 10.更新消费的偏移量:注意这里 CONSUME_SUCCESS 和 RECONSUME_LATER 都会更新
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

根据上面源码我们可以得出以下结论:

  • 由第4步我们可知:广播模式就算消费者消费失败,也不会进行重试,只是打印警告日志
  • 只有消费失败(没有返回 CONSUME_SUCCESS 都成为失败)的消息才需要发送ACK重试;
  • 如果ACK失败,(《RocketMQ技术内幕》中称为ACK失败),即重试失败,就会继续被延迟5s重新消费(又会回调到Consumer中的回调方法);
  • 消息被消费成功、失败,都会更新Consumer 的偏移量
  • ConsumeMessageConcurrentlyService.sendMessageBack:准备将结果发回Broker。
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
        //延迟级别,默认为0
        int delayLevel = context.getDelayLevelWhenNextConsume();

        // Wrap topic with namespace before sending back message.
        msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
        try {
            //先发CONSUMER_SEND_MSG_BACK命令,让broker重发;失败,则通过topic为%RETRY%+consumerGroup的消息发给broker处理
            log.info("=======delayLevel: {}", delayLevel);
            this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
            return true;
        } catch (Exception e) {
            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
        }

        return false;
    }

sendMessageBack会有两个动作:

  • 先向Broker发送请求代号为CONSUMER_SEND_MSG_BACK的请求,让Broker去处理,后面会详细说明具体干了啥事。
  • 如果这个请求发送异常,则将消息重新组装,向topic为%RETRY%ConsumerGroupName发送消息,让Broker延迟处理重试。

这里就是3.2节开头说的:

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。

提交Broker重试

3.2.2 顺序消费失败重试

对于顺序消息消费失败,需要返回的状态是SUSPEND_CURRENT_QUEUE_A_MOMENT,意思就是暂停一下,等会儿再试。

与并发消费下的消息处理不同主要在于ConsumeRequest的处理。这里不列出源码了。读者自己可以去跟一跟。

3.3 Broker重试处理

以并发消费,即无序消息处理为例。broker在收到消费者发出的请求代号为CONSUMER_SEND_MSG_BACK后,着手处理消息重试逻辑。

处理这个请求的处理器为SendMessageProcessor

public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            //处理客户端对于失败的消息进行重发
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }

SendMessageProcessor#asyncConsumerSendMsgBack():

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
                                                                        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        //........省略
        //1.新topic:%RETRY%+consumerGroupName
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        // 2.重试队列:1
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }
        //........省略
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        
        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
        if (null == msgExt) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
            return CompletableFuture.completedFuture(response);
        }

        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (null == retryTopic) {
            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
        }
        msgExt.setWaitStoreMsgOK(false);
        //3.延迟级别,初始为0
        int delayLevel = requestHeader.getDelayLevel();
        //4.最大重试次数,最大值 16次
        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
        }
        //5.超过重试次数,死信队列
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {
            //死信队列topic:%DLQ%+consumerGroupName
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0);
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return CompletableFuture.completedFuture(response);
            }
        } else {
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }
            msgExt.setDelayTimeLevel(delayLevel);
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        //重试次数+1。新消息被消费者消费时就会传上来,到第5步进行比较
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
        //6.作为新消息存到CommitLog中,originMsgId会落地的
        CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        return putMessageResult.thenApply((r) -> {
            if (r != null) {
                switch (r.getPutMessageStatus()) {
                    case PUT_OK:
                        String backTopic = msgExt.getTopic();
                        String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                        if (correctTopic != null) {
                            backTopic = correctTopic;
                        }
                        this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                        response.setCode(ResponseCode.SUCCESS);
                        response.setRemark(null);
                        return response;
                    default:
                        break;
                }
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(r.getPutMessageStatus().name());
                return response;
            }
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("putMessageResult is null");
            return response;
        });
    }
}

主要做了以下几个事:

  • 更消息的 Topic 为 "%RETRY%"+ ConsumerGroupName,计算queueId(重试队列,队列数为1);
  • 如果消息重试 >= 16次(默认)。继续更改消息的Topic 为死信队列的Topic:"%DLQ%" + ConsumerGroupName,消费队列为1(死信队列只有一个消费队列);
  • 如果没有变成死信,计算消息的延迟级别;
  • 复制原来Msg,重新生成一个Msg,将新Msg丢给BrokerController中,然后存到CommitLog中进行存储,这里原始的消息ID(originMsgId)也会落地。

3.4 死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。

此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%+ConsumerGroupName死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。

总结就是:

  • Broker中单独的一个队列(DLQ),该队列存储了Consumer端重试16次后都没成功消费的消息;
  • DLQ队列:只有写权限,没有读权限。所以是不能被Consumer重新消费的,只能进行人工干预,重新投递(Rocket-MQ-Console 中可以操作);
  • DLQ队列中,该消息的TOPIC重新被命名为: "%DLQ%" + ConsumerGroupName
  • DLQ队列其实就是(consumequeue文件夹的"%DLQ%" + ConsumerGroupName 命名的Topic文件夹下的队列)。

4 注意事项

  • 一条消息无论重试多少次,这些重试消息的Message ID都不会改变。
  • 消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
  • 死信消息不能被Consumer重新消费,只能进行人工干预,重新投递。

5 参考

1 消息重试间隔


文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录