# 安装

# Unix-Ubuntu

安装 Erlang 环境(RabbitMQ 就是这个语言开发的)

sudo apt install erlang

安装 RabbitMq

sudo apt install rabbitmq-server

查看 RabbitMq 状态

sudo rabbitmqctl status

我们需要使用的端口:5672,协议为 amqp 协议。

打开 RabbitMq 的管理面板,可以在浏览器上实时访问和监控

sudo rabbitmq-plugins enable rabbitmq_management

默认访问路径: IP:15672 ,该网页需要登录,所以需要创建一个用户

sudo rabbitmqctl add_user 用户名 密码

将管理员权限赋给刚刚创建的用户

sudo rabbitmqctl set_user_tags admin administrator

如果你希望这个用户可以操作原本的 /virtual host

sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

此处顺便介绍一下架构

  • channel :每个客户端连接都会使用一个 channel ,再通过 channel 访问 RabbitMQ ,通信协议为 amqp 协议。
  • exchange :类似于交换机,根据请求转发给对应的消息队列,每个消息队列都可以绑定到 exchange ,不同类型的 exchange 类型可以用于实现不同消息的模式。
  • queue :消息队列本体
  • routing key :路由关键字, exchange 根据该关键字进行消息投递
  • virtual host :类似环境隔离,不同环境都可以单独配置一个 virtual host ,每个 host 又可以包含很多个 exchangequeue ,不同 host 之间互不影响。

# 使用消息队列

# 控制面板

总体流程为:创建 virtual host — 创建 queue — 绑定 exchange — 向 exchange 发送消息 — 消息队列消费消息。

  • 创建 virtual host

  • 创建 queue :自动删除的选项表示如果所有消费者都与消息队列断开连接,就自动删除该队列

  • 绑定 exchange :选择交换机和路由 key

  • 从交换机发送消息:记得是刚才创建的 virtual host ,同时消息需要带上 route key

  • 去队列消费消息:可以看到消息队列已经有了一条消息,我们选择 ack 模式, Nack 模式是拒绝消息,也就是不会将消息从队列取出,并且重新排队。 ack 就是确认应答,然后将消息从消息队列中移除。 reject 表示拒绝此消息,但可以指定是否重新排队。

# Java 操作消息队列

Springboot 整合 RabbitMQ

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml 配置文件

server:
  port: 8089
  servlet:
    context-path: /mq/rabbitmq
spring:
  rabbitmq:
    addresses: 192.168.92.130
    username: admin
    password: admin
    virtual-host: cyan	#我自己创建了名叫cyan的virtual host,不要写错/cyan了

创建配置类:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
    // 将 exchange 和 queue 注入 bean 的操作大同小异
    @Bean("directExchange")
    public Exchange exchange() {
        //direct 类型的 exchange
        return ExchangeBuilder.directExchange("amq.direct").build();
    }
    // 创建 queue 只需要指定名字即可
    @Bean("cyan-queue1")
    public Queue queue() {
        return QueueBuilder.nonDurable("cyan").build();
    }
    // @Qualifier 注解用于指定使用哪个 bean,因为 exchange 也可能有多个 bean,比如 fanout 类型
    @Bean("binding")
    public Binding binding(@Qualifier("directExchange") Exchange exchange,
                           @Qualifier("cyan-queue1") Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("cyan-key") // 指定 key
                .noargs();
    }
}

在测试类写一个生产者

@SpringBootTest
class RabbitMqDemoApplicationTests {
    @Resource
    RabbitTemplate template;
    @Test
    void publisher() {
        // 生产者,最后一个参数可以是 Object 类型
        long start = System.currentTimeMillis();
        // 如果不想要返回数据,直接调用 convertAndSet 也可以
        Object res = template.convertSendAndReceive("amq.direct",
                     "cyan-key", "This message is produced by java-springboot");
        System.out.println("消费者响应: " + res + 
                           "  消耗时长: " + (System.currentTimeMillis() - start) + "ms");
    }
}

写一个监视器作为消费者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TestListener {
	// 定义此方法为队列 cyan-queue1 的监听器,一旦监听到新的消息,就会接受并处理	
    @RabbitListener(queues = "cyan-queue1")   
    public String test(Message message){
        System.out.println(new String(message.getBody()));
        return "已获取消息";
    }
}

然后直接运行即可。如果希望消费者拿到的消息是一个实体类:

创建一个实体类:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
    int id;
    String name;
}

在配置类中配置一下 json 转换的 bean

