【RocketMQ学习】7-源码之Producer


1 Producer样例

public class TestProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();
        for (int i = 0; i < 1; i++) {
            try {
                {
                    Message msg = new Message("TopicTest1", "TagA", "key113", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);

                    QueryResult queryMessage = producer.queryMessage("TopicTest1", "key113", 10, 0, System.currentTimeMillis());
                    for (MessageExt m : queryMessage.getMessageList()) {
                        System.out.printf("%s%n", m);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

2 启动流程

启动开始从producer.start()开始。

//DefaultMQProducer#start
@Override
    public void start() throws MQClientException {
        //注入生产者组
        this.setProducerGroup(withNamespace(this.producerGroup));
        //采用默认的DefaultMQProducerImpl启动
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

DefaultMQProducerImpl中的start()方法是生产者启动的核心方法,包括检查获取MQ ClientInstance实例启动三个步骤。

public class DefaultMQProducerImpl implements MQProducerInner {
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            //1.第一次一定走这里,因为serviceState默认就是CREATE_JUST状态
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                //2.检查生产者组是否满足要求
                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                //3.获取或创建MQClientInstance
                //单例MQClientManager
                //同一个clientId对应同一个MQClientInstance
                //MQClientInstance 封装了RocketMQ网络处理的API,是消息生产者与消费者同NameServer、Broker打交道的唯一途径
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //4.注册当前生产者至MQClientInstance管理之中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                }

                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                //启动
                if (startFactory) {
                    //调用MQClientInstance的API
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                        this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
            default:
                break;
        }

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
}

2.1 检查

private void checkConfig() throws MQClientException {
        //1.ProducerGroup名称是否为空、是否超过255、是否包含了除%,字母,数字,横线,下划线,空串以外的字符
        Validators.checkGroup(this.defaultMQProducer.getProducerGroup());

        //2.是否为空,这里重复判断了,bug
        if(null==this.defaultMQProducer.getProducerGroup()){
        throw new MQClientException("producerGroup is null",null);
        }

        //3.是否等于默认名称
        if(this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)){
        throw new MQClientException("producerGroup can not equal "+MixAll.DEFAULT_PRODUCER_GROUP+", please specify another one.",
        null);
        }
}

2.2 获取MQ ClientInstance实例

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    //1.创建clientId
    String clientId = clientConfig.buildMQClientId();
    //2.若已创建过实例,则直接取,否则,重新创建。key为clientID
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

这里,clientIDMQClientInstance一一对应,保存在一个ConcurrentHashMap当中。

//MQClientManager
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();

另外,创建完MQClientInstance实例,还要把该Producer注册进实例当中,key生产者组名

//4.注册当前生产者至MQClientInstance管理之中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

2.3 启动

通过MQClientInstance启动。

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                //mQClientAPIImpl其实是个NettyRemotingClient代理
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

3 消息发送流程

DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

我们从开头给的例子中send()方法出发。

SendResult sendResult = producer.send(msg);

依次经过DefaultMQProducer#sendDefaultMQProducerImpl#send()DefaultMQProducerImpl#send()DefaultMQProducerImpl#sendDefaultImpl()

主要也是三个步骤:

  • 验证消息
  • 查找路由
  • 选择队列
  • 消息发送
public class DefaultMQProducerImpl  implements MQProducerInner {
    private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        //1. 验证消息
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //2. 查找路由
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //3. 选择队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        //4. 消息发送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    }catch (Exception e){
                        //.......................省略
                    }
                } else {
                    break;
                }
            }
            //.......................省略
        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }
}

3.1 验证消息

public class Validators {
    public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
        //1. 是否为空消息
        if (null == msg) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
        }
        // 2. 验证主题合法性
        Validators.checkTopic(msg.getTopic());
        //3. 该主题是否存在于不允许客户端发送消息的主题集合当中
        Validators.isNotAllowedSendTopic(msg.getTopic());

        // 4. 消息本身合法性验证,包括是否为空,长度是否为0,是否超长
        if (null == msg.getBody()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
        }

        if (0 == msg.getBody().length) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
        }

        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                    "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
        }
    }
}

3.2 查找路由

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //1. 本地缓存中是否有该主题的路由信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //2. 没有,则从NameServer中拉取并缓存
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //这里面会更新 TopicPublishInfo
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

3.3 选择队列

public class TopicPublishInfo {
    //默认不启用Broker故障延迟机制
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //第一次选择队列(非重试)
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //sendWhichQueue
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.getAndIncrement();
                //遍历消息队列集合
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0) {
                    pos = 0;
                }
                //规避上次Broker队列
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //如果以上情况都不满足,返回sendWhichQueue取模后的队列
            return selectOneMessageQueue();
        }
    }

    //第一次选择队列
    public MessageQueue selectOneMessageQueue() {
        //sendWhichQueue自增
        int index = this.sendWhichQueue.getAndIncrement();
        //对队列大小取模
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0) {
            pos = 0;
        }
        //返回对应的队列
        return this.messageQueueList.get(pos);
    }
}

3.3.1 选择策略

3.3.1.1 默认策略-轮询

采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀

