?

基于Storm的實時大規模傳感器監控平臺的開發和實現

2019-12-12 07:06周煜敏
計算機應用與軟件 2019年12期
關鍵詞:數據流滑動分區

周煜敏 王 鵬 汪 衛

(復旦大學計算機科學技術學院 上海 200433)

0 引 言

隨著云時代的到來,大數據和云計算已經吸引了越來越多的業內外人士的關注。在諸如金融、電信和大規模傳感器監控等許多領域中,在線處理實時數據,也稱為數據流處理也得到了越來越廣泛的應用。數據流處理應用程序通常需要低延遲、快速處理和實時反饋。

在物聯網的場景下,一套完整的系統由數千個傳感器節點的分布式集合組成,每個傳感器節點能夠從環境中感測多種類型的信息并以特定頻率發送數據。這就需要非常強大的分布式處理能力來滿足計算的需求。

在Storm、Spark以及Flink等實時大數據計算引擎不斷發展的同時,傳感器的分析專家則難以掌握這些計算引擎的處理原語。因此需要用一套簡單的自定義的語言支持,幫助分析專家能夠簡單地構建計算邏輯,這樣才能夠最快速地為這些專家提供可靠的數據結果。

本文在深入分析了傳感器計算的需求后,發現絕大部分的計算都是基于傳感器數據的數值計算,而且都是基于滑動窗口的計算,將高頻的數據的統計量以秒、分鐘至小時級別的頻率計算,加以輔助的閾值設定和四則運算最終呈現結果。

本文試圖設計一套輕量但有效的方案,以支持在大量傳感器上運行的多個查詢計算。由于處理是實時的,傳統的靜態數據平臺和算法不適合傳感器的流數據。同時,由于數據的高頻率,還應該滿足處理的高吞吐量以對應數據的攝入速率。對于傳感器監測分析師,他們可以將計算需求轉化為腳本,系統會自動解析并轉化Storm的流處理程序,將高頻數據處理之后再源源不斷地產生處理結果并及時反饋給分析師。

該系統還會針對優化處理在計算的過程中出現的重復計算,衡量所涉及的通信和計算成本并通過中間結果共享的分區算法來減少網絡通信已達到更好的性能。

1 相關知識

1.1 相關工作

實時計算需要一個合適的分布式平臺輔助運行,社區已經開發了許多系統,例如早期的Aurora[1]、S4以及Storm[2]和Spark Streaming[3]等來處理秒級甚至毫秒級響應中的大量數據。Storm具有高度可擴展性,易于使用,并且具有低延遲和有保證的數據處理能力。同時,Storm提出了拓撲(Topology)的計算概念[4],相比于傳統大數據引擎Hadoop的MapReduce更加靈活并且適合實時場景。這些特性都非常契合物聯網數據處理應用的需求。與Storm不同,S4等系統無法保證每個元組都會被處理。而Spark Streaming則提出了一個新的模型,使用微批處理的方式用來近似進行分布式流處理[5],但其延遲對于實時響應來說很高,無法滿足實際的應用需求。

為了在大數據流上進行計算,已經提出了許多支持復雜事件處理(CEP)的語言,包括SQL-TS[6]、Cayuga[7]等。雖然他們設計了不同的語法規則,但某些語言不適合物聯網大數據的應用程序場景。在過去的研究中,也有相關的研究人員設計了一套實時處理應用輔助開發框架以簡化開發人員工作[9]。而本文的工作專注于數據流上的聚合和滑動窗口計算,而上述語言和系統設計初衷是處理更復雜的流處理作業,因此在分布式集群上并沒有良好的兼容性,使用這些語言會在解析子句和生成作業時會產生額外的開銷。

為了實現高吞吐量,應該充分利用分布式集群。對性能的關注需要在有限的資源下完成工作。許多先前的研究已經解決了在Storm上開發的一部分優化問題。TMSH-Storm[8]有效降低了Storm的處理延遲和通信開銷,然而在多查詢的環境下,該方法的優勢則并不明顯。

之前的許多文獻都討論了物聯網環境下傳感器的實時處理算法,例如:基于微簇的橋梁監測數據流異常識別算法[10],主要利用主成分分析提取特征,優化異常檢測的計算;基于復雜事件處理的用戶需求響應性能實時監測分析,主要在復雜事件處理上實現了R算法的內嵌和可視化的仿真[11]。這些文獻著重解決靜態數據的上下文的處理優化而并非流處理。此外,還有一些文獻作了滑動窗口相關的優化,例如:具有不同長度和不同選擇謂詞的滑動窗口上的聚合的多查詢優化[12],然而沒有利用先驗知識達到更好的效果。

1.2 滑動窗口

