Luigi是一款管道作業(yè)流程配置方案,可以幫助用戶快速構(gòu)建自動化流程管道,將復(fù)雜的任務(wù)建立聯(lián)系,為任務(wù)配置執(zhí)行方式,設(shè)置調(diào)度方案,讓任務(wù)按照您配置的工作流程自動運行,方便以后快速檢測哪個環(huán)節(jié)出現(xiàn)故障,對于需要建立工作流,需要可視化工作任務(wù)的朋友很有幫助,在處理數(shù)據(jù)的時候就可以通過這款軟件建立自動化工作方式,可以使用內(nèi)置的任務(wù)模板快速創(chuàng)建工作流管理方案,可以設(shè)置任務(wù)依存關(guān)系,可以設(shè)置任務(wù)數(shù)據(jù)輸出方式,可以設(shè)置作業(yè)相關(guān)范圍,通過流程化的方式讓你的工作可視化!
Luigi軟件功能
Luigi是一個Python(已測試3.6、3.7、3.8、3.9)軟件包,可幫助您構(gòu)建批處理作業(yè)的復(fù)雜管道。它處理依賴關(guān)系解析,工作流管理,可視化,處理故障,命令行集成等。
Luigi的目的是解決通常與長時間運行的批處理相關(guān)的所有管道問題。您希望將許多任務(wù)鏈接起來,使其自動化,否則將發(fā)生故障。這些任務(wù)可以是任何事情,但通常都是長期運行的任務(wù),例如 Hadoop作業(yè),向數(shù)據(jù)庫轉(zhuǎn)儲數(shù)據(jù)或從數(shù)據(jù)庫轉(zhuǎn)儲數(shù)據(jù),運行機器學(xué)習(xí)算法或其他任何事情。
還有其他一些軟件包專注于較低級別的數(shù)據(jù)處理,例如Hive, Pig或 Cascading。Luigi并不是替代這些框架。相反,它可以幫助您將許多任務(wù)組合在一起,其中每個任務(wù)可以是Hive查詢,Java中的Hadoop作業(yè),Scala或Python中的 Spark作業(yè),Python代碼段, 從數(shù)據(jù)庫中轉(zhuǎn)儲表或其他任何內(nèi)容。建立包含數(shù)千個任務(wù)且需要幾天或幾周才能完成的長期運行的管道很容易。Luigi負(fù)責(zé)許多工作流管理,因此您可以專注于任務(wù)本身及其依賴。
您幾乎可以構(gòu)建任何所需的任務(wù),但是Luigi還附帶了一個包含多個常用任務(wù)模板的工具箱。它包括對在Hadoop中運行Python mapreduce作業(yè)以及 Hive和Pig作業(yè)的支持。它還帶有 HDFS的文件系統(tǒng)象和本地文件,以確保所有文件系統(tǒng)作都是原子的。這很重要,因為這意味著您的數(shù)據(jù)管道在包含部分?jǐn)?shù)據(jù)的狀態(tài)下不會崩潰。
Luigi軟件特色
可視化器頁面
Luigi也帶有Web界面,因此您可以在所有任務(wù)中進行搜索和過濾。
依賴圖示例
只是為了讓您了解Luigi的功能,這是我們在生產(chǎn)中正在運行的內(nèi)容的屏幕截圖。使用Luigi的可視化工具,我們可以很好地直觀地看到工作流程的依賴關(guān)系圖。每個節(jié)點代表一個必須運行的任務(wù)。綠色任務(wù)已經(jīng)完成,而任務(wù)尚未運行。這些任務(wù)大多數(shù)是Hadoop作業(yè),但也有些事情在本地運行并建立數(shù)據(jù)文件。
從概念上講,Luigi與U Make相似,其中您有某些任務(wù),而這些任務(wù)又可能依賴于其他任務(wù)。與Oozie 和Azkaban也有一些相似之處。一個主要的區(qū)別是Luigi不僅是專門為Hadoop構(gòu)建的,而且很容易通過其他類型的任務(wù)對其進行擴展。
Luigi中的所有內(nèi)容都在Python中。代替XML配置或類似的外部數(shù)據(jù)文件中,依賴圖被指定內(nèi)的Python。這使構(gòu)建任務(wù)的復(fù)雜依賴關(guān)系圖變得容易,其中依賴關(guān)系可能涉及期代數(shù)或?qū)ν蝗蝿?wù)其他版本的遞歸引用。但是,工作流可能會觸發(fā)Python以外的事件,例如運行 Pig腳本 或scp’ing文件。
Luigi教程
執(zhí)行模型
Luigi有一個非常簡單的執(zhí)行和觸發(fā)模型。
工人和任務(wù)執(zhí)行最重要的方面是沒有執(zhí)行被轉(zhuǎn)移。當(dāng)您運行Luigi工作流時,將計劃所有任務(wù),并在流程中執(zhí)行這些任務(wù)。
這種方案的好處是,由于所有執(zhí)行都在流程中進行,因此它非常易于調(diào)試。這也使部署成為非事件。在開發(fā)過程中,通常可以從命令行運行Luigi工作流,而在部署它時,可以使用cntab或任何其他調(diào)度程序來觸發(fā)它。
缺點是Luigi不會免費提供可伸縮。實際上,在開始運行數(shù)千個任務(wù)之前,這不是問題。
Luigi自動化和安排這些工作流程的目的不是嗎?在某種程度上。Luigi可幫助您對任務(wù)的依賴進行編碼并建立鏈。此外,Luigi的調(diào)度程序可確保對依賴關(guān)系圖有一個集中的視圖,并且不會由多個同時執(zhí)行同一作業(yè)。
排程器客戶端僅在run()單線程調(diào)度程序允許的情況下啟動任務(wù)的方法。由于任務(wù)數(shù)量通常很少(與一個任務(wù)正在處理的PB數(shù)據(jù)相比),我們可以負(fù)擔(dān)得起簡單的集中式的便利。
觸發(fā)任務(wù)
Luigi不包括其自身的觸發(fā),因此您必須依外部調(diào)度程序(例如cntab)來實際觸發(fā)工作流程。
在實踐中,這并不是一個大障礙,因為Luigi避免了通常由它引起的所有混亂。使用例如,安排復(fù)雜的工作流程非常簡單。cntab。
將來,Luigi可能會實現(xiàn)自己的觸發(fā)。對cntab的依賴(或任何外部觸發(fā)機制)有點尷尬,可以避免。
觸發(fā)范例例如,如果您每天都有一個外部數(shù)據(jù)轉(zhuǎn)儲,并且您的工作流依賴于此,則編寫一個依賴于此數(shù)據(jù)轉(zhuǎn)儲的工作流。,Cntab可以每分鐘觸發(fā)一次此工作流程,以數(shù)據(jù)是否到達(dá)。如果有,它將運行完整的依賴關(guān)系圖。
在您的cnline中,您會遇到類似
30 0 * * * my-user luigi RunAll –module my_tasks
您可以從cntab甚至在多臺計算機之間觸發(fā)任意次數(shù)的觸發(fā),因為調(diào)度程序?qū)⒋_保每個AggregationTask任務(wù)最多同時運行。請注意,這實際上可能意味著可以運行多個任務(wù),因為存在具有不同參數(shù)的實例,并且這可以為您提供某種形式的并行化(例如,AggregationTask(2013-01-09)可以與并行運行AggregationTask(2013-01-08))。
當(dāng)然,某些Task類型(例如HadoopJobTask)可以將執(zhí)行轉(zhuǎn)移到其他地方,但這取決于每個Task的定義。
Luigi圖案
代碼重用
Luigi的一個優(yōu)點是,依賴于其他存儲庫中定義的任務(wù)非常容易。在執(zhí)行路徑中包含“ forks”也很簡單,在該路徑中,一項任務(wù)的輸出可能成為許多其他任務(wù)的輸入。
當(dāng)前,不支持“中間”輸出的語義,這意味著所有輸出將無限期保留。這樣做的好處是,如果嘗試運行X-> Y,并且Y崩潰,則可以使用以前構(gòu)建的X恢復(fù)。缺點是,文件系統(tǒng)上會有很多中間結(jié)果。一種有用的模式是將這些文件放在一個特殊的目錄中,并進行某種定期的垃圾收集清理。
觸發(fā)許多任務(wù)
一種方便的模式是在幾個依賴關(guān)系鏈的末尾有一個虛擬Task,因此您可以通過在命令行中僅指定一個任務(wù)來觸發(fā)多個管道,類似于make的 工作方式。
這個簡單的任務(wù)本身不會做任何事情,但是會調(diào)用許多其他任務(wù)。每次調(diào)用時,Luigi將執(zhí)行盡可能多的起作業(yè)(具有所有依賴項的作業(yè))。
您將需要使用WrapperTask它而不是通常的Task類,因為此作業(yè)不會產(chǎn)生自己的任何輸出,因此需要一種方式指示完成的時間。此類用于僅包裝其他任務(wù)的任務(wù),并且根據(jù)定義,如果所有任務(wù)均存在,則完成該任務(wù)。
觸發(fā)周期任務(wù)
一個常見的要求是每晚產(chǎn)生一份每報告(或其他報告)。有時,由于各種原因,任務(wù)將持續(xù)崩潰或缺少所需依賴項的時間超過一天,這將導(dǎo)致某個期缺少可交付成果。哎呀。
為了確保上述AllReports任務(wù)最終每天完成一次(date參數(shù)的值),可以例如在require。方法中添加一個循環(huán),以產(chǎn)生對self.date之前的幾天的依賴關(guān)系。,只要Luigi不斷被調(diào)用,解決間歇問題后,積壓的工作量就會很好地追上來。
Luigi實際上帶有一個可重用的工具來實現(xiàn)這一目標(biāo),稱為RangeDailyBase(resp。RangeHourlyBase)。簡單地說
luigi –module all_reports RangeDailyBase –of AllReports –start 2015-01-01
從2015年1月1起,您的cntab中的代碼將很容易避免出現(xiàn)差距。注意:從2015年1月1到當(dāng)前時間,它不會一直循環(huán)播放,但默認(rèn)情況下最多為3個月前-請參閱RangeDailyBase此文檔以及更多調(diào)整行為的旋鈕。另請參見下面的監(jiān)控。
有效觸發(fā)重復(fù)任務(wù)
如上所述,RangeDailyBase之所以這樣命名,是因為存在一個更有效的子類RangeDaily(resp。RangeHourly),該子類是針對數(shù)百個任務(wù)類量身定制的,這些任務(wù)類是在連續(xù)多年的連續(xù)需求下同時進行調(diào)度的(這將導(dǎo)致使用天真的循環(huán)方法進行冗余完整和調(diào)度程序過載)。 ) 用法:
luigi –module all_reports RangeDaily –of AllReports –start 2015-01-01
它具有與RangeDailyBase相同的旋鈕,但有一些附加要求。也就是說,該任務(wù)必須實現(xiàn)有效的bulk_complete方法,或者必須將輸出寫入文件系統(tǒng)Target中,且期參數(shù)值始終在文件路徑中表示。
回填任務(wù)
這也是一個常見的用例,有時您已經(jīng)調(diào)整了現(xiàn)有的重復(fù)任務(wù)代碼,并且出于某種原因或其他原因,希望將其重新計算的時間間隔定為一個期。最方便的是使用上述范圍工具,只需指定start(包括)和stop(排除)參數(shù)即可:
luigi –module all_reports RangeDaily –of AllReportsV2 –start 2014-10-31 –stop 2014-12-25
將多個參數(shù)值批處理為單次運行
有時,將多個作業(yè)作為一處理一起運行要比單獨運行每個作業(yè)更快。在這種情況下,您可以在構(gòu)造函數(shù)中使用batch_method標(biāo)記某些參數(shù),以告訴如何組合多個值。一種常見的實現(xiàn)方法是簡單地運行最大值。這對于在運行較新的數(shù)據(jù)時覆蓋較舊的數(shù)據(jù)的任務(wù)很有用。您可以通過將batch_method設(shè)置為max來完成此作,如下所示:
令人興奮的是,如果您向調(diào)度程序發(fā)送多個As,它可以將它們組合并返回一個。所以,如果A(date=2016-07-28),A(date=2016-07-29)并且 A(date=2016-07-30)都準(zhǔn)備運行,你將開始運行A(date=2016-07-30)。雖然這是正在運行,調(diào)度程序?qū)@示A(date=2016-07-28),A(date=2016-07-29)批量運行,同時 A(date=2016-07-30)運行。當(dāng)A(date=2016-07-30)完成運行,成為失敗或完成,另外兩個任務(wù)將更新為相同的狀態(tài)。
如果要限制批生產(chǎn)的數(shù)量,只需設(shè)置max_batch_size。所以如果你有
那么調(diào)度程序最多將一起批處理10個作業(yè)。您可能不希望使用max batch方法執(zhí)行此作,但是如果您使用其他方法,則可能會有所幫助。您可以使用任何采用參數(shù)值列表并返回單個參數(shù)值的方法。
如果您有兩個最大批處理參數(shù),則將獲得兩個參數(shù)的最大值。如果您的參數(shù)沒有批處理方法,則會將它們分別匯總。所以如果你有一個像
你創(chuàng)建任務(wù),, ,你會得到他們分批為和。A(p1=1, p2=2, p3=0)A(p1=2, p2=3, p3=0)A(p1=3, p2=4, p3=1)A(p1=2, p2=3, p3=0)A(p1=3, p2=4, p3=1)
請注意,批處理任務(wù)不會占用[resources],只有最終運行的任務(wù)才會使用資源。調(diào)度程序僅在將每個任務(wù)一起批處理之前,才分別每個任務(wù)是否有足夠的資源。
定期覆蓋相同數(shù)據(jù)源的任務(wù)
如果每次運行都覆蓋相同的數(shù)據(jù)源,則需要確保不能同時運行兩處理。您可以通過將batch_method設(shè)置為max并設(shè)置唯一的資源來輕松地做到這一點:
現(xiàn)在,如果你有多個任務(wù),如A(date=2016-06-01), A(date=2016-06-02),A(date=2016-06-03),調(diào)度只會讓你運行的最高可用一個并標(biāo)記較下層為batch_running。如果有新任務(wù)可用,而其他任務(wù)正在運行,則使用唯一資源將阻止多個任務(wù)同時寫入同一位置。
避免同時寫入單個文件
從多個任務(wù)更新單個文件幾乎總是一個壞主意,并且在執(zhí)行此作之前,您必須非常確信沒有其他好的解決方案。但是,如果沒有其他選擇,則可能至少需要確保沒有兩個任務(wù)試圖_simultaneously_寫入文件。
通過將“資源”轉(zhuǎn)換為Python屬,它可以返回取決于任務(wù)參數(shù)或其他動態(tài)屬的值:
由于默認(rèn)情況下,資源的使用限制為1,如果它們具有相同的Important_file_name屬,則將不會運行任務(wù)A的兩個實例。
減少正在運行的任務(wù)的資源
在調(diào)度時,luigi調(diào)度程序需要知道任務(wù)運行后可能具有的最大資源消耗。但是,對于某些任務(wù),在其運行方法內(nèi)的兩個步驟之間減少消耗的資源量(例如在進行大量計算之后)可能是有益的。在這種情況下,已經(jīng)可以調(diào)度等待該特定資源的其他任務(wù)。
監(jiān)視任務(wù)管道
Luigi帶有一些現(xiàn)有的方法,luigi.notifications可以在任務(wù)崩潰時接收。電子郵件是最常見的方式。
上面提到的用于重復(fù)任務(wù)的范圍工具不僅可以為您實現(xiàn)可的調(diào)度,而且還可以發(fā)出可用于設(shè)置延遲監(jiān)視的事件。這樣,您就可以在作業(yè)因長時間缺少輸入數(shù)據(jù)或需要其他注意而卡住時實施警報。
原子寫問題
luigi水管工經(jīng)常犯的一個普遍錯誤是將數(shù)據(jù)部分寫入最終目的地,而不是原子地。出現(xiàn)問題是因為luigi中的完成與運行一樣幼稚 luigi.target.Target.exists()。在許多情況下,這僅意味著磁盤上是否存在文件夾。在我們部分寫入數(shù)據(jù)的時間內(nèi),根據(jù)該輸出執(zhí)行的任務(wù)將認(rèn)為其輸入已完成。可能會產(chǎn)生的影響,例如在感恩節(jié)錯誤中。
可以通過想象我們處理本地磁盤上存儲的數(shù)據(jù)并通過運行命令來說明該概念:
如前所述,問題在于在一段時間內(nèi)僅存在部分?jǐn)?shù)據(jù),但是我們認(rèn)為該數(shù)據(jù)是complete()因為輸出文件夾已經(jīng)存在。這是一個強大的版本:
確實,好的方法并不那么瑣碎。它涉及到提供一個唯一的目錄名和一個相當(dāng)復(fù)雜的mv行,之所以mv需要這些都是因為我們不想mv將目錄移到潛在的現(xiàn)有目錄中。在特殊情況下,例如,當(dāng)鎖定失敗并且同一任務(wù)將以某種方式同時運行兩次時,目錄可能已經(jīng)存在。最后,在一種從未移動過文件的特殊情況下,可能要刪除從未使用過的臨時目錄。
請注意,這是一個示例,其中存儲位于本地磁盤上。但是,對于每個存儲(硬盤文件,hdfs文件,數(shù)據(jù)庫表等),此過程將有所不同。但是每個luigi用戶都需要實現(xiàn)這種復(fù)雜嗎?不,幸運的是,luigi開發(fā)人員已經(jīng)意識到了這些,并且luigi附帶了許多內(nèi)置解決方案。如果您正在處理文件系統(tǒng)(FileTarget),則應(yīng)考慮使用 temporary_path()。對于其他目標(biāo),應(yīng)確保編寫最終輸出目錄的方式是原子的。
向任務(wù)發(fā)送消息
調(diào)度程序能夠?qū)⑾l(fā)送到特定任務(wù)。當(dāng)正在運行的任務(wù)接受消息時,它可以訪問 存儲傳入消息的multipcessing.Queue對象。您可以實現(xiàn)自定義行為來響應(yīng)和響應(yīng)消息:
可以直接從調(diào)度程序UI發(fā)送消息,該UI還顯示響應(yīng)(如果有)。請注意,僅當(dāng)將調(diào)度程序配置為發(fā)送消息(請參閱[scheduler]配置),并且將任務(wù)配置為接受消息時,此功能才可用。