默认走轮询

代码见3.3节:selectOneMessageQueue()

有一个注意的地方就是计数器使用了线程的ThreadLocal

private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

public class ThreadLocalIndex {
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
        //..............
    }

    @Override
    public String toString() {
        //..............
    }
}

因为本身消息的生产就可以多线程进行,所以当然要基于线程的上下文来计数递增。

3.3.1.2 带有规避机制的策略

默认的投递方式比较简单,但是也暴露了一个问题:

有些Queue队列可能由于自身数量积压等原因,可能在投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。

基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些Queue队列投递的速度快。

在这种场景下,会优先使用消息投递延迟最小的策略,如果没有生效,再使用Queue队列轮询的方式。

统计一下消息投递的时间延迟:org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem的实现。

//MQFaultStrategy
public class MQFaultStrategy {
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            //isolation为false,则结果为实际的时长、否则为默认的30s
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    /**
     * @param currentLatency 延迟时长
     * @return 不可用时长
     * @description 发送延时latencyMax:50L,100L,550L,1000L,2000L,3000L,15000L
     * 故障规避notAvailableDuration:0L,0L,30000L,60000L,120000L,180000L,600000L
     * 假如:消息发送时长为100ms,则mq认为broker不可用的时长为0ms;
     * 假如:消息发送时长为600ms,则mq认为broker不可用的时长为30000ms;
     * 假如:消息发送时长为4000ms,则mq认为broker不可用的时长为180000ms;
     */
    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i]) {
                return this.notAvailableDuration[i];
            }
        }
        return 0;
    }
}

更新消息投递延迟

这里的操作大概如下:

  • 根据消息发送时长(currentLatency),计算broker不可用时长(duration),即如果消息发送时间越久,mq会认为broker不可用的时长越久,broker不可用时长是个经验值,如果传入isolation为true,表示默认当前发送时长为30000ms,即broker不可用时长为600000ms。

    为啥broker不可用是600s?
    发送延时latencyMax数组:50L,100L,550L,1000L,2000L,3000L,15000L
    故障规避notAvailableDuration数组:0L,0L,30000L,60000L,120000L,180000L,600000L,根据代码可得:

    • 假如:消息发送时长为100ms,则mq认为broker不可用的时长为0ms;
    • 假如:消息发送时长为600ms,则mq认为broker不可用的时长为30000ms;
    • 假如:消息发送时长为4000ms,则mq认为broker不可用的时长为180000ms;
    • 现在,消息发送市场为30000ms,则对应的broker不可用时长为600000ms,即600s10分钟
  • 调用latencyFaultTolerance.updateFaultItem更新broker异常容错信息。

这个方法最终会往一个ConcurrentHashMap表中写每台broker的延时、keybrokerNamevalueFaultItem,包含broker的发送时长恢复可用的时间点

broker的预计恢复正常时间为:当前时间 + 不可用时长,即System.currentTimeMillis() + notAvailableDuration

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
    //缓存broker对应的发送时长、恢复可用的时间点
    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);

    private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();

    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            //broker 恢复可用的时间点
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            //保存每台broker的延时、key是brokerName,value是currentLatency(延时)。
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
}

因此LatencyFaultToleranceImpl#isAvailable判断broker是否预计可用的实现也很清晰了:

只要当前时间>startTimestamp,即表示该broker正常了(逻辑意义上的正常,预计broker会在这个时间点后恢复正常)。

故障规避启用后的broker队列选择逻辑
//LatencyFaultToleranceImpl
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        return faultItem.isAvailable();
    }
    return true;
}

//FaultItem
public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

总结下带有故障规避策略的整体实现思路

  • 在消息发送失败,mq根据消息发送耗时来预测该broker不可用的时长,并将broker名称发送时长预计恢复时间点,存储于ConcurrentHashMap类型的faultItemTable中;
  • 在开启消息容错后,选择消息队列时,会根据当前时间与FaultItem中该broker的预计恢复时间点做比较,若(System.currentTimeMillis() - startTimestamp) >= 0,则预计该broker恢复正常,选择该broker的消息队列;
  • 若所有的broker都预计不可用,随机选择一个不可用的broker,从路由信息中选择下一个消息队列,重置其brokerName,queueId,进行消息发送。

3.3.1.3 选择队列策略的对比

默认队列选择机制下,会随机选择一个MessageQueue,若发送失败,轮询队列重新进行重试发送(屏蔽单次发送中不可用的broker),同步模式下默认失败时重试发送2次,但它的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。

开启故障延迟机制后,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用

这两个策略没有绝对的好与坏,个人认为,如果在工作中选择,应该是看网络环境服务器的环境

  • 如果是网络和服务器环境比较好,那么我推荐默认策略,毕竟重试的次数和几率比较小。
  • 如果是网络和服务器环境压力比较大,推荐使用故障延迟机制

3.4 消息发送

会经历很多方法,但最终还是会调用NettyRemotingClient中的发送方法,选择同步发送异步发送oneway发送其中之一。2.3节提到过,mQClientAPIImpl其实是个NettyRemotingClient代理。前者封装了很多API供生产者和消费者调用。


文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录