Kafka 連接器¶
概觀¶
此連接器允許在 Presto 中將 Apache Kafka 主題用作表格。每個訊息在 Presto 中都呈現為一列。
主題可以是即時的:資料會隨著訊息到達而出現,並隨著訊息被捨棄而消失。如果單一查詢中多次存取相同的表格(例如,執行自我聯結),可能會導致奇怪的行為。
注意
支援 Apache Kafka 2.3.1+。
組態¶
若要設定 Kafka 連接器,請建立一個目錄屬性檔案 etc/catalog/kafka.properties
,其中包含以下內容,並根據需要取代屬性
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
多個 Kafka 叢集¶
您可以根據需要設定任意數量的目錄,因此如果您有額外的 Kafka 叢集,只需在 etc/catalog
中新增另一個屬性檔案,並使用不同的名稱(確保它以 .properties
結尾)。例如,如果您將屬性檔案命名為 sales.properties
,Presto 將會使用設定的連接器建立一個名為 sales
的目錄。
組態屬性¶
可以使用以下組態屬性
屬性名稱 |
描述 |
---|---|
|
此目錄提供的所有表格清單 |
|
表格的預設綱要名稱 |
|
Kafka 叢集中的節點清單 |
|
連線至 Kafka 叢集的逾時時間 |
|
每次輪詢的最大記錄數 |
|
每次輪詢從一個分割區擷取的最大位元組數 |
|
包含主題描述檔案的目錄 |
|
控制內部欄是否為表格綱要的一部分 |
kafka.table-names
¶
此目錄提供的所有表格的逗號分隔清單。表格名稱可以是未限定的(簡單名稱),並且會放入預設綱要中(請參閱下文),或者可以使用綱要名稱限定 (<綱要名稱>.<表格名稱>
)。
對於此處定義的每個表格,可能會存在表格描述檔案(請參閱下文)。如果不存在表格描述檔案,則表格名稱會被用作 Kafka 上的主題名稱,並且沒有資料欄會對應到表格中。表格仍將包含所有內部欄(請參閱下文)。
此屬性為必要屬性;沒有預設值,並且必須定義至少一個表格。
kafka.default-schema
¶
定義將包含所有未定義限定綱要名稱的表格的綱要。
此屬性為選用屬性;預設值為 default
。
kafka.nodes
¶
Kafka 資料節點的 hostname:port
配對的逗號分隔清單。
此屬性為必要屬性;沒有預設值,並且必須定義至少一個節點。
注意
即使此處僅指定了叢集的一個子集,Presto 仍然必須能夠連線至叢集的所有節點,因為訊息可能僅位於特定節點上。
kafka.connect-timeout
¶
連線至資料節點的逾時時間。繁忙的 Kafka 叢集可能需要相當長的時間才能接受連線;當看到因逾時而失敗的查詢時,增加此值是一個好策略。
此屬性為選用屬性;預設值為 10 秒 (10s
)。
kafka.max-poll-records
¶
每次從 Kafka 執行 poll() 的最大記錄數。
此屬性為選用屬性;預設值為 500
。
kafka.max-partition-fetch-bytes``¶
每次輪詢從一個分割區擷取的最大位元組數
此屬性為選用屬性;預設值為 1MB
。
kafka.table-description-dir
¶
參照 Presto 部署中一個包含一個或多個 JSON 檔案(必須以 .json
結尾)的資料夾,其中包含表格描述檔案。
此屬性為選用屬性;預設值為 etc/kafka
。
kafka.hide-internal-columns
¶
除了表格描述檔案中定義的資料欄之外,連接器還會為每個表格維護許多額外的欄。如果這些欄被隱藏,它們仍然可以在查詢中使用,但不會顯示在 DESCRIBE <表格名稱>
或 SELECT *
中。
此屬性為選用屬性;預設值為 true
。
內部欄位¶
對於每個定義的表格,連接器會維護以下欄
欄名稱 |
類型 |
描述 |
---|---|---|
|
BIGINT |
包含此列的 Kafka 分割區 ID。 |
|
BIGINT |
此列在 Kafka 分割區中的偏移量。 |
|
BOOLEAN |
如果解碼器無法解碼此列的訊息,則為 True。如果為 true,則應該將從訊息對應的資料欄視為無效。 |
|
VARCHAR |
訊息位元組為 UTF-8 編碼的字串。這僅適用於文字主題。 |
|
BIGINT |
訊息中的位元組數。 |
|
BOOLEAN |
如果鍵解碼器無法解碼此列的鍵,則為 True。如果為 true,則應該將從鍵對應的資料欄視為無效。 |
|
VARCHAR |
鍵位元組為 UTF-8 編碼的字串。這僅適用於文字鍵。 |
|
BIGINT |
索引鍵中的位元組數。 |
對於沒有表格定義檔的表格,_key_corrupt
和 _message_corrupt
欄位的值將永遠為 false
。
表格定義檔¶
Kafka 僅將主題維護為位元組訊息,並由生產者和消費者自行定義應如何解讀訊息。對於 Presto 而言,此資料必須對應到欄位中,才能允許對資料進行查詢。
注意
對於包含 JSON 資料的文字主題,完全可以不使用任何表格定義檔,而是使用 Presto 的 JSON 函數和運算子 來剖析包含位元組對應到 UTF-8 字串的 _message
欄位。然而,這樣做相當麻煩,並且難以撰寫 SQL 查詢。
表格定義檔包含表格的 JSON 定義。檔案名稱可以是任意的,但必須以 .json
結尾。
{
"tableName": ...,
"schemaName": ...,
"topicName": ...,
"key": {
"dataFormat": ...,
"fields": [
...
]
},
"message": {
"dataFormat": ...,
"fields": [
...
]
}
}
欄位 |
必要 |
類型 |
描述 |
---|---|---|---|
|
必要 |
字串 |
此檔案定義的 Presto 表格名稱。 |
|
可選 |
字串 |
將包含表格的綱要。如果省略,則使用預設綱要名稱。 |
|
必要 |
字串 |
對應的 Kafka 主題。 |
|
可選 |
JSON 物件 |
對應到訊息索引鍵的資料欄位定義。 |
|
可選 |
JSON 物件 |
對應到訊息本身的資料欄位定義。 |
Kafka 中的索引鍵和訊息¶
從 Kafka 0.8 開始,主題中的每個訊息都可以有一個選用的索引鍵。表格定義檔包含索引鍵和訊息兩個區段,以便將資料對應到表格欄位上。
表格定義中的每個 key
和 message
欄位都是一個 JSON 物件,必須包含兩個欄位:
欄位 |
必要 |
類型 |
描述 |
---|---|---|---|
|
必要 |
字串 |
為此欄位群組選取解碼器。 |
|
必要 |
JSON 陣列 |
欄位定義的清單。每個欄位定義都會在 Presto 表格中建立一個新的欄位。 |
每個欄位定義都是一個 JSON 物件
{
"name": ...,
"type": ...,
"dataFormat": ...,
"mapping": ...,
"formatHint": ...,
"hidden": ...,
"comment": ...
}
欄位 |
必要 |
類型 |
描述 |
---|---|---|---|
|
必要 |
字串 |
Presto 表格中的欄位名稱。 |
|
必要 |
字串 |
欄位的 Presto 類型。 |
|
可選 |
字串 |
為此欄位選取欄位解碼器。預設為此列資料格式和欄位類型的預設解碼器。 |
|
可選 |
字串 |
Avro 綱要所在的檔案路徑或 URL。僅適用於 Avro 解碼器。 |
|
可選 |
字串 |
欄位的對應資訊。這與解碼器有關,請參閱下方說明。 |
|
可選 |
字串 |
為欄位解碼器設定特定欄位的格式提示。 |
|
可選 |
布林值 |
從 |
|
可選 |
字串 |
新增欄位註解,會與 |
索引鍵或訊息的欄位描述沒有限制。
列解碼¶
對於索引鍵和訊息,會使用解碼器將訊息和索引鍵資料對應到表格欄位上。
Kafka 連接器包含下列解碼器:
raw
- 不會解讀 Kafka 訊息,原始訊息位元組的範圍會對應到表格欄位csv
- Kafka 訊息會被解讀為逗號分隔訊息,並且欄位會對應到表格欄位json
- Kafka 訊息會剖析為 JSON,並且 JSON 欄位會對應到表格欄位avro
- Kafka 訊息會根據 Avro 綱要進行剖析,並且 Avro 欄位會對應到表格欄位
注意
如果表格沒有表格定義檔,則會使用 dummy
解碼器,該解碼器不會公開任何欄位。
raw
解碼器¶
原始解碼器支援從 Kafka 訊息或索引鍵讀取原始(以位元組為基礎)的值,並將其轉換為 Presto 欄位。
對於欄位,支援以下屬性:
dataFormat
- 選取轉換的資料類型寬度type
- Presto 資料類型 (支援資料類型的清單,請參閱下方表格)mapping
-<起始>[:<結束>]
;要轉換的位元組起始和結束位置(可選)
dataFormat
屬性選取轉換的位元組數。如果不存在,則會假設為 BYTE
。所有值都是帶正負號的。
支援的值如下:
BYTE
- 一個位元組SHORT
- 兩個位元組(大端序)INT
- 四個位元組(大端序)LONG
- 八個位元組(大端序)FLOAT
- 四個位元組 (IEEE 754 格式)DOUBLE
- 八個位元組 (IEEE 754 格式)
type
屬性定義值對應到的 Presto 資料類型。
根據指派給欄位的 Presto 類型,可以使用不同的 dataFormat 值
Presto 資料類型 |
允許的 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mapping
屬性指定索引鍵或訊息中用於解碼的位元組範圍。它可以是單一數字或兩個以冒號分隔的數字 (<起始>[:<結束>]
)。
如果只指定起始位置:
對於固定寬度的類型,該欄位會使用指定
dataFormat
的適當位元組數(請參閱上方)。當解碼
VARCHAR
值時,將會使用從起始位置到訊息結尾的所有位元組。
如果指定起始和結束位置:
對於固定寬度的類型,大小必須等於指定的
dataFormat
所使用的位元組數。對於
VARCHAR
,會使用起始(含)和結束(不含)之間的所有位元組。
如果未指定 mapping
屬性,則相當於將起始位置設定為 0,並且將結束位置設定為未定義。
數值資料類型(BIGINT
、INTEGER
、SMALLINT
、TINYINT
、DOUBLE
)的解碼配置相當簡單。會從輸入訊息中讀取位元組序列,並根據下列其中一個項目進行解碼:
大端序編碼(針對整數類型)
IEEE 754 格式(針對
DOUBLE
)。
解碼位元組序列的長度由 dataFormat
暗示。
對於 VARCHAR
資料類型,會根據 UTF-8 編碼來解讀位元組序列。
csv
解碼器¶
CSV 解碼器會使用 UTF-8 編碼將代表訊息或索引鍵的位元組轉換為字串,然後將結果解讀為 CSV(逗號分隔值)行。
對於欄位,必須定義 type
和 mapping
屬性
type
- Presto 資料類型 (支援資料類型的清單,請參閱下方表格)mapping
- CSV 記錄中欄位的索引
不支援 dataFormat
和 formatHint
,並且必須省略。
下表列出 type
中可使用和解碼配置的支援 Presto 類型
Presto 資料類型 |
解碼規則 |
---|---|
BIGINT INTEGER SMALLINT TINYINT |
使用 Java |
|
使用 Java |
|
「true」字元序列對應到 |
|
依現狀使用 |
json
解碼器¶
JSON 解碼器依照 RFC 4627 將代表訊息或金鑰的位元組轉換為 JSON。請注意,訊息或金鑰必須轉換為 JSON 物件,而非陣列或簡單類型。
對於欄位,支援以下屬性:
type
- Presto 資料行的類型。dataFormat
- 要用於資料行的欄位解碼器。mapping
- 以斜線分隔的欄位名稱清單,用於從 JSON 物件中選取欄位。formatHint
- 僅適用於custom-date-time
,請參閱下方說明。
JSON 解碼器支援多個欄位解碼器,其中 _default
用於標準表格資料行,而其他一些解碼器則用於日期和時間類型。
下表列出可用於 type
的 Presto 資料類型,以及可透過 dataFormat
屬性指定的相符欄位解碼器。
Presto 資料類型 |
允許的 |
---|---|
BIGINT INTEGER SMALLINT TINYINT DOUBLE BOOLEAN VARCHAR VARCHAR(x) |
預設欄位解碼器 (省略 |
TIMESTAMP TIMESTAMP WITH TIME ZONE TIME TIME WITH TIME ZONE |
|
|
|
預設欄位解碼器¶
這是支援所有 Presto 實體資料類型的標準欄位解碼器。欄位值將會透過 JSON 轉換規則強制轉換為布林值、長整數、雙精度浮點數或字串值。對於非日期/時間基礎的資料行,應使用此解碼器。
日期和時間解碼器¶
若要將 JSON 物件中的值轉換為 Presto 的 DATE
、TIME
、TIME WITH TIME ZONE
、TIMESTAMP
或 TIMESTAMP WITH TIME ZONE
資料行,則必須使用欄位定義的 dataFormat
屬性選取特殊的解碼器。
iso8601
- 基於文字,將文字欄位解析為 ISO 8601 時間戳記。rfc2822
- 基於文字,將文字欄位解析為 RFC 2822 時間戳記。custom-date-time
- 基於文字,根據 Joda 格式模式解析文字欄位透過
formatHint
屬性指定。格式模式應符合 https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html。
milliseconds-since-epoch
- 基於數字,將文字或數字解讀為自 epoch 以來的毫秒數。seconds-since-epoch
- 基於數字,將文字或數字解讀為自 epoch 以來的秒數。
對於 TIMESTAMP WITH TIME ZONE
和 TIME WITH TIME ZONE
資料類型,如果已解碼的值中存在時區資訊,則會用於 Presto 值。否則,結果時區將設定為 UTC
。
avro
解碼器¶
Avro 解碼器會根據結構描述將 Avro 格式的訊息或金鑰的位元組轉換。訊息必須內嵌 Avro 結構描述。Presto 不支援無結構描述的 Avro 解碼。
對於金鑰/訊息,使用 avro
解碼器時,必須定義 dataSchema
。這應該指向需要解碼的訊息之有效 Avro 結構描述檔案的位置。此位置可以是遠端網頁伺服器 (例如:dataSchema: 'http://example.org/schema/avro_data.avsc'
) 或本機檔案系統 (例如:dataSchema: '/usr/local/schema/avro_data.avsc'
)。如果無法從 Presto 協調器節點存取此位置,則解碼器將會失敗。
對於欄位,支援以下屬性:
name
- Presto 表格中資料行的名稱。type
- Presto 資料行的類型。mapping
- 以斜線分隔的欄位名稱清單,用於從 Avro 結構描述中選取欄位。如果mapping
中指定的欄位在原始 Avro 結構描述中不存在,則讀取操作會傳回 NULL。
下表列出可用於 type
的支援 Presto 類型,對應的 Avro 欄位類型。
Presto 資料類型 |
允許的 Avro 資料類型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Avro 結構描述演變¶
Avro 解碼器支援具有回溯相容性的結構描述演變功能。透過回溯相容性,可以使用較新的結構描述來讀取以較舊結構描述建立的 Avro 資料。Avro 結構描述中的任何變更也必須反映在 Presto 的主題定義檔案中。新加入/重新命名的欄位必須在 Avro 結構描述檔案中具有預設值。
結構描述演變行為如下
在新結構描述中新增的資料行:使用較舊結構描述建立的資料,在使用新結構描述時,將會產生預設值。
在新結構描述中移除的資料行:使用較舊結構描述建立的資料,將不再輸出已移除的資料行中的資料。
在新結構描述中重新命名的資料行:這相當於移除資料行並新增一個新的資料行,而且當表格使用新結構描述時,使用舊結構描述建立的資料將會產生預設值。
變更新結構描述中的資料行類型:如果 Avro 支援類型強制轉換,則會進行轉換。對於不相容的類型,則會擲回錯誤。