RabbitMQ 中的交换机学习

一、直接交换机(Direct Exchange)
1. 介绍
Direct 交换机将消息路由到绑定了指定 Routing Key 的队列中。每条消息都有一个 Routing Key,当队列绑定到 Direct 交换机时,它需要一个指定的 Routing Key。只有消息的 Routing Key 与队列绑定的 Routing Key 完全匹配时,消息才会路由到该队列中。

2. 代码示例
- 发送消息
public class DirectLogs {
public static final String exchange_name = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
channel.basicPublish(exchange_name, "error", null, message.getBytes("UTF-8"));
System.out.println("生产者发送消息:" + message);
}
}
}

- 接收消息
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static final String QUEUE_NAME = "console";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogsTopic01接收到的消息:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}

- 结果
消息通过 Direct 交换机路由到对应队列:

二、主题交换机(Topic Exchange)
1. 介绍
Topic 交换机是基于通配符进行路由的交换机。它允许队列绑定的 Routing Key 使用 *(匹配一个词)和 #(匹配零个或多个词)作为通配符。它适用于需要模糊匹配的场景,例如日志系统。

2. 代码示例
- 发送消息
public class EmitLog {
public static final String exchange_name = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");

for (Map.Entry<String, String> entry : bindingKeyMap.entrySet()) {
String key = entry.getKey();
String message = entry.getValue();
channel.basicPublish(exchange_name, key, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}

- 接收消息
public class ReceiveLogsTopic01 {
public static final String exchange_name = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(exchange_name, "topic");
String queue_name = "Q1";
channel.queueDeclare(queue_name, false, false, false, null);
channel.queueBind(queue_name, exchange_name, "*.orange.*");

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody(), "UTF-8"));
System.out.println("接收队列:" + queue_name + " 绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queue_name, true, deliverCallback, consumerTag -> {});
}
}

三、扇出交换机(Fanout Exchange)
1. 介绍
Fanout 交换机是最简单的交换机类型,它会将消息广播给所有与之绑定的队列,而不考虑消息的 Routing Key。通常用于广播消息,比如多服务实例之间的消息同步。

2. 代码示例
- 发送消息
public class EmitLog {
public static final String exchange_name = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(exchange_name, "fanout");
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
channel.basicPublish(exchange_name, "", null, message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}

- 接收消息
public class ReceiveLog01 {
public static final String exchange_name = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(exchange_name, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange_name, "");

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}

小结
Direct Exchange:消息通过精确匹配的 Routing Key 路由到绑定的队列。
Topic Exchange:消息通过通配符匹配的 Routing Key 路由到队列,适用于模糊匹配场景。
Fanout Exchange:消息会被广播给所有与之绑定的队列,不考虑 Routing Key,适用于广播场景。
————————————————

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/dhrmt/article/details/143099569

版权声明:
作者:SE_Yang
链接:https://www.cnesa.cn/2212.html
来源:CNESA
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
打赏
海报
RabbitMQ 中的交换机学习
一、直接交换机(Direct Exchange) 1. 介绍 Direct 交换机将消息路由到绑定了指定 Routing Key 的队列中。每条消息都有一个 Routing Key,当队列绑定到 Direct 交换机时,它需要一个指定的 Routing Key。只有消息的 Routing Key 与队列绑定的 Routing Key 完全匹配时,消息才会路由到该队列中。 2. 代码示例 - 发送消息 public class DirectLogs { public static final String exchange_name = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Scanner sc = new Scanner(System.in); while(sc.hasNext()){ String message = sc.next(); channel.basicPublish(exchange_name, "error", null, message.getBytes("UTF-8")); System.out.println("生产者发送消息:" + message); } } } - 接收消息 public class ReceiveLogsTopic01 { public static final String EXCHANGE_NAME = "direct_logs"; public static final String QUEUE_NAME = "console"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogsTopic01接收到的消息:" + new String(message.getBody(), "UTF-8")); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); } } - 结果 消息通过 Direct 交换机路由到对应队列:……
<<上一篇
下一篇>>