Presto Worker REST API

Presto 的協調器與 Presto Worker 通訊以執行查詢片段並擷取查詢結果。Presto Worker 彼此通訊以交換中間結果。本章說明這些通訊中使用的 REST API。

工作資源用於啟動查詢片段的執行、追蹤狀態並擷取結果。

控制平面

協調器使用以下 HTTP 方法啟動查詢片段的執行並追蹤執行狀態。

  • /v1/task/{taskId} 執行 POST 會啟動 POST 主體中指定的查詢片段執行。請求可選擇性地包含一組要處理的初始分割。請求也指定如何分割結果,例如使用指定的輸出欄將雜湊分割為指定數量的輸出緩衝區,或將所有結果合併為單一輸出緩衝區,或將合併的結果廣播到多個輸出緩衝區。

  • 後續對 /v1/task/{taskId} 執行 POST 可能會提供額外的分割以進行處理,並最終指定不再有分割。

  • /v1/task/{taskId}/status 執行 GET 會傳回描述目前執行狀態的 TaskStatus JSON 文件。

  • /v1/task/{taskId} 執行 GET 會傳回包含執行狀態擴展資訊的 TaskInfo JSON 文件。

  • /v1/task/{taskId} 執行 DELETE 會刪除已完成的工作或取消進行中的工作。

  • /v1/task 執行 GET 會傳回包含所有工作之 TaskInfo 清單的 JSON 文件。

來自協調器的狀態請求包含兩個 HTTP 標頭:X-Presto-Current-StateX-Presto-Max-WaitX-Presto-Current-State 指定協調器已知的工作狀態。如果 Worker 上的工作狀態不同,Worker 會立即回覆。如果 Worker 上的工作狀態與協調器上的相同,Worker 會等待工作狀態變更後再回覆。X-Presto-Max-Wait HTTP 標頭指定最長等待時間。即使工作狀態保持不變,Worker 也會在這麼長的時間後回覆。

此設計可確保協調器及時收到工作狀態變更,而無需在緊密迴圈中輪詢 Worker。

相同的設計適用於透過對 /v1/task/{taskId} 執行 GET 來請求擴展工作資訊。

資料平面

協調器使用以下 HTTP 方法來擷取最終查詢結果,或下游 Worker 從上游 Worker 擷取中間結果。

  • {taskId}/results/{bufferId}/{token} 執行 GET 會從指定的輸出緩衝區傳回下一批結果。確認收到前一批結果。

  • {taskId}/results/{bufferId}/{token}/acknowledge 執行 GET 會確認收到結果,並允許 Worker 刪除結果。

  • 如果發生錯誤,對 {taskId}/results/{bufferId} 執行 DELETE 會從指定的輸出緩衝區中刪除所有結果。

  • 或者,可以對 {taskId}/results/{bufferId} 提出 HEAD 請求,以擷取任何與資料頁面順序無關的標頭。使用此標頭檢查緩衝區是否已完成,或查看緩衝了多少資料,而無需擷取資料。確認收到前一批結果。

協調器和 Worker 會以區塊擷取結果。他們使用 X-Presto-Max-Size HTTP 標頭指定區塊的最大大小(以位元組為單位)。每個區塊都由一個單調遞增的序號識別,有時稱為權杖。第一個結果請求指定序號為零。回應包括

  • X-Presto-Page-Sequence-Id HTTP 標頭表示要求的序號,

  • X-Presto-Page-End-Sequence-Id HTTP 標頭表示用於確認收到區塊並請求下一個區塊的序號,

  • X-Presto-Buffer-Complete HTTP 標頭表示沒有更多結果,值為 true

  • X-Presto-Buffer-Remaining-Bytes HTTP 標頭表示輸出緩衝區中剩餘的緩衝位元組數。這應該傳回一個逗號分隔的清單,其中包含下一個請求中可以傳回的頁面大小(以位元組為單位)。這可以用作對上游工作的提示,以最佳化資料交換。

