0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

RocketMQ生產(chǎn)者為什么需要負(fù)載均衡?

馬哥Linux運維 ? 來源:稀土掘金 ? 2023-11-13 11:04 ? 次閱讀

RocketMQ生產(chǎn)者為什么需要負(fù)載均衡?

在RocketMQ中,隊列是消息發(fā)送的基本單位。每個Topic下可能存在多個隊列,因此一個生產(chǎn)者實例可以向不同的隊列發(fā)送消息。當(dāng)生產(chǎn)者發(fā)送消息時,如果不能均衡的將消息發(fā)送到不同的隊列,那么會導(dǎo)致隊列里的消息分布不均衡,這樣最終會導(dǎo)致消息性能下降,因此生產(chǎn)者負(fù)載均衡機(jī)制也是非常重要的。

RocketMQ生產(chǎn)者原理分析

既然生產(chǎn)者負(fù)載均衡如此重要,我們看下是如何實現(xiàn)的。

我們通常使用如下方法發(fā)送消息:

構(gòu)建消息
Message msg = new Message("TopicTest",
    "TagA",
    "OrderID188",
    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//發(fā)送消息    
SendResult sendResult = producer.send(msg);

RocketMQ發(fā)送消息的核心邏輯在DefaultMQProducerImpl類sendDefaultImpl。

9ad25470-81c0-11ee-939d-92fbcf53809c.jpg

在發(fā)送消息流程利里面有一行非常關(guān)鍵的邏輯,selectOneMessageQueue,看方法名稱就可以知道其含義,選擇一個消息隊列。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

里面是通過策略類來實現(xiàn)的。

9aee8ece-81c0-11ee-939d-92fbcf53809c.jpg

策略類最終通過org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 實現(xiàn)。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //生產(chǎn)者第一次發(fā)消息
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //非第一次,重試發(fā)消息的情況,
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //重試的情況,不取上一個broker的隊列
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
第一次發(fā)消息選擇隊列核心邏輯在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()


//線程安全的index
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();


public MessageQueue selectOneMessageQueue() {
        //獲取一個基礎(chǔ)索引,每次自增1 這個全局存在TopicPublishInfo 每一個topic
        int index = this.sendWhichQueue.getAndIncrement();
        // 基礎(chǔ)索引和 消息寫隊列大小 進(jìn)行取模 用來實現(xiàn)輪訓(xùn)的算法
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
            
        return this.messageQueueList.get(pos);
    }

哈哈,這里就是生產(chǎn)者負(fù)載均衡輪詢機(jī)制的核心邏輯了,使用到了ThreadLocal技術(shù),sendWhichQueue為每個生產(chǎn)者線程維護(hù)一個自己的下標(biāo)索引。

基礎(chǔ)索引計算器,使用ThreadLocal技術(shù)針對不同的生產(chǎn)者線程第一次隨機(jī),后面遞增,可以更加負(fù)載均衡。

public class ThreadLocalIndex {
    //關(guān)鍵技術(shù)
    private final ThreadLocal threadLocalIndex = new ThreadLocal();
    private final Random random = new Random();


    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            //第一次隨機(jī)
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }
        //第二次索引位置開始自增1
        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;


        this.threadLocalIndex.set(index);
        return index;
    }
}

哈哈,有沒有覺得這個實現(xiàn)非常巧妙了。不同的生產(chǎn)者線程都擁有自己的索引因子,分配隊列更加均衡。

總結(jié)

本文分析了RocketMQ生產(chǎn)者底層的實現(xiàn),設(shè)計地方有巧妙之處,值得我們學(xué)習(xí),上面是發(fā)送非順序消息的場景, 如果是順序消息,我們作為使用者可以指定負(fù)載均衡策略。

編輯:黃飛

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 負(fù)載均衡
    +關(guān)注

    關(guān)注

    0

    文章

    101

    瀏覽量

    12346
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    502

    瀏覽量

    19612
  • 消息隊列
    +關(guān)注

    關(guān)注

    0

    文章

    32

    瀏覽量

    2958

原文標(biāo)題:RocketMQ生產(chǎn)者負(fù)載均衡(輪詢機(jī)制)核心原理

