?

RabbitMQ 小消息確認機制優化①

2018-04-21 01:38焦文彬
計算機系統應用 2018年3期
關鍵詞:序列號投遞生產者

徐 震, 焦文彬

1(中國科學院 計算機網絡信息中心,北京 100190)

2(中國科學院大學,北京 100049)

RabbitMQ是開源的基于Erlang的高效部署分布式消息隊列,實現了AMQP協議,具有良好的可靠性、穩定性,可運行在多種操作系統,便于集群運行[1-6].RabbitMQ支持多種編程語言的客戶端,可以通過安裝插件擴展功能. RabbitMQ可以解耦應用程序,將不同語言開發的程序粘合在一起,在完全不同的應用之間共享數據.

Erlang采用輕量級并發模型,用于高并發、分布式“軟實時系統”編程,支持運行系統中的軟件升級[7].Erlang程序簡短緊湊,采用函數式編程,自動存儲管理.Erlang進程不共享內存,進程間通信通過消息傳遞進行.

RabbitMQ投遞消息的速度受軟硬件配置影響. 硬件方面有: 處理器、內存、磁盤、網絡配置等; 軟件方面有: 消息持久化機制、消息確認機制、交換器類型等. 要以高速度向消費者投遞消息,應盡可能保持隊列為空.

對RabbitMQ進行優化有較多方法. 如對生產者確認機制的優化: 直接建立channel與消息存儲之間的聯系,減少插入、刪除、消息傳遞等操作,可以大幅降低處理時間[8]. 還可以優化topic路由匹配算法; 批量發送消息; 優化消息持久化機制; 開啟Erlang HiPE編譯選項; 使用位運算等.

生產者發送一條消息到broker,消息可能被n個消費者接收. 同時啟用生產者確認與消費者確認,生產者仍無法獲知n個消費者是否全部接收到消息. 本文對小消息情況下RabbitMQ的確認機制進行優化,在broker收到n個消費者的確認消息后,向生產者發送確認消息. 生產者收到確認消息則表明消費者已成功接收到消息. 若消息丟失,由生產者負責重發消息. 對不同生產者、消費者、隊列數量的情況進行測試,分析比較優化前與優化后的持久化小消息發送速率.

1 RabbitMQ架構與相關模塊簡介

1.1 RabbitMQ架構

如圖1所示,生產者發送消息到交換器,隊列通過路由鍵綁定到交換器,根據交換器類型與路由鍵將消息路由到隊列,消費者從隊列接收消息. 常用的交換器類型有direct、fanout和topic. 對于direct交換器,如果路由鍵匹配,消息就被投入相應的隊列; fanout交換器將收到的消息廣播到綁定的隊列; topic交換器對路由鍵進行模式匹配,消息被路由到匹配的隊列. 消費者通過basic.consume命令自動從隊列獲取下一條消息;通過basic.get命令獲取單條消息. 隊列具有多個消費者時,采用round-robin方式向消費者發送消息. broker是消息隊列服務器實體,一個broker中可以有多個虛擬主機.

1.2 相關模塊功能

1) channel接收reader解析的來自客戶端的協議幀; 使用writer向客戶端發送幀; 路由消息給隊列進程;處理AMQP方法; 發出AMQP命令. 一條TCP連接中可以有多個channel.

2) 支持隊列(backing queue,BQ),一般情況下默認為rabbit_variable_queue. 隊列進程使用BQ實現隊列功能. 隊列中消息具有4種狀態: alpha、beta、gamma、delta. 持久化消息只可能處于alpha、gamma、delta三種狀態之一. BQ具有5個內部隊列: q1、q2、q3、q4、delta. q1和q4中只有alpha狀態的消息; q2和q3包含beta和gamma狀態的消息; delta隊列不在內存中,只有delta狀態的消息.

3) 隊列索引(queue index)用于在磁盤上記錄隊列中消息的順序. 每個隊列有一個隊列索引. 消息依次被發布、投遞、確認. 發布記錄包括消息ID、消息在隊列中的序列號等內容. 發布記錄也可能包括完整的消息. 投遞和確認記錄只包括消息在隊列中的序列號. 隊列索引使用日志文件(journal)避免過多磁盤尋址. 日志文件具有固定的長度,默認為32 768,由queue_index_max_journal_entries參數配置.

4) 消息存儲(message store)用于將消息寫入磁盤或將消息從磁盤加載到內存. 存儲的消息是引用計數的,ID相同的消息多次寫入時只會存儲一次.

圖1 RabbitMQ架構圖

1.3 小消息嵌入隊列索引

RabbitMQ 3.5.0版本引入小消息嵌入隊列索引. 小于queue_index_embed_msgs_below參數值的消息屬于小消息,該參數默認值為4096 bytes. 小消息的持久化操作直接在隊列進程中進行,不使用消息存儲,只需要寫一次磁盤,可以減少I/O和內存消耗,提高10%左右的性能[9].

