Kafka 連接器

概觀

此連接器允許在 Presto 中將 Apache Kafka 主題用作表格。每個訊息在 Presto 中都呈現為一列。

主題可以是即時的:資料會隨著訊息到達而出現,並隨著訊息被捨棄而消失。如果單一查詢中多次存取相同的表格(例如,執行自我聯結),可能會導致奇怪的行為。

注意

支援 Apache Kafka 2.3.1+。

組態

若要設定 Kafka 連接器,請建立一個目錄屬性檔案 etc/catalog/kafka.properties,其中包含以下內容,並根據需要取代屬性

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port

多個 Kafka 叢集

您可以根據需要設定任意數量的目錄,因此如果您有額外的 Kafka 叢集,只需在 etc/catalog 中新增另一個屬性檔案,並使用不同的名稱(確保它以 .properties 結尾)。例如,如果您將屬性檔案命名為 sales.properties,Presto 將會使用設定的連接器建立一個名為 sales 的目錄。

組態屬性

可以使用以下組態屬性

屬性名稱

描述

kafka.table-names

此目錄提供的所有表格清單

kafka.default-schema

表格的預設綱要名稱

kafka.nodes

Kafka 叢集中的節點清單

kafka.connect-timeout

連線至 Kafka 叢集的逾時時間

kafka.max-poll-records

每次輪詢的最大記錄數

kafka.max-partition-fetch-bytes

每次輪詢從一個分割區擷取的最大位元組數

kafka.table-description-dir

包含主題描述檔案的目錄

kafka.hide-internal-columns

控制內部欄是否為表格綱要的一部分

kafka.table-names

此目錄提供的所有表格的逗號分隔清單。表格名稱可以是未限定的(簡單名稱),並且會放入預設綱要中(請參閱下文),或者可以使用綱要名稱限定 (<綱要名稱>.<表格名稱>)。

對於此處定義的每個表格,可能會存在表格描述檔案(請參閱下文)。如果不存在表格描述檔案,則表格名稱會被用作 Kafka 上的主題名稱,並且沒有資料欄會對應到表格中。表格仍將包含所有內部欄(請參閱下文)。

此屬性為必要屬性;沒有預設值,並且必須定義至少一個表格。

kafka.default-schema

定義將包含所有未定義限定綱要名稱的表格的綱要。

此屬性為選用屬性;預設值為 default

kafka.nodes

Kafka 資料節點的 hostname:port 配對的逗號分隔清單。

此屬性為必要屬性;沒有預設值,並且必須定義至少一個節點。

注意

即使此處僅指定了叢集的一個子集,Presto 仍然必須能夠連線至叢集的所有節點,因為訊息可能僅位於特定節點上。

kafka.connect-timeout

連線至資料節點的逾時時間。繁忙的 Kafka 叢集可能需要相當長的時間才能接受連線;當看到因逾時而失敗的查詢時,增加此值是一個好策略。

此屬性為選用屬性;預設值為 10 秒 (10s)。

kafka.max-poll-records

每次從 Kafka 執行 poll() 的最大記錄數。

此屬性為選用屬性;預設值為 500

kafka.max-partition-fetch-bytes``

每次輪詢從一個分割區擷取的最大位元組數

此屬性為選用屬性;預設值為 1MB

kafka.table-description-dir

參照 Presto 部署中一個包含一個或多個 JSON 檔案(必須以 .json 結尾)的資料夾,其中包含表格描述檔案。

此屬性為選用屬性;預設值為 etc/kafka

kafka.hide-internal-columns

除了表格描述檔案中定義的資料欄之外,連接器還會為每個表格維護許多額外的欄。如果這些欄被隱藏,它們仍然可以在查詢中使用,但不會顯示在 DESCRIBE <表格名稱>SELECT * 中。

此屬性為選用屬性;預設值為 true

內部欄位

對於每個定義的表格,連接器會維護以下欄

欄名稱

類型

描述

_partition_id

BIGINT

包含此列的 Kafka 分割區 ID。

_partition_offset

BIGINT

此列在 Kafka 分割區中的偏移量。

_message_corrupt

BOOLEAN

如果解碼器無法解碼此列的訊息,則為 True。如果為 true,則應該將從訊息對應的資料欄視為無效。

_message

VARCHAR

