?

BC—BSP:一個基于BSP的高可擴展并行迭代圖處理系統

2016-03-24 00:15劉恩孚冷芳玲鮑玉斌
中興通訊技術 2016年2期

劉恩孚 冷芳玲 鮑玉斌

摘要:提出了一個基于整體同步并行計算(BSP)模型的、具有磁盤暫存功能的大規模圖處理系統——BC-BSP。該系統通過提供應用程序接口(API)實現系統配置和有關策略的可擴展性,通過優化的圖數據磁盤存儲實現了數據處理規模的高可擴展性以及高性能的容錯方案,并且可以處理普通數據集的聚類和分類等需要迭代計算的數據挖掘算法。通過實驗驗證了該系統的可擴展性,其在真實數據集上性能優于Giraph1.0.0,在模擬數據集上稍遜于Giraph的內存版。

關鍵詞:BSP;大規模圖處理;迭代計算;磁盤緩存

Abstract:We describe a bulk synchronous parallel (BSP)-based parallel iterative processing system for graph data with disk caching assist. This system is called BC-BSP. The system can achieve the scalability of system configuration and policy by providing APIs, high scalability of the data scale processed, and high performance of fault-tolerant scheme by disk storage optimization to graph data. It can also execute some data mining algorithms with iterative processing, such as clustering and classification on non-graph data sets. The experimental results show that the scalability and performance of the proposed system are better than that of Giraph1.0.0 on the real data set,but it is lightly poorer than the memory version of Giraph.

Key words:BSP; large-scale graph processing; iterative computing; disk cache

圖是計算機科學中最常用的一類抽象數據結構,更具有一般性的表示能力?,F實世界中的許多應用場景都可以很自然地使用圖結構表示。例如,交通運輸網絡、社交網絡中的資源對象之間的關系以及生物信息網絡等。在大數據時代,需要分析的圖規模越來越大。以互聯網和社交網絡為例,隨著互聯網的深入使用和Web 2.0技術的推動,網頁數量增長迅猛,據中國互聯網絡信息中心(CNNIC)統計:截止2014年12月中國網頁規模達到1 899億個,年增長率26.6%;而基于互聯網的社交網絡更是如此,如全球最大的社交網絡Facebook,2014年7月已有約22億用戶,其中月活躍用戶數13億人。在中國,如QQ空間、微博、開心網等,發展也異常迅猛。因此,實際應用中圖的頂點可達10億,而邊就會更多,對應的數據文件會更大。對如此大規模圖數據的存儲和分析處理的時間和空間開銷遠遠超出了傳統集中式圖數據處理的承受能力。因此,對大規模圖的有效處理成為了一個新的挑戰。

MapReduce計算模型可以實現對大規模(圖)數據的處理,并且具有很好的容錯性和可擴展性。但是由于圖數據分析(如網頁的PageRank[1]計算、最短路徑計算、聚類分析)都需要多次迭代才能完成。每次迭代需要一個或多個開銷較大的MapReduce作業完成。為解決迭代計算的時間性能問題,谷歌公司開發了基于整體同步并行計算(BSP)模型的Pregel[2]系統,之后Apache的兩個開源項目Hama和Giraph也開展了基于BSP的迭代計算系統的開發。它們都是在內存中做數據處理,因此能夠處理的圖的規模有限。文中,我們設計開發了基于BSP模型的、能夠處理大規模(圖)數據的并行迭代計算系統——BC-BSP。該系統主要特色在于:(1)實現了具有磁盤輔助的基于BSP的大規模圖數據并行迭代處理系統,該系統在內存受限的情況下具有很好的數據處理能力,即在可用的節點規模和內存配置的情況下,可以處理的數據規模較大;(2)系統多方面考慮負載均衡,在充分考慮數據本地化的前提下考慮了各個節點的負載均衡問題,并且結點的負載均衡優先于數據本地化。我們做了大量的實驗,比較了基于BSP的大規模圖處理系統的性能和擴展性。