@Configuration
public class RabbitConfiguration {
    @Bean("jacksonConverter")   // 直接创建一个用于 JSON 转换的 Bean
    public Jackson2JsonMessageConverter converter(){
        return new Jackson2JsonMessageConverter();
    }
}

导入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.14.2</version>
</dependency>

然后在监听器中指定转换器即可:

@RabbitListener(queues = "cyan-queue1", messageConverter = "jacksonConverter")
// 定义此方法为队列 cyan-queue1 的监听器,一旦监听到新的消息,就会接受并处理
public String test(User user){
    System.out.println(user);
    return "已获取消息";
}

在生产者那改变一下消息:

@Test
void publisher() {
    // 生产者,最后一个参数可以是 Object 类型
    long start = System.currentTimeMillis();
    Object res = template.convertSendAndReceive("amq.direct", "cyan-key",
                                                new User(1,"cyan"));
    System.out.println("消费者响应: " + res + 
                       "  消耗时长: " + (System.currentTimeMillis() - start) + "ms");
}

# 队列类型

# 死信队列

无法被消费的消息被称为 死信 ,存放 死信 的队列也就是 死信队列

消息成为死信的三种情况:

  1. 队列消息长度到达限制
  2. 消费者异常拒接消费消息
  3. 原队列存在消息过期设置,消息到达超时时间未被消费

此外,还有死信消费者,消费死信消息,本质上也就是一种异常处理机制。死信交换机、死信队列需要自行创建,本质上是创建一般性的队列和交换机,然后指定其为其他队列 / 交换机的死信队列。

// 配置死信队列
@Configuration
public class RabbitMqDeadQueueConfig {
    // 定义一系列队列交换机常量
    private static final String DEAD_QUEUE = "deadQueue";
    private static final String DEAD_EXCHANGE = "deadExchange";
    private static final String DEAD_ROUTE_KEY = "dead.key";
    /**
     * 死信队列
     */
    @Bean(DEAD_QUEUE)
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }
    /**
     * 死信交换机
     */
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
    }
    /**
     * 创建死信队列和死信交换机的绑定关系
     */
    @Bean("deadBinding")
    public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue,
                               @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
        return 
            BindingBuilder
            .bind(deadQueue)
            .to(directExchange)
            .with(DEAD_ROUTE_KEY)
            .and(null);
    }
}

然后创建正常的业务消费队列并指定死信交换机、路由 Key

@Configuration
public class RabbitMqConfig {
    private static final String DEAD_QUEUE = "deadQueue";
    private static final String DEAD_EXCHANGE = "deadExchange";
    private static final String DEAD_ROUTE_KEY = "dead.key";
    /**
     * 使用 ExchangeBuilder 创建交换机
     */
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange("bootExchange").durable(true).build();
    }
    /**
     * 创建队列:
     * 指定死信交换机、路由 KEY
     */
    @Bean("bootQueue")
    public Queue bootQueue001() {
        return 
            QueueBuilder
            .durable("bootQueue")
            .deadLetterExchange(DEAD_EXCHANGE) // 为队列指定死信交换机
            .deadLetterRoutingKey(DEAD_ROUTE_KEY) // 指定死信 key
            .build();
    }
}

然后创建一个死信消费者(相当于异常处理),消费死信。在死信消费者中还可以有将死信消息存入数据库的操作,等待人工干预处理

@RabbitListener(queues = {"deadQueue"}) // 注意是否要注入 convert
public void consumer(Message message) {
    System.out.println("收到死信消息" + new String(message.tobody()));
    System.out.println("存入数据库,等待人工干预");
}

# 延迟队列

延迟队列:即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

使用过期 + 死信队列的思想可以实现延迟队列,具体就是消费者发现消息过期,消息被传入死信队列,死信队列的消费者实现延迟逻辑。

  • 设置消息过期:但是消息只有被消费者消费时才会检查是否过期,一个队列中消息可能出现这种情况,队首有一个消息过期时间是 20s,已经过期了但是消费者还没有检查到那去。但是又新加入了一个 10s 的消息,该消息必须等前面的所有消息都被检查了才可以判断是否过期(尽管过期时间更短),这种做法不可取。
  • 设置队列过期:那么如果前面的消息没有过期,后面的消息一定没有过期。但是不是很灵活,每个队列只有一种过期时间。

插件实现延迟队列:消息到达 延迟交换机 后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。具体操作自行搜索。

# 惰性队列

惰性队列 会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中。队列有 defaultlazy 模式,在控制台上可以直接添加惰性队列,需要在 Arguments 添加一个 x-queue-mode=lazy

