mq是什么意思(什么是RabbitMQ)

说明:想要理解RabbitMQ,需要先理解MQ是什么?能做什么?然后根据基础知识去理解RabbitMQ是什么、提供了什么功能。

一、MQ的简单理解

1. 什么是MQ?

  • 消息队列(Message Queue),是基础数据结构中 “先进先出” 的一种数据结构。
  • 一般用来解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。

2.MQ是怎么实现消息传递的?

  1. 生产者产生消息并把传输的数据(消息)放在队列中,用队列机制来实现消息传递。
  2. 消费者可以到指定的队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

3.MQ的几个主要特性

  • 解耦:一个业务需要多个模块共同实现,或一条消息有多个系统对应处理,只需要在主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
  • 异步:主业务执行结束后,从属业务通过MQ异步处理,减少业务的响应时间,提高用户体验。
  • 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

4.MQ的缺点

  • 系统可用性降低。依赖服务越多,服务越容易挂掉。需要考虑MQ瘫痪的情况。
  • 系统的复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。
  • 业务一致性。主业务和从属业务一致性的处理。

二、RabbitMQ的简单介绍

1. 什么是RabbitMQ?

RabbitMQ是消息代理,它接受并转发消息。

  • RabbitMQ可以理解为一个邮箱,或者一个邮局,或者是一个邮递员,保证 “张三” 的信件最终传递给 “李四”。
  • RabbitMQ与上述所描述的邮局(邮箱、邮递员)的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块消息。

2.RabbitMQ和消息传递的三个术语

  • 生产:生产只意味着发送。发送消息的程序是生产者(production)。
  • 队列:队列是位于RabbitMQ中的“邮箱”的名称。尽管消息流经RabbitMQ和应用程序,但他们只能存在于队列中。队列只受主机的内存和磁盘限制,它的本质上是一个打的消息缓冲区。许多生产者可以向一个队列发送消息,许多消费者可以尝试从一个队列接收数据。
  • 消费(接收):消费与接收具有相似的含义。一个消费者(consumer)是一个程序,主要是等待接收信息。

注意:生产者、消费者、代理不必部署在同一主机上,应用程序既可以是生产者,又可以是消费者

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

三、RabbitMQ安装

3.1环境说明(本文以RabbitMQ3.8.11为例)

RabbitMQ对Erlang版本要求(Rabbit是基于Erlang编写的)

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

RabbitMQ对JDK版本要求

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

3.2 安装Erlang步骤(本文以windows版安装为例)

3.2.1 下载Erlang,或访问如下链接进行下载:

http://erlang.org/download/otp_win64_23.2.exe

3.2.2 双击运行 otp_win64_23.2.exe ,点击下一步完成安装。

3.2.3 安装完成后配置环境变量,如下图所示

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

3.2.4 运行窗口输入cmd,在dos窗口输入 erl ,返回如图中所示,则代表erlang安装完成。

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

3.2 安装RibbitMQ步骤(本文以windows版安装为例)

3.2.1 点击下载RibbitMQ,或访问如下链接进行下载:

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.11/rabbitmq-server-3.8.11.exe

3.2.2 双击运行 rabbitmq-server-3.8.11.exe,点击下一步完成安装。

3.2.3 双击RabbitMQ Service – start 运行RabbitMQ

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

出现如下提示,则代表服务启动成功:

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

3.2.4 访问RabbitMQ控制台

控制台地址:    http://localhost:15672/

控制台用户名/密码 : guest/guest

四、RabbitMQ传递消息的方式(Java客户端)

  • Work queues(工作队列)
  • Publish/Subscribe(发布/订阅)
  • Routing(路由)
  • Topics(主题)
  • RPC(远程过程调用)
  • Publisher Confirms(发布者确认)

环境要求:

  • JDK版本为15(1.8+即可)
  • amqp-client 5.10.0

添加依赖:

<!--ribbitMq-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

4.1 Work queues(工作队列)

官方描述:

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个worker时,任务将在它们之间共享。

代码示例:

