初识 MQ

种类对比

  • kafka

    编程语言:java/scala

    大数据领域的主流 MQ

  • RabbitMQ

    编程语言:erlang

    基于 erlang 语言,不好修改底层,不好查找问题的原因,不建议选用。

  • RocketMQ

    编程语言:java

    适用于大型项目。适用于集群。

  • ActiveMQ

    编程语言:java

    适用于中小型项目。

产生背景

​ 微服务架构后,链式调用是我们在写程序时候的一般流程,为了完成一个整体功能会将其拆分成多个函数(或子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但在大型分布式应用中,系统间的 RPC 交互繁杂,一个功能背后要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构的通例。这些架构会有哪些问题

  • 系统之间接口耦合比较严重

    举个例子:如果系统 A 要发送数据给系统 B 和系统 C,发送给每个系统的数据可能有差异,因此系统 A 对要发送给每个系统的数据进行了组装,然后逐一发送;

    当代码上线后又新增了一个需求:把数据也发送给 D,新上了一个 D 系统也要接受 A 系统的数据,此时就需要修改 A 系统,让他感知到 D 系统的存在,同时把数据处理好再给 D。在这个过程你会看到,每接入一个下游系统,都要对系统 A 进行代码改造,开发联调的效率很低。

  • 面对大流量并发时,容易被冲垮

    每个接口模块的吞吐能力是有限的,这个上限能力如果是堤坝,当大流量(洪水)来临时,容易被冲垮。

    举个例子秒杀业务:上游系统发起下单购买操作,就是下单一个操作,很快就完成。然而,下游系统要完成秒杀业务后面的所有逻辑(读取订单,库存检查,库存冻结,余额检查,余额冻结,订单生产,余额扣减,库存减少,生成流水,余额解冻,库存解冻)。

  • 等待同步存在性能问题

    RPC 接口上基本都是同步调用,整体的服务性能遵循”木桶理论”,即整体系统的耗时取决于链路中最慢的那个接口。比如 A 调用B/C/D 都是 50ms,但此时 B 又调用了 B1,花费 2000ms,那么直接就拖累了整个服务性能。

根据上述的几个问题,在设计系统时可以明确要达到的目标

  1. 要做到系统解耦,当新的模块接进来时,可以做到代码改动最小;能够解耦
  2. 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;能削峰
  3. 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

大致过程

​ 发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题topic中,在合适的时候,消息服务器回将消息转发给接受者。在这个过程中,发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然的关系;尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

image-20220731165646430

安装/操作

官网地址

环境:

  • ActiveMQ:5.15.9
  • JDK:1.8
  1. 下载

    image-20220731170806773

    image-20220731170925768

  2. 解压

    将压缩包上传至 /opt 目录下,并解压到 /opt/module 目录中

    1
    [root@instance-jwzbgijw opt]# tar -zxvf apache-activemq-5.17.1-bin.tar.gz -C /opt/module
  3. 普通启动

    注意:需要 jdk 环境

    进入压缩后目录的bin路径,执行:

    1
    [root@instance-jwzbgijw bin]# ./activemq start
  4. 查看是否启动

    activemq 默认进程端口为61616

    1
    2
    3
    4
    [root@instance-jwzbgijw bin]# ./activemq status
    INFO: Loading '/opt/module/apache-activemq-5.15.9//bin/env'
    INFO: Using java '/usr/bin/java'
    ActiveMQ is running (pid '19232')
  5. 其它操作

    • 重启

      1
      ./activemq restart
    • 关闭

      1
      ./activemq stop
  6. 带运行日志的启动方式

    1
    [root@instance-jwzbgijw bin]# ./activemq start > /opt/module/apache-activemq-5.15.9/run_activemq.log

控制台

后台已经启动,这时需要前台图形化界面

  1. 访问 http://ip地址:8161/admin/
  2. 默认用户名/密码:admin/admin

image-20220802095208507

QUEUE 案例

点对点

消息生产者

  1. 创建 Maven 工程

  2. pom.xml 引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    <dependencies>
    <!-- activemq 所需要的jar包-->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
    </dependency>
    </dependencies>
  3. 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class JmsProducer {

    public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
    // 创建连接工厂
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

    // 获得连接 并 启动访问
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();

    // 创建会话
    // 两个参数:事务、签收
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // 创建目的地(队列或主题)
    Queue queue = session.createQueue(QUEUE_NAME);

    // 创建消息生产者
    MessageProducer producer = session.createProducer(queue);

    // 生产 3 条消息发送到 MQ 队列
    for (int i=1;i<=3;i++){
    // 创建消息
    TextMessage textMessage = session.createTextMessage("msg" + i);
    // 发送给 mq
    producer.send(textMessage);

    }

    // 关闭资源
    producer.close();
    session.close();
    connection.close();
    }
    }
  4. 效果

    image-20220802105922872

    image-20220802110000044

消息消费者

