周 偉
(攀枝花學院,四川 攀枝花 617000)
隨著互聯網應用的普及,為了實現個性化的推送和推薦服務,有必要利用大數據處理技術對各種應用生成的日志數據進行分析和處理,因此如何采集各應用系統生成的實時數據成了大數據技術亟待解決的問題。陳飛等結合使用Flume、Elasticsearch 以及Kibana 等技術手段提出了一種分布式的日志采集分析系統,從系統設計和架構等方面提出了新的解決思路,并針對Nginx 的訪問日志進行了實時采集和分析,完成了原型系統的實現。[1]朱濤等構建了基于改進的Flume 的實時數據采集系統,采用復合型Channel 與Flume 相結合的方式,通過Flume 采集數據,在保證數據源的豐富性和可靠性的前提下,提高采集效率。[2]李洋等提出了一種整合Hadoop 和Storm 的分布式框架,構建出一種融合了實時計算與離線計算的分布式日志實時處理系統,系統架構由數據服務層、業務邏輯層和Web 展示層組成,其中數據服務層使用Flume 實時采集日志數據。[3]
通過對以上采集方案和Flume 源碼進行分析,發現Flume 在進行實時數據采集時,如果日志系統生成的日志文件存在重命名,就會重復采集數據。本文將構建基于改進的Flume 實時數據采集系統,從而提高Flume 在進行實時數據采集時的準確率。
Flume 是一個可以從多個不同的數據源進行日志收集、聚合和傳輸大量數據的分布式、高可靠性和高可用性的日志采集系統。Flume 最主要的作用就是實時讀取服務器本地磁盤的數據,將數據傳輸到HDFS 文件系統,供大數據分析使用。
Flume 架構如圖1 所示,Flume 以Agent 為最小的獨立運行單位,一個Agent 主要由Source、Channel 和Sink 三大組件組成。Source 將外部數據源的數據封裝成Flume 數據模型的最小單位event,并把數據存儲到Channel 中,Sink 負責取出Channel 中的數據并轉發到目的地。[3]
圖1 Flume 架構
Source 即數據源,通過Source 組件可以讓Flume 讀取指定的數據,然后將數據傳遞給后面的Channel。Source 可以處理avro、thrift、exec、jms、spooling directory、taildir、syslog、http 等各種類型、各種格式的日志數據。
Sink 負責輪詢Channel 中的事件并批量刪除它們,并將這些事件批量寫入到存儲系統或發送到另外一個Flume Agent。Sink 組件可以將數據寫入HDFS、logger、avro、file、Hbase 數據庫等目的地。
Channel 是位于Source 和Sink 之間起緩沖作用的組件,允許Source 和Sink 運行在不同的速率上。Flume 自帶Memory Channel 和File Channel兩種Channel,最常用的是Memory Channel。
Flume 中有Exec、Spooldir 和Taildir 三種可以監控文件或者目錄的Source,實現實時數據采集。
(1)Exec Source:實現文件監控。Exec Source可以實時監控文件中的新增內容,類似于Linux 命令中的tail-F 的效果,根據文件名進行追蹤,并保持重試,即使該文件被刪除或改名后,如果之后再次創建相同的文件名,會繼續追蹤;由于tail-F 命令跟蹤文件內容默認顯示為最新10 行內容,通過實驗發現當Flume 采集進程由于意外停止以后,重新啟動Agent 進程進行文件采集,如果被實時采集的文件數據追加內容超過10 行,Agent 進程重新啟動采集任務以后會造成部分數據丟失。
(2)Spooling Directory Source:采集目錄中新增文件的數據。Spooldir 可以監聽一個目錄,并同步目錄中的新文件到sink,被同步完的文件可立即刪除或打上標簽,適用于同步新文件,不適合對實時追加日志的文件數據進行監聽并同步。
(3)Taildir Source:可以監控多個目錄,并且使用正則表達式匹配該目錄中的文件名進行實時采集,可以說是Spooldir Source 與Exec Source 的結合體,并具有兩者的優點。Taildir Source 適合用于監聽實時追加的多個文件和斷點續傳,它以JSON 格式記錄每個被采集文件的索引節點inode信息、絕對路徑和最新采集位置信息,從而實現斷點續傳。Agent 進程重啟后不會有重復采集問題,因此,可以選擇flume 的Taildir Source 監控web服務器或日志服務器的所有搜索類日志文件并進行同步,按日期生成目錄,并指定生成文件名前綴。
(1)Agent 配置
基于Taildir Source 的Agent 配置如圖2 所示。
圖2 基于Taildir Source 的Agent 配置
(2)啟動Agent
輸入bin/flume-ng ageng -n agl -c conf -f conf/taildir-hdfs.conf,啟動Agent。
(3)數據采集測試
數據采集測試操作步驟如圖3 所示。
圖3 Taildir Source 追加數據采集實驗
Taildir Source 通過配置文件中filegroups 配置項設置,對文件數據進行采集時可以采用正則表達式對一類或一組文件進行采集,在圖2 中通過設置配置項ag1.sources.src1.filegroups=f1 和ag1.sources.src1.filegroups.f1=/root/logs/access.*.log 可以實現對/root/logs 目錄下文件名中包含關鍵字access 的所有文件數據進行采集,采集完成后將另一個文件test.txt 中的數據追加到access.log 文件中,通過實驗發現追加的數據也被采集到了HDFS文件系統中。將圖3 中的access.log 文件重新命名為"access-+年+月+日.log" 格式,然后查看HDFS文件系統中對應的目錄,發現有新的采集文件生成,通過Web 端查看新生成的文件,發現最新生成的文件大小與前兩個文件大小之和相等,同時通過haddoop fs-cat 命令顯示內容,發現其數據與追加了test.txt 文件數據之后的access.log 文件數據相同,因此,Taildir Source 在進行實時數據采集時,如果被采集文件重命名,則存在數據重復采集,實驗過程及結果如圖4 所示。
圖4 Taildir Source 對重命名文件數據重復采集
在實際應用項目中,某些日志框架程序會對前一天產生的日志文件按照日期等格式進行重命名,使日志文件的絕對路徑發生變化,由前面分析可知Flume 基于Taildir Source 采集文件數據時會以文件的inode 值和絕對路徑作為更新和讀取文件的條件,因此Flume 會對該文件的數據進行重新采集,影響數據采集的準確率而無法發揮出Taildir Source 斷點續傳的優勢。根據Flume 的Taildir Source 數據采集特點,針對這種應用場景可以通過優化Flume 源碼解決。
通過查看Taildir Source 的采集配置文件,發現Taildir Source 在采集文件數據時會自動生成保存采集文件位置信息的json 格式的文件,打開該文件內容為[{"inode":270149,"pos":144,"file":"/root/logs/access-2022-09-29.log"}],inode 表示該文件在Linux 系統中的索引節點,Linux 系統中每一個文件的索引節點都是唯一的,除非該文件被刪除之后重建一個同名的文件,否則索引節點不會改變;pos 表示當前文件的采集位置;file 表示文件絕對路徑;Flume 在采集文件數據的時讀取json文件的inode、pos、file 信息。
通過分析發現,Flume 采集文件數據時文件是否重新上傳是由文件的inode 值和文件的path(絕對路徑)共同決定的,只要其中一個有變化(如inode 改變或者文件重命名)都會造成Flume 重新采集文件數據,因此修改Flume 源碼中對應的更新文件的條件即可對Flume 實時數據采集Taildir Source 的缺點進行改進。
(1)修改更新文件的條件
修改flume-taildir-source 目錄中對應包下TailFile 類中updatePos()方法的更新條件。
//TailFile 類的構造方法
public TailFile(File file,Map
//*********省略部分代碼*************
//path:被采集文件的絕對路徑
this.path = file.getAbsolutePath();
this.inode = inode;
}
public boolean updatePos(String path,long inode,long pos)throws IOException {
//把文件inode 值和文件的絕對路徑作為更新的條件
if (this.inode == inode&& this.path.equals(path)){
//*********省略部分代碼*************
return true;
}
return false;
}
//*********省略部分代碼*************
將TailFile.java 中的if(this.inode == inode&&this.path.equals(path))更改為if(this.inode == inode),即只把inode 值作為更新的條件。
(2)修改采集數據的條件
修改flume-taildir-source 目錄中對應包下ReliableTaildirEventReader 類的updateTailFiles(參數列表)方法的更新條件。
public List
//*********省略部分*************
TailFile tf = tailFiles.ge(tinode);
//把文件inode 值和文件的絕對路徑作為讀取數據條件
if(tf == null || !tf.getPath().equals(f.getAbsolutePath())){
long startPos = skipToEnd ? f.length():0;
tf = openFile(f,headers,inode,startPos);
} else {
//*********省略部分*************
}
tailFiles.pu(tinode,tf);
updatedInodes.add(inode);
}}
return updatedInodes;
}
//*********省略部分代碼*************
將ReliableTaildirEventReader.java 中的if(tf==null|| !tf.getPath().equals(f.getAbsolutePath()))改為if(tf == null),即只把inode 值作為讀取條件。
(3)把項目重新打包后生成新的flume-taildirsource-1.10.1.jar 文件。
(4)把重新打包生成的jar 文件上傳到Flume安裝包下替換原來的jar 文件。
(5)重新啟動Flume 進行測試
測試步驟及結果如圖5 所示。從圖5 中可以發現,對文件重命名之后,Flume 不會重新采集數據,對重新重命名后追加到文件中的數據同樣會被Flume 監控到并進行采集,不會對原有的數據進行再次采集,從而避免了數據的重復采集。
本文比較了三種Flume 實時數據采集方案的優缺點,對其中基于Taildir Source 的實時數據采集方案進行了改進,對改進前后的數據采集結果進行了實驗對比。結果表明,相比于改進前的實時數據采集方案,改進后的基于Flume 的Taildir Source 實時數據采集方案避免了文件重命名后數據重復采集,提高了數據采集準確率。