生产者:

 1 public class NewTask {
 2 
 3     private static final String TASK_QUEUE_NAME = "task_queue";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         try (Connection connection = factory.newConnection();
16              Channel channel = connection.createChannel()){
17             channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
18 
19             String message = String.join(" ", "four");
20 
21             channel.basicPublish("", TASK_QUEUE_NAME,
22                     MessageProperties.PERSISTENT_TEXT_PLAIN,
23                     message.getBytes(StandardCharsets.UTF_8));
24 
25             System.out.println(" [x] Sent '" + message + "'");
26         }
27     }
28 }

消费者:

 1 public class Worker {
 2 
 3     private static final String TASK_QUEUE_NAME = "task_queue";
 4 
 5     public static void main(String[] args )throws Exception {
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         final Connection connection = factory.newConnection();
16         final Channel channel = connection.createChannel();
17 
18         channel.queueDeclare(TASK_QUEUE_NAME, true, false,false,null);
19         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
20 
21         channel.basicQos(1);
22 
23         DeliverCallback deliverCallback = (comsumerTag, delivery) ->{
24             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25 
26             System.out.println(" [x] Received '" + message + "'");
27 
28             try {
29                 doWork(message);
30             } finally {
31                 System.out.println("[x] Done");
32                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
33             }
34         };
35         channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, comsumerTag -> {});
36     }
37 
38     private static void doWork(String task){
39         for (char ch : task.toCharArray()){
40             if(ch == '.'){
41                 try {
42                     Thread.sleep(1000);
43                 } catch (InterruptedException e) {
44                     Thread.currentThread().interrupt();
45                 }
46             }
47         }
48     }
49 }

4.3 Publish/Subscribe(发布/订阅)

官方描述:

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。

简而言之:

相当于我们关注了一个微信公众号,公众号每次推文我们都能及时的收到。我们就相当于消费者,公众号相当于消息中转站,文章作者相当于生产者。

代码示例:

生产者:

 1 public class EmitLog {
 2 
 3     private static final String ExCHANGE_NAME = "logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         try (Connection connection = factory.newConnection();
16              Channel channel = connection.createChannel()){
17             channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18 
19             String message = args.length < 1 ? "info: Hello World!" : String.join(" ", args);
20 
21             channel.basicPublish(ExCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
22 
23             System.out.println(" [x] Sent '" + message + "'");
24 
25         }
26 
27     }
28 
29 }

消费者:

 1 public class ReceiveLogs {
 2 
 3     private static final String ExCHANGE_NAME = "logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6         ConnectionFactory factory = new ConnectionFactory();
 7 
 8         // 设置IP
 9         factory.setHost("127.0.0.1");
10 
11         // 设置端口号
12         factory.setPort(5672);
13 
14         Connection connection = factory.newConnection();
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18         String queueName = channel.queueDeclare().getQueue();
19         channel.queueBind(queueName, ExCHANGE_NAME, "");
20 
21         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
22 
23         DeliverCallback deliverCallback = (sonsumerTag, delivery) -> {
24             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25             System.out.println(" [x] Received '" + message + "'");
26         };
27         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
28     }
29 

4.4 Routing(路由)

官方描述:

接上例,我们可能希望将日志消息写入磁盘的程序仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。

简而言之:

如果我们只想接收某些信息,比如日志级别有INFO、ERROR、DEBUG等,我们只愿接收INFO日志。可以使用Routing进行过滤。

代码示例:

生产者:

 1 public class EmitLogDirect {
 2 
 3     private static final String EXCHANGE_NAME = "direct_logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         try (Connection connection = factory.newConnection();
16              Channel channel = connection.createChannel()){
17             channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
18 
19             String severity = getServerity(args);
20             String message = getMessage(args);
21 
22             channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(StandardCharsets.UTF_8));
23             System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
24 
25         }
26 
27     }
28 
29     private static String getServerity(String[] strings){
30         if (strings.length < 1){
31             return "info";
32         }
33         return strings[0];
34 
35     }
36 
37     private static String getMessage(String[] strings){
38         if (strings.length < 2) {
39             return "Hello World!";
40         }
41         return joinStrings(strings, " ", 1);
42     }
43 
44     private static String joinStrings(String[] strings, String delimiter, int startIndex){
45         int length = strings.length;
46         if(length == 0){
47             return "";
48         }
49         if(length <= startIndex){
50             return "";
51         }
52         StringBuilder words = new StringBuilder(strings[startIndex]);
53         for (int i = startIndex + 1; i < length; i++){
54             words.append(delimiter).append(strings[i]);
55         }
56         return words.toString();
57 
58     }
59 }