同步阻塞

  1. 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {

    // 创建连接工厂
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

    // 获得连接 并 启动访问
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();

    // 创建会话
    // 两个参数:事务、签收
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // 创建目的地(队列或主题)
    Queue queue = session.createQueue(QUEUE_NAME);

    // 创建消费者
    MessageConsumer consumer = session.createConsumer(queue);

    // 接收消息
    while (true){
    // 注意:因为发送的是 TextMessage类型消息,所以需类型转换
    TextMessage msg = (TextMessage) consumer.receive();
    if (msg != null){
    System.out.println(msg.getText());
    }else{
    break;
    }
    }

    // 关闭资源
    consumer.close();
    session.close();
    connection.close();
    }
    }
  2. 效果

    image-20220802111148476

    image-20220802111217654

reveive(): 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式。

reveive(Long time):等待n毫秒之后还没有收到消息,结束同步阻塞。

异步监听

  1. 代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {

    // 创建连接工厂
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

    // 获得连接 并 启动访问
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();

    // 创建会话
    // 两个参数:事务、签收
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // 创建目的地(队列或主题)
    Queue queue = session.createQueue(QUEUE_NAME);

    // 创建消费者
    MessageConsumer consumer = session.createConsumer(queue);

    /**
    * 异步监听
    */
    consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    if(message != null && message instanceof TextMessage){
    TextMessage textMessage = (TextMessage) message;
    try {
    System.out.println(textMessage.getText());
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
    });

    // 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。
    // 实际开发中,我们的程序会一直运行,这句代码都会省略。
    System.in.read();

    // 关闭资源
    consumer.close();
    session.close();
    connection.close();
    }
    }
  2. 测试

    • 先运行消费者,监听队列
    • 运行生产者,发布消息
    • 消费者收到发布的消息
  3. 效果

    image-20220802123146808

总结

消费方式

  • 同步阻塞
  • 异步监听

特点

  1. 每个消息只能有一个消费者
  2. 消息生产者和消费者之间没有时间上的相关性。即不一定得同时处于运行状态。
  3. 消息被消费后,队列不再存储。

消费情况

  • 生产消息,先启动1号消费者再启动2号消费者,消息会被1号消费者消费掉,2号消费者没有消息可以消费。
  • 先启动2个消费者,再生产消息,一人一半消费。

TOPIC 案例

订阅与发布

发布主题者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class JmsTopicProducer {

public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
public static final String TOPIC_NAME = "topic01";

public static void main(String[] args) throws JMSException {

// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

// 获得连接 并 启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 创建会话
// 两个参数:事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建目的地(队列或主题)
Topic topic = session.createTopic(TOPIC_NAME);

// 创建消息生产者
MessageProducer producer = session.createProducer(topic);

// 生产 3 条消息发送到 MQ 队列
for (int i=1;i<=3;i++){
// 创建消息
TextMessage textMessage = session.createTextMessage("msg" + i);
// 发送给 mq
producer.send(textMessage);
}

// 关闭资源
producer.close();
session.close();
connection.close();
}
}

订阅主题者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class JmsTopicConsumer {

public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
public static final String TOPIC_NAME = "topic01";

public static void main(String[] args) throws JMSException, IOException {

// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

// 获得连接 并 启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 创建会话
// 两个参数:事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建目的地(队列或主题)
Topic topic = session.createTopic(TOPIC_NAME);

// 创建消费者
MessageConsumer consumer = session.createConsumer(topic);

// 异步监听(另一写法)
consumer.setMessageListener((message) -> {
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

// 避免关掉程序,导致异步监听结束
System.in.read();

// 关闭资源
consumer.close();
session.close();
connection.close();
}
}

测试

先运行消费者订阅主题,然后再运行生产者发布消息

image-20220802134518126

image-20220802134537261

特点

  1. 生产者将消息发布到 topic 中,每个消息可以有多个消费者,属于1:N的关系
  2. 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
  3. 生产者生产时,假如之前无人订阅,那生产的就是废消息,所以,一般先启动消费者再启动生产者。

默认情况下如上所述,但是 JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比微信公众号订阅。

对比

比较项目 Topic 模式 Queue 模式
工作模式 “订阅-发布”模式,如果当前没有订阅者,消息会被丢弃;如果有订阅者,那么所有订阅者都会收到消息 “负载均衡”模式,如果当前没有消费者,消息不会丢弃;如果有消费者,那么一条消息只会被其中一个消费者消费,并且要求消费者ack信息
有无状态 无状态 Queue数据默认在mq服务器上以文件形式保存,也可配置成DB存储
传递完整性 如果没有订阅者,消息会丢弃 消息不会丢弃
处理效率 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 由于一条消息只发送给一个消费者,所以消费者再多,性能也不会明显降低。当然不同消息协议的具体性能也是有差异的

JMS

简介

Java 消息服务

​ Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步/削峰的效果。

image-20220802140715488

image-20220802140730735

消息头

JMS 的消息头属性:

  • JMSDestination:消息目的地
  • JMSDeliveryMode:消息持久化模式
  • JMSExpiration:消息过期时间
  • JMSPriority:消息的优先级
  • JMSMessageID:消息的唯一标识符。后面会讲如何解决幂等性

消息的生产者可以set这些属性,消息的消费者可以get这些属性。这些属性在send方法里面也可以设置。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 指定消息的目的地
message.setJMSDestination(topic);
// 持久模式和非持久模式
// 一条持久性的消息:意味着服务器出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
// 一条非持久的消息:意味着服务器出现故障,该消息将会永远丢失。
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); // 持久
// 可以设置消息在一定时间后过期,默认是永不过期
message.setJMSExpiration(1000);
// 消息优先级,从0-9十个级别,0-4是普通消息 5-9是加急消息
// JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级
message.setJMSPriority(10);
// 唯一标识。MQ会默认生成一个,也可以自己指定
message.setJMSMessageID("AAA");
// 上面有些属性在send方法里也能设置

消息体

封装具体的消息数据,发送和接收的消息体类型必须一致

消息体格式:

  • TextMessage:普通字符串消息,包含一个 String
  • MapMessage:Map 类型消息,key 为字符串类型,value 为 Java 基本类型
  • BytesMessage:二进制数组消息,包含一个 byte[]
  • StreamMessage:Java 数据流消息,用标准操作来顺序的填充和读取
  • ObjectMessage:对象消息,包含一个可序列化的 Java 对象

示例:

1
2
3
4
5
6
7
8
9
// 发送MapMessage消息体。set方法: 添加,get方式:获取
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "张三"+i);
mapMessage.setInt("age", 18+i);
producer.send(mapMessage);

