函數

外掛程式實作

函數框架用於實作 SQL 函數。Presto 包含許多內建函數。為了實作新的函數,您可以編寫一個外掛程式,從 getFunctions() 傳回一個或多個函數

public class ExampleFunctionsPlugin
        implements Plugin
{
    @Override
    public Set<Class<?>> getFunctions()
    {
        return ImmutableSet.<Class<?>>builder()
                .add(ExampleNullFunction.class)
                .add(IsNullFunction.class)
                .add(IsEqualOrNullFunction.class)
                .add(ExampleStringFunction.class)
                .add(ExampleAverageFunction.class)
                .build();
    }
}

請注意,ImmutableSet 類別是 Guava 的公用程式類別。 getFunctions() 方法包含我們將在本教學課程中實作的所有函數類別。

如需程式碼庫中的完整範例,請參閱 Presto 原始碼根目錄中的 presto-ml 模組(用於機器學習函數)或 presto-teradata-functions 模組(用於與 Teradata 相容的函數)。

純量函數實作

函數框架使用註解來指示有關函數的相關資訊,包括名稱、描述、傳回類型和參數類型。以下是實作 is_null 的範例函數

public class ExampleNullFunction
{
    @ScalarFunction("is_null", calledOnNullInput = true)
    @Description("Returns TRUE if the argument is NULL")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isNull(@SqlNullable @SqlType(StandardTypes.VARCHAR) Slice string)
    {
        return (string == null);
    }
}

函數 is_null 接受單一 VARCHAR 引數,並傳回一個 BOOLEAN,指示引數是否為 NULL。請注意,函數的引數類型為 SliceVARCHAR 使用 Slice,它本質上是 byte[] 的包裝函式,而不是 String 作為其原生容器類型。

  • @SqlType:

    @SqlType 註解用於宣告傳回類型和引數類型。請注意,Java 程式碼的傳回類型和引數必須符合對應註解的原生容器類型。

  • @SqlNullable:

    @SqlNullable 註解指示引數可能為 NULL。如果沒有此註解,框架會假設如果任何引數為 NULL,則所有函數都會傳回 NULL。當使用具有基本原生容器類型 (例如 BigintType) 的 Type 時,使用 @SqlNullable 時,請使用原生容器類型的物件包裝函式。如果方法在引數為非 Null 時可能傳回 NULL,則必須使用 @SqlNullable 註解。

參數化純量函數

具有類型參數的純量函數具有一些額外的複雜性。為了使我們之前的範例適用於任何類型,我們需要以下內容

@ScalarFunction(name = "is_null", calledOnNullInput = true)
@Description("Returns TRUE if the argument is NULL")
public final class IsNullFunction
{
    @TypeParameter("T")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isNullSlice(@SqlNullable @SqlType("T") Slice value)
    {
        return (value == null);
    }

    @TypeParameter("T")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isNullLong(@SqlNullable @SqlType("T") Long value)
    {
        return (value == null);
    }

    @TypeParameter("T")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isNullDouble(@SqlNullable @SqlType("T") Double value)
    {
        return (value == null);
    }

    // ...and so on for each native container type
}
  • @TypeParameter:

    @TypeParameter 註解用於宣告類型參數,該參數可用於引數類型 @SqlType 註解,或函數的傳回類型。它也可用於註解類型為 Type 的參數。在執行階段,引擎會將具體類型繫結至此參數。或者,可以透過將 boundedBy 類型類別提供給 @TypeParameter,將類型參數限制為特定類型的子代。 @OperatorDependency 可用於宣告需要用於操作給定類型參數的其他函數。例如,以下函數只會繫結至已定義 equals 函數的類型

@ScalarFunction(name = "is_equal_or_null", calledOnNullInput = true)
@Description("Returns TRUE if arguments are equal or both NULL")
public final class IsEqualOrNullFunction
{
    @TypeParameter("T")
    @SqlType(StandardTypes.BOOLEAN)
    public static boolean isEqualOrNullSlice(
            @OperatorDependency(operator = OperatorType.EQUAL, returnType = StandardTypes.BOOLEAN, argumentTypes = {"T", "T"}) MethodHandle equals,
            @SqlNullable @SqlType("T") Slice value1,
            @SqlNullable @SqlType("T") Slice value2)
    {
        if (value1 == null && value2 == null) {
            return true;
        }
        if (value1 == null || value2 == null) {
            return false;
        }
        return (boolean) equals.invokeExact(value1, value2);
    }