訊息位元組為 UTF-8 編碼的字串。這僅適用於文字主題。

_message_length

BIGINT

訊息中的位元組數。

_key_corrupt

BOOLEAN

如果鍵解碼器無法解碼此列的鍵,則為 True。如果為 true,則應該將從鍵對應的資料欄視為無效。

_key

VARCHAR

鍵位元組為 UTF-8 編碼的字串。這僅適用於文字鍵。

_key_length

BIGINT

索引鍵中的位元組數。

對於沒有表格定義檔的表格,_key_corrupt_message_corrupt 欄位的值將永遠為 false

表格定義檔

Kafka 僅將主題維護為位元組訊息,並由生產者和消費者自行定義應如何解讀訊息。對於 Presto 而言,此資料必須對應到欄位中,才能允許對資料進行查詢。

注意

對於包含 JSON 資料的文字主題,完全可以不使用任何表格定義檔,而是使用 Presto 的 JSON 函數和運算子 來剖析包含位元組對應到 UTF-8 字串的 _message 欄位。然而,這樣做相當麻煩,並且難以撰寫 SQL 查詢。

表格定義檔包含表格的 JSON 定義。檔案名稱可以是任意的,但必須以 .json 結尾。

{
    "tableName": ...,
    "schemaName": ...,
    "topicName": ...,
    "key": {
        "dataFormat": ...,
        "fields": [
            ...
        ]
    },
    "message": {
        "dataFormat": ...,
        "fields": [
            ...
       ]
    }
}

欄位

必要

類型

描述

tableName

必要

字串

此檔案定義的 Presto 表格名稱。

schemaName

可選

字串

將包含表格的綱要。如果省略,則使用預設綱要名稱。

topicName

必要

字串

對應的 Kafka 主題。

key

可選

JSON 物件

對應到訊息索引鍵的資料欄位定義。

message

可選

JSON 物件

對應到訊息本身的資料欄位定義。

Kafka 中的索引鍵和訊息

從 Kafka 0.8 開始,主題中的每個訊息都可以有一個選用的索引鍵。表格定義檔包含索引鍵和訊息兩個區段,以便將資料對應到表格欄位上。

表格定義中的每個 keymessage 欄位都是一個 JSON 物件,必須包含兩個欄位:

欄位

必要

類型

描述

dataFormat

必要

字串

為此欄位群組選取解碼器。

fields

必要

JSON 陣列

欄位定義的清單。每個欄位定義都會在 Presto 表格中建立一個新的欄位。

每個欄位定義都是一個 JSON 物件

{
    "name": ...,
    "type": ...,
    "dataFormat": ...,
    "mapping": ...,
    "formatHint": ...,
    "hidden": ...,
    "comment": ...
}

欄位

必要

類型

描述

name

必要

字串

Presto 表格中的欄位名稱。

type

必要

字串

欄位的 Presto 類型。

dataFormat

可選

字串

為此欄位選取欄位解碼器。預設為此列資料格式和欄位類型的預設解碼器。

dataSchema

可選

字串

Avro 綱要所在的檔案路徑或 URL。僅適用於 Avro 解碼器。

mapping

可選

字串

欄位的對應資訊。這與解碼器有關,請參閱下方說明。

formatHint

可選

字串

為欄位解碼器設定特定欄位的格式提示。

hidden

可選

布林值

DESCRIBE <表格名稱>SELECT * 中隱藏欄位。預設為 false

comment

可選

字串

新增欄位註解,會與 DESCRIBE <表格名稱> 一起顯示。

索引鍵或訊息的欄位描述沒有限制。

列解碼

對於索引鍵和訊息,會使用解碼器將訊息和索引鍵資料對應到表格欄位上。

Kafka 連接器包含下列解碼器:

  • raw - 不會解讀 Kafka 訊息,原始訊息位元組的範圍會對應到表格欄位

  • csv - Kafka 訊息會被解讀為逗號分隔訊息,並且欄位會對應到表格欄位

  • json - Kafka 訊息會剖析為 JSON,並且 JSON 欄位會對應到表格欄位

  • avro - Kafka 訊息會根據 Avro 綱要進行剖析,並且 Avro 欄位會對應到表格欄位

注意

如果表格沒有表格定義檔,則會使用 dummy 解碼器,該解碼器不會公開任何欄位。

