【RocketMQ学习】2.玩转各种消息


1 前言

上一章节,主要介绍了RocketMQ的物理架构和逻辑概念。这一章主要列举其中的消息类型以及各自的应用场景,举例。

2 普通消息

2.1 发送

RocketMQ支持以下三种方式发送一条MQ:同步发送、异步发送、单向发送。各自优劣见下表:

发送方式发送TPS发送结果反馈可靠性适用场景
同步发送不丢失重要通知邮件、报名短信通知、营销短信系统等
异步可靠发送不丢失用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等
单向发送最快可能丢失适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
三种发送方式

2.2 消费

一般RocketMQ默认消费模式是集群消费,另外还可选广播消费模式。

2.2.1 集群消费

上一章节,提到了RocketMQ官方建议的消息订阅一致性要求。如果符合这个要求,相当于在集群消费模式下,对于同一个消费者组的相同Topic,相同Tag的消息,做到了消费端的负载均衡,该条消息只会被RocketMQ投递到一个Consumer Group下面的一个实例。且消费进度(Consumer Offset)的存储会持久化到Broker

实际上,每个 Consumer 是平均分摊 Message Queue 的去做拉取消费。例如某个 Topic3 条 Q,其中一个 Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 1 条 Q

集群消费模式

例:

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please rename _unique_group_name")
        producer.setNamesrvAddr("'localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message(
                    "TopicTest",
                    "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消费结果:

"C: \Program Files\Java\jdk1.8.0_101 \bin\java.exe"
15:11:52.433 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueld=1, storeSize=201, queueOffset=17
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=201, queueOffset=17
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=201, queueOffset=17
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=201, queueOffset=17

2.2.2 广播消息

消息将对一个Consumer Group下的各个Consumer 实例都投递一遍。即即使这些Consumer 属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer 都消费一次

消费进度(Consumer Offset)会存储持久化到实例本地。同时,出现重复的概率稍大于集群模式。另外,广播模式下服务端不维护消费进度,所以消息队列RocketMQ 控制台不支持消息堆积查询消息堆积报警订阅关系查询功能。

广播消费模式

例:

public class BroadcastComsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("localhost: 9876");
        //Topic
        consumer.subscribe("TopicTest", "*");
        //广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @override
            public ConsumeConcurrentlyStatufconsumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息吉
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

3 顺序消息

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的 queue(分区队列);
而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。

  • 当发送和消费参与的 queue 只有一个,则是全局有序;
  • 如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。
    全局顺序消息
    部分顺序消息
public class ProducerInOrder {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("106.55.246.66:9876");//106.55.246.66
        producer.start();

        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        // 订单列表
        List<Order> orderList = new ProducerInOrder().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < orderList.size(); i++) {
            // 加个时间前缀
            String body = dateStr + " Order:" + orderList.get(i);
            Message msg = new Message("PartOrder", tags[i % tags.length], "KEY" + i, body.getBytes());

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //根据订单id选择发送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//订单id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer.shutdown();
    }

    /**
     * 订单
     */
    private static class Order {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "Order{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模拟订单数据  3个订单   每个订单4个状态
     */
    private List<Order> buildOrders() {
        List<Order> orderList = new ArrayList<Order>();

        Order orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

结果:

SendResult status: SEND_OK, queueId:1, body:2021-04-06 16:21:05 Order: Order{orderId=20210406001, desc='创建'}
SendResult status: SEND_OK, queueId:2, body:2021-04-06 16:21:05 Order: Order{orderId=20210406002, desc='创建'}
SendResult status: SEND_OK, queueId:1, body:2021-04-06 16:21:05 Order: Order{orderId=20210406001, desc='付款'}
SendResult status: SEND_OK, queueId:3, body:2021-04-06 16:21:05 Order: Order{orderId=20210406003, desc='创建'}
SendResult status: SEND_OK, queueId:2, body:2021-04-06 16:21:05 Order: Order{orderId=20210406002, desc='付款'}
SendResult status: SEND_OK, queueId:3, body:2021-04-06 16:21:05 Order: Order{orderId=20210406003, desc='付款'}
SendResult status: SEND_OK, queueId:2, body:2021-04-06 16:21:05 Order: Order{orderId=20210406002, desc='推送'}
SendResult status: SEND_OK, queueId:3, body:2021-04-06 16:21:05 Order: Order{orderId=20210406003, desc='推送'}
SendResult status: SEND_OK, queueId:2, body:2021-04-06 16:21:05 Order: Order{orderId=20210406002, desc='完成'}
SendResult status: SEND_OK, queueId:1, body:2021-04-06 16:21:05 Order: Order{orderId=20210406001, desc='推送'}
SendResult status: SEND_OK, queueId:3, body:2021-04-06 16:21:05 Order: Order{orderId=20210406003, desc='完成'}
SendResult status: SEND_OK, queueId:1, body:2021-04-06 16:21:05 Order: Order{orderId=20210406001, desc='完成'}

注意:消息消费失败时,不能返回 reconsume_later, 而是suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。

4 延迟消息

Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息

4.1 适用场景

消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

注意,Apache RocketMQ 目前只支持固定精度的定时消息,分为18个等级:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,默认等级为3,即10s
阿里云 RocketMQ(ONS) 提供了任意时刻的定时消息功能。(要用啊,给钱买吧!)

4.2 举例

4.2.1 生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

/**
 * @author zyx
 * 延时消息-生产者
 */
public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 实例化一个生产者来产生延时消息
        DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("106.55.246.66:9876");
        // 启动Producer实例
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
            // 设置延时等级3,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
            // delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            message.setDelayTimeLevel(4);
            // 发送消息
            producer.send(message);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

4.2.2 消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author zyx
 * 延时消息-消费者
 */
public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("106.55.246.66:9876");
        // 订阅Topics
        consumer.subscribe("ScheduledTopic", "*");
        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (message.getStoreTimestamp() - message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

结果:

"C:\Program Files\Java\jdk1.8.0_101\bin\java.exe"...
16:53:00.493 [main] DEBUG i.n.u.i.l. InternalLoggerFactory - Using SLF4J as the default logging framework
Receive message[msgId=7F0000010F6C18B4AAC21D5F55620001] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F557C0012] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F557D0013] 10001ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F557B0011] 10001ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F557A0010] 10000ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5572000B] 10000ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5575000D] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5573000C] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F556B0006] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5577000E] 10002ms later
Receive message[msgId=7F0000010F6C18B4AAC21D5F5578000F] 10001ms later