實時查詢通常適用于無界數據流,而不是靜態數據集??紤]到內存限制,有必要設計用于維護流歷史的摘要或概要的技術。對于大多數應用程序,數據流的最新元素比舊的元素更重要。因此這種對最近數據的偏好產生了實時流數據上的滑動窗口的表達形式。窗口的大小和滑動間隔通常使用時間間隔(基于時間)或元組數來指定(基于元組)。

在本文中滑動窗口使用時間間隔標準,時間滑動窗口W定義如下。

定義1將時間長度為Lms,每次滑動時間長度為Sms定義為滑動窗口,記作W(L,S)。

W(L,S)所對應的時間窗口如圖1所示。

圖1 時間滑動窗口示意圖

當需要計算W(L,S)中的數據時,就需要在內存中存儲時間長度為L的數據以供計算,并且每Sms就需要給出當前窗口的結果反饋。

1.3 數據格式

首先介紹一下流數據和查詢的基本數據結構。每個傳感器都有一個唯一的標識號。傳入數據流采用(sensor_id,timestamp,data)的數據格式。它指的是在timestamp時間戳時,sensor_id對應傳感器的值為data。

Timestamp的類型為long,使用的是Unix時間戳。

Data的類型包括布爾類型、整數數據和實數數據。系統會將其統一轉化為單精度浮點類型進行存儲和計算。

每個傳感器將以靜態頻率發送該結構的元組。sensor_id與其頻率之間的關系存儲在數據庫中,在優化計算時該信息會被使用。

1.4 語法規則

一位分析師可以對傳感器啟動一系列查詢,并添加一些進一步的計算以獲得檢查所需的最終結果。這里將查詢組稱為工作流。不同的分析師希望監控不同傳感器上的不同參數,從而導致許多工作流一起執行。為了消除歧義,應該為工作流提供明確的定義,并為分析師提供語言標準,為此本文設計了一套腳本語言。

本文將用戶的查詢劃分為以下3類:

(1) 滑動窗口聚合功能;

(2) 流聯合函數;

(3) 基本算術計算支持。

對于第一類,我們提供四種基本聚合功能:滑動窗口的最大值、最小值、平均值、求和運算。這4種函數每一種都需要3個參數,包括輸入的sensor_id,窗口長度L和滑動間隔S。計算所產生的結果可以計為用戶定義的新的流。例如:

A1=avg("S1",1 000,1 000)

該語句即代表了傳感器“S1”在1秒鐘的滑動窗口上的平均值,計算的結果成為一個新的流,并命名為A1。

對于第二類,本文為流提供聯合函數union,其中的每個參數都是一個不同的流id,可以使用原始數據的sensor_id或者是另一個用戶定義的流id。該函數還將生成一個帶有新用戶定義id的連接流。

對于第三類,本文提供4個算術運算(加,減,乘,除)和4個聚合函數(最大值,最小值,平均值,求和運算),接收不同的流id作為運算參數,并持續計算結果輸出。與第一類計算不同,這些運算符給出結果的時間發生在任何輸入參數值發生變化的時候,而不是等待窗口時間之后更新其結果。

以下腳本是工作流的另一個示例。這意味著首先計算兩個傳感器的平均值,并在一個10分鐘的滑動窗口中以5分鐘的滑動間隔計算兩個傳感器的連接流,然后在同一時間計算三個輸出流的最大值和平均值窗口。

MD1Z=avg("8MD1-AZ",600 000,300 000);

UNI=union("8MD2-A","8MD3-A");

MD23=avg("UNI",600 000,300 000);

MD4Z=avg("8MD4-AZ",600 000,300 000);

UNIF=union("MD1Z","MD23","MD4Z");

out_MZ=max("UNIF",600 000,300 000);

out_AZ=avg("UNIF",600 000,300 000)。

2 系統架構

本節主要講述該實時計算平臺的系統架構,如圖2所示。系統內部主要分為三大模塊:腳本解析模塊、實時計算代碼生成模塊和分布式實時計算模塊。

圖2 計算平臺示意圖

2.1 腳本解析模塊

腳本解析模塊負責解析腳本語言,提取出關鍵信息供后續邏輯搭建,為后續模塊的查詢優化提供信息。

當收到所有的腳本時,這些腳本首先通過語法分析模塊生成抽象語法樹,然后再通過腳本所攜帶的額外信息通過計算圖生成模塊進一步生成計算圖。為了使分析計算組組之間的時間序列計算的計算能夠共享,需要將計算組中的每一個語句,也就是每個計算,當成一個節點看待,而運算符和與該運算所用到的底層所有其他運算之間形成有向邊,對于用戶所給出的計算組進行一個有向圖狀的描述,從而對于相同的操作可以進行有效的合并。