// 接收
mapMessage.getString("name");
mapMessage.getInt("age");

消息属性

如果需要除消息头字段之外的值,那么可以使用消息属性。他是 识别/去重/重点标注 等操作非常有用的方法。

​ 他们是以属性名和属性值对的形式制定的。可以将属性视为消息头的扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。

image-20220802143407135

示例:

1
2
3
4
5
6
7
8
9
// 调用Message的set*Property()方法,就能设置消息属性。根据value的数据类型的不同,有相应的API。
textMessage.setStringProperty("From","ZhangSan@qq.com");
textMessage.setByteProperty("Spec", (byte) 1);
textMessage.setBooleanProperty("Invalide",true);

// 接收
textMessage.getStringProperty("From");
textMessage.getByteProperty("Spec");
textMessage.getBooleanProperty("Invalide");

消息持久化

概述

​ 在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息。虽然这样增加了开销,但却增加了可靠性。

  • Queue
    • queue 非持久,当 mq 服务器宕机,消息会被丢弃。
    • queue 持久化,当 mq 服务器宕机,消息依然存在。queue 消息默认是持久化的。
  • Topic
    • topic 非持久,topic 默认非持久化。
    • topic 持久,前提消费者已经向 MQ 服务器订阅过,订阅主题有新消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。重新连接后还是会收到消息。

示例

Queue

演示非持久化 ,因为默认为持久化。

在生产者创建后,设置非持久化即可,之后发送的消息都是非持久化的。

当消息发送到 mq 服务器后,mq 服务器宕机,重启之后,队列的消息就会清空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class JmsProducer {

public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);

// 创建消息生产者
MessageProducer producer = session.createProducer(queue);

// 非持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

for (int i=1;i<=3;i++){
TextMessage textMessage = session.createTextMessage("msg" + i);
producer.send(textMessage);
}

producer.close();
session.close();
connection.close();
}
}

Topic

默认为非持久化,这里演示持久化。

topic 持久化有两个方面:

1)在发布者设置的持久化是为了消息在 mq 服务器不会被丢弃。

2)在订阅者设置的持久化是为了掉线重连后依然能够获取期间漏掉的消息。

  • 订阅者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class JmsTopicConsumer {

    public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException, IOException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    Connection connection = activeMQConnectionFactory.createConnection();

    // 设置订阅者ID
    // 为了记录谁订阅了主题,当 ZhangSan 订阅后,如果下线了,再重新连接依然可以获取下线之后新增的消息。
    // 而非持久化下线后需重新订阅并不能获取下线重连后期间的新消息
    connection.setClientID("ZhangSan");

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic(TOPIC_NAME);

    // 创建持久化订阅者
    TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"备注");
    // 启动连接
    connection.start();

    // 接收消息
    Message message = topicSubscriber.receive();
    while (message != null){
    TextMessage textMessage = (TextMessage) message;
    System.out.println(textMessage.getText());
    message = topicSubscriber.receive();
    }

    session.close();
    connection.close();
    }
    }
  • 发布者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class JmsTopicProducer {

    public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    Connection connection = activeMQConnectionFactory.createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic(TOPIC_NAME);
    MessageProducer producer = session.createProducer(topic);

    // 持久化
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    // 连接
    connection.start();

    for (int i=1;i<=3;i++){
    TextMessage textMessage = session.createTextMessage("msg" + i);
    producer.send(textMessage);
    }
    producer.close();
    session.close();
    connection.close();
    }
    }
  • 测试

    1. 启动订阅者持久化订阅主题
    2. 关闭订阅者
    3. 打开发布者持久化存储消息到主题
    4. 重启 mq 服务器
    5. 启动订阅者(如成功会获取发布者发布的消息)
  • 效果

    可见不管是订阅者下线,还是 mq 服务器宕机,恢复连接后,依然能够获取到消息。