1 BSP模型和相關工作

BSP是一種“塊”同步模型[3],即通過消息傳遞機制,實現塊內異步并行,塊間顯式同步。一個基于BSP的計算系統是由具有處理機和存儲器的多個自治的計算服務器組成的集群,并且這個集群采用主/從結構。主節點用于協調整個集群,包括接收用戶的作業提交、作業調度、故障監控等功能,從節點(也稱為工作節點)用于存儲和處理數據。

谷歌公司開發的基于BSP模型的分布式圖計算框架Pregel主要是為了處理大規模圖數據,如網頁的PageRank計算、最短路徑等。Pregel假設處理的數據都在內存中,因此在一定的節點規模下,它能夠處理的數據規模是有限制的?;赑regel的思想,許多基于BSP的大規模圖處理系統被開發出來。例如,Apache推出了基于Java的開源項目Hama[4],它是一個純粹的基于BSP的用于大規??茖W計算(如矩陣計算、圖和網絡算法)的計算框架,同樣它的早期版本沒有考慮磁盤輔助的問題,而是假設所有數據全部位于內存中,最新的版本也在添加磁盤輔助功能,但是很不完善;而Apache的另一個開源項目Giraph,是建立在Hadoop基礎之上的Pregel的開源實現[5],可以認為它是MapReduce模型和BSP模型的結合體,即它利用MapReduce作業的Map任務實現了基于BSP模型的迭代計算,而不需要Reduce任務,整個圖處理過程只需要啟動一次MapRedcue作業,但是一旦出現故障,整個作業需要重新啟動;GraphLab是卡內基梅隆大學提出的面向大規模數據挖掘和圖計算的分布式內存計算框架[6]。更多的基于BSP模型的類Pregel的大規模數據分布式并行處理系統和框架請見文獻[7]。

2 BC-BSP概述

圖1給出了BC-BSP系統的整體結構,主要包括BSP核心層、管理接口層和接口層。BC-BSP實現了對Hadoop分布式文件系統(HDFS)、HBase、MySQL等底層存儲系統的支持,包括數據的輸入和輸出。BC-BSP系統內部核心層主要包括客戶端作業提交和數據劃分,主節點端的作業調度和集群監控,從節點端的本地計算處理、全局同步、消息通信和容錯控制;接口層主要包括應用編程接口(API)和命令行接口(CLI);管理接口層主要包括集群管理、系統自動化安裝部署、日志管理、性能管理和故障管理等工具。

從系統實現的角度,BC-BSP系統是一個主從式結構,主要分為客戶端、主控節點、工作節點、任務模塊、全局同步模塊。圖2給出了BC-BSP的運行控制機制以及系統中客戶端、主控節點、工作節點、任務模塊、全局同步模塊之間的協作關系。

在BC-BSP系統中,客戶端主要根據用戶指定的輸入路徑進行數據分片,調整分區數目,檢查作業運行的可行性,向主控節點申請作業并將作業打包提交給BSP主控節點,當作業開始運行后,負責及時反饋作業運行狀態;主控節點端管理集群工作節點的注冊、心跳信息和狀態信息收集等,并作為容錯控制的控制中心,提供各種狀態查詢接口,并以作業為單位,負責作業的初始化、調度和同步控制等;工作節點端主要負責工作節點本地的任務管理和局部同步控制以及局部聚集計算等;任務模塊端是任務運行的實體,主要負責執行用戶的業務處理邏輯和數據輸入輸出處理等;全局同步負責同一作業的所有任務在各個超步之間的全局同步工作,超步路障同步由主節點端、工作節點端及任務模塊端共同完成,在同步過程中,可以完成聚集計算,系統中的同步主要通過第三方組件Zookeeper實現;消息通信主要在每一個超步的本地計算執行過程中,負責異步地發送和接收消息,并將接收的消息暫存到本地的接收消息隊列中,當內存空間不足時,支持磁盤輔助存儲,這里主要是通過遠程過程調用協議(RPC)機制實現消息傳遞;容錯控制模塊負責容錯備份、故障檢測和故障恢復等功能,以寫檢查點機制作為主要的容錯方案,支持手動備份和自動周期備份功能;管理工具主要通過Web界面或命令行的方式為用戶提供可視化的系統管理和監控功能;接口模塊主要為用戶提供本地計算、消息發送/接收等的應用編程接口,以及為用戶提供啟動和關閉系統服務、作業提交等命令行接口。

