Presto Worker REST API¶
Presto 的協調器與 Presto Worker 通訊以執行查詢片段並擷取查詢結果。Presto Worker 彼此通訊以交換中間結果。本章說明這些通訊中使用的 REST API。
工作資源用於啟動查詢片段的執行、追蹤狀態並擷取結果。
控制平面¶
協調器使用以下 HTTP 方法啟動查詢片段的執行並追蹤執行狀態。
對
/v1/task/{taskId}
執行POST
會啟動POST
主體中指定的查詢片段執行。請求可選擇性地包含一組要處理的初始分割。請求也指定如何分割結果,例如使用指定的輸出欄將雜湊分割為指定數量的輸出緩衝區,或將所有結果合併為單一輸出緩衝區,或將合併的結果廣播到多個輸出緩衝區。後續對
/v1/task/{taskId}
執行POST
可能會提供額外的分割以進行處理,並最終指定不再有分割。對
/v1/task/{taskId}/status
執行GET
會傳回描述目前執行狀態的TaskStatus
JSON 文件。對
/v1/task/{taskId}
執行GET
會傳回包含執行狀態擴展資訊的TaskInfo
JSON 文件。對
/v1/task/{taskId}
執行DELETE
會刪除已完成的工作或取消進行中的工作。對
/v1/task
執行GET
會傳回包含所有工作之TaskInfo
清單的 JSON 文件。
來自協調器的狀態請求包含兩個 HTTP 標頭:X-Presto-Current-State
和 X-Presto-Max-Wait
。X-Presto-Current-State
指定協調器已知的工作狀態。如果 Worker 上的工作狀態不同,Worker 會立即回覆。如果 Worker 上的工作狀態與協調器上的相同,Worker 會等待工作狀態變更後再回覆。X-Presto-Max-Wait
HTTP 標頭指定最長等待時間。即使工作狀態保持不變,Worker 也會在這麼長的時間後回覆。
此設計可確保協調器及時收到工作狀態變更,而無需在緊密迴圈中輪詢 Worker。
相同的設計適用於透過對 /v1/task/{taskId}
執行 GET
來請求擴展工作資訊。
資料平面¶
協調器使用以下 HTTP 方法來擷取最終查詢結果,或下游 Worker 從上游 Worker 擷取中間結果。
對
{taskId}/results/{bufferId}/{token}
執行GET
會從指定的輸出緩衝區傳回下一批結果。確認收到前一批結果。對
{taskId}/results/{bufferId}/{token}/acknowledge
執行GET
會確認收到結果,並允許 Worker 刪除結果。如果發生錯誤,對
{taskId}/results/{bufferId}
執行DELETE
會從指定的輸出緩衝區中刪除所有結果。或者,可以對
{taskId}/results/{bufferId}
提出HEAD
請求,以擷取任何與資料頁面順序無關的標頭。使用此標頭檢查緩衝區是否已完成,或查看緩衝了多少資料,而無需擷取資料。確認收到前一批結果。
協調器和 Worker 會以區塊擷取結果。他們使用 X-Presto-Max-Size
HTTP 標頭指定區塊的最大大小(以位元組為單位)。每個區塊都由一個單調遞增的序號識別,有時稱為權杖。第一個結果請求指定序號為零。回應包括
以
X-Presto-Page-Sequence-Id
HTTP 標頭表示要求的序號,以
X-Presto-Page-End-Sequence-Id
HTTP 標頭表示用於確認收到區塊並請求下一個區塊的序號,以
X-Presto-Buffer-Complete
HTTP 標頭表示沒有更多結果,值為true
。以
X-Presto-Buffer-Remaining-Bytes
HTTP 標頭表示輸出緩衝區中剩餘的緩衝位元組數。這應該傳回一個逗號分隔的清單,其中包含下一個請求中可以傳回的頁面大小(以位元組為單位)。這可以用作對上游工作的提示,以最佳化資料交換。
回應的主體包含SerializedPage 線路格式中的頁面清單。
收到第一批結果後,用戶端會使用 X-Presto-Page-End-Sequence-Id
序號來請求下一批結果。請求下一批會自動確認收到前一批。用戶端會持續擷取結果,直到收到 X-Presto-Buffer-Complete
HTTP 標頭,值為 true
。
當用戶端決定不立即擷取下一批資料時,它會使用對 {taskId}/results/{bufferId}/{token}/acknowledge
的 GET 來傳送明確的確認。用戶端會將權杖設定為先前收到的 X-Presto-Page-End-Sequence-Id
標頭的值。
如果 Worker 逾時填入回應,或工作已失敗或中止,Worker 將傳回空的結果。用戶端可以嘗試重試請求。在工作處於終端狀態的情況下,假設控制平面最終會處理狀態變更。
如果用戶端遺失了回應,它可以重複請求,而 Worker 將再次傳送結果。在收到序號的確認後,Worker 會刪除所有序號小於該序號的結果,且用戶端無法再重新擷取這些結果。
以下是從輸出緩衝區零擷取雙區塊結果的訊息傳遞範例圖。