消息事务

概述

image-20220802190814847

  • 生产者开启事务后,执行 commit 方法,这批消息才真正的被提交。不执行 commit 方法,这批消息不会提交。执行 rollback 方法,之前的消息会回滚。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。
  • 消费者开启事务后,执行 commit 方法,这批消息才算真正的被消费。不执行 commit 方法,这些消息不会标记已消费,下次还会被消费。执行 rollback 方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用 commit 和 rollback 方法,甚至能够违反一个消费者只能消费一次消息的原理。

消费者和生产者的事务,完全没有关联,各自是各自的事务。

代码

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class JmsProducer {

public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 1、true 开启事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);

try {
for (int i=1;i<=3;i++){
TextMessage textMessage = session.createTextMessage("msg" + i);
producer.send(textMessage);
if (i == 2){
throw new RuntimeException("GG...");
}
}
// 2、提交事务
session.commit();
System.out.println("消息提交成功");
}catch (Exception e){
// 3、回滚
session.rollback();
System.out.println("出现异常,消息回滚");
}

producer.close();
session.close();
connection.close();
}
}

执行上方代码后,因为出现异常,会进行回滚,结果就是 3 条消息全都回滚。

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class JmsConsumer {

public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(String[] args) throws JMSException, IOException {

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();

// 开启事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
int a = 0;
@SneakyThrows
@Override
public void onMessage(Message message) {
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
if (a == 0){
System.out.println("commit");
session.commit();
}
if (a == 2){
System.out.println("rollback");
session.rollback();
}
a++;
} catch (JMSException e) {
System.out.println("出现异常");
session.rollback();
}
}
}
});

System.in.read();
consumer.close();
session.close();
connection.close();
}
}

效果:

image-20220802193050620

消息签收

签收方式

  • 自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
  • 手动签收(Session.CLIENT_ACKNOWLEDGE):该种方式,需要我们手动调用 Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。
  • 允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。
  • 事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。

事务和签收的关系

  1. 在事务性会话中,当一个事务被成功提交则消息自动被签收。如果事务回滚,则消息会被再次传送。签收机制不再起任何作用。
  2. 非事务性会话中,消息何时被签收取决于创建会话时的应答模式。
  3. 消费者事务开启,只有 commit 后才能将全部消息变为已消费。
  4. 事务偏向生产者,签收偏向消费者。

代码

  1. 生产者发布消息

    image-20220803100703259

  2. 消费者设置手动签收模式但不签收消息

    1
    2
    // 手动签收
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
  3. 启动消费者

    此时消费者正常接收到消息,但可以看到 mq 控制台中的消息还处于待处理状态。

    image-20220803101105607

    image-20220803100828539

  4. 重新启动消费者

    可以再次接收到消息。

    image-20220803101108384

  5. 消费者添加签收代码

    1
    message.acknowledge();
  6. 再次重启消费者

    此时队列中的消息才真正被消费掉。

    image-20220803101620921

Broker

简介

​ 相当于一个 ActiveMQ 服务器实例。说白了,Broker 其实就是实现了用代码的形式启动 ActiveMQ 将 MQ 嵌入到 Java 代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。

启动 broker 时指定配置文件,可以帮助我们在一台服务器上启动多个 broker。实际工作中一般一台服务器只启动一个broker。

1
2
3
4
# 拷贝配置文件
[root@instance-jwzbgijw conf]# cp activemq.xml activemq02.xml
# 启动 broker
[root@instance-jwzbgijw bin]# ./activemq start xbean:file:/opt/module/apache-activemq-5.15.9/conf/activemq02.xml

嵌入式启动

  1. 添加依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.1</version>
    </dependency>
  2. broker 启动类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class EmbedBroker {

    public static void main(String[] args) throws Exception{
    BrokerService brokerService = new BrokerService();
    brokerService.setPopulateJMSXUserID(true);
    brokerService.addConnector("tcp://localhost:61616"); // 注意填写本机即可
    brokerService.start();
    }
    }
  3. 测试

    1)启动 EmbedBroker

    image-20220803122547548

    2)修改生产者以及消费者的 ACTIVEMQ_URL

    1
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    3)生产者启动发送消息

    4)消费者启动接收消息

整合 Spring