如果小消息被一個交換器路由到多個隊列,這條消息需要被寫入多個隊列索引; 若使用消息存儲,則只需要寫一次. 從磁盤讀取消息時,每個隊列索引需要在內存中保持至少1個段文件. 段文件包含16 384條消息記錄. 因此queue_index_embed_msgs_below參數的少量增加會導致大量的內存使用[10].

2 RabbitMQ消息確認過程分析

如圖2所示,生產者確認是異步的,生產者發送消息到broker,可以在等待確認的同時發送下一條. 為了在broker重啟或崩潰時不丟失消息,消息投遞給消費者前需要進行持久化,消息寫入磁盤后向生產者發送確認消息. 消費者收到消息后必須進行確認,可以發送

basic.ack命令進行顯示確認,也可以使用自動確認. 若使用自動確認,消費者接收到消息,即視其確認了消息.broker收到消費者發送的確認消息,將確認記錄追加到隊列索引的日志文件.

圖2 消息確認過程與消息持久化關系圖

開啟生產者確認與消費者確認,持久化小消息在生產者、消費者、RabbitMQ相關模塊間的傳遞過程如圖 3所示. 1-6: 生產者發送消息到消費者; 7-10: 消費者確認相關過程; 11-14: 生產者確認相關過程.

圖3 小消息確認過程圖

2.1 生產者確認過程分析

生產者確認過程會依次在channel、隊列進程、隊列索引、BQ處記錄生產者確認相關信息. 消息寫入磁盤后,已確認的消息ID從隊列索引依次傳遞給BQ、隊列進程、channel,各處均會將已確認記錄刪除.

channel收到生產者發送的消息,為消息分配一個唯一的序列號,組裝#delivery,獲取需要投遞的隊列記錄列表Qs,將#delivery投遞到Qs中的隊列. channel使用dtree記錄消息被投遞到哪些隊列,格式為: {消息在channel中的序列號,隊列進程pid列表,交換器名稱}.若channel將消息投遞到m個隊列,channel收到相應的m個隊列發送的確認消息才會向生產者發送確認消息.

隊列進程收到消息,判斷隊列的消費者是否滿足消息投遞條件. 若有消費者滿足投遞條件且消息隊列為空,則消息不會進入隊列,而是直接投遞給消費者端channel. 需要組裝消息狀態(message status). 將包含小消息的發布記錄與只包括消息在隊列中序列號的投遞記錄追加到隊列索引的日志文件.

若沒有消費者滿足投遞條件或消息隊列非空,則將消息進隊. 需要組裝消息狀態. 將發布記錄追加到隊列索引的日志文件. 將消息狀態加入消息隊列. 從消息隊列中取消息時,若消息隊列非空且有消費者滿足消息投遞條件,則將消息從消息隊列中移除. 將投遞記錄追加到隊列索引的日志文件. 將取出的消息投遞給消費者端channel.

隊列進程收到生產者端channel投遞的消息,使用gb_trees記錄未確認的消息ID、發送消息的channel和該消息在channel中的序列號,格式為: {消息ID,{channel pid,消息在channel中的序列號}}.

在發布記錄寫入日志文件前,隊列索引使用gb_sets記錄未確認的消息ID. 隊列索引的日志文件可能在兩種情況下寫入磁盤.

1) 隊列進程設置同步定時器,每200毫秒向自身發送sync_timeout消息. 隊列進程收到消息后,對隊列索引的日志文件執行sync操作.

2) 當日志文件中記錄數目達到一定數量時,將內存中預分割的日志文件寫入段文件.

消息持久化操作完成后,BQ使用gb_sets記錄未確認的消息ID.

2.2 消費者確認過程分析

消費者確認過程會依次在BQ、隊列進程、channel處記錄消費者確認相關信息. 收到消費者發送的確認消息后,會按照相反的順序從未確認記錄中刪除已確認記錄,最終將包括消息在隊列中序列號的確認記錄追加到隊列索引的日志文件.

消息到達隊列進程直接投遞給消費者端channel時或從消息隊列中取消息時,會在BQ相應的gb_trees中添加未確認記錄,格式為: {消息在隊列中的序列號,消息狀態}.

隊列進程將消息投遞給channel,在Erlang queue中添加未確認記錄,格式為: {消息在隊列中的序列號,消費者標簽}.

channel使用writer將消息投遞給消費者,在Erlang queue中添加未確認記錄,格式為: {投遞標簽,消費者標簽,{隊列進程pid,消息在隊列中的序列號}}. 收到消費者發送的確認消息后,channel根據確認消息中的投遞標簽與multiple字段從未確認記錄中獲取已確認記錄,將已確認的消息序列號發送給相應的隊列進程.

2.3 對性能的影響

