?

消除規范關系連接冗余的二次排序算法研究

2018-01-02 08:40劉黎志
武漢工程大學學報 2017年5期
關鍵詞:分區排序分組

劉黎志 ,張 威

1.智能機器人湖北省重點實驗室(武漢工程大學),湖北 武漢 430205;

2.武漢工程大學計算機科學與工程學院,湖北 武漢 430205

消除規范關系連接冗余的二次排序算法研究

劉黎志1,2,張 威1,2

1.智能機器人湖北省重點實驗室(武漢工程大學),湖北 武漢 430205;

2.武漢工程大學計算機科學與工程學院,湖北 武漢 430205

使用MapReduce框架對規范的一對多關系實體進行連接操作時,一方實體的各個屬性會在連接的結果中產生大量冗余.通過對二次排序算法進行優化,重新定義Map階段的分區過程、Shuffle階段的排序及分組過程,使得Map階段的輸出為包含一方實體屬性值和多方實體排序值的組合鍵及包含多方實體屬性值的集合.Reduce階段將組合鍵進行分解,提取一方實體的主碼作為HBase表的行健,并將組合鍵中一方實體的各個屬性值及多方實體屬性值集合分別寫入HBase表中對應的列,從而既實現了連接的語義,又消除了冗余.實驗證明,優化后的算法可以消除一方實體屬性值在連接結果中的冗余,提高了對連接結果的查詢效率.

MapReduce;連接冗余;二次排序;HBase

MapReduce在對規范的一對多關系進行連接操作時,一方關系的各個屬性值會在連接的結果中產生大量的冗余,為消除冗余,可利用HBase表的稀疏存儲特性,將一方關系的各個屬性值只存儲一次,同時將其對應的多方關系進行按列多次存儲.實現的過程可借鑒二次排序算法的思想,讓一方和多方關系在Map端進行連接后,輸出的Key既包含一方關系的屬性,又包含多方關系中可排序的值,從而使得Reduce端在規約時可將Key包含的一方關系的屬性值及多方關系的經過二次排序后的屬性值直接寫入HBase表.

MapReduce是Google在2004年提出一個用于處理大數據的分布式計算框架,其數據處理的流程分為Map、Shuffle及Reduce三個階段.在Map階段,原始數據源根據其數據特征被劃分成若干數據塊,每個數據塊由集群中的節點進行Map邏輯處理,結果以Key/Value(鍵/值對)的形式輸出.Shuffle階段負責對Key/Value對進行排序及分組,Map階段的排序發生在將節點內存緩沖區的key/Value寫入到本地磁盤spill文件,及將多個本地磁盤spill文件合并為一個spill文件時,排序的過程為:首先根據key所屬的Partition(分區)排序,每個Partition再按Key進行排序.Map階段完成后,每個的Partition會被拷貝到對應的Reduce節點,由于Reduce節點會接受來自多個Map節點的數據,故Shuffle在Reduce階段的任務就是將來自不同Map節點的Partition按Key值進行歸并排序后,將Key/Value根據Key值分組為[Key,List(Value1,Value2…Valuen)]作為Reduce任務的輸入.Reduce階段負責對[Key,List< Value1,Value2…Valuen]按特定邏輯進行規約處理,并將結果輸出[1-2].

Hadoop MapReduce是Google MapReduce框架的開源實現[3],通過對 Hadoop MapReduce進行擴展,可以將HBase與MapReduce進行集成,從而使得HBase數據表和外界數據源可以以MapReduce的方式進行雙向交互,從而提高數據的處理速度和效率.HBase是建立在Hadoop之上,具有高可靠性、高性能、列存儲、可伸縮、實時讀寫特點的數據庫系統,能夠為海量的數據提供高性能的數據維護及查詢服務[4-8].可以利用MapReduce將具有相同屬性的文件進行連接操作,根據參與連接的文件大小可選擇使用Reduce端連接、Map端連接、Semi(半)連接及Reduce端+Bloom Filter連接,連接的結果可以寫入文本文件,也可以直接寫入HBase數據表.由于MapReduce的Shuffle過程默認按連接結果的Key進行排序,若需要對Value也進行排序,則需要重新定義Shuffle的排序和分組過程,進行二次排序,從而使得連接的結果首先按 Key排序,然后再按Value排序[9-15].