案例

  1. 引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    <dependencies>
    <!-- activemq 所需要的jar包-->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
    </dependency>
    <!-- activemq 连接池 -->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.9</version>
    </dependency>
    <!--spring 支持 jms 的包 -->
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>5.2.1.RELEASE</version>
    </dependency>
    <!-- 嵌入式 broker 所需包 -->
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.1</version>
    </dependency>
    <!-- Spring 相关依赖包 -->
    <dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.15</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
    <version>4.3.23.RELEASE</version>
    </dependency>
    <!-- Spring核心依赖 -->
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>4.3.23.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.3.23.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aop</artifactId>
    <version>4.3.23.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-orm</artifactId>
    <version>4.3.23.RELEASE</version>
    </dependency>
    </dependencies>
  2. 创建配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 开启扫描 -->
    <context:component-scan base-package="com.cuc.acmq.spring" />

    <!-- 配置连接工厂 -->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory">
    <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://120.48.54.126:61616"/>
    </bean>
    </property>
    <property name="maxConnections" value="100"/>
    </bean>

    <!-- 配置目的地(队列和主题) -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="queue02"/>
    </bean>
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="topic02"/>
    </bean>

    <!-- Spring提供的JMS工具类,他可以进行消息发送,接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <!-- 传入连接工厂 -->
    <property name="connectionFactory" ref="connectionFactory"/>
    <!-- 传入目的地 -->
    <property name="defaultDestination" ref="destinationQueue"/>
    <!-- 消息自动转换器 -->
    <property name="messageConverter">
    <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    </property>
    </bean>

    </beans>
  3. 创建生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    @Service
    public class Producer {

    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
    ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring-activemq.xml");
    Producer producer = applicationContext.getBean(Producer.class);
    producer.jmsTemplate.send(
    new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage("Spring 和 ActiveMQ 整合");
    }
    }
    );
    // 另一写法(更简便)
    // producer.jmsTemplate.send((session) -> {
    // TextMessage textMessage = session.createTextMessage("Spring 和 ActiveMQ 整合");
    // return textMessage;
    // });
    System.out.println("send task over.");
    }
    }
  4. 创建消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Service
    public class Consumer {

    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
    ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring-activemq.xml");
    Consumer consumer = (Consumer) applicationContext.getBean("consumer");
    String ret = (String) consumer.jmsTemplate.receiveAndConvert();
    System.out.println(ret);
    }
    }
  5. 测试

    自行更改配置文件中的目的地类型进行测试。

监听器

在 spring 里实现消费者不启动,通过配置监听完成。

  1. 创建监听类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Component
    public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
    if (message != null && message instanceof TextMessage){
    TextMessage textMessage = (TextMessage) message;
    try {
    System.out.println(textMessage.getText());
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
    }
  2. xml 添加配置

    1
    2
    3
    4
    5
    6
    7
    8
    <!-- 配置监听器 -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="destinationQueue" />
    <!--public class MyMessageListener implements MessageListener-->
    <property name="messageListener" ref="myMessageListener" />
    </bean>

  3. 之后启动生产者即可

整合 SpringBoot

这里只演示 Queue,Topic 逻辑代码一样,只需将一些队列的东西改成主题即可。

  1. 创建两个项目

    image-20220803145553376

  2. 引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    <!--spring boot整合activemq的jar包-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
    <version>2.1.5.RELEASE</version>
    </dependency>
    </dependencies>
  3. 配置文件

    注意端口号不要一样。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    server:
    port: 7777

    spring:
    activemq:
    broker-url: tcp://120.48.54.126:61616
    user: admin
    password: admin
    jms:
    # 目的地是 queue 还是 topic, false(默认) = queue true 为 topic
    pub-sub-domain: false

    # 自定义队列名称
    myqueue: queue03
  4. 生产者

    1)创建配置类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component
    @EnableJms // 开启jms适配
    public class ConfigBean {

    @Value("${myqueue}") // 注入配置文件中的 myqueue
    private String myQueue ;

    @Bean
    public ActiveMQQueue queue(){
    return new ActiveMQQueue(myQueue);
    }
    }

    2)实现类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Component
    public class QueueProducer {

    // JMS模板
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate ;

    // 配置的目的地
    @Autowired
    private Queue queue ;

    // 发送消息
    public void produceMessage(){
    // 一参是目的地,二参是消息的内容
    jmsMessagingTemplate.convertAndSend(queue,"****"+ UUID.randomUUID().toString().substring(0,6));
    }

    // 定时任务。每3秒执行一次。
    @Scheduled(fixedDelay = 3000)
    public void produceMessageScheduled(){
    produceMessage();
    }
    }

    3)启动类开启任务调度功能

    1
    @EnableScheduling // 开启定时任务调度功能

    4)编写单元测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @SpringBootTest
    class TestActiveMQ {

    @Autowired
    private QueueProducer queueProducer;

    @Test
    void contextLoads() {
    queueProducer.produceMessage();
    // queueProducer.produceMessageScheduled();
    // System.in.read();
    }

    }

    执行单元测试即可将消息发送到 mq 服务器。

  5. 消费者

    创建监听类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    public class QueueConsumer {

    // 注册一个监听器。destination指定监听的主题。
    @JmsListener(destination = "${myqueue}")
    public void receive(TextMessage textMessage) throws Exception{
    System.out.println("收到的消息:"+textMessage.getText());
    }
    }

    执行启动类就可以接受到消息。