生產者確認與消費者確認過程涉及較多dtree、gb_trees、gb_sets和Erlang queue操作,包括插入、刪除、查找、集合運算等. 隊列索引的日志文件會定時地或在記錄達到一定數量時寫入磁盤,對性能影響較大.

使用RabbitMQ 2.8.1進行簡單測試,CPU為雙Xeon E5530,RAM為40GB,Erlang R15B,開啟HiPE,1個生產者,1個消費者[11]. 不使用生產者確認與消費者確認,不進行消息持久化,消息發送速率為: 44824 msg/s;開啟消費者確認后: 32005 msg/s; 接著開啟生產者確認:26103 msg/s; 在此基礎上對消息進行持久化: 4725 msg/s[12].可見消息確認機制對消息發送速率有一定影響,消息持久化機制對消息發送速率有較大影響.

3 優化方法

3.1 小消息確認機制的優化

如圖4所示,優化后持久化與非持久化小消息的確認過程是相同的. 需要將生產者確認過程與消費者確認過程銜接起來. 生產者發送消息到broker,消息投遞給消費者前,不進行消息持久化操作. 不會向日志文件追加記錄,不寫段文件; 不會設置同步定時器,不執行代價較大的sync操作. 隊列索引與BQ不記錄未確認的消息ID. 消費者收到消息后,向broker發送確認消息. broker收到消費者確認消息,向生產者發送確認消息. 若消費者沒有收到消息,生產者不會收到確認消息,此時由生產者重發該消息. 該方法保證了在生產者收到確認消息時消費者已成功接收到消息.

圖4 優化后小消息確認過程圖

1) 小消息到達隊列進程,隊列進程需要記錄生產者確認相關信息. 需要修改隊列進程模塊的send_or_record_confirms/2函數. SenderPid是發送消息給隊列進程的channel pid,MsgSeqNo是消息在channel中的序列號,MTC用于記錄未確認消息ID對應的{SenderPid,MsgSeqNo}. 添加未確認記錄后,更新隊列進程狀態.

2) 隊列進程在消費者確認過程結束后,向生產者端channel發送確認消息. 需要修改隊列進程模塊的ack/3函數. MsgIds是已確認的消息ID列表. 需要獲取MsgIds對應的channel pid和消息在channel中的序列號,將包含消息在channel中序列號的確認消息發送給相應的channel. 更新隊列進程狀態.

3.2 繼續優化消費者確認過程

可以在上述優化的基礎上減少消費者確認過程中的插入、刪除等操作,提高性能,減少內存使用. BQ和隊列進程不記錄消費者確認相關消息,消費者端channel記錄: {消息投遞標簽,消費者標簽,{隊列進程pid,消息ID}}.

1) 隊列進程向channel投遞消息,格式為:{deliver,ConsumerTag,AckRequired,Msg}. ConsumerTag是消費者標簽,AckRequired取值為true或false. Msg類型為 rabbit_amqqueue:qmsg(),格式為: {隊列名稱,隊列進程pid,消息在隊列中的序列號,Redelivered,Message}.Redelivered取值為true或false,Message類型為#basic_message. 使用模式匹配從Message中提取消息ID. 需要修改channel模塊的record_sent/4函數.

2) 消費者端channel向隊列進程發送的消息中包括已確認的消息ID列表MsgIds. 隊列進程收到確認消息,向生產者端channel發送相應的確認消息. 更新隊列進程狀態. 需要修改隊列進程模塊的handle_cast/2函數.

4 性能測試

4.1 測試環境與方法

生產者、消費者、RabbitMQ在同一臺機器上. 開啟生產者確認與消費者確認. 持久化小消息,消息大小為1500 bytes. 性能測試工具為PerfTest. 測試環境配置如表1所示.

表1 性能測試環境配置表

在1個虛擬主機中啟動不同數量的持久化隊列,每個隊列有2個生產者、3個消費者,每個生產者連接中有2個channel,每個消費者連接中有3個channel.隊列數量小于等于15時,綁定到同一個持久化direct交換器; 隊列數量大于15時,綁定到兩個持久化direct交換器. 開啟management插件與top插件. 分別記錄優化前與優化后的消息發送速率,每種情況測試10分鐘,測試3次取平均值.

消息發送速率提高百分比的計算方法為: (優化后消息發送速率-優化前消息發送速率)/優化前消息發送速率*100%.

消息發送速率平均提高百分比為: 不同隊列數量時,消息發送速率提高百分比的算術平均值.

4.2 測試結果與分析