本模塊通過ANTLR的解析,能夠識別出對于相同傳感器的聚集操作,對于其中不同的窗口進行最大公約數的合并計算,以最大限度地節省不必要的計算。由于減少了一些重復互相有交集的時間片段數據存儲,因此在橋梁傳感器網絡監測的高頻率數據流的應用情景下,合并切分窗口來進行分析計算會節省不少內存消耗。

這些信息將通過信息收集模塊將代碼所需信息存儲起來,以便后續使用。

在1.4節中描述的腳本語言簡易的語法使得以前需要使用成千上萬行的Storm代碼才能完成的查詢邏輯,只需要幾十條語句便可以完成。當用戶遇到查詢需求變更的時候,用戶只需要把簡短的幾十條語句作輕微的修改,然后由實時計算代碼生成模塊重新生成可以執行的Storm代碼,進行部署和運行。

2.2 實時計算代碼生成模塊

實時計算代碼生成模塊通過Java反射機制,根據用戶的腳本需求,將計算圖進一步轉化為Storm的Bolt具體的處理代碼。

上一模塊產生的計算圖的結果,分別經過代碼生成模塊和優化分區結果生成模塊的解析、相應的源代碼和分區結果。

其中代碼生成模塊主要運用Java語言的反射機制,將計算圖的邏輯轉換成相對應的Java函數,并配置對應的參數。而每一種函數都對應一段具體的Storm原語的計算邏輯。

對于優化分區結果生成模塊而言,由于多個腳本之間存在重復的查詢語句,因此代碼生成模塊中還包含了查詢共享發現模塊。該模塊負責把腳本中存在的重復查詢語句組進行合并去重,減少數據流元組在網絡中的重復傳輸以及在集群中的重復計算。本文采用的查詢共享模塊的算法架構和具體實現如下:

對于一個分布式集群,如果所有的計算能夠相對平均的分配到每一個計算節點上去,那么集群的計算能力就能夠得到最大程度的發揮,計算的吞吐量也得以提升。而實際情況中,在一個工作流中通常存在對于相同的流的計算的情況,如果這些計算能夠分區在一起將會共享計算結果,減少重復的計算,從而獲得更好的性能。同時,如果兩個不同的工作流程共享同一個傳感器計算或甚至相同的窗口聚集計算,那將這兩個工作流程合并在一起也能降低通信成本,從而提高性能。

基于以上想法,采用啟發式算法進行分區優化見算法1。

算法1分區優化算法

輸入:分布式工作節點個數n,計算圖G,數據庫中存儲的傳感器數據頻率Freq[]

輸出:每個傳感器的分區結果Map:Partition

for i :=1 to n

W[i] :=0

//工作節點負載

foreach G的子圖G’

load[G’] := 0

foreach G’中所有傳感器sensor_id

load[G’] :=load[G’]+freq[sensor_id]

Arrays.sort(load)

foreach G的子圖G’(按load從大到小)

target :=W數組中最小值下標

foreach G’中所有傳感器sensor_id

Partition.put(sensor_id,target)

算法首先將每一個結點的計算復雜度作為該節點的權重,然后以子圖的粒度進行權重的計算。獲取劃分算法之后,通過加權輪詢算法判斷子圖和分區的對應關系,以達到負載均衡。

2.3 分布式實時計算框架

分布式實時計算通過分布式Storm集群實現,其Storm的拓撲結構如圖3所示。

圖3 Storm計算拓撲示意圖

在Storm拓撲中,數據源模塊將持續發送原始傳感器數據的元組,在計算之前還需要一層Filter Bolt以過濾無關傳感器的流式數據。在查詢組中所設計到的要處理的數據種類是有限的,正如橋梁檢測系統中的原始數據通道可能有幾千個,而查詢組中涉及到的通道卻有可能只有非常少量的部分。因此本文增加了這一層過濾模塊,將不必要的傳感器數據從系統中過濾出去,有效減少了整個分布式系統的負載和計算壓力。

數據流中的元組從一個組件發往另外一個組件需要指定發送的分組方式,默認的隨機分組并不能有效地解決大規模傳感器場景下數據不均衡所帶來的性能壓力。2.2節所述的分區優化算法幫助系統產生了負載更均衡的數據分區對應關系。我們利用了Storm系統提供的CustomGrouping API,將優化算法輸出的Map優化分區結果應用于Storm的數據中,進行重排分發。數據會相對均勻地交給后續Calc Bolt的每一個具體實例任務。

Calc Bolt接收代碼生成模塊傳遞的參數和代碼,能夠保證運算嚴格按照用戶腳本所定義的窗口運行。而上游的優化分組策略能夠進一步降低網絡傳輸的代價,提升整個系統的計算效率。

Result Bolt和Calc Bolt的原理很類似,它會接收Calc Bolt的計算結果,以相對較低的負載完成上層結果的計算并在命令行進行輸出。不同的是,Result Bolt的計算在接收數據的瞬間觸發而非等待窗口到達。