有bug,生产者第一次发消息总是会生成两条。不懂。

传输协议

概述

ActiveMQ 支持通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM

协议 描述
TCP 默认的协议,性能相对可以
NIO 基于TCP协议之上,进行了扩展和优化,具有更好的扩展性
UDP 性能比TCP好,但是不具有可靠性
SSL 安全链接
HTTP(S) 基于HTTP或HTTPS
VM VM本身不是协议,当客户端和代理在同一个Java虚拟机中运行时,它们之间需要通信,但不想占用网络通道,而是直接通信,可以使用该方式

其中配置 TransportConnector 的文件在 ActiveMQ 安装目录的 conf/activemq.xml 中的 <transportConnectors> 标签之内:

1
2
3
4
5
6
7
8
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

上文给出的配置信息中,URI 描述信息的头部都是采用协议名称。

例如:

描述 amqp 协议的监听端口时,采用的URI描述格式为amqp://······

描述 Stomp 协议的监听端口时,采用URI描述格式为stomp://······

唯独在进行openwire协议描述时,URI头却采用的tcp://······。这是因为ActiveMQ中默认的消息协议就是openwire

协议

除了 tcpnio 协议,其他了解就行。各种协议有各自擅长该协议的中间件,工作中一般不会使用 activemq 去实现这些协议。

TCP

  • Transmission Control Protocol(TCP) 是默认的。TCP 的 Client 监听端口 61616

  • 在网络传输数据前,必须要先序列化数据,消息是通过一个叫 wire protocol 的来序列化成字节流。

  • TCP 连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。

  • 优点

    TCP 协议传输可靠性高,稳定性强

    高效率:字节流方式传递,效率很高

    有效性、可用性:应用广泛,支持任何平台。

  • 关于 Transport 协议的可选配置参数可以参考官网

NIO

  • New I/O API Protocol(NIO)

  • NIO 协议和 TCP 协议类似,但 NIO 更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的 client 调用和服务器端有更多的负载。

  • 适合使用 NIO 协议的场景:

    ​ 可能有大量的 Client 去连接到 Broker 上,一般情况下,大量的 Client 去连接 Broker 是被操作系统的线程所限制的。因此,NIO的实现比 TCP 需要更少的线程去运行,所以建议使用 NIO 协议。

    ​ 可能对于 Broker 有一个很迟钝的网络传输,NIO 比 TCP 提供更好的性能。

  • NIO 连接的 URI 形式:nio://hostname:port?key=value&key=value

  • 关于 Transport 协议的可选配置参数可以参考官网

NIO 案例

ActiveMQ 这些协议传输的底层默认都是使用 BIO 网络的 IO 模型。只有当我们指定使用才会使用 NIO 的 IO 模型。

  1. 修改配置文件 activemq.xml

    <transportConnectors>标签内加入如下内容:

    1
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
  2. 重启 activemq

    1
    [root@instance-jwzbgijw bin]# ./activemq restart
  3. 查看控制台,可以看到页面多了nio

    image-20220803163849941

  4. 修改 ACTIVEMQ_URL

    1
    private static final String ACTIVEMQ_URL = "nio://120.48.54.126:61618";
  5. 自行测试

    记得将端口打开。

NIO 增强

URI格式头以nio开头,表示这个端口使用以tcp协议为基础的NIO网络IO模型。但是这样的设置方式,只能使这个端口支持Openwire协议。那么怎么既让这个端口支持NIO网络IO模型,又让它支持多个协议?

步骤:

  1. 修改 activemq.xml

    <transportConnectors>标签内加入如下内容:

    1
    <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>

    auto:会自动识别是什么协议。

    nio:使用 NIO 网络 IO 模型

  2. 重启 activemq

    1
    [root@instance-jwzbgijw bin]# ./activemq restart
  3. 修改 ACTIVEMQ_URL

    1
    2
    private static final String ACTIVEMQ_URL = "nio://120.48.54.126:61608";
    private static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61608";
  4. 轮流切换以上URL进行测试

    如果想测试其它协议,java代码是不一样的。

消息持久化

简介

​ ActiveMQ 的消息持久化机制有 JDBC,AMQ,KahaDB 和 LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。

​ 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

KahaDB

概述

基于日志文件,ActiveMQ5.4 后默认的持久化插件。

使用一个事务日志和仅仅用一个索引文件来存储消息。

activemq.xml 中日志文件的存储路径:

1
2
3
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

image-20220803195941155

存储原理