3 BC-BSP提供的API

系統給用戶提供了與作業建立相關的API,用于編寫針對圖處理或科學計算的處理程序。另外,系統還提供了用于系統功能擴展的接口。下面我們簡單介紹這些接口。

(1)消息管理接口負責消息的發送/接收功能,在每一個超步的本地計算執行過程中,并行地發送和接收消息,并將接收的消息緩存到本地的接收消息隊列中,在發送消息隊列達到一定規模的時候,執行Combine操作,然后再將消息發送給目的節點。

(2)分區數據管理接口負責在進行圖數據處理之前將待處理的圖數據按照一定的原則劃分給各個任務。本系統實現了基于Hash的劃分方法和基于Hash的均衡劃分方法。

(3)圖頂點上下文接口負責在任務處理的一個超步中,處理每個圖頂點時獲取正在處理的圖頂點的相關屬性信息和方法。

(4)消息合并接口在圖處理過程中,通常以頂點為中心進行處理,該接口為了減少在網絡上傳送的消息數量,在發送端對發給同一個頂點的消息進行合并。

(5)聚集計算接口許多圖處理/機器學習算法中需要聚集計算,實現該接口可進行超步間的聚集值計算。

(6)數據輸入輸出接口包括輸入接口和輸出接口,用于實現將數據從指定數據存儲系統中讀入和寫出。

4 BC-BSP系統的實現

本節介紹BC-BSP系統在實現上的一些主要策略和細節,主要包括圖數據的表示、主節點控制器、從節點管理器、本地計算與消息通信、圖數據劃分以及故障恢復等的實現。

4.1 主節點控制器

主控節點是整個BC-BSP集群的控制中心,負責管理所有的工作節點,監控整個集群的工作狀態,接收各工作節點的心跳信息并加以處理,完成整個作業的全局同步控制,并提供統一的信息查詢接口和作業提交接口。當集群啟動后,主控節點接收各工作節點的注冊信息,形成統一的集群資源信息,在運行過程中通過心跳信息不斷更新集群資源信息,例如,可用任務槽數量。當客戶端請求提交作業時,將其放入作業等待隊列,作業調度器按照優先級加先入先出隊列(FIFO)的策略調度作業;而完成一個作業的具體任務的調度則是按照負載均衡和數據本地化的原則。因為本系統中一個作業的所有任務需要同時運行,所以系統中的任務調度是采用由BSP主節點控制器根據上述原則將任務依次不斷下推給各個節點。

4.2 從節點管理器

工作節點是硬件上的計算單元,系統啟動后,BC-BSP集群的各個節點上啟動一個從節點管理器(WM)進程,負責完成具體的任務啟動和消息通信。每個工作節點啟動后,都首先向主控節點注冊,使自己成為BC-BSP集群中的一員;之后,工作節點定期向主控節點發送心跳信息,匯報自己的狀態;當有新任務下達時,工作節點根據新任務的指令,到HDFS上讀取作業信息并下載到本地文件系統;然后創建任務控制對象和對應的執行進程,接著運行任務。WM為在本節點上運行的每個作業建立一個WorkerAgent對象,用于收集該作業在本節點上的各個任務的心跳信息、工作狀態信息等。這樣全局同步采用兩級同步方式,即一個工作節點上的屬于同一個作業的各個任務在本節點上實現局部同步,然后再以節點為單位向Zookeeper注冊實現全局同步。工作節點以作業為單位維護在本節點上運行的隸屬于同一個作業的所有任務,進行統一管理,完成各種局部操作,例如本地聚集計算。

