掃二維碼與項(xiàng)目經(jīng)理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
后續(xù)延遲隊列優(yōu)化用Springboot整合,先理解死信隊列

成都創(chuàng)新互聯(lián)公司是一家專業(yè)提供上栗企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計、html5、小程序制作等業(yè)務(wù)。10年已為上栗眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站制作公司優(yōu)惠進(jìn)行中。
com.rabbitmq
amqp-client
5.12.0
由于特定原因?qū)е玛犃兄械南⒉荒鼙幌M(fèi),這樣的消息如果沒有后續(xù)處理就可以放入死信隊列中,例如一個訂單如果超時未被支付從而自動失效,就將這個訂單放到死信隊列中。(死信隊列中的消息是可以被消費(fèi)的)
就是在規(guī)定的時間內(nèi)消息沒有被消費(fèi),(和延遲隊列不同,延遲隊列時表示到達(dá)時間消息才可以被消費(fèi))
在生產(chǎn)者代碼中設(shè)置消息過期時間:
//生產(chǎn)者發(fā)送消息,將消息設(shè)置為TTL消息
AMQP.BasicProperties properties =
new AMQP.BasicProperties().builder().expiration("10000").build();修改隊列參數(shù)argument的特殊屬性:
arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交換機(jī)
arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkey
arguments.put("x-message-TTL", 10000);//設(shè)置過期時間(單位毫秒)
//將死信交換機(jī)與死信隊列綁定
消費(fèi)者1
public class Consumer01 {
public static final String EXCHANGE_DIRECT = "exchange_direct";//普通交換機(jī)的名稱
public static final String EXCHANGE_DIRECT_DEAD = "exchange_direct_dead";//死信交換機(jī)的名稱
public static final String QUEUE_PLAIN = "queue_plain";//普通隊列的名稱
public static final String QUEUE_PLAIN_DEAD = "queue_plain_dead";//死信隊列的名稱
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.createChannel();
//聲明死信交換機(jī)和普通交換機(jī)
channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(EXCHANGE_DIRECT_DEAD, BuiltinExchangeType.DIRECT);
//聲明普通隊列(綁定普通隊列與死信交換機(jī)的關(guān)系,在通過rotingkey綁定死信隊列
Map arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交換機(jī)
arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkey
//設(shè)置過期時間(單位毫秒)
arguments.put("x-message-TTL", 10000);
channel.queueDeclare(QUEUE_PLAIN, false, false, false, arguments);
//聲明死信隊列
channel.queueDeclare(QUEUE_PLAIN_DEAD, false, false, false, null);
//普通交換機(jī)和隊列的綁定
channel.queueBind(QUEUE_PLAIN, EXCHANGE_DIRECT, "routingkey_direct");
//死信交換機(jī)和死信隊列的綁定
channel.queueBind(QUEUE_PLAIN_DEAD, EXCHANGE_DIRECT_DEAD, "routingkey_direct-dead");
//模擬超時時間消息未被消費(fèi)
Thread.sleep(1000000);
channel.basicConsume(QUEUE_PLAIN, true, (consumerTag, message) -> {
System.out.println("Consumer01.main接受到消息:" + new String(message.getBody()));
}, (consumerTag, sig) -> {
});
}
} 生產(chǎn)者
public class Produce {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.createChannel();
//生產(chǎn)者發(fā)送消息,將消息設(shè)置為TTL消息
AMQP.BasicProperties properties =
new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message = i + "";
channel.basicPublish(Consumer01.EXCHANGE_DIRECT,"routingkey_direct",properties,message.getBytes(StandardCharsets.UTF_8));
}
}
}消費(fèi)者2
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.createChannel();
channel.basicConsume(Consumer01.QUEUE_PLAIN_DEAD, true, (consumerTag, message) -> {
System.out.println("Consumer2.main接受死信隊列的消息:" + new String(message.getBody()));
}, (consumerTag, sig) -> {
});
}
}
/**輸出結(jié)果:
Consumer2.main接受死信隊列的消息:0
Consumer2.main接受死信隊列的消息:1
Consumer2.main接受死信隊列的消息:2
Consumer2.main接受死信隊列的消息:3
Consumer2.main接受死信隊列的消息:4
Consumer2.main接受死信隊列的消息:5
Consumer2.main接受死信隊列的消息:6
Consumer2.main接受死信隊列的消息:7
Consumer2.main接受死信隊列的消息:8
Consumer2.main接受死信隊列的消息:9
*/
將RabbiMQ的隊列的argument屬性的鍵設(shè)置為 x-max-length 表示隊列可以容納的最大條數(shù)
將自動應(yīng)答設(shè)為false
在消費(fèi)者調(diào)一個Channel.basicReject,設(shè)置參數(shù)requeue為false,表示不重新排隊,將消息丟到死信隊列
延遲隊列就是講一個消息延遲發(fā)送,例如消息在隊列中10s后才能被取出,可以通過RabbitMQ的插件或者死信隊列來實(shí)現(xiàn)
用死信隊列實(shí)現(xiàn)延遲隊列的思路:
在于死信隊列綁定的普通隊列不設(shè)置消費(fèi)者,利用TTL延遲消息,當(dāng)TTL時間過期后,到達(dá)死信隊列被消費(fèi)這樣就形成一個延遲隊列。
延遲隊列的使用場景:①典型的就是流量削峰,對于不重要的消息,可以延遲消費(fèi),有助于減輕數(shù)據(jù)庫的壓力,強(qiáng)化分布式系統(tǒng)的高可用和并發(fā)性能。②還可以實(shí)現(xiàn)一個消息提醒,例如用戶三天未登錄發(fā)送一個消息提醒。
在實(shí)際生產(chǎn)中可能存在很多不同的延遲時間要求,不可能每一個延遲要求就創(chuàng)造一個隊列,我們可以用生產(chǎn)者實(shí)現(xiàn)延遲信息,而隊列不設(shè)置TTL就可以根據(jù)生產(chǎn)的延遲消息進(jìn)行延遲發(fā)送。
但是此方法雖然實(shí)現(xiàn)了一個隊列就可以轉(zhuǎn)發(fā)不同延時時間的消息,但是有缺陷,隊列中的消息是排隊發(fā)送的,也就是說如果我第一條消息發(fā)送20s延時,接著第二條消息發(fā)送2s延時。最后卻是20s消息先消費(fèi),而2s消息后消費(fèi),因?yàn)镽abbitMQ在檢測一條消息時發(fā)生了20s的阻塞。如下:
###
GET http://localhost:8080/ttl/sendExpirationMessage/aaaaa/20000
###
GET http://localhost:8080/ttl/sendExpirationMessage/bbbbb/2000
最后輸出結(jié)果是先消費(fèi)aaaa后消費(fèi)bbbb可以通過RabbitMQ的插件實(shí)現(xiàn)延時隊列,此方法沒有這缺陷
從官網(wǎng)上下載對應(yīng)版本的延遲插件,下載后如圖:交換機(jī)類型會多出一個 x-delayed-message
在我們自定義的交換機(jī)中,這是一種新的交換機(jī)類型,該類型消息支持延遲投遞機(jī)制,消息傳遞后并不會立即投遞到目標(biāo)隊列中,而是存儲在mnesia(一個分布式數(shù)據(jù)系統(tǒng))表中,當(dāng)達(dá)到投遞時間時,才會投遞到目標(biāo)隊列中。
代碼實(shí)例:
配置類:
@Configuration
public class RabbitDelayedConfig {
//延遲交換機(jī)
public static final String DELAYED_EXCHANGE = "delayed.exchange";
//延遲隊列b
public static final String DELAYED_QUEUE = "delayed.queue";
//延遲交換機(jī)和隊列的routingkey
public static final String DELAYED_ROTINGKEY = "delayed.routingkey";
//public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map arguments) {
// super(name, durable, autoDelete, arguments);
// this.type = type;
// }
@Bean
public CustomExchange delayedExchange() {
Map arguments = new HashMap<>();
//定義延遲消息類型由那種交換機(jī)規(guī)則處置
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments);
}
@Bean
public Queue delayedQueue() {
return QueueBuilder
.nonDurable(DELAYED_QUEUE)
.build();
}
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROTINGKEY).noargs();
}
} 生產(chǎn)者:
/*延遲交換機(jī)發(fā)送消息*/
@GetMapping("/sendDelayedMessage/{message}/{delayedTTL}")
public void sendDelayedMessage(@PathVariable String message, @PathVariable Integer delayedTTL) {
log.info("當(dāng)前時間:{},發(fā)送一條延遲時間為{}的延遲消息給延遲隊列:{}", new Date().toString(), delayedTTL, message);
rabbitTemplate.convertAndSend(RabbitDelayedConfig.DELAYED_EXCHANGE,
RabbitDelayedConfig.DELAYED_ROTINGKEY,
message,
msg -> {
msg.getMessageProperties().setDelay(delayedTTL);//設(shè)置消息的延遲消息時間
return msg;
});
}消費(fèi)者:
@Slf4j
@Component
public class DelayedQueueConsumer {
@RabbitListener(queues = RabbitDelayedConfig.DELAYED_QUEUE)
public void queue(Message message) {
log.info("接受到延遲隊列的消息,當(dāng)前時間:{},消息:{}",new Date().toString(),new String(message.getBody()));
}
} 
我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流