    // ...and so on for each native container type
}

另一個純量函數範例

lowercaser 函數接受單一 VARCHAR 引數,並傳回一個 VARCHAR,它是轉換為小寫的引數

public class ExampleStringFunction
{
    @ScalarFunction("lowercaser")
    @Description("converts the string to alternating case")
    @SqlType(StandardTypes.VARCHAR)
    public static Slice lowercaser(@SqlType(StandardTypes.VARCHAR) Slice slice)
    {
        String argument = slice.toStringUtf8();
        return Slices.utf8Slice(argument.toLowerCase());
    }
}

請注意,對於大多數常見的字串函數,包括將字串轉換為小寫,Slice 程式庫也提供了直接在底層 byte[] 上運作的實作,這些實作具有更好的效能。此函數沒有 @SqlNullable 註解,這表示如果引數為 NULL,則結果將自動為 NULL(不會呼叫函數)。

Codegen 純量函數實作

純量函數也可以在位元組碼中實作,讓我們可以根據 @TypeParameter 專門化和最佳化函數

  • @CodegenScalarFunction:

    @CodegenScalarFunction 註解用於宣告以位元組碼實作的純量函數。 @SqlType 註解用於宣告傳回類型。它採用 Type 作為參數,這些參數也具有 @SqlType 註解。傳回類型是 MethodHandle,它是 codegen 函數方法。

public class CodegenArrayLengthFunction
{
    @CodegenScalarFunction("array_length", calledOnNullInput = true)
    @SqlType(StandardTypes.INTEGER)
    @TypeParameter("K")
    public static MethodHandle arrayLength(@SqlType("array(K)") Type arr)
    {
        CallSiteBinder binder = new CallSiteBinder();
        ClassDefinition classDefinition = new ClassDefinition(a(Access.PUBLIC, FINAL), makeClassName("ArrayLength"), type(Object.class));
        classDefinition.declareDefaultConstructor(a(PRIVATE));

        Parameter inputBlock = arg("inputBlock", Block.class);
        MethodDefinition method = classDefinition.declareMethod(a(Access.PUBLIC, STATIC), "array_length", type(Block.class), ImmutableList.of(inputBlock));
        BytecodeBlock body = method.getBody();
        body.append(inputBlock.invoke("getPositionCount", int.class).ret());

        Class<?> clazz = defineClass(classDefinition, Object.class, binder.getBindings(), CodegenArrayLengthFunction.class.getClassLoader());
        return new methodHandle(clazz, "array_length", Block.class), Optional.of();
    }
}

彙總函數實作

彙總函數使用類似於純量函數的框架,但稍微複雜一些。

  • AccumulatorState:

    所有彙總函數都會將輸入列累加到狀態物件中;此物件必須實作 AccumulatorState。對於簡單的彙總,只需將 AccumulatorState 擴展到具有您想要的 getter 和 setter 的新介面,框架將為您產生所有實作和序列化程式。如果您需要更複雜的狀態物件,則需要實作 AccumulatorStateFactoryAccumulatorStateSerializer,並透過 AccumulatorStateMetadata 註解提供這些。

以下程式碼實作彙總函數 avg_double,它會計算 DOUBLE 欄位的平均值

@AggregationFunction("avg_double")
public class AverageAggregation
{
    @InputFunction
    public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value)
    {
        state.setLong(state.getLong() + 1);
        state.setDouble(state.getDouble() + value);
    }

    @CombineFunction
    public static void combine(LongAndDoubleState state, LongAndDoubleState otherState)
    {
        state.setLong(state.getLong() + otherState.getLong());
        state.setDouble(state.getDouble() + otherState.getDouble());
    }

    @OutputFunction(StandardTypes.DOUBLE)
    public static void output(LongAndDoubleState state, BlockBuilder out)
    {
        long count = state.getLong();
        if (count == 0) {
            out.appendNull();
        }
        else {
            double value = state.getDouble();
            DOUBLE.writeDouble(out, value / count);
        }
    }
}

