掃二維碼與項(xiàng)目經(jīng)理溝通
我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
大家好呀,我是樓仔。

創(chuàng)新互聯(lián)是網(wǎng)站建設(shè)技術(shù)企業(yè),為成都企業(yè)提供專業(yè)的成都做網(wǎng)站、網(wǎng)站建設(shè),網(wǎng)站設(shè)計(jì),網(wǎng)站制作,網(wǎng)站改版等技術(shù)服務(wù)。擁有十年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制適合企業(yè)的網(wǎng)站。十年品質(zhì),值得信賴!
RabbitMQ 的文章之前寫過,但是當(dāng)時(shí)給的示例是 Demo 版的,這篇文章主要是結(jié)合之前寫的理論知識(shí),將 RabbitMQ 集成到技術(shù)派項(xiàng)目中。
不 BB,上文章目錄:
下面我們先回顧一下理論知識(shí),如果對(duì)這塊知識(shí)已經(jīng)清楚的同學(xué),可以直接跳到實(shí)戰(zhàn)部分。
消息隊(duì)列目前主要 2 種模式,分別為“點(diǎn)對(duì)點(diǎn)模式”和“發(fā)布/訂閱模式”。
一個(gè)具體的消息只能由一個(gè)消費(fèi)者消費(fèi),多個(gè)生產(chǎn)者可以向同一個(gè)消息隊(duì)列發(fā)送消息,但是一個(gè)消息在被一個(gè)消息者處理的時(shí)候,這個(gè)消息在隊(duì)列上會(huì)被鎖住或者被移除并且其他消費(fèi)者無法處理該消息。
需要額外注意的是,如果消費(fèi)者處理一個(gè)消息失敗了,消息系統(tǒng)一般會(huì)把這個(gè)消息放回隊(duì)列,這樣其他消費(fèi)者可以繼續(xù)處理。
單個(gè)消息可以被多個(gè)訂閱者并發(fā)的獲取和處理。一般來說,訂閱有兩種類型:
RabbitMQ 2007 年發(fā)布,是使用 Erlang 語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于 AMQP 協(xié)議來實(shí)現(xiàn)。
提到RabbitMQ,就不得不提AMQP協(xié)議。AMQP協(xié)議是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。
先了解一下AMQP協(xié)議中間的幾個(gè)重要概念:
AMQP 協(xié)議模型由三部分組成:生產(chǎn)者、消費(fèi)者和服務(wù)端,執(zhí)行流程如下:
RabbitMQ常用的交換器類型有direct、topic、fanout、headers四種:
因?yàn)槲矣玫氖荕ac,所以直接可以參考官網(wǎng):
https://www.rabbitmq.com/install-homebrew.html
需要注意的是,一定需要先執(zhí)行:
brew update然后再執(zhí)行:
brew install rabbitmq
之前沒有執(zhí)行brew update,直接執(zhí)行brew install rabbitmq時(shí),會(huì)報(bào)各種各樣奇怪的錯(cuò)誤,其中“403 Forbidde”居多。
但是在執(zhí)行“brew install rabbitmq”,會(huì)自動(dòng)安裝其它的程序,如果你使用源碼安裝Rabbitmq,因?yàn)閱?dòng)該服務(wù)依賴erlang環(huán)境,所以你還需手動(dòng)安裝erlang,但是目前官方已經(jīng)一鍵給你搞定,會(huì)自動(dòng)安裝Rabbitmq依賴的所有程序,是不是很棒!
最后執(zhí)行成功的輸出如下:
啟動(dòng)服務(wù):
# 啟動(dòng)方式1:后臺(tái)啟動(dòng)
brew services start rabbitmq
# 啟動(dòng)方式2:當(dāng)前窗口啟動(dòng)
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server在瀏覽器輸入:
http://localhost:15672/會(huì)出現(xiàn)RabbitMQ后臺(tái)管理界面(用戶名和密碼都為guest):
通過brew安裝,一行命令搞定,真香!
添加賬號(hào):
## 添加賬號(hào)
./rabbitmqctl add_user admin admin
## 添加訪問權(quán)限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設(shè)置超級(jí)權(quán)限
./rabbitmqctl set_user_tags admin administratorpom 引入依賴:
com.rabbitmq
amqp-client
5.5.1
先整一個(gè) ConnectionFactory 單例,每臺(tái)機(jī)器都有自己的 ConnectionFactory,防止每次都初始化(在后面的迭代中,我會(huì)把這個(gè)去掉,整成連接池)。
/**
* @author Louzai
* @date 2023/5/10
*/
public class RabbitmqUtil {
/**
* 每個(gè)key都有自己的工廠
*/
private static Map executors = new ConcurrentHashMap<>();
/**
* 初始化一個(gè)工廠
*
* @param host
* @param port
* @param username
* @param passport
* @param virtualhost
* @return
*/
public static ConnectionFactory init(String host,
Integer port,
String username,
String passport,
String virtualhost) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(passport);
factory.setVirtualHost(virtualhost);
return factory;
}
/**
* 工廠單例,每個(gè)key都有屬于自己的工廠
*
* @param key
* @param host
* @param port
* @param username
* @param passport
* @param virtualhost
* @return
*/
public static ConnectionFactory getOrInitConnectionFactory(String key,
String host,
Integer port,
String username,
String passport,
String virtualhost) {
ConnectionFactory connectionFactory = executors.get(key);
if (null == connectionFactory) {
synchronized (RabbitmqUtil.class) {
connectionFactory = executors.get(key);
if (null == connectionFactory) {
connectionFactory = init(host, port, username, passport, virtualhost);
executors.put(key, connectionFactory);
}
}
}
return connectionFactory;
}
} 獲取 RabbitmqClient:
/**
* @author Louzai
* @date 2023/5/10
*/
@Component
public class RabbitmqClient {
@Autowired
private RabbitmqProperties rabbitmqProperties;
/**
* 創(chuàng)建一個(gè)工廠
* @param key
* @return
*/
public ConnectionFactory getConnectionFactory(String key) {
String host = rabbitmqProperties.getHost();
Integer port = rabbitmqProperties.getPort();
String userName = rabbitmqProperties.getUsername();
String password = rabbitmqProperties.getPassport();
String virtualhost = rabbitmqProperties.getVirtualhost();
return RabbitmqUtil.getOrInitConnectionFactory(key, host, port, userName,password, virtualhost);
}
}重點(diǎn)!敲黑板?。。∵@里就是 RabbmitMQ 的核心邏輯了。
我們使用的交換機(jī)類型是 Direct Exchange,此交換機(jī)需要綁定一個(gè)隊(duì)列,要求該消息與一個(gè)特定的路由鍵完全匹配,簡(jiǎn)單點(diǎn)說就是一對(duì)一的,點(diǎn)對(duì)點(diǎn)的發(fā)送。
至于為什么不用廣播和主題交換機(jī)模式,因?yàn)榧夹g(shù)派的使用場(chǎng)景就是發(fā)送單個(gè)消息,點(diǎn)到點(diǎn)發(fā)送和消費(fèi)的模式完全可以滿足我們的需求。
下面 3 個(gè)方法都很簡(jiǎn)單:
@Component
public class RabbitmqServiceImpl implements RabbitmqService {
@Autowired
private RabbitmqClient rabbitmqClient;
@Autowired
private NotifyService notifyService;
@Override
public void publishMsg(String exchange,
BuiltinExchangeType exchangeType,
String toutingKey,
String message) throws IOException, TimeoutException {
ConnectionFactory factory = rabbitmqClient.getConnectionFactory(toutingKey);
// TODO: 這種并發(fā)量起不來,需要改造成連接池
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息為可持久化,不自動(dòng)刪除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 發(fā)布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Publish msg:" + message);
channel.close();
connection.close();
}
@Override
public void consumerMsg(String exchange,
String queue,
String routingKey) throws IOException, TimeoutException {
ConnectionFactory factory = rabbitmqClient.getConnectionFactory(routingKey);
// TODO: 這種并發(fā)量起不來,需要改造成連接池
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息信道
final Channel channel = connection.createChannel();
//消息隊(duì)列
channel.queueDeclare(queue, true, false, false, null);
//綁定隊(duì)列到交換機(jī)
channel.queueBind(queue, exchange, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer msg:" + message);
// 獲取Rabbitmq消息,并保存到DB
// 說明:這里僅作為示例,如果有多種類型的消息,可以根據(jù)消息判定,簡(jiǎn)單的用 if...else 處理,復(fù)雜的用工廠 + 策略模式
notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 取消自動(dòng)ack
channel.basicConsume(queue, false, consumer);
}
@Override
public void processConsumerMsg() {
System.out.println("Begin to processConsumerMsg.");
Integer stepTotal = 1;
Integer step = 0;
// TODO: 這種方式非常 Low,后續(xù)會(huì)改造成阻塞 I/O 模式
while (true) {
step ++;
try {
System.out.println("processConsumerMsg cycle.");
consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE,
CommonConstants.QUERE_KEY_PRAISE);
if (step.equals(stepTotal)) {
Thread.sleep(10000);
step = 0;
}
} catch (Exception e) {
}
}
}
}這里只是給個(gè)示例,如果要真正用到生產(chǎn)環(huán)境,你覺得有哪些問題呢? 你自己先想想,文末再告訴你。
其實(shí)之前我們是通過 Java 的內(nèi)置異步調(diào)用方式,為了方便驗(yàn)證,我把文章點(diǎn)贊的功能遷移到 RabbitMQ 中,只要是點(diǎn)贊,就走 RabbitMQ 模式。
// 點(diǎn)贊消息走 RabbitMQ,其它走 Java 內(nèi)置消息機(jī)制
if (notifyType.equals(NotifyTypeEnum.PRAISE) && rabbitmqProperties.getSwitchFlag()) {
rabbitmqService.publishMsg(
CommonConstants.EXCHANGE_NAME_DIRECT,
BuiltinExchangeType.DIRECT,
CommonConstants.QUERE_KEY_PRAISE,
JsonUtil.toStr(foot));
} else {
Optional.ofNullable(notifyType).ifPresent(notify -> SpringUtil.publishEvent(new NotifyMsgEvent<>(this, notify, foot)));
}那消費(fèi)入口放哪里呢?其實(shí)是在程序啟動(dòng)的時(shí)候,我們就啟動(dòng) RabbitMQ 進(jìn)行消費(fèi),然后整個(gè)進(jìn)程一直在程序中跑。
@Override
public void run(ApplicationArguments args) {
// 設(shè)置類型轉(zhuǎn)換, 主要用于mybatis讀取varchar/json類型數(shù)據(jù)據(jù),并寫入到j(luò)son格式的實(shí)體Entity中
JacksonTypeHandler.setObjectMapper(new ObjectMapper());
// 應(yīng)用啟動(dòng)之后執(zhí)行
GlobalViewConfig config = SpringUtil.getBean(GlobalViewConfig.class);
if (webPort != null) {
config.setHost("http://127.0.0.1:" + webPort);
}
// 啟動(dòng) RabbitMQ 進(jìn)行消費(fèi)
if (rabbitmqProperties.getSwitchFlag()) {
taskExecutor.execute(() -> rabbitmqService.processConsumerMsg());
}
log.info("啟動(dòng)成功,點(diǎn)擊進(jìn)入首頁: {}", config.getHost());
}
我們多次點(diǎn)擊“點(diǎn)贊”按鈕,觸發(fā) RammitMQ 消息發(fā)送。
可以通過日志,也可以看到發(fā)送和消費(fèi)過的消息。
我靠!好多沒有關(guān)閉的鏈接。
還有一堆沒有關(guān)閉的 channel。
估計(jì)再多跑一會(huì),內(nèi)存全部吃光,機(jī)器就死機(jī)了,怎么破?答案是連接池!
為了方便大家學(xué)習(xí)功能演變的過程,每個(gè)模塊都會(huì)單獨(dú)開個(gè)分支,包括后面的升級(jí)版:
如果需要運(yùn)行 RabbitMQ,下面的配置需要改成 true,因?yàn)榇a默認(rèn)是 false。
這篇文章,讓大家知道 RabbitMQ 的基本原理,以及如何去集成 RabbitMQ,但是還不能用到實(shí)際生產(chǎn)環(huán)境,但是這個(gè)確實(shí)是我寫的第一個(gè)版本,存粹是搞著玩的,因?yàn)槔锩娲嬖诘膯栴}還非常多。
我簡(jiǎn)單列舉一下:
如果你對(duì)上面的問題也非常感興趣,可以直接基于分支 feature/add_rabbitmq_20230506,然后給我提 PR,技術(shù)嘛,我喜歡邊玩邊學(xué)。

我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流