raw 解碼器

原始解碼器支援從 Kafka 訊息或索引鍵讀取原始(以位元組為基礎)的值,並將其轉換為 Presto 欄位。

對於欄位,支援以下屬性:

  • dataFormat - 選取轉換的資料類型寬度

  • type - Presto 資料類型 (支援資料類型的清單,請參閱下方表格)

  • mapping - <起始>[:<結束>];要轉換的位元組起始和結束位置(可選)

dataFormat 屬性選取轉換的位元組數。如果不存在,則會假設為 BYTE。所有值都是帶正負號的。

支援的值如下:

  • BYTE - 一個位元組

  • SHORT - 兩個位元組(大端序)

  • INT - 四個位元組(大端序)

  • LONG - 八個位元組(大端序)

  • FLOAT - 四個位元組 (IEEE 754 格式)

  • DOUBLE - 八個位元組 (IEEE 754 格式)

type 屬性定義值對應到的 Presto 資料類型。

根據指派給欄位的 Presto 類型,可以使用不同的 dataFormat 值

Presto 資料類型

允許的 dataFormat

BIGINT

BYTESHORTINTLONG

INTEGER

BYTESHORTINT

SMALLINT

BYTESHORT

TINYINT

BYTE

DOUBLE

DOUBLEFLOAT

BOOLEAN

BYTESHORTINTLONG

VARCHAR / VARCHAR(x)

BYTE

mapping 屬性指定索引鍵或訊息中用於解碼的位元組範圍。它可以是單一數字或兩個以冒號分隔的數字 (<起始>[:<結束>])。

如果只指定起始位置:

  • 對於固定寬度的類型,該欄位會使用指定 dataFormat 的適當位元組數(請參閱上方)。

  • 當解碼 VARCHAR 值時,將會使用從起始位置到訊息結尾的所有位元組。

如果指定起始和結束位置:

  • 對於固定寬度的類型,大小必須等於指定的 dataFormat 所使用的位元組數。

  • 對於 VARCHAR,會使用起始(含)和結束(不含)之間的所有位元組。

如果未指定 mapping 屬性,則相當於將起始位置設定為 0,並且將結束位置設定為未定義。

數值資料類型(BIGINTINTEGERSMALLINTTINYINTDOUBLE)的解碼配置相當簡單。會從輸入訊息中讀取位元組序列,並根據下列其中一個項目進行解碼:

  • 大端序編碼(針對整數類型)

  • IEEE 754 格式(針對 DOUBLE)。

解碼位元組序列的長度由 dataFormat 暗示。

對於 VARCHAR 資料類型,會根據 UTF-8 編碼來解讀位元組序列。

csv 解碼器

CSV 解碼器會使用 UTF-8 編碼將代表訊息或索引鍵的位元組轉換為字串,然後將結果解讀為 CSV(逗號分隔值)行。

對於欄位,必須定義 typemapping 屬性

  • type - Presto 資料類型 (支援資料類型的清單,請參閱下方表格)

  • mapping - CSV 記錄中欄位的索引

不支援 dataFormatformatHint,並且必須省略。

下表列出 type 中可使用和解碼配置的支援 Presto 類型

Presto 資料類型

解碼規則

BIGINT
INTEGER
SMALLINT
TINYINT

使用 Java Long.parseLong() 進行解碼

DOUBLE

使用 Java Double.parseDouble() 進行解碼

BOOLEAN

「true」字元序列對應到 true;其他字元序列對應到 false

VARCHAR / VARCHAR(x)

依現狀使用

json 解碼器

JSON 解碼器依照 RFC 4627 將代表訊息或金鑰的位元組轉換為 JSON。請注意,訊息或金鑰必須轉換為 JSON 物件,而非陣列或簡單類型。

對於欄位,支援以下屬性:

  • type - Presto 資料行的類型。

  • dataFormat - 要用於資料行的欄位解碼器。

  • mapping - 以斜線分隔的欄位名稱清單,用於從 JSON 物件中選取欄位。

  • formatHint - 僅適用於 custom-date-time,請參閱下方說明。

JSON 解碼器支援多個欄位解碼器,其中 _default 用於標準表格資料行,而其他一些解碼器則用於日期和時間類型。

