【RocketMQ学习】1.基础知识


1 消息中间件

属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。

1.1 作用

解耦、异步、削峰、分发

2 物理架构

RocketMQ底层架构

启动流程:

  • NameServer先启动
  • Broker 启动,向 NameServer 注册
  • 生产者先从 NameServer 获取 Broker服务地址列表(或者集群),再根据负载均衡算法选取一台Broker进行消息发送
  • NameServer每台Broker服务器保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker 宕机(使用心跳机制,如果检测超过120S),则从路由注册表中将其移除
  • 消费者先从 NameServer 获取Broker服务地址列表(或者集群),然后从 Broker 中订阅消息,规则由Broker配置决定

2.1 NameServer(服务注册中心)

NameServer 是整个 RocketMQ 的大脑,它是 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer,再启动 Rocket 中的 Broker

Broker 在启动时向所有 NameServer注册(主要是服务器地址等),生产者(或消费者)在发送(收取)消息之前先从 NameServer 获取 Broker 服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

NameServer每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除。这样就可以实现 RocketMQ 的高可用

NameServer支持集群化部署,相互之间独立,其他角色同时向多个NameServer机器上报状态信息,从而达到热备份的目的。

2.2 生产者(Producer)

也称为消息发布者,负责生产并发送消息至 RocketMQ。

生产者通常被集成在业务系统中,将业务消息按照要求封装成Apache RocketMQ的消息(Message)并发送至服务端。

生产者主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,对于平台类场景如果需要发送消息到多个主题,并不需要创建多个生产者;同一个主题也可以接收多个生产者的消息,以此可以实现生产者性能的水平扩展和容灾。

  • 启动流程
    • 创建消息生产者producer,并指定生产者组名(3.x/4.x版本选配,5.x不需要配置,采用了匿名发送/生产)
    • 指定Nameserver地址
    • 启动producer
    • 创建消息对象,指定TopicTag消息体
    • 发送消息
    • 关闭生产者producer

2.3 消费者(Consumer)

也称为消息订阅者,负责从 RocketMQ 接收并消费消息。

消费者通常被集成在业务系统中,从 Apache RocketMQ服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。

2.3.1 消费者分组

在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾

在消费者分组中,统一定义以下消费行为,同一分组下的多个消费者将按照分组内统一的消费行为负载均衡策略消费消息。

  • 订阅关系:Apache RocketMQ 以消费者分组的粒度管理订阅关系,实现订阅关系的管理和追溯。
  • 投递顺序性:Apache RocketMQ 的服务端将消息投递给消费者消费时,支持顺序投递并发投递,投递方式在消费者分组中统一配置。
  • 消费重试策略:消费者消费消息失败时的重试策略,包括重试次数死信队列设置等。

2.3.2 启动流程

  • 创建消费者Consumer,指定消费者组名
  • 指定Nameserver地址
  • 订阅主题TopicTag
  • 设置回调函数,处理消息
  • 启动消费者consumer

2.4 消息(Message)

生产或消费的数据,对于 RocketMQ 来说,消息就是字节数组

它是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到 Apache RocketMQ 服务端,服务端按照相关语义将消息投递到消费端进行消费。

常见的RocketMQ的消息有:

  • 普通消息
  • 顺序消息
  • 延迟消息
  • 事务消息
  • 批量消息
  • 过滤消息

2.5 主机(Broker)

Broker是RocketMQ的核心,大部分“重量级”工作都是由Broker完成的,包括接收Producer发过来的消息处理Consumer的消费消息请求消息的持久化存储消息的HA机制以及服务端过滤功能等。 支持集群化部署主从架构

3 逻辑概念

3.1 分组(Group)

3.1.1 生产者分组

  • 主要用于事务消息,若一个组内的某一生产者宕机,导致事务消息一直 prepared 或者超时,则选择这个组内的另一生产者进行消息发送。
  • 3.x/4.x版本才有生产者分组,在5.x版本官方已经淡化,“对于历史版本服务端3.x和4.x版本,已经使用的生产者分组可以废弃无需再设置,且不会对当前业务产生影响。”

3.1.2 消费者分组

是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组