平均值有兩個部分:欄位中每列 DOUBLE 的總和,以及看到的列數的 LONG 計數。 LongAndDoubleState 是一個介面,會擴充 AccumulatorState

public interface LongAndDoubleState
        extends AccumulatorState
{
    long getLong();

    void setLong(long value);

    double getDouble();

    void setDouble(double value);
}

如上所述,對於簡單的 AccumulatorState 物件,只需定義具有 getter 和 setter 的介面即可,框架會為您產生實作。

接下來將深入探討與撰寫彙總函數相關的各種註解

  • @InputFunction:

    @InputFunction 註解宣告一個函數,該函數接受輸入列並將它們儲存在 AccumulatorState 中。類似於純量函數,您必須使用 @SqlType 註解參數。請注意,與上述純量範例中,使用 Slice 來保存 VARCHAR 不同,這裡輸入函數的參數使用原始的 double 型別。在這個例子中,輸入函數只是追蹤正在執行的列數(透過 setLong())和執行總和(透過 setDouble())。

  • @CombineFunction:

    @CombineFunction 註解宣告一個函數,用於合併兩個狀態物件。此函數用於合併所有部分聚合狀態。它接受兩個狀態物件,並將結果合併到第一個物件中(在上面的範例中,僅僅是將它們相加)。

  • @OutputFunction:

    @OutputFunction 是計算聚合時最後呼叫的函數。它接受最終狀態物件(合併所有部分狀態的結果),並將結果寫入 BlockBuilder

  • 序列化發生在哪裡?什麼是 GroupedAccumulatorState

    @InputFunction 通常在與 @CombineFunction 不同的 worker 上執行,因此狀態物件會在這些 worker 之間由聚合框架序列化和傳輸。當執行 GROUP BY 聚合時,會使用 GroupedAccumulatorState,如果您沒有指定 AccumulatorStateFactory,則會自動為您生成一個實作。

進階使用案例

原始區塊輸入

純量和聚合函數註解都允許您定義操作原生型別的方法。在 Java 中,這些原生型別是 booleanSlicelong。對於參數化實作或參數型別,標準 Java 型別無法使用,因為它們無法表示輸入資料。

若要定義可以接受任何型別的方法處理程序,請結合使用 @BlockPosition@BlockIndex 參數。與 @SqlNullable 註解類似,使用 @NullablePosition 註解來表示當區塊位置為 NULL 時應呼叫該函數。

這適用於純量和聚合函數實作。

@ScalarFunction("example")
public static Block exampleFunction(
        @BlockPosition @NullablePosition @SqlType("array(int)") Block block,
        @BlockIndex int index) { /* ...implementation */ }

使用 @BlockPosition 套用泛型型別

當函數使用 @TypeParameter 註解定義時,使用 @BlockPosition 語法的函數簽名能夠對泛型型別進行操作。在 @BlockPosition 引數中加入額外的 @SqlType("T") 註解,表示它接受與泛型型別相對應的引數。這適用於純量和聚合函數實作。

@ScalarFunction("example")
@TypeParameter("T")
public static Block exampleFunction(
        @BlockPosition @SqlType("T") Block block,
        @BlockIndex int index) { /* ...implementation */ }

使用 @TypeParameter 擷取泛型型別

在函數的引數列表開頭加入 @TypeParameter 註解,以允許實作執行型別特定的邏輯。將一個帶有 @TypeParameter 註解的 Type 型別引數作為函數簽名的第一個引數,以存取 Type。這適用於純量和聚合函數。

@ScalarFunction("example")
@TypeParameter("T")
public static Block exampleFunction(
        @TypeParameter("T") Type type,
        @BlockPosition @SqlType("T") Block block,
        @BlockIndex int index) { /* ...implementation */ }