image-20220803200750477

  • db-[Number].log

    预定大小的数据存储文件,当文件数据存满时,一个新的文件随之创建,number 值递增。当不再引用到数据文件中的任何消息时,文件会被删除或归档。

  • db.data

    文件包含了持久化的 BTree 索引,是消息的索引文件,指向 db-[Number].log 里面存储的消息。

  • db.free(图片里没有不重要,会有的)

    当前 db.data 文件里哪些页面是空闲的,文件具体内容是所有空闲页的 ID。

  • db.redo

    用来进行消息恢复,如果 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引。

  • lock

    文件锁,表示当前获得 kahadb 读写权限的 broker。

JDBC

概述

与 KahaDB 不同,JDBC 可将数据与MQ服务器分开,而 KahaDB 则是将数据存储在本地 MQ 上。

image-20220803203841037

配置步骤

  1. 添加 mysql 驱动包到 lib 文件夹下

    image-20220803210240011

  2. jdbcPersistenceAdapter 配置

    /opt/module/apache-activemq-5.15.9/conf下修改配置文件 activemq.xml

    1
    2
    3
    4
    <persistenceAdapter>
    - <kahaDB directory="${activemq.data}/kahadb"/>
    + <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>
    </persistenceAdapter>

    dataSource:指定引用的持久化数据库的 bean 名称,#相当于ref。应该懂了吧✌。

    createTableOnStartup:是否在启动的时候创建数据表,默认为 true,每次启动都会创建,所以一般创建后就改成 false。

  3. 数据库连接池配置

    需要准备一个 mysql 数据库,并创建一个名为 activemq 的数据库且要采用latin1或者ASCII编码。

    默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池 jar 包,也放到 lib 目录下。

    image-20220803212408563

    1
    2
    3
    4
    5
    6
    7
    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://ip地址:3306/activemq?relaxAutoCommit=true"/>
    <property name="username" value="用户名"/>
    <property name="password" value="密码"/>
    <property name="poolPreparedStatements" value="true"/>
    </bean>
  4. 之后自行测试

    没有那么多台服务器,要连 windows 还是麻烦。

表说明

重启 activemq。会自动生成如下3张表,如果没有自动生成,需要我们手动执行SQL。

  • ACTIVEMQ_MSGS(消息表)

    1
    2
    3
    4
    5
    6
    7
    ID:自增的数据库主键
    CONTAINER:消息的Destination
    MSGID_PROD:消息发送者客户端的主键
    MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
    EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
    MSG:消息本体的Java序列化对象的二进制数据
    PRIORITY:优先级,从0-9,数值越大优先级越高
  • ACTIVEMQ_ACKS(存储订阅关系)

    用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存

    主要的数据库字段如下:

    1
    2
    3
    4
    5
    6
    CONTAINER:消息的Destination
    SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
    CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
    SUB_NAME:订阅者名称
    SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
    LAST_ACKED_ID:记录消费过的消息的ID。
  • ACTIVEMQ_LOCK

    在集群环境中才有用,只有一个 Broker 可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个 Broker 是当前的Master Broker。

JDBC Message Store with ActiveMQ Journal

概述

​ 这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal 文件能够大大减少需要写入到DB中的消息。

​ 举个例子:生产者生产了1000条消息,这1000条消息会保存到 journal 文件,如果消费者的消费速度很快的情况下,在 journal 文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候 journal 文件可以使消息以批量方式写到DB。

​ 为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比 JDBC 性能要高。

配置

基于上面JDBC配置,再做一点修改:

修改 activemq.xml 配置文件,之后启动就好使了。

1
2
3
4
5
6
7
8
9
10
<!--消息存储持久化JDBC message store with activemq journal-->
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data"/>
</persistenceFactory>

集群

不乱学。

高级特性

异步投递

​ ActiveMQ 支持同步异步两种发送的模式将消息发送到 broker,默认使用异步发送的模式,除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化消息,这两种情况是同步发送到。

​ 如果没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞 producer 直到 broker 返回一个确认,表示消息已经被安全的持久化到了磁盘。确认机制提供了消息安全的保障,但同时阻塞客户端也带来了很大的延迟。所以允许在失败的情况下丢失少量的数据,可以使用异步发送来提高生产率。

​ 异步发送可以最大化 producer 的发送效率。通常在发送消息量比较密集的情况下使用异步发送,可以提高producer的性能,但需要消耗较多的 client 内存,也会使 broker 端性能消耗增加,且不能有效的保证消息的发送成功。所以在userAsyncSend=true的情况下客户端需要容忍消息丢失的可能。

配置方式(3种):

1
tcp://106.13.187.36:61616?jms.useAsyncSend=true
1
activeMQConnectionFactory.setUseAsyncSend(true);
1
((ActiveMQConnection) connection).setUseAsyncSend(true);

异步发送消息丢失的场景:

​ 由于消息是不阻塞的,生产者会认为所有 send 的消息都是发送成功到 mq 的。但是如果 mq 突然宕机后,此时在生产者端内存中未发送的消息将会丢失。所以,正确的异步发送是需要接收回调的。

