# 安装
# Unix-Ubuntu
安装 Erlang 环境(RabbitMQ 就是这个语言开发的)
sudo apt install erlang |
安装 RabbitMq
sudo apt install rabbitmq-server |
查看 RabbitMq
状态
sudo rabbitmqctl status |
我们需要使用的端口:5672,协议为 amqp
协议。
打开 RabbitMq
的管理面板,可以在浏览器上实时访问和监控
sudo rabbitmq-plugins enable rabbitmq_management |
默认访问路径: IP:15672
,该网页需要登录,所以需要创建一个用户
sudo rabbitmqctl add_user 用户名 密码 |
将管理员权限赋给刚刚创建的用户
sudo rabbitmqctl set_user_tags admin administrator |
如果你希望这个用户可以操作原本的 /
的 virtual host
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" |
此处顺便介绍一下架构
channel
:每个客户端连接都会使用一个channel
,再通过channel
访问RabbitMQ
,通信协议为amqp
协议。exchange
:类似于交换机,根据请求转发给对应的消息队列,每个消息队列都可以绑定到exchange
,不同类型的exchange
类型可以用于实现不同消息的模式。queue
:消息队列本体routing key
:路由关键字,exchange
根据该关键字进行消息投递virtual host
:类似环境隔离,不同环境都可以单独配置一个virtual host
,每个host
又可以包含很多个exchange
和queue
,不同host
之间互不影响。
# 使用消息队列
# 控制面板
总体流程为:创建 virtual host
— 创建 queue
— 绑定 exchange
— 向 exchange
发送消息 — 消息队列消费消息。
- 创建
virtual host
:
- 创建
queue
:自动删除的选项表示如果所有消费者都与消息队列断开连接,就自动删除该队列
- 绑定
exchange
:选择交换机和路由key
。
- 从交换机发送消息:记得是刚才创建的
virtual host
,同时消息需要带上route key
。
- 去队列消费消息:可以看到消息队列已经有了一条消息,我们选择
ack
模式,Nack
模式是拒绝消息,也就是不会将消息从队列取出,并且重新排队。ack
就是确认应答,然后将消息从消息队列中移除。reject
表示拒绝此消息,但可以指定是否重新排队。
# Java 操作消息队列
Springboot
整合 RabbitMQ
:
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-amqp</artifactId> | |
</dependency> |
application.yml
配置文件
server: | |
port: 8089 | |
servlet: | |
context-path: /mq/rabbitmq | |
spring: | |
rabbitmq: | |
addresses: 192.168.92.130 | |
username: admin | |
password: admin | |
virtual-host: cyan #我自己创建了名叫cyan的virtual host,不要写错/cyan了 |
创建配置类:
import org.springframework.amqp.core.*; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
@Configuration | |
public class RabbitConfiguration { | |
// 将 exchange 和 queue 注入 bean 的操作大同小异 | |
@Bean("directExchange") | |
public Exchange exchange() { | |
//direct 类型的 exchange | |
return ExchangeBuilder.directExchange("amq.direct").build(); | |
} | |
// 创建 queue 只需要指定名字即可 | |
@Bean("cyan-queue1") | |
public Queue queue() { | |
return QueueBuilder.nonDurable("cyan").build(); | |
} | |
// @Qualifier 注解用于指定使用哪个 bean,因为 exchange 也可能有多个 bean,比如 fanout 类型 | |
@Bean("binding") | |
public Binding binding(@Qualifier("directExchange") Exchange exchange, | |
@Qualifier("cyan-queue1") Queue queue) { | |
return BindingBuilder | |
.bind(queue) | |
.to(exchange) | |
.with("cyan-key") // 指定 key | |
.noargs(); | |
} | |
} |
在测试类写一个生产者
@SpringBootTest | |
class RabbitMqDemoApplicationTests { | |
@Resource | |
RabbitTemplate template; | |
@Test | |
void publisher() { | |
// 生产者,最后一个参数可以是 Object 类型 | |
long start = System.currentTimeMillis(); | |
// 如果不想要返回数据,直接调用 convertAndSet 也可以 | |
Object res = template.convertSendAndReceive("amq.direct", | |
"cyan-key", "This message is produced by java-springboot"); | |
System.out.println("消费者响应: " + res + | |
" 消耗时长: " + (System.currentTimeMillis() - start) + "ms"); | |
} | |
} |
写一个监视器作为消费者
import org.springframework.amqp.core.Message; | |
import org.springframework.amqp.rabbit.annotation.RabbitListener; | |
import org.springframework.stereotype.Component; | |
@Component | |
public class TestListener { | |
// 定义此方法为队列 cyan-queue1 的监听器,一旦监听到新的消息,就会接受并处理 | |
@RabbitListener(queues = "cyan-queue1") | |
public String test(Message message){ | |
System.out.println(new String(message.getBody())); | |
return "已获取消息"; | |
} | |
} |
然后直接运行即可。如果希望消费者拿到的消息是一个实体类:
创建一个实体类:
@Data | |
@AllArgsConstructor | |
@NoArgsConstructor | |
public class User { | |
int id; | |
String name; | |
} |
在配置类中配置一下 json
转换的 bean
:
@Configuration | |
public class RabbitConfiguration { | |
@Bean("jacksonConverter") // 直接创建一个用于 JSON 转换的 Bean | |
public Jackson2JsonMessageConverter converter(){ | |
return new Jackson2JsonMessageConverter(); | |
} | |
} |
导入依赖:
<dependency> | |
<groupId>com.fasterxml.jackson.dataformat</groupId> | |
<artifactId>jackson-dataformat-xml</artifactId> | |
<version>2.14.2</version> | |
</dependency> |
然后在监听器中指定转换器即可:
@RabbitListener(queues = "cyan-queue1", messageConverter = "jacksonConverter") | |
// 定义此方法为队列 cyan-queue1 的监听器,一旦监听到新的消息,就会接受并处理 | |
public String test(User user){ | |
System.out.println(user); | |
return "已获取消息"; | |
} |
在生产者那改变一下消息:
@Test | |
void publisher() { | |
// 生产者,最后一个参数可以是 Object 类型 | |
long start = System.currentTimeMillis(); | |
Object res = template.convertSendAndReceive("amq.direct", "cyan-key", | |
new User(1,"cyan")); | |
System.out.println("消费者响应: " + res + | |
" 消耗时长: " + (System.currentTimeMillis() - start) + "ms"); | |
} |
# 队列类型
# 死信队列
无法被消费的消息被称为 死信
,存放 死信
的队列也就是 死信队列
。
消息成为死信的三种情况:
- 队列消息长度到达限制
- 消费者异常拒接消费消息
- 原队列存在消息过期设置,消息到达超时时间未被消费
此外,还有死信消费者,消费死信消息,本质上也就是一种异常处理机制。死信交换机、死信队列需要自行创建,本质上是创建一般性的队列和交换机,然后指定其为其他队列 / 交换机的死信队列。
// 配置死信队列 | |
@Configuration | |
public class RabbitMqDeadQueueConfig { | |
// 定义一系列队列交换机常量 | |
private static final String DEAD_QUEUE = "deadQueue"; | |
private static final String DEAD_EXCHANGE = "deadExchange"; | |
private static final String DEAD_ROUTE_KEY = "dead.key"; | |
/** | |
* 死信队列 | |
*/ | |
@Bean(DEAD_QUEUE) | |
public Queue deadQueue() { | |
return QueueBuilder.durable(DEAD_QUEUE).build(); | |
} | |
/** | |
* 死信交换机 | |
*/ | |
@Bean(DEAD_EXCHANGE) | |
public Exchange deadExchange() { | |
return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build(); | |
} | |
/** | |
* 创建死信队列和死信交换机的绑定关系 | |
*/ | |
@Bean("deadBinding") | |
public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, | |
@Qualifier(DEAD_EXCHANGE) Exchange directExchange) { | |
return | |
BindingBuilder | |
.bind(deadQueue) | |
.to(directExchange) | |
.with(DEAD_ROUTE_KEY) | |
.and(null); | |
} | |
} |
然后创建正常的业务消费队列并指定死信交换机、路由 Key
@Configuration | |
public class RabbitMqConfig { | |
private static final String DEAD_QUEUE = "deadQueue"; | |
private static final String DEAD_EXCHANGE = "deadExchange"; | |
private static final String DEAD_ROUTE_KEY = "dead.key"; | |
/** | |
* 使用 ExchangeBuilder 创建交换机 | |
*/ | |
@Bean("bootExchange") | |
public Exchange bootExchange() { | |
return ExchangeBuilder.directExchange("bootExchange").durable(true).build(); | |
} | |
/** | |
* 创建队列: | |
* 指定死信交换机、路由 KEY | |
*/ | |
@Bean("bootQueue") | |
public Queue bootQueue001() { | |
return | |
QueueBuilder | |
.durable("bootQueue") | |
.deadLetterExchange(DEAD_EXCHANGE) // 为队列指定死信交换机 | |
.deadLetterRoutingKey(DEAD_ROUTE_KEY) // 指定死信 key | |
.build(); | |
} | |
} |
然后创建一个死信消费者(相当于异常处理),消费死信。在死信消费者中还可以有将死信消息存入数据库的操作,等待人工干预处理
@RabbitListener(queues = {"deadQueue"}) // 注意是否要注入 convert | |
public void consumer(Message message) { | |
System.out.println("收到死信消息" + new String(message.tobody())); | |
System.out.println("存入数据库,等待人工干预"); | |
} |
# 延迟队列
延迟队列:即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
使用过期 + 死信队列的思想可以实现延迟队列,具体就是消费者发现消息过期,消息被传入死信队列,死信队列的消费者实现延迟逻辑。
- 设置消息过期:但是消息只有被消费者消费时才会检查是否过期,一个队列中消息可能出现这种情况,队首有一个消息过期时间是 20s,已经过期了但是消费者还没有检查到那去。但是又新加入了一个 10s 的消息,该消息必须等前面的所有消息都被检查了才可以判断是否过期(尽管过期时间更短),这种做法不可取。
- 设置队列过期:那么如果前面的消息没有过期,后面的消息一定没有过期。但是不是很灵活,每个队列只有一种过期时间。
插件实现延迟队列:消息到达 延迟交换机
后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。具体操作自行搜索。
# 惰性队列
惰性队列
会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中。队列有 default
和 lazy
模式,在控制台上可以直接添加惰性队列,需要在 Arguments
添加一个 x-queue-mode=lazy
。
或者使用代码来设置:
// 方案一 | |
@Bean("lazy_queue1") | |
public Queue bootQueue() { | |
Map<String, Object> args = new HashMap<>(); | |
args.put("x-queue-mode", "lazy"); | |
return | |
QueueBuilder | |
.durable("lazy_queue1") | |
.withArguments(args) | |
.deadLetterExchange(DEAD_EXCHANGE) | |
.deadLetterRoutingKey(DEAD_ROUTE_KEY) | |
.build(); | |
} | |
// 方案二 | |
@Bean("lazy_queue2") | |
public Queue lazy_queue999() { | |
return | |
QueueBuilder | |
.durable("lazy_queue2") | |
.lazy() | |
.deadLetterExchange(DEAD_EXCHANGE) | |
.deadLetterRoutingKey(DEAD_ROUTE_KEY) | |
.build(); | |
} |
# 优先队列
数据结构懂得都懂,需要做的就是对消息设置优先级取值范围为 0~255(一个 8 位无符号整数),这个范围足够适用于大部分场景,即使有特殊需求,也可以使用其他机制来设计优先级,比如使用多个队列:每增加一个队列,相当于就将上限增加了 256 个优先级。
控制台设置队列优先级的最大值,还是增加 Arguments
即可:
同样的,代码层面,使用 withArguments
也可以设置:
// 使用 withArguments | |
@Bean("priorityQueue") | |
public Queue priorityQueue() { | |
Map<String, Object> arguments = new HashMap<>(); | |
arguments.put("x-max-priority", 10); | |
return QueueBuilder.durable("priorityQueue").withArguments(arguments).build(); | |
} | |
// 使用 maxPriority | |
@Bean("priorityQueue") | |
public Queue priorityQueue() { | |
//maxPriority 设置最大优先级 | |
return QueueBuilder.durable("priorityQueue").maxPriority(10).build(); | |
} |
设置消息优先级:
MessageProperties messageProperties = new MessageProperties(); | |
messageProperties.setPriority(i); // 设置优先级 | |
System.out.println("发送消息,优先级为:"+i); | |
Message message = new Message(("消息优先级为:"+i).getBytes(), messageProperties); | |
TimeUnit.SECONDS.sleep(1); | |
rabbitTemplate.send("priorityExchange", "priority.key", message); |
如果希望将自定义的对象,比如 User 写入队列中:
Message message = rabbitmqTemplate | |
.getMessageConverter() | |
.toMessage(user, messageProperties); |
消费者直接接收 User 即可,不需要显示处理 Message
,默认的 SimpleMessageConverter
或者 Jackson2JsonMessageConverter
都会自动根据消息内容进行自动转换:
@RabbitListener(queues = "my_queue") | |
public void receiveMessage(User user) { | |
System.out.println("Received user: " + user); | |
} |
# 交换机
当消息到达交换机后,如果没有找到匹配的队列,调用退回模式将消息回退给生产者。
# 备份交换机
除了回退模式, RabbitMQ
还提供了 备份交换机
机制,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理。
注意事项:当退回模式和备份交换机一起使用的时候,备份交换机的优先级比较高,不会执行回退消息的回调。
声明备份交换机,需要将交换机类型设为 fanout
:
public static final String BACKUP_QUEUE = "backupQueue"; | |
public static final String BACKUP_EXCHANGE = "backupExchange"; | |
public static final String BACKUP_ROUTE_KEY = "backup.key"; | |
@Bean(BACKUP_QUEUE) | |
public Queue backupQueue() { | |
return QueueBuilder.durable(BACKUP_QUEUE).build(); | |
} | |
@Bean(BACKUP_EXCHANGE) | |
public FanoutExchange backupExchange() { | |
return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE).durable(true).build(); | |
} | |
@Bean("backupBinding") | |
public Binding backupBinding(@Qualifier(BACKUP_QUEUE) Queue backupQueue, @Qualifier(BACKUP_EXCHANGE) FanoutExchange backupExchange) { | |
// 不需要指定 key | |
return BindingBuilder.bind(backupQueue).to(backupExchange); | |
} |
正常的业务交换机需要设置一下参数来指定备份交换机:
@Bean(BIZ_EXCHANGE) | |
public Exchange bizExchange() { | |
// 使用 alternate-exchange 设置备份交换机 | |
return ExchangeBuilder | |
.directExchange(BIZ_EXCHANGE) | |
.withArgument("alternate-exchange", MqBackupConfig.BACKUP_EXCHANGE) | |
.durable(true).build(); | |
} |
然后创建一个备份队列的消费者即可。
# 相关类
# AmqpAdmin
通过 bean
声明交换机、队列,在应用启动时自动创建,不是很灵活。通过 AmqpAdmin
可以通过接口或者业务代码去操作,该接口 SpringBoot
已经做好了自动配置,所以直接注入即可:
@Resource | |
AmqpAdmin amqpAdmin; |
相关功能为:
大致就是先通过 builder
拿到一个对象(交换机、队列、绑定关系等),然后再通过 AmqpAdmin
进行操作
- 查询队列信息:
// 获取队列名称、消息计数和消费者计数 | |
// Properties getQueueProperties (String queueName); 获取队列信息 | |
Properties properties = amqpAdmin.getQueueProperties("bootQueue001"); | |
// QueueInformation getQueueInfo (String var1); 获取队列信息 | |
QueueInformation queueInformation = amqpAdmin.getQueueInfo("bootQueue001"); |
- 声明、删除交换机:
//void declareExchange (Exchange var1); 声明一个交换机 | |
Exchange adminExchange = ExchangeBuilder | |
.directExchange("adminExchange") | |
.durable(true) | |
.build(); | |
amqpAdmin.declareExchange(adminExchange); | |
//boolean deleteExchange (String var1); 删除一个交换机 | |
boolean deleteExchange = amqpAdmin.deleteExchange("deleteExchange"); | |
System.out.println("删除一个交换机:" + deleteExchange); |
- 声明、删除队列:
// Queue declareQueue (); 声明一个随机名称队列:amq.gen-9RGhmUOsu8GbhmLfPe9-KQ | |
Queue queue = amqpAdmin.declareQueue(); | |
System.out.println("声明一个队列:" + queue.getName()); | |
// String declareQueue (Queue var1); 声明一个队列 | |
Queue declareQueue = QueueBuilder.durable("declareQueue").build(); | |
String s = amqpAdmin.declareQueue(declareQueue); | |
System.out.println("声明一个队列:" + s); |
- 声明、删除交换机 / 队列绑定关系
//void declareBinding (Binding binding); 声明交换机、队列绑定关系 | |
Binding binding = BindingBuilder | |
.bind(queue) | |
.to(adminExchange) | |
.with("declare.key") | |
.noargs(); | |
amqpAdmin.declareBinding(binding); | |
//void removeBinding (Binding var1); 删除交换机、队列绑定关系 | |
amqpAdmin.removeBinding(binding); |
# AmqpTemplate
RabbitTemplate
是 spring-amqp
提供的一个 RabbitMQ
消息操作模板类,主要提供了发送消息、接收消息以及其他附加功能,内部封装了 RabbitMQ
原生 API
,大大简化了使用 RabbitMQ
操作。而 RabbitTemplate
主要实现了 AmqpTemplate
和 RabbitOperations
接口。 AmqpTemplate
主要声明了三类方法:
public interface AmqpTemplate { | |
// 发送消息 | |
void send(Message var1) throws AmqpException; | |
// 接收消息 | |
Message receive() throws AmqpException; | |
// 发送消息并接收回复 | |
Message sendAndReceive(Message var1) throws AmqpException; | |
} |
- send 方法:创建
Message
消息对象
// 发送消息到默认交换机、默认路由 KEY | |
void send(Message message) throws AmqpException; | |
// 发送消息到默认交换机、使用指定路由 KEY | |
void send(String routingKey, Message message) throws AmqpException; | |
// 发送消息到指定交换机、使用指定路由 KEY | |
void send(String exchange, String routingKey, Message message) throws AmqpException; |
convertAndSend
方法:可以转换对象并发送,可以添加一个消息处理器MessagePostProcessor
// 将 Java 对象转换为 Amqp{@link Message}并将其发送到默认交换机、使用默认路由 KEY | |
void convertAndSend(Object message) throws AmqpException; | |
// 将 Java 对象转换为 Amqp{@link Message}并将其发送到默认交换机、使用自定义路由 KEY | |
void convertAndSend(String routingKey, Object message) throws AmqpException; | |
// 将 Java 对象转换为 Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由 KEY | |
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException; | |
// 将 Java 对象转换为 Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由 KEY | |
// 在发送消息之前添加一个消息处理器 MessagePostProcessor | |
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; | |
// 将 Java 对象转换为 Amqp{@link Message}并将其发送到默认交换机、使用自定义路由 KEY | |
// 在发送消息之前添加一个消息处理器 MessagePostProcessor | |
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) | |
throws AmqpException; | |
// 将 Java 对象转换为 Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由 KEY | |
// 在发送消息之前添加一个消息处理器 MessagePostProcessor | |
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) | |
throws AmqpException; |
一般可以通过 MessagePostProcessor
设置一些 Message
的属性(过期):
MessagePostProcessor messagePostProcessor = message1 -> { | |
MessageProperties messageProperties = message1.getMessageProperties(); | |
messageProperties.setExpiration("1000"); | |
return message1; | |
}; | |
rabbitTemplate.convertAndSend("","","消息",messagePostProcessor); |
- receive 方法:获取消息分为主动拉去消息和被动获取消息(RabbitMQ 主动将消息推送给订阅队列的消费者)。而 receive 方法就是主动从队列获取消息。
Message receive(String queueName, long timeoutMillis) // 重载方法 |
- receiveAndConvert 方法:拉取消息并进行对象转换。需要注入一个
Converter
。 - receiveAndReply 方法:
receiveAndReply
支持在获取消息时传入一个回调函数ReceiveAndReplyCallback
,处理接收到消息和回复消息的业务逻辑。该模式用的比较少,实现起来也比较麻烦。 - sendAndReceive 方法:发送消息并接收回复消息
Message sendAndReceive(String exchange, String routingKey, Message message) |
- convertSendAndReceive 方法:可以直接发送对象消息,还是建议注入一个
converter
Object convertSendAndReceive(String exchange, String routingKey, Object message) |
# 对象序列化机制
使用 RabbitMQ
原生 API
,发送消息时,发送的是二进制 byte[]
数据。
void basicPublish(String var1, String var2, byte[] var4) throws IOException; |
使用 RabbtiTemplate.send()
发送 Message
对象,也是二进制 byte[]
数据。
public Message(byte[] body) { | |
this(body, new MessageProperties()); | |
} |
因为 Java 基于对象操作,所以消息一般都是对象。
# 发送对象
默认的序列化( SimpleMessageConverter
)需要对象实现 Serializable
序列化接口:
User user = new User(); | |
user.setName("张三"); | |
rabbitTemplate.convertAndSend("bbdbdbdb","aaa.key",user); |
convertAndSend
本质也是调用 send
方法,只是多了一个 convertMessageIfNecessary
,将对象转为二进制数组,并封装到 Message
对象中:
public void convertAndSend(String exchange, | |
String routingKey, | |
Object object, | |
CorrelationData correlationData) throws AmqpException { | |
//this.convertMessageIfNecessary (object) 将 JAVA 消息对象转为 `Message` | |
this.send(exchange, | |
routingKey, | |
this.convertMessageIfNecessary(object), | |
correlationData); | |
} |
convertMessageIfNecessary
会判断当前消息是否是 Message
类型,如果是直接返回,不是则调用消息转换器进行转换。然后获取消息转换器,直接通过 RabbitTemplate.getMessageConverter
获取其成员属性,也就是 SimpleMessageConverter
,这是默认值。
private MessageConverter getRequiredMessageConverter() throws IllegalStateException { | |
// private MessageConverter messageConverter = new SimpleMessageConverter(); | |
MessageConverter converter = this.getMessageConverter(); | |
if (converter == null) { | |
throw new AmqpIllegalStateException("xxx"); | |
} else { | |
return converter; | |
} | |
} |
我们可以在 config
中配置一下 converter
,常见的就是 Jackson2JsonMessageConverter
:
@Bean("jacksonConverter") // 直接创建一个用于 JSON 转换的 Bean | |
public Jackson2JsonMessageConverter converter(){ | |
return new Jackson2JsonMessageConverter(); | |
} |
生产者可以直接使用 RabbitTemplate
发送消息即可,消费者需要指定一下 converter
:
// 获取一个订单类 Order | |
@RabbitListener(queues = QUEUE_NAME, messageConverter = "jacksonConverter") | |
public String getOrder(Order order) { | |
return "已消费订单"; | |
} |
当然,配置转换器,也可以用:只需要在 RabbitTemplate
、监听容器工厂 RabbitListenerContainerFactory
中设置转换器即可。
// 不需要在消费者指定 converter | |
@Bean | |
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ | |
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); | |
factory.setConnectionFactory(connectionFactory); | |
factory.setMessageConverter(new Jackson2JsonMessageConverter()); | |
return factory; | |
} | |
@Bean | |
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) { | |
RabbitTemplate template = new RabbitTemplate(); | |
configurer.configure(template, connectionFactory); | |
template.setMessageConverter(new Jackson2JsonMessageConverter()); | |
return template; | |
} |
# 面经
如何保证消息队列的消息不丢失?
从三个角度来回答:
- 生产者:生产者生产消息后,应该收到消息队列的
ack
才能算该消息成功被接收,如果没有收到ack
应该通过回调函数来重发或者采取补救措施 - 消息队列:因为消息在内存中,如果消息队列宕机了,可能会丢失消息,所以需要开启持久化,而为了防止将消息写到磁盘中时宕机而导致消息丢失,应该开启生产者确认,当写入磁盘成功后发送
ack
给生产者。 - 消费者:如果消费者拿到了消息,但是在处理过程中发生了异常,也可能会导致消息丢失,所以消费者最好将自动应答改为手动应答。
如何保证消息队列的消息不被重复消费?
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
所以这里我们给出几个考虑保证幂等性的措施:
- 对于 MySQL 和 Redis 的插入,redis 反正都是 set,那都无所谓,如果是 MySQL,应该对表设置唯一主键等,保证第二次插入应该失败(或者插入之前先查询一下),但是该情况很特殊,局限性大。
- 对每个消息都设置一个 id,全局唯一那种,每次消费一个消息后,都在 redis 里写入这个消息的 id,之后的消息都需要先将 id 在 redis 的查一下。(但是消费者也可能消费完后就挂了,没有在 redis 里写入数据)