下表列出可用於 type 的 Presto 資料類型,以及可透過 dataFormat 屬性指定的相符欄位解碼器。

Presto 資料類型

允許的 dataFormat

BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
BOOLEAN
VARCHAR
VARCHAR(x)

預設欄位解碼器 (省略 dataFormat 屬性)

TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIME
TIME WITH TIME ZONE

custom-date-timeiso8601rfc2822milliseconds-since-epochseconds-since-epoch

DATE

custom-date-timeiso8601rfc2822

預設欄位解碼器

這是支援所有 Presto 實體資料類型的標準欄位解碼器。欄位值將會透過 JSON 轉換規則強制轉換為布林值、長整數、雙精度浮點數或字串值。對於非日期/時間基礎的資料行,應使用此解碼器。

日期和時間解碼器

若要將 JSON 物件中的值轉換為 Presto 的 DATETIMETIME WITH TIME ZONETIMESTAMPTIMESTAMP WITH TIME ZONE 資料行,則必須使用欄位定義的 dataFormat 屬性選取特殊的解碼器。

  • iso8601 - 基於文字,將文字欄位解析為 ISO 8601 時間戳記。

  • rfc2822 - 基於文字,將文字欄位解析為 RFC 2822 時間戳記。

  • custom-date-time - 基於文字,根據 Joda 格式模式解析文字欄位

    透過 formatHint 屬性指定。格式模式應符合 https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html

  • milliseconds-since-epoch - 基於數字,將文字或數字解讀為自 epoch 以來的毫秒數。

  • seconds-since-epoch - 基於數字,將文字或數字解讀為自 epoch 以來的秒數。

對於 TIMESTAMP WITH TIME ZONETIME WITH TIME ZONE 資料類型,如果已解碼的值中存在時區資訊,則會用於 Presto 值。否則,結果時區將設定為 UTC

avro 解碼器

Avro 解碼器會根據結構描述將 Avro 格式的訊息或金鑰的位元組轉換。訊息必須內嵌 Avro 結構描述。Presto 不支援無結構描述的 Avro 解碼。

對於金鑰/訊息,使用 avro 解碼器時,必須定義 dataSchema。這應該指向需要解碼的訊息之有效 Avro 結構描述檔案的位置。此位置可以是遠端網頁伺服器 (例如:dataSchema: 'http://example.org/schema/avro_data.avsc') 或本機檔案系統 (例如:dataSchema: '/usr/local/schema/avro_data.avsc')。如果無法從 Presto 協調器節點存取此位置,則解碼器將會失敗。

對於欄位,支援以下屬性:

  • name - Presto 表格中資料行的名稱。

  • type - Presto 資料行的類型。

  • mapping - 以斜線分隔的欄位名稱清單,用於從 Avro 結構描述中選取欄位。如果 mapping 中指定的欄位在原始 Avro 結構描述中不存在,則讀取操作會傳回 NULL。

下表列出可用於 type 的支援 Presto 類型,對應的 Avro 欄位類型。

Presto 資料類型

允許的 Avro 資料類型

BIGINT

INTLONG

DOUBLE

DOUBLEFLOAT

BOOLEAN

BOOLEAN

VARCHAR / VARCHAR(x)

STRING

VARBINARY

FIXEDBYTES

ARRAY

ARRAY

MAP

MAP

Avro 結構描述演變

Avro 解碼器支援具有回溯相容性的結構描述演變功能。透過回溯相容性,可以使用較新的結構描述來讀取以較舊結構描述建立的 Avro 資料。Avro 結構描述中的任何變更也必須反映在 Presto 的主題定義檔案中。新加入/重新命名的欄位必須在 Avro 結構描述檔案中具有預設值。

結構描述演變行為如下

  • 在新結構描述中新增的資料行:使用較舊結構描述建立的資料,在使用新結構描述時,將會產生預設值。

  • 在新結構描述中移除的資料行:使用較舊結構描述建立的資料,將不再輸出已移除的資料行中的資料。

  • 在新結構描述中重新命名的資料行:這相當於移除資料行並新增一個新的資料行,而且當表格使用新結構描述時,使用舊結構描述建立的資料將會產生預設值。

  • 變更新結構描述中的資料行類型:如果 Avro 支援類型強制轉換,則會進行轉換。對於不相容的類型,則會擲回錯誤。