5 事务消息

RocketMQ事务消息流程

其中分为两个流程:正常事务消息的发送及提交事务消息的补偿流程

5.1 正常事务流程

  • 发送消息(half 消息):图中步骤 1。
  • 服务端响应消息写入结果:图中步骤 2。
  • 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。
  • 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4。

5.2 事务补偿流程

  • 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。
  • Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。
  • 根据本地事务状态,重新 Commit 或者 Rollback:图中步骤 6。
    其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

5.3 举例

5.3.1 生产者

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author zyx
 * 事务消息-消息发送方
 */
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //创建事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        //创建消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("106.55.246.66:9876");
        //创建线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        //设置生产者回查线程池
        producer.setExecutorService(executorService);
        //生产者设置监听器
        producer.setTransactionListener(transactionListener);
        //启动消息生产者
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 3; i++) {
            try {
                Message msg =
                        new Message("TransactionTopic",
                                tags[i % tags.length],
                                "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //1,2步  半事务的发送,确认。
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(1000);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

如果消息的总长度可能大于 4MB 时,这时候最好把消息进行分割。比如:

public class ListSplitter implements Iterator<List<Message>> {
    private int sizeLimit = 1000 * 1000;//1M
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日志的开销20字节
            if (tmpSize > sizeLimit) {
                //单个消息超过了最大的限制(1M)
                //忽略,否则会阻塞分裂的进程
                if (nextIndex - currIndex == 0) {
                    //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}

运用此切分类,新的生产者如下:

public class SplitBatchProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("106.55.246.66:9876");
        // 启动Producer实例
        producer.start();

        //large batch
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100 * 1000);
        //10万元素的数组
        for (int i = 0; i < 100 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }

        //把大的消息分裂成若干个小的消息(1M左右)
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
            Thread.sleep(100);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
        System.out.printf("Consumer Started.%n");
    }
}

5.3.2 事务监听

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class TransactionListenerImpl implements TransactionListener {
    //事务状态记录
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 执行本地事务 3
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("执行本地事务");
        int value = transactionIndex.getAndIncrement();
        //0,1,2
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        //这里模拟的不进行步骤4  A系统不知道的--UNKNOWN
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 检查本地事务状态  默认是60s,一分钟检查一次
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        //打印每次回查的时间
        //设置日期格式
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // new Date()为获取当前系统时间
        System.out.println("checkLocalTransaction:" + df.format(new Date()));
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【中间状态】");
                    return LocalTransactionState.UNKNOW;
                case 2:
                    System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【回滚状态】");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    System.out.println("MQ检查消息【" + msg.getTransactionId() + "】事务状态【提交状态】");
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

6 批量消息

批量发送消息能显著提高传递小消息性能限制是这些批量消息应该有相同的 topic相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB

6.1 生产者

import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
 * @author zyx
 * 批量消息-生产者  list不要超过4m
 */
public class BatchProducer {

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("106.55.246.66:9876");
        // 启动Producer实例
        producer.start();

        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
        try {
            producer.send(messages);
        } catch (Exception e) {
            producer.shutdown();
            e.printStackTrace();
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

6.2 消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;
/**
 * @author zyx
 * 批量消息-消费者
 */
public class BatchConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("106.55.246.66:9876");
        // 订阅Topic
        consumer.subscribe("BatchTest", "*");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

7 过滤消息

上一章节3.1.2节有介绍,参阅3.1.2节。

注意:上述所有例子均基于版本4.8.0,5.x版本可能会有所不同,请知悉。


文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录