1 改進的二次排序算法

假設規范的一方關系為M(MKEY,MATT1,MATT2,…,MATTn),多 方 關 系 為 S(SKEY,MKEY ,SATT,SVALUE),其中MKEY為關系S的外碼,(mkey[m],matt1[m],matt2[m],…,mattn[m]),m∈[1,n]表示關系M的一個元組,關系S中的SATT屬性的取值范圍為{satt1,satt2,…,sattn},SValue的取值范圍為{svalue1,svalue2,…,svaluen},且按序號從小到大排序的整型值.則使用MKEY對關系M和S進行連接操作,并根據SVALUE值進行二次排序后的結果如下所示:

可見一方關系M的每個元組的各個屬性會在連接結果中產生了大量冗余,消除冗余的方法是將連接的結果進行轉換,寫入HBase的數據表.由于HBase表是按列存儲的,在定義表結構時只需要定義列族(Column Family),對屬于列族的列的數量沒有限制,以ColumnFamily:Qualifier的形式表示一個列名,Qualifier可以是任意的字節數組.因此可以以S:satt[k]列,k∈[1,n],S為多方關系名,satt[k]為S中的SATT屬性的在連接結果中的值,來存儲連接結果中的多方關系SVALUE屬性的值.對于連接結果中的一方關系,提取mkey[m],m∈[1,n]為 HBase表的行健,以 M:MATT[m]列,m∈[1,n],M為一方關系名,MATT[m]為M中的MATT屬性名,來存儲一方關系在連接結果中和 mkey[m]對應的(matt1[m],matt2[m],…,mattn[m])屬性值,從而使得一方關系的連接結果只存儲了一次,既實現了連接的語義,又消除了冗余.HBase存儲列值時默認按列名進行排序,故經過二次排序后的連接結果的svalue[t],t∈[1,n]可能不會按照排序后的次序進行存儲,可增加M:Seq 列存儲排序后的 svalue[t]值及其與 satt[k],k∈[1,n]的對應關系.經過二次排序后的連接結果在HBase表中的存儲結構如圖1所示.

實現將一對多的規范關系進行連接,二次排序后,直接寫入HBase表,其過程如下.

1.1 自定義組合鍵

MapReduce的Shuffle階段只能按Key對數據進行排序,因此若需要在對Key進行排序后,再對Value進行排序,必須使得Map階段輸出的Key包含多方關系S中的SVALUE值,為將一方關系M的各個屬性寫入HBase表,Key還要包含一方關系M的各個屬性值,自定義的組合鍵如下所示:

圖1 二次排序連接結果的存儲結構Fig.1 Storage structure of secondary sort join result

其中firstKey存儲M關系中的MKEY及各個MATT屬性值以字符“ ”分隔的字符串,mkey為以“ ”分隔的第一個子字符串,secoondKey存儲S關系中的SVALUE值.

1.2 實現Map端連接