4.3 磁盤輔助的本地計算和消息通信

任務模塊是邏輯上的計算處理單元,稱為一個任務。BSP主節點控制器中的任務調度器根據負載均衡和數據本地化原則將任務分配到具體的工作節點上,由WM創建該任務模塊進程。任務模塊啟動后,首先完成數據加載,將需要處理的數據分片從存儲介質上按照指定的輸入格式讀入本地,并進行數據劃分。計算過程中會定期地向WM的WorkerAgent對象發送心跳信息,報告任務的狀態等信息。

在Pregel系統以及基于它思想的各種實現中,都假設集群的處理節點足夠,使得待處理的數據等夠完全存放在內存中。但是實際情況卻不是這樣的:一方面對于一個給定的待處理數據集,用戶很難確定需要幾個工作節點才能使得各個任務處理的數據能夠存放在內存中;另一方面,當集群規模有限時,也希望能夠處理相對較大規模的數據。對于系統中發送(或接收)的消息也是如此。鑒于以上原因,本系統中使用了磁盤臨時存儲數據和消息(也稱之為磁盤暫存),以便能夠處理較大規模的數據。

對于消息數據,將消息數據的內存占用比例按照用戶指定的靜態劃分參數確定,系統運行時處理各種類型的消息時內存的使用單獨分配處理,每種類型的消息內存占用都具有一個獨立的閾值控制。

對于任務處理的數據而言,在迭代計算過程中常駐磁盤。對于出邊表不變的計算情況,即不增加也不刪除邊的情形,將頂點的出邊表與頂點的其他在計算中變化的部分,例如頂點的值或標簽等信息,分開存放,但是同樣使用記錄的ID的Hash映射進行劃分,如圖3所示。將圖數據分開處理的好處在于:每次迭代結束只需將本次迭代過程中變化的數據寫回本地磁盤文件即可,不變的靜態部分不需要寫回磁盤,同時也為容錯控制提供了方便。

4.4 圖的頂點類

一個圖是由頂點集合和邊集合構成,因此有頂點類和邊類。本系統中使用鄰接表的方式組織圖數據。這樣一個頂點類中除了頂點本身的屬性之外,還有與之相連的出邊信息,同時提供了對頂點和邊進行操作的方法(見圖4)。

4.5 數據劃分

數據劃分是BSP計算與MapReduce計算不同的地方。前者需要在迭代計算中能夠定位消息發送的目的地在哪里。因此,數據劃分是將各個任務與之綁定的數據分片的數據從數據源讀入,然后利用一定的數據劃分原則,例如Hash劃分,將圖數據分配給某個任務,以便形成超步迭代計算時的數據分區。

一個作業的各個數據分區大小是否均勻直接影響系統的負載均衡,但是Hash函數很難保證各個分區大小的均衡。為此,我們采用了多Hash桶合并的劃分方法,以實現數據的近似均衡劃分。合并的原則可以是各個桶中的對象數據盡可能均衡,還可以考慮數據的本地性。本系統目前是按照各個桶中數據對象近似均衡為主兼顧本地性的原則進行合并。

4.6 容錯機制

容錯是本分布式處理系統必須考慮的問題。BC-BSP系統中考慮兩類故障:一類是任務故障,例如任務進程宕掉;另一類是工作節點故障,例如一個Worker出現網絡斷開故障或者磁盤讀寫故障。系統中各個任務通過心跳機制向所在Worker的WM匯報自己的工作狀態,而各個工作節點也是通過心跳機制定期向BSP主節點控制器匯報工作狀態。

本模塊包括寫檢查點、故障檢測和故障診斷以及故障恢復等功能。寫檢查點是定期或者人工控制方式將某個時刻的作業運行快照保存到分布式文件系統,如HDFS;故障檢測與故障診斷是完成故障信息的收集與故障類型的判斷,不同階段的不同類型的故障,采用不同的恢復機制。BC-BSP系統實現了基本的基于檢查點的故障恢復策略和面向磁盤駐留的多級容錯處理策略。