回應的主體包含SerializedPage 線路格式中的頁面清單。

收到第一批結果後,用戶端會使用 X-Presto-Page-End-Sequence-Id 序號來請求下一批結果。請求下一批會自動確認收到前一批。用戶端會持續擷取結果,直到收到 X-Presto-Buffer-Complete HTTP 標頭,值為 true

當用戶端決定不立即擷取下一批資料時,它會使用對 {taskId}/results/{bufferId}/{token}/acknowledge 的 GET 來傳送明確的確認。用戶端會將權杖設定為先前收到的 X-Presto-Page-End-Sequence-Id 標頭的值。

如果 Worker 逾時填入回應,或工作已失敗或中止,Worker 將傳回空的結果。用戶端可以嘗試重試請求。在工作處於終端狀態的情況下,假設控制平面最終會處理狀態變更。

如果用戶端遺失了回應,它可以重複請求,而 Worker 將再次傳送結果。在收到序號的確認後,Worker 會刪除所有序號小於該序號的結果,且用戶端無法再重新擷取這些結果。

以下是從輸出緩衝區零擷取雙區塊結果的訊息傳遞範例圖。

../_images/worker-protocol-results.png

輸出緩衝區

資料隨機處理涉及下游階段中的 Worker 從上游階段中的 Worker 擷取結果。每個產生上游的 Worker 會設定與下游階段中 Worker 數量一樣多的輸出緩衝區。輸出緩衝區由從零開始的連續編號識別。每個下游 Worker 都會獲指派一個輸出緩衝區,並使用它從所有上游 Worker 擷取結果。

下圖顯示 3 個下游 Worker。它們獲指派輸出緩衝區編號 0、1 和 2。每個上游 Worker 都有 3 個輸出緩衝區。下游 Worker #0 使用緩衝區編號 0 從所有上游 Worker 擷取結果。下游 Worker #1 使用緩衝區編號 1 從所有上游 Worker 擷取結果。下游 Worker #2 使用緩衝區編號 2 從所有上游 Worker 擷取結果。

../_images/worker-protocol-output-buffers.png

錯誤處理

工作失敗會透過 TaskStatusTaskInfo 更新報告給協調器。

當發現工作失敗時,協調器會中止所有剩餘的工作,並向用戶端報告查詢失敗。當發生工作失敗或收到中止請求時,所有進一步的處理都會停止,且所有剩餘的工作輸出都會被捨棄。

為了防止連鎖故障,失敗或中止的任務會像平常一樣繼續回應資料平面請求。因為輸出在失敗時會被完全丟棄,所有後續的回應都是空的。 X-Presto-Buffer-Complete 標頭會被設定為 false,以防止下游任務成功完成並產生不正確的結果。

對客戶端來說,這些回應與健康任務的回應沒有區別。為了避免請求突發,在回應空結果集之前,會套用一個標準延遲。

問題診斷

HTTP 請求日誌可以幫助診斷與協定相關的問題。

可以通過 config.properties 檔案啟用請求日誌。

在 Presto 中

http-server.log.enabled=true
http-server.log.path=<request_log_file_path>

在 Prestissimo 中(日誌會寫入標準日誌)

http-server.enable-access-log=true

使用 grep 來追蹤特定的協定互動。

一次 Exchange

cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results'
I0402 15:33:06.928076   625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:06] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 0   57
I0402 15:33:07.181629   625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:07] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 94024   0
I0402 15:33:25.392717   675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/1 HTTP/1.1" 200 0   0
I0402 15:33:25.393162   675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "DELETE /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213 HTTP/1.1" 200 0   0

一個 TaskStatus 更新

cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status'
I0402 15:33:34.629278   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:34] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:35.636466   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:35] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:36.644189   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:36.768704   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 717   115

日誌記錄包含諸如回應狀態、回應大小和回應時間等資訊,這些資訊有助於理解互動流程,包括在檢查它們時的延遲和逾時。