官方建议:

  • 按照业务合理拆分分组:Apache RocketMQ 的消费者主题多对多的关系,对于消费者分组的拆分设计,建议遵循以下原则:
    • 消费者的投递顺序一致:同一消费者分组下所有消费者的消费投递顺序是相同的,统一都是顺序投递或并发投递,不同业务场景不能混用消费者分组。
    • 消费者业务类型一致:一般消费者分组和主题对应,不同业务域对消息消费的要求不同,例如消息过滤属性、消费重试策略不同。因此,不同业务域主题的消费建议使用不同的消费者分组,避免一个消费者分组消费超过10个主题。
  • 消费者分组管理尽量避免自动化机制:Apache RocketMQ 虽然提供了自动创建消费者分组的功能,但是建议仅在测试环境使用,生产环境请勿打开,避免产生大量消费者分组,无法管理和回收,且浪费系统资源。
  • 订阅关系一致:同一个消费者组下所有消费者实例所订阅的TopicTag必须完全一致。如果订阅关系(消费者分组名-Topic-Tag)不一致,会导致消费消息紊乱,甚至消息丢失。

下图中,订阅的Topic一样,且过滤表达式一致:

正确订阅关系示例

不一致情况:

  • 同一ConsumerGroup下的Consumer实例订阅的Topic不同(3.x、4.x SDK适用)
    在早期3.x/4.x版本的SDK中,如下图所示,同一 ConsumerGroup 下的三个Consumer实例C1C2C3分别订阅了TopicATopicBTopicC订阅的Topic不一致,不符合订阅关系一致性原则
    不符合订阅关系一致原则示例1
  • 同一 ConsumerGroup 下的 Consumer 实例订阅的Topic相同,但订阅的Tag不一致
    如下图所示,同一 ConsumerGroup 下的三个Consumer实例C1C2C3分别都订阅了TopicA,但是C1订阅TopicATagTag1C2C3订阅的TopicATagTag2
    订阅同一Topic的Tag不一致,不符合订阅关系一致性原则**。
    不符合订阅关系一致原则示例2

    Tips:
    过滤表达式:RocketMQ支持通过Tag/SQL进行消息过滤。
    上述几个图是通过Tag进行过滤的。通过SQL过滤支持以下几种方式:

    数值比较,比如:>,>=,<,<=, BETWEEN, =;
    字符比较,比如:=<>, IN;
    IS NULLIS NOT NULL;
    逻辑符号AND, OR, NOT;
    数值, 比如:1233.1415;
    字符, 比如:’abc’,必须用单引号包裹起来;
    NULL,特殊的常量
    布尔值,TRUE或者FALSE.

举例,通过MessageSelector.bySql()方法可以实现通过SQL方式进行消息的过滤。

public class RocketMQReceiveTest {
  public static void main(String[] args) throws MQClientException {
    //1. 创建消息消费者, 指定消费者所属的组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
    //2. 指定Nameserver地址
    consumer.setNamesrvAddr("192.168.109.131:9876");
    consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', TagB')) and (a is not null and a between 0 and 3)"));
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs, msgs.size());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });

    consumer.start();
    System.out.printf("Consumer started.");
  }
}

Tips:
开启SQL过滤,需要修改 Broker.conf 配置文件:enablePropertyFilter=true,然后重启 Broker 服务。

3.2 主题(Topic)

主题是 Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。

但主题是一个逻辑概念,并不是实际的消息容器。主题内部由多个队列组成,消息的存储和水平扩展能力最终是由队列实现的;并且针对主题的所有约束和属性设置,最终也是通过主题内部的队列来实现。

3.3 标签(Tag)

RocketMQ 支持在发送的时候给 TopicTag,同一个 Topic 的消息虽然逻辑管理是一样的。但是消费 Topic 的时候,如果你消费订阅的时候指定的是 tagA,那么 tagB 的消息将不会投递。

3.4 消息队列(Message Queue)

队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是 Apache RocketMQ 消息的最小存储单元

一个 Topic 可有多个 MQ,每一条MQ均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。

3.5 偏移量(Offset)

RocketMQ 中,有很多 offset 的概念。一般我们只关心暴露到客户端的offset。不指定的话,就是指 Message Queue 下面的offset

  • Message Queue 是无限长的数组。一条消息进来下标就会+1,而这个数组的下标就是 offset,Message Queue 中的 max offset 表示消息的最大 offset;
  • Consumer offset 可以理解为标记 Consumer Group 在一条逻辑 Message Queue 上,消息消费到哪里,即消费进度。但从源码上看,这个数值是消费过的最新消费的消息 offset + 1,即实际上表示的是下次拉取的 offset 位置

文章作者: Kezade
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Kezade !
评论
  目录