所謂的面向磁盤駐留的多級容錯處理策略,是利用了本系統的磁盤輔助機制的一些措施,即將圖數據分成不變的常駐磁盤的靜態部分(例如圖頂點的出邊表)和每次迭代計算幾乎都會變化的需要寫回磁盤的動態部分。因此在進行系統快照備份時,實現增量備份,即對圖數據的靜態部分只需要備份一次即可,而每次迭代計算時只需增量地備份動態變化部分。當然每次備份時需要備份本次收到的所有消息。

5 BC-BSP系統應用示例

本節討論使用本系統進行圖數據的PageRank計算和多維數值型數據集的k-means聚類分析的示例。在k-means示例中,可以論證BC-BSP系統也可以有效地處理非圖數據的數據挖掘算法。

5.1 PageRank

使用BC-BSP系統實現PageRank計算中,首先將一個頂點的PageRank值按照一定的規則(如各個出邊頂點平分),通過發送消息的方式發送給出邊頂點,同時獲得來自入邊頂點的消息;之后按照PageRank算法的PageRank值計算公式,將一個頂點的消息值(即PageRank貢獻值)累加,計算當前頂點新的PageRank值。因此用戶可以提供combine方法實現消息發送前的合并,再基于頂點的新PageRank值重復上面的計算過程,直到滿足收斂條件結束計算,并按預先的用戶配置輸出計算結果。

5.2 多維數值型數據集的k-means

聚類

使用BC-BSP系統對多維數值型數據集進行k-means聚類,不需要進行頂點間的消息傳遞,但是需要利用聚集器計算新的聚類中心,可以通過各個聚簇的所有數據點的累計和與累計數據點計數兩種聚集器實現。因此,用戶可以實現BC-BSP系統提供的staffStartup接口,完成整個聚類作業開始之前的聚類中心初始化工作,例如讀取預先設定好的存儲在分布式文件中的初始聚類中心,利用系統提供的聚集器接口實現聚簇內數據點累計和與累計計數計算新的聚類中心,這樣就需要每個任務計算自己任務內的局部累計和與累計計數,然后在BSP主節點控制器計算各個類的總累計和以及總類內數據點數,在新的超步開始時計算聚集中心。

當k-means聚類的k值較?。ɡ鐜资畟€)時,這種利用聚集器的方法是可行的。然而,實驗中我們發現:當k值上百或更大時,就會出現異常。這是因為需要向Zookeeper寫的內容太多。因為系統框架中聚集器的實現利用了Zookeeper,所以在實現k-means聚類時,使用了分布式文件暫存各個任務的局部聚集結果。在執行超步計算前讀取這些臨時文件,計算新的聚類中心,可以解決k值較大時引起的異常問題。

6 BC-BSP系統的實驗

選擇同樣基于BSP模型的Hama[4]和Giraph[5]作為參照比較系統,并且使用它們的API實現了PageRank算法。實驗軟硬件配置是:30個工作節點,一個作為控制節點,29個用作存儲和計算的工作節點,Java虛擬機(JVM)的內存設置為2 GB。每個節點的配置如下:Intel Core i3-2100雙核中央處理器(CPU)、8 GB雙倍速率同步動態隨機存儲器(DDR)3內存、500 G/7200 RPM磁盤,安裝了Red Hat Centos 6.0操作系統、JDK1.6.0-30、Hadoop-0.20.2和Zookeeper-3.3.2。統計了運行PageRank 10次迭代的運行時間開銷。

測試數據采用不同規模的真實數據和人工合成數據;人工合成數據集由數據生成器生成。實驗中我們選擇了定點規模不同的5個真實數據集[8],它們的統計信息見表1。

6.1 真實數據集測試結果