异步发送如何确认发送成功?

同步发送完之后,send 不阻塞了就表示发送成功了

异步发送完之后,需要接收回调并由客户端再次判断是否发送成功

通过 ActiveMQMessageProducer 实现的回调,之前的代码使用 MessageProducer 发送的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class JmsProducerAsyncSend {

public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
public static final String QUEUE_NAME = "queue01";

public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

// 开启异步投递
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);

// 通过ActiveMQMessageProducer实现回调
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(queue);

for (int i=1;i<=3;i++){
TextMessage textMessage = session.createTextMessage("msg" + i);
// 设置消息唯一标识
textMessage.setJMSMessageID(UUID.randomUUID().toString());
String msgID = textMessage.getJMSMessageID();
// 回调
producer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(msgID+" has been send success");
}

@Override
public void onException(JMSException e) {
System.out.println(msgID+" fail to send");
}
});
}

producer.close();
session.close();
connection.close();
}
}
1
2
3
ID:8b4730b9-531e-48e1-9ee2-8fbcfd48ffe2 has been send success
ID:764d4b78-c4e4-407f-9e25-3748ed547ed0 has been send success
ID:9c81fc52-32e4-487f-a92f-da9d49c2454f has been send success

延迟投递和定时投递

image-20220804083221905

  1. 修改配置文件并重启

    添加 schedulerSupport="true"

    1
    2
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
    <destinationPolicy>
  2. 代码实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class JmsProducer {

    public static final String ACTIVEMQ_URL = "tcp://120.48.54.126:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue(QUEUE_NAME);
    MessageProducer producer = session.createProducer(queue);

    long delay = 4*1000;
    long period = 3*1000;
    int repeat = 3;


    for (int i=1;i<=3;i++){
    TextMessage textMessage = session.createTextMessage("msg" + i);
    // 延迟时间
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
    // 重复投递的时间间隔
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);
    // 重复投递的次数
    textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
    // 该条消息,等待4秒,之后每3秒发送一次,重复发送3次。
    producer.send(textMessage);
    }

    producer.close();
    session.close();
    connection.close();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    msg1
    msg2
    msg3
    msg1
    msg2
    msg3
    msg1
    msg2
    msg3
    msg1
    msg2
    msg3

消费重试机制

​ 消费者收到消息,之后出现异常了,没有告诉 broker 确认收到该消息,broker 会尝试再将该消息发送给消费者。尝试 n 次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列,之后 broker 不会再将该消息发送给消费者。默认每秒发6次

引发消息重发的情况:

  • Client 用了 transactions 且调用了 rollback
  • Client 用了 transactions 且 在调用 commit 之前关闭 或者 没有 commit
  • Client 在 CLIENT_ACKNOWLEDGE 的传递模式下,session 中调用了 recover

有毒消息Poison ACK:

​ 一个消息被 redelivedred 超过默认的最大重发次数(默认6次)时,消费者会给 MQ 发一个"poison ack"表示这个消息有毒,告诉broker 不要再发了。这个时候 broker 会把这个消息放到 DLQ(死信队列)。

属性说明:

image-20220804094737237

代码验证:

  1. 生产者发送数据

  2. 消费者开启事务,却没有 commit 。重启消费者,前6次都能收到消息,到第7次,不会再收到消息。

  3. activemq 管理后台。多了一个名为 ActiveMQ.DLQ 队列。

    image-20220804095318639

  4. 也可修改默认参数

    1
    2
    3
    4
    // 修改默认参数,设置消费重试 3 次
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(3);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
  5. 整合Spring

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <!--定义重发机制-->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="useExponentialBackOff" value="true" />
    <property name="maximumRedeliveries" value="3" />
    <property name="initialRedeliveryDelay" value="1000" />
    <property name="backOffMultiplier" value="2" />
    <property name="maximumRedeliveryDelay" value="1000" />
    </bean>

    <!-- 配置连接工厂 -->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory">
    <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://120.48.54.126:61616"/>
    <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
    </bean>
    </property>
    <property name="maxConnections" value="100"/>
    </bean>

死信队列

异常消息规避处理的集合,主要处理失败的消息。不管是 queue 还是 topic,失败的消息都放到这个队列中。

死信队列的配置(一般采用默认):

  • sharedDeadLetterStrategy

    修改 activemq.xml 的配置,可以修改队列的名字。默认ActiveMQ.DLQ

  • individualDeadLetterStrategy

    可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。

  • 自动删除过期消息

    过期消息是指生产者指定的过期时间,超过这个时间的消息。

  • 存放非持久消息到死信队列中

    默认不会把非持久的死消息发送到死信队列中。

防止重复消费

由于网络延迟,会造成MQ的重试,重试过程中,可能会造成重复消费。

解决:

​ 准备第三方服务做消费记录。以 redis 为例,给消息分配 id,消息只要被消费,以k-v形式<id,message>写入 redis。消费者消费前,先去 redis 查询有没有消费记录即可。