RocketMQ生產(chǎn)者為什么需要負(fù)載均衡?
在RocketMQ中,隊列是消息發(fā)送的基本單位。每個Topic下可能存在多個隊列,因此一個生產(chǎn)者實例可以向不同的隊列發(fā)送消息。當(dāng)生產(chǎn)者發(fā)送消息時,如果不能均衡的將消息發(fā)送到不同的隊列,那么會導(dǎo)致隊列里的消息分布不均衡,這樣最終會導(dǎo)致消息性能下降,因此生產(chǎn)者負(fù)載均衡機(jī)制也是非常重要的。
RocketMQ生產(chǎn)者原理分析
既然生產(chǎn)者負(fù)載均衡如此重要,我們看下是如何實現(xiàn)的。
我們通常使用如下方法發(fā)送消息:
構(gòu)建消息 Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //發(fā)送消息 SendResult sendResult = producer.send(msg);
RocketMQ發(fā)送消息的核心邏輯在DefaultMQProducerImpl類sendDefaultImpl。
在發(fā)送消息流程利里面有一行非常關(guān)鍵的邏輯,selectOneMessageQueue,看方法名稱就可以知道其含義,選擇一個消息隊列。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
里面是通過策略類來實現(xiàn)的。
策略類最終通過org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 實現(xiàn)。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //生產(chǎn)者第一次發(fā)消息 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //非第一次,重試發(fā)消息的情況, for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //重試的情況,不取上一個broker的隊列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } 第一次發(fā)消息選擇隊列核心邏輯在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue() //線程安全的index private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); public MessageQueue selectOneMessageQueue() { //獲取一個基礎(chǔ)索引,每次自增1 這個全局存在TopicPublishInfo 每一個topic int index = this.sendWhichQueue.getAndIncrement(); // 基礎(chǔ)索引和 消息寫隊列大小 進(jìn)行取模 用來實現(xiàn)輪訓(xùn)的算法 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
哈哈,這里就是生產(chǎn)者負(fù)載均衡輪詢機(jī)制的核心邏輯了,使用到了ThreadLocal技術(shù),sendWhichQueue為每個生產(chǎn)者線程維護(hù)一個自己的下標(biāo)索引。
基礎(chǔ)索引計算器,使用ThreadLocal技術(shù)針對不同的生產(chǎn)者線程第一次隨機(jī),后面遞增,可以更加負(fù)載均衡。
public class ThreadLocalIndex { //關(guān)鍵技術(shù) private final ThreadLocalthreadLocalIndex = new ThreadLocal (); private final Random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); if (null == index) { //第一次隨機(jī) index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } //第二次索引位置開始自增1 index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } }
哈哈,有沒有覺得這個實現(xiàn)非常巧妙了。不同的生產(chǎn)者線程都擁有自己的索引因子,分配隊列更加均衡。
總結(jié)
本文分析了RocketMQ生產(chǎn)者底層的實現(xiàn),設(shè)計地方有巧妙之處,值得我們學(xué)習(xí),上面是發(fā)送非順序消息的場景, 如果是順序消息,我們作為使用者可以指定負(fù)載均衡策略。
編輯:黃飛
-
負(fù)載均衡
+關(guān)注
關(guān)注
0文章
101瀏覽量
12346 -
線程
+關(guān)注
關(guān)注
0文章
502瀏覽量
19612 -
消息隊列
+關(guān)注
關(guān)注
0文章
32瀏覽量
2958
原文標(biāo)題:RocketMQ生產(chǎn)者負(fù)載均衡(輪詢機(jī)制)核心原理
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論