利用表1中描述的5個真實數據集,在Giraph1.0.0的內存版(Giraph 1.0.0_MEM)和磁盤版(Giraph 1.0.0_HDD)、Hama 0.6.4和BC-BSP 2.0系統上分別運行了PageRank算法,得到了圖5所示的結果。

由圖5展示的結果可得出:BC-BSP2.0的性能優于另外3個對比系統,總體上比Giraph1.0.0的內存版的性能好。

6.2虛擬數據集測試結果

通過測試虛擬數據集進行系統可擴展性的對比,我們可知:數據從1 000萬頂點至11 000萬頂點,主要用于測試系統的可擴展性和計算性能,平均出度規模為11.5。

由圖6展示的結果可得出:圖數據的頂點從1 000萬到11 000萬,BC-BSP 2.0在數據吞吐量以及在相同數據集的處理效率上都要優于HAMA-0.6.4,并優于GIRAPH-1.0.0_HDD,效率略低于GIRAPH-1.0.0_MEM,但可擴展性更好。

7 結束語

文章描述了在Java語言環境下基于BSP模型實現的用于大規模圖數據迭代處理的系統BC-BSP。該系統在Pregel思想的基礎上,實現了它的基本功能,同時增加了若干優化策略,包括增加了均衡的數據劃分策略,使得每個任務處理的節點數量盡可能相近,圖數據處理和消息通信過程中的磁盤暫存使得在計算節點及其內存資源有限的情況下可以處理較大的數據,具有更高的可擴展性。

盡管在系統開發過程中已經做了大量的優化工作,但是系統還有可優化的地方。例如,關于圖數據結構的優化與改進:(1)目前不論是圖頂點對象還是邊對象都采用字符串方式存儲,可以改成支持泛型的實現;(2)系統利用寫檢查點機制實現了故障恢復,但是對于故障類型的捕獲和診斷還有待進一步加強;(3)在系統實現中發現Java環境對內存的開銷巨大,因此對數據結構的設計以及使用需要仔細地斟酌。

致謝

本研究得到東北大學于戈教授和谷峪副教授的幫助,以及中國移動(蘇州)研發中心錢嶺博士的支持,謹致謝意!

本系統開發工作是由東北大學計算機軟件所王志剛博士研究生以及許多已經畢業的研究生共同完成,對他們謹致謝意!

參考文獻

[1] SERGEY B, LARRY P. The Anatomy of a Large-Scale Hypertextual Web Search Engine [J]. Computer Networks and ISDN Systems, 1998, 30(98): 1-7

[2] GUERON M, LLIA R, MARGULIS G. Pregel: A System for Large-Scale Graph Processing [J]. American Journal of Emergency Medicine, 2009, 18(18):135-146

[3] VALIANT L G. Bulk-Synchrony: A Bridging Model for Parallel Computation [J]. Communications of the ACM, 1990, 33(8):103-111

[4] Welcome to Hama Project [EB/OL].[2011-07-13] . http://incubator.apache.org/hama/

[5] AVERY C, CHRISTAN K. Giraph: Large-Scalegraph Processing Infrastructure on Hadoop [EB/OL]. [2011-06-29]. Hadoop Summit 2011, https://github.com/aching/Giraph

[6] LOW Y, BICKSON D, GONZALEZ J, GUESTRIN C, et al. Distributed GraphLab: A Framework for Machine Learning and Data Mining in the Cloud [J]. Proceedings of the VLDB Endowment, 2012, 5(8): 716-727

[7] MAMOU H. An Experimental Comparison of Pregel-Like Graph Processing Systems [C]// Proceedings of Vldb Endowment. USA: ACM 2014: 7(12):1047-1058

[8] Using the Stanford Large Network Dataset Collection [EB/OL], https://snap.stanford.edu /data/index.html

91香蕉高清国产线观看免费-97夜夜澡人人爽人人喊a-99久久久无码国产精品9-国产亚洲日韩欧美综合