如圖5所示,在1個虛擬主機中,隨著隊列數量增加,消息發送速率先增加然后緩慢下降. 優化后消息發送速率提高的比例是逐漸下降的. 1個隊列時,優化后的消息發送速率是優化前的3.08倍; 2個隊列時,優化后的消息發送速率是優化前的2.01倍; 15個隊列時,消息發送速率提高40.9%; 30個隊列時,消息發送速率提高40.3%. 隊列數量大于3時,消息發送速率平均提高42.9%.如圖6所示,對消費者確認過程進一步優化后,在同樣的測試環境中,隨著隊列數量增加,優化后的消息發送速率逐漸下降. 1個隊列時,優化后的消息發送速率取得最大值,是優化前的3.48倍; 2個隊列時,優化后的消息發送速率是優化前的2.16倍; 15個隊列時,

圖5 優化后消息發送速率對比圖

消息發送速率提高52.6%; 30個隊列時,消息發送速率提高53.4%. 隊列數量大于3時,消息發送速率平均提高56.5%.

圖6 繼續優化后消息發送速率對比圖

優化后不執行消息持久化操作,減少部分內存操作,使消息發送速率得到提高. 繼續優化后,不會在BQ和隊列進程處記錄消費者確認相關信息,減少了插入、查找、刪除等操作,進一步提高了消息發送速率,但是在隊列數量較多時可靠性略有下降. 上述兩種優化方法需要確保每個隊列至少有一個消費者,適用于消費速度很快的情形. 在生產者、消費者、隊列數量較少時可以獲得更大的性能提升. 生產者重發消息的策略,可以根據實際應用場景確定. 與改進前類似,在異常情況下,消費者可能收到重復的消息. 第一種優化方法保留了完整的消費者確認過程,能夠較好地處理basic.reject與basic.nack等命令,消費者可以拒絕接收某些消息. 第二種優化方法簡化了消費者確認過程,無法處理消費者拒絕消息的情況,適用于消費者只對消息進行確認的情況. 可以根據應用程序對性能、可靠性的不同需求使用相應的優化方法.

5 結語

本文詳細分析了RabbitMQ中持久化小消息的確認過程,將生產者確認過程與消費者確認過程結合起來進行優化,使生產者可以獲知消費者成功接收到消息,提高了持久化小消息的發送速率. 要使消息發送速率得到根本提高,可以重新設計RabbitMQ的架構: 使用多個輕量級進程實現邏輯隊列與邏輯channel,或集群部署使用.

1Rostanski M,Grochla K,Seman A. Evaluation of highly available and fault-tolerant middleware clustered architectures using RabbitMQ. Federated Conference on Computer Science and Information Systems. Warsaw,Poland. 2014. 879-884.

2Videla A,Williams JJW. RabbitMQ實戰: 高效部署分布式消息隊列. 汪佳南,譯. 北京: 電子工業出版社,2015.

3Ionescu VM. The analysis of the performance of RabbitMQ and ActiveMQ. 14th RoEduNet International Conference - Networking in Education and Research (RoEduNet NER).Craiova,Romania. 2015. 132-137.

4Vandikas K,Tsiatsis V. Performance evaluation of an IoT platform. 8th International Conference on Next Generation Mobile Apps,Services and Technologies. Oxford,UK. 2014.141-146.

5Dawar S,Fallon E,Bennet T,et al. An extensible architecture for mobile network management event distribution and rule processing - a performance evaluation.1st International Conference on Artificial Intelligence,Modelling and Simulation. Kota Kinabalu,Malaysia. 2013.451-456.

6Yang WJ,Liu XG,Zhang L,et al. Big data real-time processing based on storm. 12th IEEE International Conference on Trust,Security and Privacy in Computing and Communications. Melbourne,VIC,Australia. 2013.1784-1787.

7Cesarini F,Thompson S. Erlang編程指南. 慕尼黑Isar工作組,楊劍,譯. 北京: 機械工業出版社,2011.

8袁佳. 基于主機日志的入侵檢測系統的設計與實現[碩士學位論文]. 北京: 北京郵電大學,2014.

9李帥. RabbitMQ進程結構分析與性能調優. https://www.qcloud.com/community/article/164816001481011847.[2016-10-10].

10Persistence Configuration. http://www.rabbitmq.com/persistence-conf.html. [2017-06-01]

11RabbitMQ performance measurements,part 1. http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performancemeasurements-part-1/. [2012-04-17]

12RabbitMQ performance measurements,part 2. http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performancemeasurements-part-2/. [2012-04-25]

猜你喜歡
序列號投遞生產者
傳統與文化的“投遞”
1月巴西生產者價格指數上漲3.92%
一種離線電子錢包交易的雙向容錯控制方法
2020德國iF設計獎
一種控制器硬件序列號的更新方法
關于《國家稅務總局 工業和信息化部關于加強車輛配置序列號管理有關事項的公告》的解讀
大迷宮
手機使用中的“秘訣”
派發廣告分工做得好 人人努力效率高
會安慰自己的人
91香蕉高清国产线观看免费-97夜夜澡人人爽人人喊a-99久久久无码国产精品9-国产亚洲日韩欧美综合