文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    labviEW一個生產(chǎn)者,多個消費問題

    大家好,我的程序的出發(fā)點是希望實現(xiàn)一個生產(chǎn)者,十六個消費模塊的形式。即生產(chǎn)者循環(huán)中的事件結(jié)構(gòu)有十六個處理分支,對應(yīng)每一個分支,它產(chǎn)生一個“開始”元素入隊列,相應(yīng)的消費模塊中元素出隊
    發(fā)表于 04-05 16:42

    生產(chǎn)者與消費循環(huán)相關(guān)問題

    我是labview初學(xué)者,想請問一下各位大神,如果采集卡有緩存那還需要生產(chǎn)者與消費循環(huán)嗎?
    發(fā)表于 10-21 14:05

    生產(chǎn)者與消費注冊時間的應(yīng)用

    生產(chǎn)者與消費注冊時間的應(yīng)用
    發(fā)表于 03-29 15:02

    生產(chǎn)者消費模式(事件結(jié)構(gòu))

    現(xiàn)小弟學(xué)習(xí)生產(chǎn)者消費的事件結(jié)構(gòu)模式(用隊列傳遞消息),在生產(chǎn)者中用事件結(jié)構(gòu),但是當(dāng)我點擊其中一個按鈕響應(yīng)事件后就無法再點擊其它的按鈕了,這是怎么搞的,請大俠貼出圖片讓小弟看看是什么情況。
    發(fā)表于 12-23 14:14

    生產(chǎn)者與消費循環(huán)結(jié)構(gòu)當(dāng)生產(chǎn)者停止發(fā)送數(shù)據(jù)為什么消費還要循環(huán)兩次?

    各位大神: 今天用生產(chǎn)者與消費結(jié)構(gòu)做一個程序,需要消費循環(huán)每執(zhí)行一次計數(shù)+1。但是發(fā)現(xiàn)當(dāng)生產(chǎn)者停止發(fā)送數(shù)據(jù)后,消費
    發(fā)表于 09-17 23:08

    生產(chǎn)者是怎么把要發(fā)送的信息傳送到生產(chǎn)者模式里面的?

    誰有關(guān)于生產(chǎn)者與消費模式的講解,就是生產(chǎn)者是怎么把要發(fā)送的信息傳送到生產(chǎn)者模式里面的,就是誰可以講解下,或是哪里有歷程的視頻講解。先行謝過。
    發(fā)表于 10-28 20:57

    生產(chǎn)者消費的事件結(jié)構(gòu)模式(用隊列傳遞消息)

    現(xiàn)小弟學(xué)習(xí)生產(chǎn)者消費的事件結(jié)構(gòu)模式(用隊列傳遞消息),在生產(chǎn)者中用事件結(jié)構(gòu),但是當(dāng)我點擊其中一個按鈕響應(yīng)事件后,再點擊其它的按鈕了需要點兩次,這是怎么搞的,請大俠貼出圖片讓小弟看看
    發(fā)表于 01-17 14:53

    生產(chǎn)者消費循環(huán)

    有木有大神知道生產(chǎn)者消費循環(huán)中隊列的大小,默認(rèn)值一般為多少?此外這個大小能否改變?
    發(fā)表于 11-28 19:59

    生產(chǎn)者與消費循環(huán)程序

    生產(chǎn)者與消費循環(huán)程序
    發(fā)表于 12-02 19:57

    生產(chǎn)者與消費

    生產(chǎn)者與消費
    發(fā)表于 12-22 20:46

    labview的生產(chǎn)者/消費模式

    生產(chǎn)者/消費模式以前在沒有學(xué)習(xí)隊列這塊,看到生產(chǎn)者/消費模式的時候總認(rèn)為很困難。今天仔細(xì)學(xué)習(xí)了隊列后,回頭再看著塊時就不是多么難理解。這個編程模式使用到了隊列的函數(shù)。首先,字面理解
    發(fā)表于 05-05 09:36

    生產(chǎn)者消費循環(huán)的問題

    如果將生產(chǎn)者消費循環(huán)中的一個生產(chǎn)者同時對應(yīng)兩個消費的時候,會有一些問題。如圖所示,生產(chǎn)者循環(huán)將一個數(shù)據(jù)入列,然后下面是兩個消費
    發(fā)表于 03-25 10:02

    基于生產(chǎn)者消費完整測試程序

    [hide][url=]基于生產(chǎn)者消費完整測試 ...[/url] [/hide]
    發(fā)表于 11-01 17:13

    電池生產(chǎn)者與消費要知道的常識

    電池生產(chǎn)者與消費要知道的常識  一、常用電池型號、俗稱及日常適用范圍 國
    發(fā)表于 10-22 10:39 ?548次閱讀

    RocketMQ協(xié)議是什么?RocketMQ協(xié)議特點

    分布式消息系統(tǒng)中生產(chǎn)者和消費之間的高效可靠通信。它支持同步和異步消息傳遞模式,可以實現(xiàn)靈活和響應(yīng)迅速的通信方式。 RocketMQ協(xié)議基于發(fā)布-訂閱消息模式,生產(chǎn)者將消息發(fā)布到特定的
    的頭像 發(fā)表于 01-03 16:11 ?744次閱讀