或者使用代码来设置:

// 方案一
@Bean("lazy_queue1")
public Queue bootQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-queue-mode", "lazy");
    return
        QueueBuilder
        .durable("lazy_queue1")
        .withArguments(args)
        .deadLetterExchange(DEAD_EXCHANGE)
        .deadLetterRoutingKey(DEAD_ROUTE_KEY)
        .build();
}
// 方案二
@Bean("lazy_queue2")
public Queue lazy_queue999() {
    return 
        QueueBuilder
        .durable("lazy_queue2")
        .lazy()
        .deadLetterExchange(DEAD_EXCHANGE)
        .deadLetterRoutingKey(DEAD_ROUTE_KEY)
        .build();
}

# 优先队列

数据结构懂得都懂,需要做的就是对消息设置优先级取值范围为 0~255(一个 8 位无符号整数),这个范围足够适用于大部分场景,即使有特殊需求,也可以使用其他机制来设计优先级,比如使用多个队列:每增加一个队列,相当于就将上限增加了 256 个优先级。

控制台设置队列优先级的最大值,还是增加 Arguments 即可:

同样的,代码层面,使用 withArguments 也可以设置:

// 使用 withArguments
@Bean("priorityQueue")
public Queue priorityQueue() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-max-priority", 10);
    return QueueBuilder.durable("priorityQueue").withArguments(arguments).build();
}
// 使用 maxPriority
@Bean("priorityQueue")
public Queue priorityQueue() {
    //maxPriority 设置最大优先级
    return QueueBuilder.durable("priorityQueue").maxPriority(10).build();
}

设置消息优先级:

MessageProperties messageProperties = new MessageProperties();
messageProperties.setPriority(i); // 设置优先级
System.out.println("发送消息,优先级为:"+i);
Message message = new Message(("消息优先级为:"+i).getBytes(), messageProperties);
TimeUnit.SECONDS.sleep(1);
rabbitTemplate.send("priorityExchange", "priority.key", message);

如果希望将自定义的对象,比如 User 写入队列中:

Message message = rabbitmqTemplate
    			.getMessageConverter()
    			.toMessage(user, messageProperties);

消费者直接接收 User 即可,不需要显示处理 Message ,默认的 SimpleMessageConverter 或者 Jackson2JsonMessageConverter 都会自动根据消息内容进行自动转换:

@RabbitListener(queues = "my_queue")
public void receiveMessage(User user) {
    System.out.println("Received user: " + user);
}

# 交换机

当消息到达交换机后,如果没有找到匹配的队列,调用退回模式将消息回退给生产者。

# 备份交换机

除了回退模式, RabbitMQ 还提供了 备份交换机 机制,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理。

注意事项:当退回模式和备份交换机一起使用的时候,备份交换机的优先级比较高,不会执行回退消息的回调。

声明备份交换机,需要将交换机类型设为 fanout

public static final String BACKUP_QUEUE = "backupQueue";
public static final String BACKUP_EXCHANGE = "backupExchange";
public static final String BACKUP_ROUTE_KEY = "backup.key";
@Bean(BACKUP_QUEUE)
public Queue backupQueue() {
    return QueueBuilder.durable(BACKUP_QUEUE).build();
}
@Bean(BACKUP_EXCHANGE)
public FanoutExchange backupExchange() {
    return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE).durable(true).build();
}
@Bean("backupBinding")
public Binding backupBinding(@Qualifier(BACKUP_QUEUE) Queue backupQueue, @Qualifier(BACKUP_EXCHANGE) FanoutExchange backupExchange) {
    // 不需要指定 key
    return BindingBuilder.bind(backupQueue).to(backupExchange);
}

正常的业务交换机需要设置一下参数来指定备份交换机:

@Bean(BIZ_EXCHANGE)
public Exchange bizExchange() {
    // 使用 alternate-exchange 设置备份交换机
    return ExchangeBuilder
        .directExchange(BIZ_EXCHANGE)
        .withArgument("alternate-exchange", MqBackupConfig.BACKUP_EXCHANGE)
        .durable(true).build();
}

然后创建一个备份队列的消费者即可。

# 相关类

# AmqpAdmin

通过 bean 声明交换机、队列,在应用启动时自动创建,不是很灵活。通过 AmqpAdmin 可以通过接口或者业务代码去操作,该接口 SpringBoot 已经做好了自动配置,所以直接注入即可:

@Resource
AmqpAdmin amqpAdmin;

相关功能为:

大致就是先通过 builder 拿到一个对象(交换机、队列、绑定关系等),然后再通过 AmqpAdmin 进行操作

  • 查询队列信息:
// 获取队列名称、消息计数和消费者计数
// Properties getQueueProperties (String queueName); 获取队列信息
Properties properties = amqpAdmin.getQueueProperties("bootQueue001");
// QueueInformation getQueueInfo (String var1); 获取队列信息
QueueInformation queueInformation = amqpAdmin.getQueueInfo("bootQueue001");
  • 声明、删除交换机:
//void declareExchange (Exchange var1); 声明一个交换机
Exchange adminExchange = ExchangeBuilder
                        .directExchange("adminExchange")
                        .durable(true)
                        .build();
amqpAdmin.declareExchange(adminExchange);
//boolean deleteExchange (String var1); 删除一个交换机
boolean deleteExchange = amqpAdmin.deleteExchange("deleteExchange");
System.out.println("删除一个交换机:" + deleteExchange);
  • 声明、删除队列:
// Queue declareQueue (); 声明一个随机名称队列:amq.gen-9RGhmUOsu8GbhmLfPe9-KQ
Queue queue = amqpAdmin.declareQueue();
System.out.println("声明一个队列:" + queue.getName());
// String declareQueue (Queue var1); 声明一个队列
Queue declareQueue = QueueBuilder.durable("declareQueue").build();
String s = amqpAdmin.declareQueue(declareQueue);
System.out.println("声明一个队列:" + s);
  • 声明、删除交换机 / 队列绑定关系
//void declareBinding (Binding binding); 声明交换机、队列绑定关系
Binding binding = BindingBuilder
                .bind(queue)
                .to(adminExchange)
                .with("declare.key")
                .noargs();
amqpAdmin.declareBinding(binding);
//void removeBinding (Binding var1); 删除交换机、队列绑定关系
amqpAdmin.removeBinding(binding);

# AmqpTemplate

RabbitTemplatespring-amqp 提供的一个 RabbitMQ 消息操作模板类,主要提供了发送消息、接收消息以及其他附加功能,内部封装了 RabbitMQ 原生 API ,大大简化了使用 RabbitMQ 操作。而 RabbitTemplate 主要实现了 AmqpTemplateRabbitOperations 接口。 AmqpTemplate 主要声明了三类方法:

public interface AmqpTemplate {
	// 发送消息
    void send(Message var1) throws AmqpException;
    // 接收消息
    Message receive() throws AmqpException;
	
    // 发送消息并接收回复
    Message sendAndReceive(Message var1) throws AmqpException;
}
  • send 方法:创建 Message 消息对象
// 发送消息到默认交换机、默认路由 KEY
void send(Message message) throws AmqpException;
// 发送消息到默认交换机、使用指定路由 KEY
void send(String routingKey, Message message) throws AmqpException;
// 发送消息到指定交换机、使用指定路由 KEY
void send(String exchange, String routingKey, Message message) throws AmqpException;
  • convertAndSend 方法:可以转换对象并发送,可以添加一个消息处理器 MessagePostProcessor
// 将 Java 对象转换为 Amqp{@link Message}并将其发送到默认交换机、使用默认路由 KEY
	void convertAndSend(Object message) throws AmqpException;
	// 将 Java 对象转换为 Amqp{@link Message}并将其发送到默认交换机、使用自定义路由 KEY
	void convertAndSend(String routingKey, Object message) throws AmqpException;
	// 将 Java 对象转换为 Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由 KEY
	void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
	 // 将 Java 对象转换为 Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由 KEY
	 // 在发送消息之前添加一个消息处理器 MessagePostProcessor 
	void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
	// 将 Java 对象转换为 Amqp{@link Message}并将其发送到默认交换机、使用自定义路由 KEY
	// 在发送消息之前添加一个消息处理器 MessagePostProcessor 
	void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
			throws AmqpException;
	// 将 Java 对象转换为 Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由 KEY
	// 在发送消息之前添加一个消息处理器 MessagePostProcessor
	void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
			throws AmqpException;

一般可以通过 MessagePostProcessor 设置一些 Message 的属性(过期):

MessagePostProcessor messagePostProcessor = message1 -> {
    MessageProperties messageProperties = message1.getMessageProperties();
    messageProperties.setExpiration("1000");
    return message1;
};
rabbitTemplate.convertAndSend("","","消息",messagePostProcessor);
  • receive 方法:获取消息分为主动拉去消息和被动获取消息(RabbitMQ 主动将消息推送给订阅队列的消费者)。而 receive 方法就是主动从队列获取消息。