輸出緩衝區¶
資料隨機處理涉及下游階段中的 Worker 從上游階段中的 Worker 擷取結果。每個產生上游的 Worker 會設定與下游階段中 Worker 數量一樣多的輸出緩衝區。輸出緩衝區由從零開始的連續編號識別。每個下游 Worker 都會獲指派一個輸出緩衝區,並使用它從所有上游 Worker 擷取結果。
下圖顯示 3 個下游 Worker。它們獲指派輸出緩衝區編號 0、1 和 2。每個上游 Worker 都有 3 個輸出緩衝區。下游 Worker #0 使用緩衝區編號 0 從所有上游 Worker 擷取結果。下游 Worker #1 使用緩衝區編號 1 從所有上游 Worker 擷取結果。下游 Worker #2 使用緩衝區編號 2 從所有上游 Worker 擷取結果。

錯誤處理¶
工作失敗會透過 TaskStatus
和 TaskInfo
更新報告給協調器。
當發現工作失敗時,協調器會中止所有剩餘的工作,並向用戶端報告查詢失敗。當發生工作失敗或收到中止請求時,所有進一步的處理都會停止,且所有剩餘的工作輸出都會被捨棄。
為了防止連鎖故障,失敗或中止的任務會像平常一樣繼續回應資料平面請求。因為輸出在失敗時會被完全丟棄,所有後續的回應都是空的。 X-Presto-Buffer-Complete
標頭會被設定為 false
,以防止下游任務成功完成並產生不正確的結果。
對客戶端來說,這些回應與健康任務的回應沒有區別。為了避免請求突發,在回應空結果集之前,會套用一個標準延遲。
問題診斷¶
HTTP 請求日誌可以幫助診斷與協定相關的問題。
可以通過 config.properties
檔案啟用請求日誌。
在 Presto 中
http-server.log.enabled=true
http-server.log.path=<request_log_file_path>
在 Prestissimo 中(日誌會寫入標準日誌)
http-server.enable-access-log=true
使用 grep 來追蹤特定的協定互動。
一次 Exchange
cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results'
I0402 15:33:06.928076 625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:06] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 0 57
I0402 15:33:07.181629 625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:07] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 94024 0
I0402 15:33:25.392717 675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/1 HTTP/1.1" 200 0 0
I0402 15:33:25.393162 675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "DELETE /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213 HTTP/1.1" 200 0 0
一個 TaskStatus
更新
cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status'
I0402 15:33:34.629278 668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:34] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739 1000
I0402 15:33:35.636466 668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:35] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739 1000
I0402 15:33:36.644189 668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739 1000
I0402 15:33:36.768704 668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 717 115
日誌記錄包含諸如回應狀態、回應大小和回應時間等資訊,這些資訊有助於理解互動流程,包括在檢查它們時的延遲和逾時。