消费者:

 1 public class ReceiveLogsDirect {
 2 
 3     private static final String EXCHANGE_NAME = "direct_logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         Connection connection = factory.newConnection();
16 
17         Channel channel = connection.createChannel();
18 
19         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
20 
21         String queueName = channel.queueDeclare().getQueue();
22 
23         if(args.length < 1){
24             System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
25             System.exit(1);
26         }
27 
28         for (String severity : args){
29             channel.queueBind(queueName, EXCHANGE_NAME, severity);
30         }
31         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
32 
33         DeliverCallback deliverCallback = (consumerTag, delivery)->{
34             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
35             System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
36         };
37 
38         channel.basicConsume(queueName, true, deliverCallback, comsumerTag ->{});
39     }
40 }

4.5 Topics(主题)

官方描述:

发送到主题交换机的消息不能具有任意的 routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的路由关键示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由关键字中可以包含任意多个单词,最多255个字节。

绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换-用特定路由键发送的消息将传递到所有用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:

  • *(星)只能代替一个单词。#(散列)可以代替零个或多个单词。

简而言之:

  Topic会根据消息自身所携带的路由键(Routing Key)在所有的绑定关系中寻找,与消息相匹配的队列推送该消息。

注意:

当在绑定中不使用特殊字符“ * ”(星号)和“ # ”(哈希)时,主题交换的行为就像直接的一样。

代码示例:

生产者:

 1 public class EmitLogTopic {
 2 
 3     private static final String EXCHANGE_NAME = "topic_logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8         // 设置IP
 9         factory.setHost("127.0.0.1");
10 
11         // 设置端口号
12         factory.setPort(5672);
13 
14         try(Connection connection = factory.newConnection();
15             Channel channel = connection.createChannel()){
16 
17             channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18 
19             String routingKey = getRouting(args);
20             String message = getMessage(args);
21 
22             channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
23             System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
24         }
25     }
26 
27     private static String getRouting(String[] strings){
28         if (strings.length < 1){
29             return "anonymous.info";
30         }
31         return strings[0];
32     }
33 
34     private static String getMessage(String[] strings){
35         if (strings.length < 2){
36             return "hello world";
37         }
38         return joinStrings(strings, " ", 1);
39     }
40 
41     private static String joinStrings(String[] strings, String delimiter, int startIndex){
42         int length = strings.length;
43         if(length == 0){
44             return "";
45         }
46         if(length < startIndex){
47             return "";
48         }
49         StringBuilder words = new StringBuilder(strings[startIndex]);
50         for (int i = startIndex + 1; i < length; i++){
51             words.append(delimiter).append(strings[i]);
52         }
53         return words.toString();
54     }
55 }

消费者:

 1 public class ReceiveLogTopic {
 2 
 3     private static final String EXCHANGE_NAME = "topic_logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8         // 设置IP
 9         factory.setHost("127.0.0.1");
10 
11         // 设置端口号
12         factory.setPort(5672);
13 
14         Connection connection = factory.newConnection();
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18 
19         String queueName = channel.queueDeclare().getQueue();
20 
21         if(args.length < 1){
22             System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
23             System.exit(1);
24         }
25 
26         for (String bindingKey : args){
27             channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
28         }
29 
30         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
31 
32         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
33             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
34             System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
35         };
36         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
37     }
38 }

4.6 RPC(远程过程调用)

官方描述:

尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。

代码示例:

生产者

 1 public class RPCServer {
 2 
 3     private static final String RPC_QUEUE_NAME = "rpc_queue";
 4 
 5     private static int fib(int n){
 6         if(n == 0){
 7             return 0;
 8         }
 9         if(n == 1){
10             return 1;
11         }
12         return fib(n - 1) + fib(n - 2);
13     }
14 
15     public static void main(String[] args) throws Exception{
16 
17         // 创建服务器的连接
18         ConnectionFactory factory = new ConnectionFactory();
19 
20         // 设置IP
21         factory.setHost("127.0.0.1");
22 
23         // 设置端口号
24         factory.setPort(5672);
25 
26         try (Connection connection = factory.newConnection();
27              Channel channel = connection.createChannel()) {
28             channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
29             channel.queuePurge(RPC_QUEUE_NAME);
30 
31             channel.basicQos(1);
32 
33             System.out.println(" [x] Awaiting RPC requests");
34 
35             Object monitor = new Object();
36             DeliverCallback deliverCallback = (consumerTag, delivery) ->{
37                 AMQP.BasicProperties replyProps = new AMQP.BasicProperties
38                         .Builder()
39                         .correlationId(delivery.getProperties().getCorrelationId())
40                         .build();
41 
42                 String response = "";
43 
44                 try{
45                     String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
46                     int n = Integer.parseInt(message);
47 
48                     System.out.println(" [.] fib(" + message + ")");
49                     response += fib(n);
50                 }catch (RuntimeException e){
51                     System.out.println(" [.] " + e.toString());
52                 }finally {
53                     channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
54                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
55 
56                     // RabbitMq consumer worker thread notifies the RPC server owner thread
57                     // RabbitMq使用者工作线程通知RPC服务器所有者线程
58                     synchronized (monitor){
59                         monitor.notify();
60                     }
61                 }
62             };
63             channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
64             // Wait and be prepared to consume the message from RPC client.
65             // 等待并准备使用来自RPC客户端的消息。
66             while(true){
67                 synchronized (monitor){
68                     try {
69                         monitor.wait();
70                     }catch (InterruptedException e){
71                         e.printStackTrace();
72                     }
73                 }
74             }
75         }
76     }
77 }

消费者:

 1 public class RPCClient {
 2 
 3     private Connection connection;
 4     private Channel channel;
 5     private String requestQueueName = "rpc_queue";
 6 
 7     public RPCClient() throws IOException, TimeoutException {
 8         // 创建服务器的连接
 9         ConnectionFactory factory = new ConnectionFactory();
10 
11         // 设置IP
12         factory.setHost("127.0.0.1");
13 
14         // 设置端口号
15         factory.setPort(5672);
16 
17         connection = factory.newConnection();
18         channel = connection.createChannel();
19     }
20 
21     public static void main(String[] args) throws Exception{
22         RPCClient fibonacciRpc = new RPCClient();
23             for (int i = 0; i < 32; i++) {
24                 String i_str = Integer.toString(i);
25                 System.out.println(" [x] Requesting fib(" + i_str + ")");
26                 String response = fibonacciRpc.call(i_str);
27                 System.out.println(" [.] Got '" + response + "'");
28             }
29 
30     }
31 
32     public String call(String message) throws IOException, InterruptedException {
33         final String corrId = UUID.randomUUID().toString();
34 
35         String replyQueueName = channel.queueDeclare().getQueue();
36         AMQP.BasicProperties props = new AMQP.BasicProperties
37                 .Builder()
38                 .correlationId(corrId)
39                 .replyTo(replyQueueName)
40                 .build();
41 
42         channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
43 
44         final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
45 
46         String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
47             if (delivery.getProperties().getCorrelationId().equals(corrId)) {
48                 response.offer(new String(delivery.getBody(), "UTF-8"));
49             }
50         }, consumerTag -> {
51         });
52 
53         String result = response.take();
54         channel.basicCancel(ctag);
55         return result;
56     }
57 
58     public void close() throws IOException {
59         connection.close();
60     }
61 }

4.7 Publisher Confirms(发布者确认)

官方描述:

在某些应用程序中,确保将发布的消息发送到代理非常重要。发布者确认是RabbitMQ功能,可以帮助满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有确定的方法可以实现发布者确认,这通常归结为应用程序和整个系统中的约束。典型的技术有:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批量确认:简单,合理的吞吐量,但是很难推断出什么时候出了问题。
  • 异步处理:为佳性能和资源使用,在出现错误的情况下可以很好地控制,但是可以正确实施。

代码示例:

  1 public class PublisherConfirms {
  2 
  3     static final int MESSAGE_COUNT = 50_000;
  4 
  5     static Connection createConnection() throws Exception{
  6 
  7         ConnectionFactory cf = new ConnectionFactory();
  8 
  9         // 设置IP
 10         cf.setHost("127.0.0.1");
 11 
 12         // 设置端口号
 13         cf.setPort(5672);
 14 
 15         // 设置用户名
 16         cf.setUsername("guest");
 17 
 18         // 设置密码
 19         cf.setPassword("guest");
 20 
 21         return cf.newConnection();
 22     }
 23 
 24     public static void main(String[] args) throws Exception{
 25         publishMessagesIndividually();
 26         publishMessagesInBatch();
 27         handlePublishConfirmsAsynchronously();
 28     }
 29 
 30     static void publishMessagesIndividually() throws Exception{
 31         try(Connection connection = createConnection()){
 32             Channel ch = connection.createChannel();
 33 
 34             String queue = UUID.randomUUID().toString();
 35             ch.queueDeclare(queue, false, false, true, null);
 36 
 37             ch.confirmSelect();
 38 
 39             long start = System.nanoTime();
 40             for (int i = 0; i < MESSAGE_COUNT; i++){
 41                 String body = String.valueOf(i);
 42                 ch.basicPublish("", queue, null, body.getBytes(StandardCharsets.UTF_8));
 43                 ch.waitForConfirmsOrDie(5_000);
 44             }
 45             long end = System.nanoTime();
 46             System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
 47         }
 48     }
 49 
 50     static void publishMessagesInBatch() throws Exception {
 51         try (Connection connection = createConnection()) {
 52             Channel ch = connection.createChannel();
 53 
 54             String queue = UUID.randomUUID().toString();
 55             ch.queueDeclare(queue, false, false, true, null);
 56 
 57             ch.confirmSelect();
 58 
 59             int batchSize = 100;
 60             int outstandingMessageCount = 0;
 61             long start = System.nanoTime();
 62             for (int i = 0; i < MESSAGE_COUNT; i++) {
 63                 String body = String.valueOf(i);
 64                 ch.basicPublish("", queue, null, body.getBytes());
 65                 outstandingMessageCount++;
 66 
 67                 if (outstandingMessageCount == batchSize) {
 68                     ch.waitForConfirmsOrDie(5_000);
 69                     outstandingMessageCount = 0;
 70                 }
 71             }
 72 
 73             if (outstandingMessageCount > 0) {
 74                 ch.waitForConfirmsOrDie(5_000);
 75             }
 76             long end = System.nanoTime();
 77             System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
 78 
 79         }
 80 
 81     }
 82 
 83     static void handlePublishConfirmsAsynchronously() throws Exception {
 84         try (Connection connection = createConnection()) {
 85             Channel ch = connection.createChannel();
 86 
 87             String queue = UUID.randomUUID().toString();
 88             ch.queueDeclare(queue, false, false, true, null);
 89 
 90             ch.confirmSelect();
 91 
 92             ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
 93 
 94             ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
 95                 if (multiple) {
 96                     ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
 97                             sequenceNumber, true
 98                     );
 99                     confirmed.clear();
100                 } else {
101                     outstandingConfirms.remove(sequenceNumber);
102                 }
103             };
104 
105             ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
106                 String body = outstandingConfirms.get(sequenceNumber);
107                 System.err.format(
108                         "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
109                         body, sequenceNumber, multiple
110                 );
111                 cleanOutstandingConfirms.handle(sequenceNumber, multiple);
112             });
113 
114             long start = System.nanoTime();
115             for (int i = 0; i < MESSAGE_COUNT; i++) {
116                 String body = String.valueOf(i);
117                 outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
118                 ch.basicPublish("", queue, null, body.getBytes());
119             }
120 
121             if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
122                 throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
123             }
124 
125             long end = System.nanoTime();
126             System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
127         }
128     }
129 
130     static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
131         int waited = 0;
132         while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
133             Thread.sleep(100L);
134             waited = +100;
135         }
136         return condition.getAsBoolean();
137     }
138 }