本系統結合物聯網傳感器計算的窗口聚合計算占比較大,計算同質性較大等特點,完成了一個基于匹配的查詢優化算法,使得對于海量流式數據在分布式系統中的處理更加平衡,從而節約資源,提高查詢效率和性能。

3 實 驗

3.1 測試集群

實驗環境使用由1個Master和4個Slave組成的Storm集群。每個節點都有64 GB內存,6×2.0 GHz(Intel Xeon E5-2620)CPU和6 TB磁盤空間。所有節點都通過1 GB以太網連接。

3.2 數據集

實驗使用真實的上海市的大橋傳感器數據,傳感器種類達21種,總數量達到了1 000,其中大多數傳感器的數據傳輸速率達到了20 Hz以上,每秒鐘傳輸的數據量約30 MB。數據通過傳感器采集系統的加密socket協議進入系統。

工作流程由經驗豐富的工程師設計,因此監控結果在實際應用中具有重要意義。此外,工程師來自不同的領域,包括一些交叉領域。不同類型的傳感器以復雜的方式使用。本文從工程師那里收集了1 024個不同的工作流程。

3.3 實驗結果及分析

通過套接字獲取數據源并將其放入Apache Kafka[13]作為Spout的數據生成器。作為對比實驗,本文在sensor_id上使用fieldGrouping來對所有數據進行分區(圖例中naive算法),而Storm的拓撲結構保持不變。這樣,所有的傳感器會以隨機的方式分配到不同的計算節點上進行運算,可以有效地檢驗本文所述優化和算法的有效性。

本文使用兩個性能指標:通信成本和節省的代碼行。通信成本通過每分鐘Filter Bolt與Calc Bolt任務之間傳遞的數據單元的數量來衡量。節省的代碼行是將腳本語言的行數和直接在Storm上編寫代碼執行所有流處理邏輯的代碼行數的比較,該參數可以直觀地測量為傳感器監測專家節省的工作量。

本文使用兩個評估參數:計算涉及傳感器的數量n和計算中共享的傳感器的數量ns。n可以反映工作量的復雜性,而ns可以反映可以共享的數據量,也就是整個計算圖的連接性。

系統的通信成本遠低于對比方法,在各種計算量的實驗中,平均提高了20%,并且特別在大計算量的實驗中更顯著(見圖4)。這是因為當計算量更大時,會存在更多的重復計算的優化空間,證明了分區算法的優勢。

圖4 網絡傳輸量對比實驗結果

本文使用16個相同數據發送頻率的傳感器進行了另一組腳本實驗,并且構造了相同的計算邏輯。唯一的區別是本文改變不同的sensor_id以獲得不同的ns來改變數據計算的可共享性。當ns增加時,系統的表現遠遠好于對比算法(參見圖5),實驗表明本文算法更好地利用了可利用的先驗知識讓計算盡可能在本地進行??梢钥闯?,本系統在共享8個傳感器的計算中,達到最多20%的網絡傳輸減少,進一步體現了本文算法的有效性。

圖5 網絡傳輸量和計算可共享傳感器數量關系實驗結果

另外本文測試了直接編寫Java代碼來直接實現計算邏輯,并和精簡的腳本語言進行比較。實驗發現:如果代碼不是由系統自動生成的,將會需要完成大量的重復編碼工作。從表1中看出,尤其是當查詢數量很大時,腳本行數相對于Java的代碼行數有了極大的減少,這說明了本文提供的腳本語言為傳感器監控專家節省了大量的精力。

表1 腳本優化情況比較

4 結 語

本文提出了大規模傳感器流數據中的實時聚合計算框架的方法。主要貢獻是提供了適合這一類計算的簡單易用的腳本語言和相應的分布式計算系統。腳本語言使分析人員能夠在滑動窗口的聚合的組合中構建自己的計算邏輯。同時,該系統平臺可以將腳本語言解釋為Storm拓撲,使用智能計算和分組方法來顯著提高性能。實驗證明了本文所述的系統使得傳感器監控專家從編碼工作中解脫,在計算大規模傳感器的應用中降低了流處理的通信成本,從而能夠在分布式環境中處理大量的查詢。

猜你喜歡
數據流滑動分區
用于彎管機的鋼管自動上料裝置
優先級驅動的泛化航電網絡實時性能分析
貴州省地質災害易發分區圖
上海實施“分區封控”
汽車維修數據流基礎(上)
汽車維修數據流基礎(下)
基于XML的數據流轉換在民航離港系統中應用
針對移動端設計的基于滑動響應方式的驗證碼研究
Big Little lies: No One Is Perfect
大型數據庫分區表研究
91香蕉高清国产线观看免费-97夜夜澡人人爽人人喊a-99久久久无码国产精品9-国产亚洲日韩欧美综合