小米業(yè)務(wù)線眾多,從信息流,電商,廣告到金融等覆蓋了眾多領(lǐng)域,小米流式平臺為小米集團(tuán)各業(yè)務(wù)提供一體化的流式數(shù)據(jù)解決方案,主要包括數(shù)據(jù)采集,數(shù)據(jù)集成和流式計(jì)算三個(gè)模塊。目前每天數(shù)據(jù)量達(dá)到 1.2 萬億條,實(shí)時(shí)同步任務(wù) 1.5 萬,實(shí)時(shí)計(jì)算的數(shù)據(jù) 1 萬億條。
伴隨著小米業(yè)務(wù)的發(fā)展,流式平臺也經(jīng)歷三次大升級改造,滿足了眾多業(yè)務(wù)的各種需求。最新的一次迭代基于 Apache Flink,對于流式平臺內(nèi)部模塊進(jìn)行了徹底的重構(gòu),同時(shí)小米各業(yè)務(wù)也在由 Spark Streaming 逐步切換到 Flink。
背景介紹
小米流式平臺的愿景是為小米所有的業(yè)務(wù)線提供流式數(shù)據(jù)的一體化、平臺化解決方案。具體來講包括以下三個(gè)方面:
流式數(shù)據(jù)存儲:流式數(shù)據(jù)存儲指的是消息隊(duì)列,小米開發(fā)了一套自己的消息隊(duì)列,其類似于 Apache kafka,但它有自己的特點(diǎn),小米流式平臺提供消息隊(duì)列的存儲功能;
流式數(shù)據(jù)接入和轉(zhuǎn)儲:有了消息隊(duì)列來做流式數(shù)據(jù)的緩存區(qū)之后,繼而需要提供流式數(shù)據(jù)接入和轉(zhuǎn)儲的功能;
流式數(shù)據(jù)處理:指的是平臺基于 Flink、Spark Streaming 和 Storm 等計(jì)算引擎對流式數(shù)據(jù)進(jìn)行處理的過程。
下圖展示了流式平臺的整體架構(gòu)。從左到右第一列橙色部分是數(shù)據(jù)源,包含兩部分,即 User 和 Database。
User 指的是用戶各種各樣的埋點(diǎn)數(shù)據(jù),如用戶 APP 和 WebServer 的日志,其次是 Database 數(shù)據(jù),如 MySQL、HBase 和其他的 RDS 數(shù)據(jù)。
中間藍(lán)色部分是流式平臺的具體內(nèi)容,其中 Talos 是小米實(shí)現(xiàn)的消息隊(duì)列,其上層包含 Consumer SDK 和 Producer SDK。
此外小米還實(shí)現(xiàn)了一套完整的 Talos Source,主要用于收集剛才提到的用戶和數(shù)據(jù)庫的全場景的數(shù)據(jù)。
Talos Sink 和 Source 共同組合成一個(gè)數(shù)據(jù)流服務(wù),主要負(fù)責(zé)將 Talos 的數(shù)據(jù)以極低的延遲轉(zhuǎn)儲到其他系統(tǒng)中;Sink 是一套標(biāo)準(zhǔn)化的服務(wù),但其不夠定制化,后續(xù)會基于 Flink SQL 重構(gòu) Talos Sink 模塊。
下圖展示了小米的業(yè)務(wù)規(guī)模。在存儲層面小米每天大概有 1.2 萬億條消息,峰值流量可以達(dá)到 4300 萬條每秒。轉(zhuǎn)儲模塊僅 Talos Sink 每天轉(zhuǎn)儲的數(shù)據(jù)量就高達(dá) 1.6 PB,轉(zhuǎn)儲作業(yè)目前將近有 1.5 萬個(gè)。每天的流式計(jì)算作業(yè)超過 800 個(gè),F(xiàn)link 作業(yè)超過 200 個(gè),F(xiàn)link 每天處理的消息量可以達(dá)到 7000 億條,數(shù)據(jù)量在 1 PB 以上。
小米流式平臺發(fā)展歷史
小米流式平臺發(fā)展歷史分為如下三個(gè)階段:
Streaming Platform 1.0:小米流式平臺的 1.0 版本構(gòu)建于 2010 年,其最初使用的是 Scribe、Kafka 和 Storm,其中 Scribe 是一套解決數(shù)據(jù)收集和數(shù)據(jù)轉(zhuǎn)儲的服務(wù)。
Streaming Platform 2.0:由于 1.0 版本存在的種種問題,我們自研了小米自己的消息隊(duì)列 Talos,還包括 Talos Source、Talos Sink,并接入了 Spark Streaming。
Streaming Platform 3.0:該版本在上一個(gè)版本的基礎(chǔ)上增加了 Schema 的支持,還引入了 Flink 和 Stream SQL。
Streaming Platform 1.0 整體是一個(gè)級聯(lián)的服務(wù),前面包括 Scribe Agent 和 Scribe Server 的多級級聯(lián),主要用于收集數(shù)據(jù),然后滿足離線計(jì)算和實(shí)時(shí)計(jì)算的場景。離線計(jì)算使用的是 HDFS 和 Hive,實(shí)時(shí)計(jì)算使用的是 Kafka 和 Storm。雖然這種離線加實(shí)時(shí)的方式可以基本滿足小米當(dāng)時(shí)的業(yè)務(wù)需求,但也存在一系列的問題。
首先是 Scribe Agent 過多,而配置和包管理機(jī)制缺乏,導(dǎo)致維護(hù)成本非常高;
Scribe 采用的 Push 架構(gòu),異常情況下無法有效緩存數(shù)據(jù),同時(shí) HDFS / Kafka 數(shù)據(jù)相互影響;
最后數(shù)據(jù)鏈級聯(lián)比較長的時(shí)候,整個(gè)全鏈路數(shù)據(jù)黑盒,缺乏監(jiān)控和數(shù)據(jù)檢驗(yàn)機(jī)制。
為了解決 Streaming Platform 1.0 的問題,小米推出了 Streaming Platform 2.0 版本。該版本引入了 Talos,將其作為數(shù)據(jù)緩存區(qū)來進(jìn)行流式數(shù)據(jù)的存儲,左側(cè)是多種多樣的數(shù)據(jù)源,右側(cè)是多種多樣的 Sink,即將原本的級聯(lián)架構(gòu)轉(zhuǎn)換成星型架構(gòu),優(yōu)點(diǎn)是方便地?cái)U(kuò)展。
由于 Agent 自身數(shù)量及管理的流較多(具體數(shù)據(jù)均在萬級別),為此該版本實(shí)現(xiàn)了一套配置管理和包管理系統(tǒng),可以支持 Agent 一次配置之后的自動更新和重啟等。
此外,小米還實(shí)現(xiàn)了去中心化的配置服務(wù),配置文件設(shè)定好后可以自動地分發(fā)到分布式結(jié)點(diǎn)上去。
最后,該版本還實(shí)現(xiàn)了數(shù)據(jù)的端到端監(jiān)控,通過埋點(diǎn)來監(jiān)控?cái)?shù)據(jù)在整個(gè)鏈路上的數(shù)據(jù)丟失情況和數(shù)據(jù)傳輸延遲情況等。
Streaming Platform 2.0 的優(yōu)勢主要有:
引入了 Multi Source & Multi Sink,之前兩個(gè)系統(tǒng)之間導(dǎo)數(shù)據(jù)需要直接連接,現(xiàn)在的架構(gòu)將系統(tǒng)集成復(fù)雜度由原來的 O(M*N) 降低為 O(M+N);
引入配置管理和包管理機(jī)制,徹底解決系統(tǒng)升級、修改和上線等一系列問題,降低運(yùn)維的壓力;
引入端到端數(shù)據(jù)監(jiān)控機(jī)制,實(shí)現(xiàn)全鏈路數(shù)據(jù)監(jiān)控,量化全鏈路數(shù)據(jù)質(zhì)量;
產(chǎn)品化解決方案,避免重復(fù)建設(shè),解決業(yè)務(wù)運(yùn)維問題。
下圖詳細(xì)介紹一下 MySQL 同步的案例,場景是將 MySQL 的一個(gè)表通過上述的機(jī)制同步到消息隊(duì)列 Talos。具體流程是 Binlog 服務(wù)偽裝成 MySQL 的 Slave,向 MySQL 發(fā)送 Dump binlog 請求;MySQL 收到 Dump 請求后,開始推動 Binlog 給 Binlog 服務(wù);Binlog 服務(wù)將 binlog 以嚴(yán)格有序的形式轉(zhuǎn)儲到 Talos。之后會接入 Spark Streaming 作業(yè),對 binlog 進(jìn)行解析,解析結(jié)果寫入到 Kudu 表中。目前平臺支持寫入到 Kudu 中的表的數(shù)量級超過 3000 個(gè)。
Agent Source 的功能模塊如下圖所示。其支持 RPC、Http 協(xié)議,并可以通過 File 來監(jiān)聽本地文件,實(shí)現(xiàn)內(nèi)存和文件雙緩存,保證數(shù)據(jù)的高可靠。平臺基于 RPC 協(xié)議實(shí)現(xiàn)了 Logger Appender 和 RPC 協(xié)議的 SDK;對于 Http 協(xié)議實(shí)現(xiàn)了 HttpClient;對于文件實(shí)現(xiàn)了 File Watcher 來對本地文件進(jìn)行自動地發(fā)現(xiàn)和掃描,Offset Manager 自動記錄 offset;Agent 機(jī)制與 K8S 環(huán)境深度整合,可以很容易地和后端的流式計(jì)算等相結(jié)合。
下圖是 Talos Sink 的邏輯流程圖,其基于 Spark Streaming 來實(shí)現(xiàn)一系列流程。最左側(cè)是一系列 Talos Topic 的 Partition 分片,基于每個(gè) batch 抽象公共邏輯,如 startProcessBatch() 和 stopProcessBatch(),不同 Sink 只需要實(shí)現(xiàn) Write 邏輯;不同的 Sink 獨(dú)立為不同的作業(yè),避免相互影響;Sink 在 Spark Streaming 基礎(chǔ)上進(jìn)行了優(yōu)化,實(shí)現(xiàn)了根據(jù) Topic 流量進(jìn)行動態(tài)資源調(diào)度,保證系統(tǒng)延遲的前提下最大限度節(jié)省資源。
下圖是平臺實(shí)現(xiàn)的端到端數(shù)據(jù)監(jiān)控機(jī)制。具體實(shí)現(xiàn)是為每個(gè)消息都有一個(gè)時(shí)間戳 EventTime,表示這個(gè)消息真正生成的時(shí)間,根據(jù) EventTime 來劃分時(shí)間窗口,窗口大小為一分鐘,數(shù)據(jù)傳輸?shù)拿恳惶y(tǒng)計(jì)當(dāng)前時(shí)間窗口內(nèi)接受到的消息數(shù)量,最后統(tǒng)計(jì)出消息的完整度。延遲是計(jì)算某一跳 ProcessTime 和 EventTime 之間的差值。
Streaming Platform 2.0 目前的問題主要有三點(diǎn):
Talos 數(shù)據(jù)缺乏 Schema 管理,Talos 對于傳入的數(shù)據(jù)是不理解的,這種情況下無法使用 SQL 來消費(fèi) Talos 的數(shù)據(jù);
Talos Sink 模塊不支持定制化需求,例如從 Talos 將數(shù)據(jù)傳輸?shù)?Kudu 中,Talos 中有十個(gè)字段,但 Kudu 中只需要 5 個(gè)字段,該功能目前無法很好地支持;
Spark Streaming 自身問題,不支持 Event Time,端到端 Exactly Once 語義。
基于 Flink 的實(shí)時(shí)數(shù)倉
為了解決 Streaming Platform 2.0 的上述問題,小米進(jìn)行了大量調(diào)研,也和阿里的實(shí)時(shí)計(jì)算團(tuán)隊(duì)做了一系列溝通和交流,最終決定將使用 Flink 來改造平臺當(dāng)前的流程,下面具體介紹小米流式計(jì)算平臺基于Flink的實(shí)踐。
使用 Flink 對平臺進(jìn)行改造的設(shè)計(jì)理念如下:
全鏈路 Schema 支持,這里的全鏈路不僅包含 Talos 到 Flink 的階段,而是從最開始的數(shù)據(jù)收集階段一直到后端的計(jì)算處理。需要實(shí)現(xiàn)數(shù)據(jù)校驗(yàn)機(jī)制,避免數(shù)據(jù)污染;字段變更和兼容性檢查機(jī)制,在大數(shù)據(jù)場景下,Schema 變更頻繁,兼容性檢查很有必要,借鑒 Kafka 的經(jīng)驗(yàn),在 Schema 引入向前、向后或全兼容檢查機(jī)制。
借助 Flink 社區(qū)的力量全面推進(jìn) Flink 在小米的落地,一方面 Streaming 實(shí)時(shí)計(jì)算的作業(yè)逐漸從 Spark、Storm 遷移到 Flink,保證原本的延遲和資源節(jié)省,目前小米已經(jīng)運(yùn)行了超過 200 個(gè) Flink 作業(yè);另一方面期望用 Flink 改造 Sink 的流程,提升運(yùn)行效率的同時(shí),支持 ETL,在此基礎(chǔ)上大力推進(jìn) Streaming SQL;
實(shí)現(xiàn) Streaming 產(chǎn)品化,引入 Streaming Job 和 Streaming SQL 的平臺化管理;
基于 Flink SQL 改造 Talos Sink,支持業(yè)務(wù)邏輯定制化
下圖是 Streaming Platform 3.0 版本的架構(gòu)圖,與 2.0 版本的架構(gòu)設(shè)計(jì)類似,只是表達(dá)的角度不同。具體包含以下幾個(gè)模塊:
抽象 Table:該版本中各種存儲系統(tǒng)如 MySQL 和 Hive 等都會抽象成 Table,為 SQL 化做準(zhǔn)備。
Job 管理:提供 Streaming 作業(yè)的管理支持,包括多版本支持、配置與Jar分離、編譯部署和作業(yè)狀態(tài)管理等常見的功能。
SQL 管理:SQL 最終要轉(zhuǎn)換為一個(gè) Data Stream 作業(yè),該部分功能主要有 Web IDE 支持、Schema 探查、UDF/維表 Join、SQL 編譯、自動構(gòu)建 DDL 和 SQL 存儲等。
Talos Sink:該模塊基于 SQL 管理對 2.0 版本的 Sink 重構(gòu),包含的功能主要有一鍵建表、Sink 格式自動更新、字段映射、作業(yè)合并、簡單 SQL 和配置管理等。前面提到的場景中,基于 Spark Streaming 將 Message 從 Talos 讀取出來,并原封不動地轉(zhuǎn)到 HDFS 中做離線數(shù)倉的分析,此時(shí)可以直接用 SQL 表達(dá)很方便地實(shí)現(xiàn)。未來希望實(shí)現(xiàn)該模塊與小米內(nèi)部的其他系統(tǒng)如 ElasticSearch 和 Kudu 等進(jìn)行深度整合,具體的場景是假設(shè)已有 Talos Schema,基于 Talos Topic Schema 自動幫助用戶創(chuàng)建 Kudu 表。
平臺化:為用戶提供一體化、平臺化的解決方案,包括調(diào)試開發(fā)、監(jiān)控報(bào)警和運(yùn)維等。
Job 管理
Job 管理提供 Job 全生命周期管理、Job 權(quán)限管理和 Job 標(biāo)簽管理等功能;支持Job 運(yùn)行歷史展示,方便用戶追溯;支持 Job 狀態(tài)與延遲監(jiān)控,可以實(shí)現(xiàn)失敗作業(yè)自動拉起。
SQL 管理
主要包括以下四個(gè)環(huán)節(jié):
將外部表轉(zhuǎn)換為 SQL DDL,對應(yīng) Flink 1.9 中標(biāo)準(zhǔn)的 DDL 語句,主要包含 Table Schema、Table Format 和 Connector Properities。
基于完整定義的外部 SQL 表,增加 SQL 語句,既可以得到完成的表達(dá)用戶的需求。即 SQL Config 表示完整的用戶預(yù)計(jì)表達(dá),由 Source Table DDL、Sink Table DDL 和 SQL DML語句組成。
將 SQL Config 轉(zhuǎn)換成 Job Config,即轉(zhuǎn)換為 Stream Job 的表現(xiàn)形式。
將 Job Config 轉(zhuǎn)換為 JobGraph,用于提交 Flink Job。
外部表轉(zhuǎn)換成 SQL DDL 的流程如下圖所示。
首先根據(jù)外部表獲取 Table Schema 和 Table Format 信息,后者用于反解數(shù)據(jù),如對于 Hive 數(shù)據(jù)反序列化;
然后再后端生成默認(rèn)的 Connector 配置,該配置主要分為三部分,即不可修改的、帶默認(rèn)值的用戶可修改的、不帶默認(rèn)值的用戶必須配置的。
不可修改的配置情況是假設(shè)消費(fèi)的是 Talos 組件,那么 connector.type 一定是 talos,則該配置不需要改;而默認(rèn)值是從 Topic 頭部開始消費(fèi),但用戶可以設(shè)置從尾部開始消費(fèi),這種情況屬于帶默認(rèn)值但是用戶可修改的配置;而一些權(quán)限信息是用戶必須配置的。
之所以做三層配置管理,是為了盡可能減少用戶配置的復(fù)雜度。Table Schema、Table Format 和 Connector 1 其他配置信息,組成了SQL DDL。將 SQL Config 返回給用戶之后,對于可修改的需要用戶填寫,這樣便可以完成從外部表到 SQL DDL 的轉(zhuǎn)換,紅色字體表示的是用戶修改的信息。
SQL 管理引入了一個(gè) External Table 的特性。假設(shè)用戶在平臺上選擇消費(fèi)某個(gè) Topic 的時(shí)候,該特性會自動地獲取上面提到的 Table 的 Schema 和 Format 信息,并且顯示去掉了注冊 Flink Table 的邏輯;獲取 Schema 時(shí),該特性會將外部表字段類型自動轉(zhuǎn)換為 Flink Table 字段類型,并自動注冊為 Flink Tab 了。同時(shí)將 Connector Properties 分成三類,參數(shù)帶默認(rèn)值,只有必須項(xiàng)要求用戶填寫;所有參數(shù)均采用 Map 的形式表達(dá),非常便于后續(xù)轉(zhuǎn)化為 Flink 內(nèi)部的 TableDescriptor。
上面介紹了 SQL DDL 的創(chuàng)建過程,在已經(jīng)創(chuàng)建的 SQL DDL 的基礎(chǔ)上,如 Source SQL DDL 和 Sink SQL DDL,要求用戶填寫 SQL query 并返回給后端,后端會對 SQL 進(jìn)行驗(yàn)證,然后會生成一個(gè) SQL Config,即一個(gè) SQL 語句的完整表達(dá)。
SQL Config 轉(zhuǎn)換為 Job Config 的流程如下圖所示。
首先在 SQL Config 的基礎(chǔ)上增加作業(yè)所需要的資源、Job 的相關(guān)配置(Flink 的 state 參數(shù)等);
然后將 SQLConfig 編譯成一個(gè) Job Descriptor,即 Job Config 的描述,如 Job 的 Jar 包地址、MainClass 和 MainArgs 等。
下圖展示了 Job Config 轉(zhuǎn)換為 Job Graph 的過程。對于 DDL 中的 Schema、Format 和 Property 是和 Flink 中的 Table Descriptor 是一一對應(yīng)的,這種情況下只需要調(diào)用 Flink 的相關(guān)內(nèi)置接口就可以很方便地將信息轉(zhuǎn)換為 Table Descriptor,如 CreateTableSource()、RegistorTableSource() 等。通過上述過程,DDL 便可以注冊到 Flink 系統(tǒng)中直接使用。對于 SQL 語句,可以直接使用 TableEnv 的 sqlUpdate() 可以完成轉(zhuǎn)換。
SQL Config 轉(zhuǎn)換為一個(gè) Template Job 的流程如下所示。前面填寫的 Jar 包地址即該 Template 的 Jar 地址,MainClass 是該 Template Job。假設(shè)已經(jīng)有了 SQL DDL,可以直接轉(zhuǎn)換成 Table Descriptor,然后通過 TableFactorUtil 的 findAndCreateTableSource() 方法得到一個(gè) Table Source,Table Sink 的轉(zhuǎn)換過程類似。完成前兩步操作后,最后進(jìn)行 sqlUpdate() 操作。這樣便可以將一個(gè) SQL Job 轉(zhuǎn)換為最后可執(zhí)行的 Job Graph 提交到集群上運(yùn)行。
Talos Sink 采用了下圖所示的三種模式:
Row:Talos 的數(shù)據(jù)原封不動地灌到目標(biāo)系統(tǒng)中,這種模式的好處是數(shù)據(jù)讀取和寫入的時(shí)候無需進(jìn)行序列化和反序列化,效率較高;
ID mapping:即左右兩邊字段進(jìn)行 mapping,name 對應(yīng) field_name,timestamp 對應(yīng) timestamp,其中 Region 的字段丟掉;
SQL:通過 SQL 表達(dá)來表示邏輯上的處理。
未來規(guī)劃
小米流式平臺未來的計(jì)劃主要有以下幾點(diǎn):
在 Flink 落地的時(shí)候持續(xù)推進(jìn) Streaming Job 和平臺化建設(shè);
使用 Flink SQL 統(tǒng)一離線數(shù)倉和實(shí)時(shí)數(shù)倉;
評論
查看更多