新聞中心
這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
RocketMQ5.x的生產(chǎn)者,應(yīng)該怎么配?
在RocketMQ 5.x中,生產(chǎn)者的配置主要包括以下幾個方面:

1、創(chuàng)建生產(chǎn)者實例
2、設(shè)置生產(chǎn)者屬性
3、發(fā)送消息
4、關(guān)閉生產(chǎn)者
下面詳細(xì)介紹每個方面的配置方法。
創(chuàng)建生產(chǎn)者實例
在RocketMQ中,生產(chǎn)者實例是通過DefaultMQProducer類創(chuàng)建的,首先需要引入RocketMQ的依賴,然后創(chuàng)建一個DefaultMQProducer實例。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class RocketMQProducer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建一個名為"producerGroup"的生產(chǎn)者組
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
// 設(shè)置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 啟動生產(chǎn)者實例
producer.start();
}
}
設(shè)置生產(chǎn)者屬性
在創(chuàng)建完生產(chǎn)者實例后,可以通過setProperty方法設(shè)置生產(chǎn)者的屬性,以下是一些常用的屬性設(shè)置:
| 屬性名 | 默認(rèn)值 | 描述 |
sendMessageTimeout | 3000 | 發(fā)送消息的超時時間,單位為毫秒 |
retryTimesWhenSendFailed | 2 | 發(fā)送失敗時的重試次數(shù) |
maxReconsumeTimes | 18 | 消息最大重試次數(shù) |
topicPublishInfoBatchMaxSize | 1000 | 批量發(fā)送主題消息的最大數(shù)量 |
compressMsgBodyOverHowmuch | 1024 * 4 | 壓縮消息體的大小閾值,超過該值則進(jìn)行壓縮 |
batchSize | 16 | 批量發(fā)送消息的大小閾值,超過該值則進(jìn)行批量發(fā)送 |
sendMessageThreadPoolNums | 128 | 發(fā)送消息的線程池數(shù)量 |
pullMessageThreadPoolNums | 128 | 拉取消息的線程池數(shù)量 |
checkFrequency | 60000 | 檢查Broker是否可用的頻率,單位為毫秒 |
storePathRootDir | null | 消息存儲路徑的根目錄 |
storePathCommitLog | null | commitLog存儲路徑 |
mappedFileSizeCommitLog | 1GB | commitLog文件的大小閾值,超過該值則新建一個文件 |
flushIntervalCommitLog | 500ms | commitLog刷新到磁盤的時間間隔,單位為毫秒 |
cleanResourceInterval | 1000 * 60 * 60 | 清理資源的時間間隔,單位為毫秒 |
deleteWhen | 04 | commitLog文件刪除的時間點,取值為小時(024)和分鐘(059)的組合,例如04表示凌晨4點刪除文件 |
fileReservedTime | 72 | commitLog文件保留的時間,單位為小時,超過該時間則刪除文件 |
maxTransferBytesOnMessageInMemory | 1 | 消息內(nèi)存中傳輸?shù)淖畲笞止?jié)數(shù),超過該值則將消息寫入磁盤隊列中等待傳輸 |
maxTransferCountOnMessageInMemory | 1 | 消息內(nèi)存中傳輸?shù)淖畲蟠螖?shù),超過該值則將消息寫入磁盤隊列中等待傳輸 |
enableMsgTrace | false | 是否開啟消息軌跡追蹤功能,開啟后會記錄消息的發(fā)送和接收情況,用于排查問題 |
msgTraceTopic | null | 消息軌跡追蹤的主題名稱,如果為null則使用生產(chǎn)者組名作為主題名稱 |
msgTraceConsumerGroup | null | 消息軌跡追蹤的消費者組名稱,如果為null則使用生產(chǎn)者組名作為消費者組名稱 |
flushConsumerQueueLaterally | false | 是否啟用側(cè)邊隊列刷盤機制,當(dāng)消費者消費速度過快時,可以將部分消息暫時存儲在側(cè)邊隊列中,待消費者慢下來后再進(jìn)行消費,提高系統(tǒng)吞吐量 |
maxReconsumeTimeDifference | 1800000000L | 兩次重試之間的最大時間差,超過該時間差則重新投遞未被消費的消息,單位為毫秒,默認(rèn)值為1表示禁用該功能 |
maxDelayTime | 1L | 如果設(shè)置了延遲消息,則該參數(shù)表示延遲的最大時間,單位為毫秒,默認(rèn)值為1表示禁用該功能 |
maxOffsetDelta | 1L | 如果設(shè)置了延遲消息或者定時消息,則該參數(shù)表示允許的最大offset變化量,單位為毫秒,默認(rèn)值為1表示禁用該功能 |
traceDispatcherListenerStackSize | 1L | traceDispatcherListener的堆棧大小,默認(rèn)值為1表示使用JVM默認(rèn)值,建議設(shè)置為32K或更大以減少OOM異常的發(fā)生概率,注意:此參數(shù)僅在開啟traceDispatcherEnable時生效。 |
| traceDispatcherEnable
分享名稱:RocketMQ5.x的生產(chǎn)者,應(yīng)該怎么配?
路徑分享:http://www.5511xx.com/article/dhsogic.html


咨詢
建站咨詢