由于一方關系M的數據一般較小,故可將其數據文件復制多份,讓每個map節點內存中保存一份(如存放到HashMap中),然后掃描多方關系S:對于S中的每一條記錄,在HashMap中查找是否有相同的MKEY的記錄,如果有,則連接后輸出.在MapReduce的任務啟動時,通過job.addCache-File(“hdfs://namenode:9000/M.txt”)指定要復制的一方數據文件M.txt,JobTracker在作業啟動之前會獲取這個URI列表,并將相應的文件拷貝到各個TaskTracker的本地磁盤上,Map函數在重載的setup方法中通過context.getCacheFiles()可以獲取到緩存到本地的文件.實現Map端連接的過程如下:

//定義Map的輸出為< CombinationKey,Text>

HBaseMapper extends Mapper<LongWritable, Text,CombinationKey,Text>

{private HashMap<String,String>cache_M=new Hash-Map<String,String>();

protected void setup(Context context)

{BufferedReader br=null;URI[]distributePaths=context.getCacheFiles();

String mInfo=null;File mFile=new File("./M.txt");

br=new BufferedReader(new FileReader(mFile.getPath()));//讀緩存文件,并放到內存中

while(null!=(mInfo=br.readLine())){

String[]mParts=mInfo.split(" ");cache_M.put(mParts[0],mInfo)}//mPart[0]為MKEY值

}

protected void map(LongWritable key,Text value,Context context)

{string mkey=得到S關系每一行數據的MKEY值;

string sattr=得到S關系每一行數據的SATT值;

IntWritable svalue=得到S關系每一行數據的SVALUE值;

Text mInfo=new Text(cache_M.get(mkey));

if(mInfo!=null){

CombinationKey cbkey=new CombinationKey();

cbkey.setFirstKey(mInfo);cbkey.setSecondKey(svalue);

context.write(cbkey,new Text(sattr+“ ”+svalue));

}}}

1.3 重新定義分區函數和排序依據及分組函數

由于在Map端就進行一方關系M和多方關系S的連接操作,故需要重新定義分區函數,使得firstKey中具有相同mkey的連接結果分到同一個區(Partition),自定義分區類的定義如下:

class CusPartition extends Partitioner<CombinationKey,Text>{

public int getPartition(CombinationKey key,Text value,int numPartitions){

string mkey=取出key中firstKey部分的mkey;//mkey為分組依據

return (mkey.hashCode()&Integer.MAX_VALUE)%numPartitions;}

}//numPartitions的值為集群中reduce節點的數量.

在Map和Reduce階段都需要對處在同一個分區的連接結果首先按firstKey中的mkey進行排序,再按secondKey進行排序,自定義的排序比較類定義如下:

class CusComparator extends WritableComparator{

public int compare(WritableComparable cbKeyOne,WritableComparable cbKeyTwo){

CombinationKey c1=(CombinationKey)cbKeyOne;

CombinationKey c2=(CombinationKey)cbKeyTwo;

string mkey1=從c1中取出firstKey部分的mkey;IntWritabe svalue1=取出c1中的secondKey;

string mkey2=從c2中取出firstKey部分的mkey;IntWritabe svalue2=取出c2中的secondKey;

if(!mkey1.equals(mkey2)) {return mkey1.compareTo(mkey2);}//以字符方式比較mkey

else{return svalue1-svalue2;//以數值方式比較second-Key}}}

比較方法返回值值分別以小于零的值、零值、大于零的值表示小于、等于和大于.

在Reduce階段將具有相同combinationkey的連接結果分在同一組,形成[combinationkey,List(sattr[k]“ ”svalue1,sattr[k]“ ”svalue2...sattr[k]“ ”svaluen)],k∈[1,n].分組的依據仍然為 firstkey的mkey部分,自定義的分組類如下所示:

class CusGrouping extends WritableComparator{

public int compare(WritableComparable cbKeyOne,WritableComparable cbKeyTwo){

CombinationKey c1=(CombinationKey)cbKeyOne;

CombinationKey c2=(CombinationKey)cbKeyTwo;

string mkey1=從c1中取出firstKey部分的mkey;

string mkey2=從c2中取出firstKey部分的mkey;

return mkey1.compareTo(mkey2);}}

1.4 在Reduce階段將連接結果寫入HBase表

Reduce首先對輸入的combinationkey進行分解,取出firstKey中的mkey作為HBase表的行健,然后將firstKey中的其它屬性值依次以M:MATT[m],m∈[1,n]列存儲.對已經按svalue排序好的集合 List(sattr[k]“ ”svalue1,sattr[k]“ ”svalue2...sattr[k]“ ”svaluen),以 S:sattr[k],k∈[1,n]列存儲對應的svalue值.由于HBase默認按列的名稱S:satt[k]進行排序,故存儲的次序可能與排序的結果不一致,可以增加M:seq列,存儲排序后的svalue值,Reduce的過程定義如下:

class SCHBTReducer extends TableReducer<CombinationKey,Text,ImmutableBytesWritable>{

public void reduce(CombinationKey key,Iterable<Text>values,Context context){

string mSeq=“”;

string[]mParts=key.getFirstKey().toString().split(" ");

Put put=new Put(mParts[0].getBytes());//行健為mkey

for(int i=1;i<mParts.length;i++){//存儲一方關系 M的各個屬性值

put.add(“M”.getBytes(),MATT[i].getBytes(),mParts[i].getBytes())}

while(values.iterator().hasNext()){//存儲多方關系的satt及svalue值

string[]sParts=ite.next().toString().split(" ");

put.add(“S.”getBytes(),sParts[0].getBytes(),,sParts[1].get-Bytes());

mSeq+=sParts[0]+":"+sParts[1]+";";}

put.add(“M”.getBytes(),”mSeq”.getBytes(),mSeq.get-Bytes());//存儲按svalue排序的結果

context.write(new ImmutableBytesWritable(mParts[0].get-Bytes()),put);//寫入 HBase表

}}

2 實驗部分

實驗環境為包含有4個節點的Hadoop集群,1個主節點,4個數據節點(主節點也為數據節點),節點計算機的配置為Pentium(R)Dual-Core E500 2.5 GHz,2 GiB內存,160 GiB硬盤.集群使用的操作系統為 ubuntu-14.04.1-server-i386,Hadoop版本為2.4.1,HBase版本為0.98,7,客戶端使用Eclipse 4.2.1,Java SE 1.7為開發環境.以學生選課關系模擬規范的一對多關系,模擬數據如圖2所示.

在集群的HDFS上建立SecSort_MFileIn目錄,上傳std.txt文件到該目錄,建立SecSort_SFileIn目錄,上傳course.txt文件到該目錄.在Main函數中對Job進行如下的配置:

String[]ioArgs=new String[]{"hdfs://MainDataNode:9000/SecSort_MFileIn/std.txt",

"hdfs://MainDataNode:9000/SecSort_SFileIn"};

job.addCacheFile(new URI(ioArgs[0]));//設置 Map端連接需要復制到每個節點內存的一方文件

job.setJarByClass(SecSortHBase.class);//設置任務的類名

job.setMapperClass(HBaseMapper.class);//設置Map類名

job.setReducerClass(SCHBTReducer.class);//設 置 Re-duce類名

job.setPartitionerClass(CusPartition.class);//設置自定義分區類

job.setSortComparatorClass(CusComparator.class);//設置自定義排序比較類

job.setGroupingComparatorClass(CusGrouping.class);//設置自定義分區類

job.setMapOutputKeyClass(CombinationKey.class);//設置Map輸出的Key類型

job.setMapOutputValueClass(Text.class);//設置 Map輸出的Value類型

FileInputFormat.setInputPaths(job,new Path(ioArgs[1]));//設置任務的輸入路徑

job.setOutputFormatClass(TableOutputFormat.class);//設置任務輸出為HBase表

job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,"t_sc");//設置輸出的表名

System.exit(job.waitForCompletion(true)?0:1); //運行任務

任務在集群中運行后,將結果寫入到HBase的t_sc表中的結果如圖3所示.

圖2 實驗模擬數據Fig.2 Experimental simulation data

從實驗結果看出,std.txt文件中每個學生的屬性只存儲了一次,每個學生所選課程存儲了多次,實現了連接的語義,消除了冗余,二次排序的學生成績可直接通過查詢std:CourseSeq列得到.因以HBase表存儲連接結果,可以利用HBase提供的各類方法進行數據檢索,提高了數據的查詢效率.

3 結 語

利用MapReduce對具有相同屬性的關系進行連接時,不可避免會產生冗余,將Map端輸出的Key進行組合,使其包含產生冗余的屬性,并重新定義分區、排序及分組過程得到期望的Reduce端的輸入,在Reduce端將連接結果寫入HBase表,可以優化規范一對多關系的連接結果冗余.如何對多個關聯的實體進行連接,實現連接的語義,又避免連接結果中不必要的冗余,將是以后研究的主要方向.

[1] 王珊,王會舉.架構大數據:挑戰、現狀與展望[J].計算機學報,2011,34(10):1741-1751.WANG S, WANG H J.Architectingbigdata:challenges,studies and forecasts[J].Chinese Journal of Computers,2011,34(10):1741-1751.

[2] 孟小峰,慈祥.大數據管理:概念、技術與挑戰[J].計算機研究與發展,2013,50(1):146-169.MENG X F,CI X.Big data management:concepts,techniques and challenges[J].Journal of Computer Research and Development,2013,50(1):146-159.

[3] 陳吉榮,樂嘉錦.基于Hadoop生態系統的大數據解決方案綜述[J].計算機工程與科學,2013,35(10):25-35.CHEN J R,LE J J.Reviewing the big data solution based on Hadoop ecosystem[J].Computer Engineering&Science,2013,35(10):25-35.

[4] LARS G.HBase:the definitive guide[M].Sebastopol:O’REILLY,2011.

[5] ZIKOPOULOS P C,EATON C,DEROOS D,et al.Understanding big data:analytics for enterprise class Hadoop and streaming data [M]. New Youk:McGraw-Hill,2012.

[6] AIYER A,BAUTIN M,CHEN G J,et al.Storage Infrastructure Behind Facebook Messages Using HBase at Scale[J].Bulletin of the Technical Committee on Data Engineering,2012,35(2):996-999.

[7] VENNER J.Pro Hadoop[M].Berkeley:Appress,2009.

[8] 蔡睿誠.基于HDFS的小文件處理與相關MapReduce計算模型性能的優化與改進[D].長春:吉林大學,2012.

[9] LU W,SHEN Y Y,CHEN S,et al.Efficient processing of k nearest neighbor joins using MapReduce[J].PVLDB,2016,5(10):1184-195.

[10] PANSARE N,BORKAR V R,JERMAINE C,et al.Online aggregation for large MapReduce jobs[J].PBLDB,2014,4(11):1135-1145.

[11] OKCAN I,RIEDEWALD M.Processing theta-joins using MapReduce[C]//ACM SIGMOD International Conference on Management of Data.ACM,2011:949-960.

[12] AFRARTI F N,DAS S A,MENESTRINA D,et al.Fuzzy joins using MapReduce[C]//IEEE International Conference on Data Engineering.IEEE,2012:498-509.

[13] ZHANG X F,SHEN L,WANG M.Efficient multi-way theta-join processing using MapReduce[J].PVLDB,2016,5(11):1184-1195.

[14] BABU S.Towards automatic optimization of MapReduce programs[C]//ACM Symposium on Cloud Computing.ACM,2010:137-142.

[15] SILBA Y N,REED J M.Exploiting MapReduce based similarity joins[C]//ACM SIGMOD International Conference on Management of Data.ACM,2012:693-696.

Secondary Sort-Based Algorithm for Eliminating Normative Join Redundancy

LIU Lizhi1,2,ZHANG Wei1,2
1.Hubei Key Laboratory of Intelligent Robot(Wuhan Institute of Technology),Wuhan 430205,China;
2.School of Computer Science and Engineering,Wuhan Institute of Technology,Wuhan 430205,China

The join results of two entities with normative one-to-many relationship by MapReduce may contain some redundancy of one side entity.A combination key with one side entity properties and multi-side sorted values and a list of multi-side entity properties can be got as the input of reduce stage,by optimizing secondary sort-based algorithm and redefining the partition function of map stage,sort and group function of shuffle stage.After splitting the combination key at reduce stage,the key of one side entity was extracted as rowkey of the HBase table to store the join results,and the other properties of the one side entity and the list containing multiside entity properties were put in the corresponding columns of the HBase table,so the join semantics was realized and the redundancy was eliminated.The examination proves that the optimized algorithm can eliminate the redundancy of one side entity properties and promote the data query efficiency of the join results.

MapReduce;join redundancy;secondary sort;HBase

TP311

A

10.3969/j.issn.1674-2869.2017.05.018

1674-2869(2017)05-0508-06

2016-12-01

劉黎志,碩士,副教授.E-mail:llz73@163.com

劉黎志,張威.消除規范關系連接冗余的二次排序算法研究[J].武漢工程大學學報,2017,39(5):508-513.

LIU L Z,ZHANG W.Research on the secondary sort algorithm for eliminating normative join redundancy[J].Journal of Wuhan Institute of Technology,2017,39(5):508-513.

陳小平

猜你喜歡
分區排序分組
貴州省地質災害易發分區圖
上海實施“分區封控”
排序不等式
恐怖排序
分組搭配
節日排序
怎么分組
浪莎 分區而治
分組
大空間建筑防火分區設計的探討
91香蕉高清国产线观看免费-97夜夜澡人人爽人人喊a-99久久久无码国产精品9-国产亚洲日韩欧美综合