Message receive(String queueName, long timeoutMillis) // 重载方法
  • receiveAndConvert 方法:拉取消息并进行对象转换。需要注入一个 Converter
  • receiveAndReply 方法: receiveAndReply 支持在获取消息时传入一个回调函数 ReceiveAndReplyCallback ,处理接收到消息和回复消息的业务逻辑。该模式用的比较少,实现起来也比较麻烦。
  • sendAndReceive 方法:发送消息并接收回复消息
Message sendAndReceive(String exchange, String routingKey, Message message)
  • convertSendAndReceive 方法:可以直接发送对象消息,还是建议注入一个 converter
Object convertSendAndReceive(String exchange, String routingKey, Object message)

# 对象序列化机制

使用 RabbitMQ 原生 API ,发送消息时,发送的是二进制 byte[] 数据。

void basicPublish(String var1, String var2, byte[] var4) throws IOException;

使用 RabbtiTemplate.send() 发送 Message 对象,也是二进制 byte[] 数据。

public Message(byte[] body) {
        this(body, new MessageProperties());
    }

因为 Java 基于对象操作,所以消息一般都是对象。

# 发送对象

默认的序列化( SimpleMessageConverter )需要对象实现 Serializable 序列化接口:

User user = new User();
user.setName("张三");
rabbitTemplate.convertAndSend("bbdbdbdb","aaa.key",user);

convertAndSend 本质也是调用 send 方法,只是多了一个 convertMessageIfNecessary ,将对象转为二进制数组,并封装到 Message 对象中:

public void convertAndSend(String exchange,
                           String routingKey,
                           Object object,
                           CorrelationData correlationData) throws AmqpException {
    //this.convertMessageIfNecessary (object) 将 JAVA 消息对象转为 `Message`
    this.send(exchange,
              routingKey, 
              this.convertMessageIfNecessary(object), 
              correlationData);
}

convertMessageIfNecessary 会判断当前消息是否是 Message 类型,如果是直接返回,不是则调用消息转换器进行转换。然后获取消息转换器,直接通过 RabbitTemplate.getMessageConverter 获取其成员属性,也就是 SimpleMessageConverter ,这是默认值。

private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
    //  private MessageConverter messageConverter = new SimpleMessageConverter();
    MessageConverter converter = this.getMessageConverter();
    if (converter == null) {
        throw new AmqpIllegalStateException("xxx");
    } else {
        return converter;
    }
}

我们可以在 config 中配置一下 converter ,常见的就是 Jackson2JsonMessageConverter

@Bean("jacksonConverter")   // 直接创建一个用于 JSON 转换的 Bean
public Jackson2JsonMessageConverter converter(){
    return new Jackson2JsonMessageConverter();
}

生产者可以直接使用 RabbitTemplate 发送消息即可,消费者需要指定一下 converter

// 获取一个订单类 Order
@RabbitListener(queues = QUEUE_NAME, messageConverter = "jacksonConverter")
public String getOrder(Order order) {
    return "已消费订单";
}

当然,配置转换器,也可以用:只需要在 RabbitTemplate 、监听容器工厂 RabbitListenerContainerFactory 中设置转换器即可。

// 不需要在消费者指定 converter
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    return template;
}

# 面经

如何保证消息队列的消息不丢失?

从三个角度来回答:

  • 生产者:生产者生产消息后,应该收到消息队列的 ack 才能算该消息成功被接收,如果没有收到 ack 应该通过回调函数来重发或者采取补救措施
  • 消息队列:因为消息在内存中,如果消息队列宕机了,可能会丢失消息,所以需要开启持久化,而为了防止将消息写到磁盘中时宕机而导致消息丢失,应该开启生产者确认,当写入磁盘成功后发送 ack 给生产者。
  • 消费者:如果消费者拿到了消息,但是在处理过程中发生了异常,也可能会导致消息丢失,所以消费者最好将自动应答改为手动应答。

如何保证消息队列的消息不被重复消费?

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

所以这里我们给出几个考虑保证幂等性的措施:

  • 对于 MySQL 和 Redis 的插入,redis 反正都是 set,那都无所谓,如果是 MySQL,应该对表设置唯一主键等,保证第二次插入应该失败(或者插入之前先查询一下),但是该情况很特殊,局限性大。
  • 对每个消息都设置一个 id,全局唯一那种,每次消费一个消息后,都在 redis 里写入这个消息的 id,之后的消息都需要先将 id 在 redis 的查一下。(但是消费者也可能消费完后就挂了,没有在 redis 里写入数据)