五、总结

总的来说,RabbitMQ还是比较简单的。目前文章只是简单记录一下,后期会更深入学习。

声明:所有白马号原创内容,未经允许禁止任何网站及个人转载、采集等一切非法引用。本站已启用原创保护,有法律保护作用,否则白马号保留一切追究的权利。发布者:白马号,转转请注明出处:https://www.bmhysw.com/article/27394.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
白马号白马号

相关推荐

  • Win10固态硬盘分区不见了怎么办?恢复Win10硬盘分区教程

    Win10固态硬盘分区不见了怎么办?恢复Win10硬盘分区教程 Win10固态硬盘分区不见的原因 Win10固态硬盘分区不见的解决方案 恢复Win10硬盘分区教程 Win10固态硬盘分区不见的原因 Win10固态硬盘分区不见的原因可能有以下几种: 硬盘故障或损坏 操作系统错误或文件系统损坏 病毒或恶意软件感染 人为误操作或删除分区 Win10固态硬盘分区不见…

    2023-07-27
    00
  • FPS之前好好的突然变低?解决方案揭秘,让你的游戏恢复顺畅

    大纲 什么是FPS 为什么FPS会突然变低 解决方案 更新显卡驱动程序 关闭不必要的后台程序 降低游戏画质设置 清理游戏缓存 升级硬件 FPS之前好好的突然变低?解决方案揭秘,让你的游戏恢复顺畅 FPS是游戏中常见的一个术语,指的是每秒钟帧数(Frames Per Second),也就是游戏画面每秒钟刷新的次数。FPS越高,画面越流畅,游戏体验也越好。 为什…

    2023-05-29
    00
  • Win10自动播放设置里的设备怎么删除?

    Win10自动播放设置里的设备怎么删除? 了解Win10自动播放设置 如何删除设备 了解Win10自动播放设置 Win10自动播放设置是一个非常方便的功能,可以在插入设备时自动弹出相关的应用或文件。例如,插入U盘后自动弹出文件夹或音乐播放器。但是,当你插入过多的设备时,这些设备会出现在自动播放设置中,占用了过多的空间,影响了使用体验。因此,了解如何删除设备是…

    2023-09-01
    00
  • 英伟达静悄悄的通知AIC合作伙伴告知GPU成本将下调8~12%(价格有望稳步下调)

    在芯片供应持续短缺的情况下,英伟达一度将成本转嫁给了 AIC 合作伙伴和系统集成商。不过一份新爆料称,这种情况正在迎来逆转。WCCFTech 援引消息人士的话称,英伟达已向 AIC 合作伙伴发去通知,告知其 GPU 成本将下调 8~12% 。考虑到成本变动向终端传递需要一段时间(代理商库存缓冲),我们预计零售价格会在未来几周内迎来显著下调。 此前受 COVI…

    2022-03-26
    00
  • 彻底关掉Win10自动更新方法详解:Win10关闭自动更新的方法和注意事项

    彻底关掉Win10自动更新方法详解:Win10关闭自动更新的方法和注意事项 一、为什么要关闭Win10自动更新 更新可能会影响电脑性能和稳定性 更新可能会导致软件或硬件不兼容 更新可能会消耗网络流量和时间 二、如何关闭Win10自动更新 方法一:使用组策略编辑器 步骤: 按Win+R打开运行窗口,输入gpedit.msc并回车打开组策略编辑器 依次展开计算机…

    2023-05-20
    00

联系我们

QQ:183718318

在线咨询: QQ交谈

邮件:183718318@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信